Spark - Overview

Updated: 2018-12-11

Create SparkConf. local[2] means local mode, 2 cores.

val conf = new SparkConf().setAppName("myAppName").setMaster("local[2]")

Create SparkContext

val sc = new SparkContext(conf)

Create SQLContext

val sqlContext = new SQLContext(sc)

Load data file

val distFile = sc.textFile("src/main/resources/Titanic/train.csv")

print some info

println(distFile.count())
//892

println(distFile.first())
//PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked

print all the lines, use .foreach instead of .map, since .map is a transformation, will not be evaluated until an action

distFile.foreach(println)

To join strings, use .mkString

records.foreach(row => println(row.mkString(",")))

Write to file(use Java API)

val writer = new FileOutputStream("path/to/file.csv")
writer.write(records.mkString("\n").getBytes("UTF-8"))

sbt: %% auto scala version

libraryDependencies += "com.databricks" %% "spark-csv" % "1.0.3"

equivalent to

libraryDependencies += "com.databricks" % "spark-csv_2.11" % "1.0.3"

Spark RDD Cache vs Checkpoint

  • cache: save in memory, may need to recompute upon worker failure
  • checkpoint: save in external storage(disk)

Set Serialization

conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"

How to minimize data transfers in Spark?

  • Using Broadcast Variable to enhance the efficiency of joins between small and large RDDs.
  • Using Accumulators to update the values of variables in parallel while executing.
  • Avoid operations ByKey, repartition or any other operations which trigger shuffles.

max(), min(), avg() undefined

When calling Spark DataFrame agg(), IDE cannot find the definition of max(), min(), avg() etc.

Solution: add the import

import org.apache.spark.sql.functions._

Estimator and Transformer

  • DataFrame -> [Transformer] -> DataFrame

    val newDf = transformer.transform(oldDf)
  • DataFrame -> [Estimator] -> Transformer(model)

    val transformer = estimator.fit(oldDf)
    val newDf = transformer.transform(oldDf)

RandomForest VariableImportance

val importances = model // this would be your trained model
  .stage(2)
  .asInstanceOf[RandomForestClassificationModel]
  .featureImportances

scala vector vs spark vector

scala:

val v = Vector()

spark:

val v = Vectors.dense()

Serialization

columns requires an access to schema and schema depends on queryExecution which is transient hence won't be shipped to the workers.

Object has to serialized to be shipped to the workers and transient annotation explicitly excludes attributes from serialization

How can you minimize data transfers when working with Spark?

Minimizing data transfers and avoiding shuffling helps write spark programs that run in a fast and reliable manner. The various ways in which data transfers can be minimized when working with Apache Spark are:

  1. Using Broadcast Variable- Broadcast variable enhances the efficiency of joins between small and large RDDs.
  2. Using Accumulators – Accumulators help update the values of variables in parallel while executing.
  3. The most common way is to avoid operations ByKey, repartition or any other operations which trigger shuffles.

broad case variables

These are read only variables, present in-memory cache on every machine. When working with Spark, usage of broadcast variables eliminates the necessity to ship copies of a variable for every task, so data can be processed faster. Broadcast variables help in storing a lookup table inside the memory which enhances the retrieval efficiency when compared to an RDD lookup.

ByteBuffer Limit

ByteBuffer is limited by Integer.MAX_SIZE(2 GB)!

val buf = ByteBuffer.allocate(length.toInt)

No Spark shuffle block can be greater than 2 GB

Skewed Data

Salting

  • Normal Key: Foo
  • Salted Key: Foo1 Foo2 ...

ReduceByKey over GroupByKey

ReduceByKey can do almost anything that GroupByKey can do

  • ReduceByKey has a fixed limit of Memory requirements(take two inputs and return one)
  • GroupByKey is unbound and dependent of the data

Stats

val df = sqlContext.read.parquet(config.inputPath)

val scattered = df.flatMap { r =>
  r.schema.map { field =>
    val s = r.getAs[String](field.name)
    if (Try(s.toDouble).isSuccess) {
      (field.name, s.toDouble)
    } else {
      ("Invalid", 0.0)
    }
  }
}.toDF("name", "value")

val result = scattered.groupBy("name").agg(count("value"), avg("value"), max("value"), min("value"))

val res = result.collect().map(r => r.mkString(",")).mkString("\n")

Files.write(
  Paths.get("output.path"),
  res.getBytes,
  StandardOpenOption.CREATE)