Spark - DataFrame

Updated: 2018-12-11

Concepts

  • "A DataFrame is a distributed collection of data organized into named columns. "
  • i.e. a 2-D table with schema

Basic Operations

Show some samples:

df.show()

Show schema

df.printSchema()

get a column

df("foo")

get a new DataFrame with that column

df.select("foo")

add literal(constant) column

df.withColumn("name", lit(name)

.select() vs .col()

  • df.select(): returns another DataFrame
  • df.col(): returns a Column

Create From file

Example: load json

val df = sqlContext.read.json("examples/src/main/resources/people.json")

Example: load csv

import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructField, StringType, StructType}

// Load header as an Array
val header = Seq("col0", "col1", "col2", "col3", "col4", "col5", "col6", "col7")

val schema = StructType(header.map(name => StructField(name, StringType)))

// Load data as RDD
val datafile = sc.textFile("path/to/test.csv")

// Convert to Row RDD
val rdd = datafile.map(_.split(",")).map(arr => Row.fromSeq(arr))

// Create DataFrame from Row RDD and schema
val df = sqlContext.createDataFrame(rdd, schema)

Create by sqlContext.createDataFrame()

By RDD of Row + schema

Interface:

def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame

Example:

val schema =
  StructType(
    Array(
      StructField("id", LongType, true),
      StructField("name", StringType, true),
      StructField("age", LongType, true),
      StructField("gender", StringType, true),
      StructField("height", LongType, true),
      StructField("job_title", StringType, true)
    )
  )

val rowRDD = sc.parallelize(Array(
  Row(1l, "Name.1", 20l, "M", 6l, "dad"),
  Row(2l, "Name.2", 20l, "F", 5l, "mom"),
  Row(3l, "Name.3", 20l, "F", 5l, "mom"),
  Row(4l, "Name.4", 20l, "M", 5l, "mom"),
  Row(5l, "Name.5", 10l, "M", 4l, "kid"),
  Row(6l, "Name.6", 8l, "M", 3l, "kid")))

val df = sqlContext.createDataFrame(rowRDD, schema)

By RDD of Product

Interface:

def createDataFrame[A <: Product : TypeTag](rdd: RDD[A]): DataFrame

By Seq of Product

Interface:

def createDataFrame[A <: Product : TypeTag](data: Seq[A]): DataFrame

Example:

val data = Array(-0.5, -0.3, 0.0, 0.2)
val dataFrame = sqlContext.createDataFrame(data.map(Tuple1.apply)).toDF("features")

toDF

Not every RDD can be converted to DF by toDF!

If toDF cannot be resolved, make sure that:

  1. this line is added:

    import sqlContext.implicits._
  2. the RDD is composed of Products (i.e., case classes or tuples)

UDF vs UDAF vs Window

  • UDFs and Built-in functions:

    • input: a single row
    • return: a single return value for every input row
    • e.g. substr or round
  • UDAF(Aggregate functions)

    • input: a group of rows
    • return: a single return value for every group
    • e.g. as sum or max
  • Window

    • input: a group of rows
    • return: a single value for every input row

Split

val Array(train, test) = df.randomSplit(Array(0.7, 0.3))

In short:

DataFrame = Data + Schema
          = RDD[Row] + StructType

e.g.

val schema =
  StructType(
    Array(
      StructField("id", LongType, true),
      StructField("name", StringType, true),
      StructField("age", LongType, true),
      StructField("gender", StringType, true),
      StructField("height", LongType, true),
      StructField("job_title", StringType, true)
    )
  )

val rowRDD = sc.parallelize(Array(
  Row(1l, "Name.1", 20l, "M", 6l, "dad"),
  Row(2l, "Name.2", 20l, "F", 5l, "mom"),
  Row(3l, "Name.3", 20l, "F", 5l, "mom"),
  Row(4l, "Name.4", 20l, "M", 5l, "mom"),
  Row(5l, "Name.5", 10l, "M", 4l, "kid"),
  Row(6l, "Name.6", 8l, "M", 3l, "kid")))

val df = sqlContext.createDataFrame(rowRDD, schema)

Schema

/**
   * Returns the schema of this [[DataFrame]].
   * @group basic
   * @since 1.3.0
   */
  def schema: StructType = queryExecution.analyzed.schema

Data

Represents the content of the DataFrame as an RDD of Rows(RDD[Row])

  /**
   * Represents the content of the [[DataFrame]] as an [[RDD]] of [[Row]]s. Note that the RDD is
   * memoized. Once called, it won't change even if you change any query planning related Spark SQL
   * configurations (e.g. `spark.sql.shuffle.partitions`).
   * @group rdd
   * @since 1.3.0
   */
  lazy val rdd: RDD[Row] = {
    // use a local variable to make sure the map closure doesn't capture the whole DataFrame
    val schema = this.schema
    queryExecution.toRdd.mapPartitions { rows =>
      val converter = CatalystTypeConverters.createToScalaConverter(schema)
      rows.map(converter(_).asInstanceOf[Row])
    }
  }

The return value of some methods are RDDs

/**
* Returns a new RDD by applying a function to all rows of this DataFrame.
* @group rdd
* @since 1.3.0
*/
def map[R: ClassTag](f: Row => R): RDD[R] = rdd.map(f)

/**
 * Returns a new RDD by applying a function to each partition of this DataFrame.
 * @group rdd
 * @since 1.3.0
 */
def mapPartitions[R: ClassTag](f: Iterator[Row] => Iterator[R]): RDD[R] = {
  rdd.mapPartitions(f)
}

Filter Out Null

df.filter( df("columnname").isNotNull() )

Select Multiple Columns

The list must be split into the head and the rest in order to select multiple columns...

val cols = List("b", "c"),
df.select(cols.head, cols.tail: _*)

Compare Pandas(using Python):

cols = ["a", "b"]
df = df[cols]

Map

each row generates a new row

MapPartitions

Each partition(multiple rows) generates a new row