Spark - Overview

Create SparkConf. local[2] means local mode, 2 cores.

val conf = new SparkConf().setAppName("myAppName").setMaster("local[2]")

Create SparkContext

val sc = new SparkContext(conf)

Create SQLContext

val sqlContext = new SQLContext(sc)

Load data file

val distFile = sc.textFile("src/main/resources/Titanic/train.csv")

print some info

println(distFile.count())
//892

println(distFile.first())
//PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked

print all the lines, use .foreach instead of .map, since .map is a transformation, will not be evaluated until an action

distFile.foreach(println)

To join strings, use .mkString

records.foreach(row => println(row.mkString(",")))

Write to file(use Java API)

val writer = new FileOutputStream("path/to/file.csv")
writer.write(records.mkString("\n").getBytes("UTF-8"))

sbt: %% auto scala version

libraryDependencies += "com.databricks" %% "spark-csv" % "1.0.3"

equivalent to

libraryDependencies += "com.databricks" % "spark-csv_2.11" % "1.0.3"

Spark RDD Cache vs Checkpoint

  • cache: save in memory, may need to recompute upon worker failure
  • checkpoint: save in external storage(disk)

Set Serialization

conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"

How to minimize data transfers in Spark?

  • Using Broadcast Variable to enhance the efficiency of joins between small and large RDDs.
  • Using Accumulators to update the values of variables in parallel while executing.
  • Avoid operations ByKey, repartition or any other operations which trigger shuffles.