Spark
    Overview
    Getting Started
    Configuration
    DataFrame
    Aggregation
    Utilities
    YARN
    IO

Spark - Overview

Updated: 2022-02-06

What is new in Spark 3

  • Upgrade to Scala 2.12, support JDK 11.
  • Upgrade to Python 3. Deprecate Python 2.x.
  • SQL Improvements:
    • Adaptive execution of SparkSQL
    • DPP: Dynamic Partition Pruning
  • Support Deep Learning
  • Kubernetes Integration
  • SparkGraph
  • ACID Transactions with Delta Lake
  • Integrate with Apache Arrow (an in-memory columnar format)
  • Data Source
    • Support binaryFile data source
    • Pluggable catalog

Getting Started

Create SparkConf. local[2] means local mode, 2 cores.

val conf = new SparkConf().setAppName("myAppName").setMaster("local[2]")

Create SparkContext

val sc = new SparkContext(conf)

Create SQLContext

val sqlContext = new SQLContext(sc)

Load data file

val distFile = sc.textFile("src/main/resources/Titanic/train.csv")

print some info

println(distFile.count())
//892

println(distFile.first())
//PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked

print all the lines, use .foreach instead of .map, since .map is a transformation, will not be evaluated until an action

distFile.foreach(println)

To join strings, use .mkString

records.foreach(row => println(row.mkString(",")))

Write to file(use Java API)

val writer = new FileOutputStream("path/to/file.csv")
writer.write(records.mkString("\n").getBytes("UTF-8"))

sbt: %% auto scala version

libraryDependencies += "com.databricks" %% "spark-csv" % "1.0.3"

equivalent to

libraryDependencies += "com.databricks" % "spark-csv_2.11" % "1.0.3"

Spark RDD Cache vs Checkpoint

  • cache: save in memory, may need to recompute upon worker failure
  • checkpoint: save in external storage(disk)

Set Serialization

conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"

How to minimize data transfers in Spark?

  • Using Broadcast Variable to enhance the efficiency of joins between small and large RDDs.
  • Using Accumulators to update the values of variables in parallel while executing.
  • Avoid operations ByKey, repartition or any other operations which trigger shuffles.

max(), min(), avg() undefined

When calling Spark DataFrame agg(), IDE cannot find the definition of max(), min(), avg() etc.

Solution: add the import

import org.apache.spark.sql.functions._

Estimator and Transformer

  • DataFrame -> [Transformer] -> DataFrame

    val newDf = transformer.transform(oldDf)
  • DataFrame -> [Estimator] -> Transformer(model)

    val transformer = estimator.fit(oldDf)
    val newDf = transformer.transform(oldDf)

RandomForest VariableImportance

val importances = model // this would be your trained model
  .stage(2)
  .asInstanceOf[RandomForestClassificationModel]
  .featureImportances

scala vector vs spark vector

scala:

val v = Vector()

spark:

val v = Vectors.dense()

Serialization

columns requires an access to schema and schema depends on queryExecution which is transient hence won't be shipped to the workers.

Object has to serialized to be shipped to the workers and transient annotation explicitly excludes attributes from serialization

How can you minimize data transfers when working with Spark?

Minimizing data transfers and avoiding shuffling helps write spark programs that run in a fast and reliable manner. The various ways in which data transfers can be minimized when working with Apache Spark are:

  1. Using Broadcast Variable- Broadcast variable enhances the efficiency of joins between small and large RDDs.
  2. Using Accumulators – Accumulators help update the values of variables in parallel while executing.
  3. The most common way is to avoid operations ByKey, repartition or any other operations which trigger shuffles.

broad case variables

These are read only variables, present in-memory cache on every machine. When working with Spark, usage of broadcast variables eliminates the necessity to ship copies of a variable for every task, so data can be processed faster. Broadcast variables help in storing a lookup table inside the memory which enhances the retrieval efficiency when compared to an RDD lookup.

ByteBuffer Limit

ByteBuffer is limited by Integer.MAX_SIZE (2 GB)!

val buf = ByteBuffer.allocate(length.toInt)

No Spark shuffle block can be greater than 2 GB

Skewed Data

Salting

  • Normal Key: Foo
  • Salted Key: Foo1 Foo2 ...

ReduceByKey over GroupByKey

ReduceByKey can do almost anything that GroupByKey can do

  • ReduceByKey has a fixed limit of Memory requirements(take two inputs and return one)
  • GroupByKey is unbound and dependent of the data

Stats

val df = sqlContext.read.parquet(config.inputPath)

val scattered = df.flatMap { r =>
  r.schema.map { field =>
    val s = r.getAs[String](field.name)
    if (Try(s.toDouble).isSuccess) {
      (field.name, s.toDouble)
    } else {
      ("Invalid", 0.0)
    }
  }
}.toDF("name", "value")

val result = scattered.groupBy("name").agg(count("value"), avg("value"), max("value"), min("value"))

val res = result.collect().map(r => r.mkString(",")).mkString("\n")

Files.write(
  Paths.get("output.path"),
  res.getBytes,
  StandardOpenOption.CREATE)

Timeout

Increase spark.rpc.askTimeout and spark.network.timeout(120s by default for both)

spark-submit \
    --master yarn \
    --conf spark.rpc.askTimeout=600s \
    --conf spark.network.timeout=600s \
    --driver-java-options -Dconfig.file=app.conf \
    ...

Memory Issue or Super Slow

spark-submit \
    --master yarn \
    --executor-memory 16g \
    --driver-memory 16g \
    ...

The Queue Was Crowded

We are using a shared cluster, and multiple queues. To see the status of the queues:

$ hadoop queue -list

Then specify the queue:

spark-submit \
    --master yarn \
    --queue foo_queue \
    ...

The Job Does Not Finish

By manual check, the output is already there, the program should shut down, but somehow, it hangs. If we explicitly call spark.stop(), the driver is shut down, but AM may still be running, so some messages may be lost.

From the internet, this may be an unique issue to Java 8.

To Downgrade to Java 7: make sure you use Java 7 when calling sbt package, check by

$ which java
$ java -version

However Typesafe Config 1.3.0 is compiled against Java 8, so use 1.2.1 instead, otherwise you would see this at runtime:

Unsupported major.minor version 52.0

Whether Java 7 can solve the problem is left to tomorrow.

More

Read from file

scala.io.Source.fromInputStream(in).mkString

Read Header

scala> val header = scala.io.Source.fromInputStream(FileSystem.get(spark.sparkContext.hadoopConfiguration).open(new Path("/path/to/header"))).mkString.split("\u0007”, -1)

Create Schema

scala> import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType}

scala> val schema = StructType(header.map(StructField(_, StringType)))

Create DF

scala> val df = spark.read.schema(schema).option("sep","\u0007").option("inferSchema",false").csv("/path/to/file.csv")

or

scala> val df = spark.read.option("sep","\u0007").option("inferSchema", "true").csv("/path/to/file.csv")

Calculate approx quantile

scala> df.stat.approxQuantile("_c810", Array(0, 0.5, 1.0), 0.25)

Play with schema

scala> df.schema.map(field => field.name)
res3: Seq[String] = List(_c0, _c1, _c2, _c3, _c4, _c5, _c6, _c7,

scala> df.schema.map(field => field.dataType == DoubleType)
res10: Seq[Boolean] = List(false, false, false, false, false, ...

scala> df.columns
res0: Array[String] = Array(_c0, _c1, _c2, _c3, _c4, _c5, _c6, _c7, _c8, _c9, _c10, _c11, _c12, _c13, _c14, _c15, _c16, _c17, _c18, _c19, _c20, _c21, _c22, _c23, _c24, _c25, _c26, _c27, _c28, _c29, _c30, _c31, _c32, _c33, _c34, _c35, _c36, _c37, _c38, _c39, _c40, _c41, _c42, _c43, _c44, _c45, _c46, _c47, _c48, _c49, _c50, _c51, _c52, _c53, _c54, _c55, _c56, _c57, _c58, _c59, _c60, _c61, _c62, _c63, _c64, _c65, _c66, _c67, _c68, _c69, _c70, _c71, _c72, _c73, _c74, _c75, _c76, _c77, _c78, _c79, _c80, _c81, _c82, _c83, _c84, _c85, _c86, _c87, _c88, _c89, _c90, _c91, _c92, _c93, _c94, _c95, _c96, _c97, _c98, _c99, _c100, _c101, _c102, _c103, _c104, _c105, _c106, _c107, _c108, _c109, _c110, _c111, _c112, _c113, _c114, _c115, _c116, _c117, _c118, _c119, _c120, _c121, _c122, _c123, _c124, _c12...

Calculate daily mean

scala> val mean = df.groupBy(“date_column").mean()

Save

scala> mean.repartition(1).write.format("csv").option("header", "true").save("dailymean")

Example

import scala.util.Try
import org.apache.spark.sql.types._
val df = sqlContext.read.parquet(config.inputPath)

val scattered = df.flatMap { r =>
  r.schema.map { field =>
    val s = r.getAs[String](field.name)
    if (Try(s.toDouble).isSuccess) {
      (field.name, s.toDouble)
    } else {
      ("Invalid", 0.0)
    }
  }
}.toDF("name", "value")

val result = scattered.groupBy("name").agg(count("value"), avg("value"), max("value"), min("value"))

val res = result.collect().map(r => r.mkString(",")).mkString("\n")

Files.write(
  Paths.get("output.path"),
  res.getBytes,
  StandardOpenOption.CREATE)