Spark - DataFrame
Concepts
- "A DataFrame is a distributed collection of data organized into named columns. "
- i.e. a 2-D table with schema
Basic Operations
Show some samples:
df.show()
Show schema
df.printSchema()
get a column
df("foo")
get a new DataFrame with that column
df.select("foo")
add literal(constant) column
df.withColumn("name", lit(name)
.select() vs .col()
- df.select(): returns another DataFrame
- df.col(): returns a Column
Create From file
Example: load json
val df = sqlContext.read.json("examples/src/main/resources/people.json")
Example: load csv
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructField, StringType, StructType}
// Load header as an Array
val header = Seq("col0", "col1", "col2", "col3", "col4", "col5", "col6", "col7")
val schema = StructType(header.map(name => StructField(name, StringType)))
// Load data as RDD
val datafile = sc.textFile("path/to/test.csv")
// Convert to Row RDD
val rdd = datafile.map(_.split(",")).map(arr => Row.fromSeq(arr))
// Create DataFrame from Row RDD and schema
val df = sqlContext.createDataFrame(rdd, schema)
Create by sqlContext.createDataFrame()
By RDD of Row + schema
Interface:
def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame
Example:
val schema =
StructType(
Array(
StructField("id", LongType, true),
StructField("name", StringType, true),
StructField("age", LongType, true),
StructField("gender", StringType, true),
StructField("height", LongType, true),
StructField("job_title", StringType, true)
)
)
val rowRDD = sc.parallelize(Array(
Row(1l, "Name.1", 20l, "M", 6l, "dad"),
Row(2l, "Name.2", 20l, "F", 5l, "mom"),
Row(3l, "Name.3", 20l, "F", 5l, "mom"),
Row(4l, "Name.4", 20l, "M", 5l, "mom"),
Row(5l, "Name.5", 10l, "M", 4l, "kid"),
Row(6l, "Name.6", 8l, "M", 3l, "kid")))
val df = sqlContext.createDataFrame(rowRDD, schema)
By RDD of Product
Interface:
def createDataFrame[A <: Product : TypeTag](rdd: RDD[A]): DataFrame
By Seq of Product
Interface:
def createDataFrame[A <: Product : TypeTag](data: Seq[A]): DataFrame
Example:
val data = Array(-0.5, -0.3, 0.0, 0.2)
val dataFrame = sqlContext.createDataFrame(data.map(Tuple1.apply)).toDF("features")
toDF
Not every RDD can be converted to DF by toDF
!
If toDF
cannot be resolved, make sure that:
-
this line is added:
import sqlContext.implicits._
-
the RDD is composed of Products (i.e., case classes or tuples)
UDF vs UDAF vs Window
- UDFs and Built-in functions:
- input: a single row
- return: a single return value for every input row
- e.g. substr or round
- UDAF(Aggregate functions)
- input: a group of rows
- return: a single return value for every group
- e.g. as sum or max
- Window
- input: a group of rows
- return: a single value for every input row
Split
val Array(train, test) = df.randomSplit(Array(0.7, 0.3))
In short:
DataFrame = Data + Schema
= RDD[Row] + StructType
e.g.
val schema =
StructType(
Array(
StructField("id", LongType, true),
StructField("name", StringType, true),
StructField("age", LongType, true),
StructField("gender", StringType, true),
StructField("height", LongType, true),
StructField("job_title", StringType, true)
)
)
val rowRDD = sc.parallelize(Array(
Row(1l, "Name.1", 20l, "M", 6l, "dad"),
Row(2l, "Name.2", 20l, "F", 5l, "mom"),
Row(3l, "Name.3", 20l, "F", 5l, "mom"),
Row(4l, "Name.4", 20l, "M", 5l, "mom"),
Row(5l, "Name.5", 10l, "M", 4l, "kid"),
Row(6l, "Name.6", 8l, "M", 3l, "kid")))
val df = sqlContext.createDataFrame(rowRDD, schema)
Schema
/**
* Returns the schema of this [[DataFrame]].
* @group basic
* @since 1.3.0
*/
def schema: StructType = queryExecution.analyzed.schema
Data
Represents the content of the DataFrame
as an RDD
of Row
s(RDD[Row]
)
/**
* Represents the content of the [[DataFrame]] as an [[RDD]] of [[Row]]s. Note that the RDD is
* memoized. Once called, it won't change even if you change any query planning related Spark SQL
* configurations (e.g. `spark.sql.shuffle.partitions`).
* @group rdd
* @since 1.3.0
*/
lazy val rdd: RDD[Row] = {
// use a local variable to make sure the map closure doesn't capture the whole DataFrame
val schema = this.schema
queryExecution.toRdd.mapPartitions { rows =>
val converter = CatalystTypeConverters.createToScalaConverter(schema)
rows.map(converter(_).asInstanceOf[Row])
}
}
The return value of some methods are RDDs
/**
* Returns a new RDD by applying a function to all rows of this DataFrame.
* @group rdd
* @since 1.3.0
*/
def map[R: ClassTag](f: Row => R): RDD[R] = rdd.map(f)
/**
* Returns a new RDD by applying a function to each partition of this DataFrame.
* @group rdd
* @since 1.3.0
*/
def mapPartitions[R: ClassTag](f: Iterator[Row] => Iterator[R]): RDD[R] = {
rdd.mapPartitions(f)
}
Filter Out Null
df.filter( df("columnname").isNotNull() )
Select Multiple Columns
The list must be split into the head and the rest in order to select multiple columns...
val cols = List("b", "c"),
df.select(cols.head, cols.tail: _*)
Compare Pandas(using Python):
cols = ["a", "b"]
df = df[cols]
Map
each row generates a new row
MapPartitions
Each partition(multiple rows) generates a new row