Spark 2.0

Updated: 2018-12-11

If you do not know where to find the jars: http://search.maven.org/

Missing Jersey

java.lang.NoClassDefFoundError: com/sun/jersey/api/client/config/ClientConfig
 at org.apache.hadoop.yarn.client.api.TimelineClient.createTimelineClient(TimelineClient.java:45)
 at org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.serviceInit(YarnClientImpl.java:163)
 at org.apache.hadoop.service.AbstractService.init(AbstractService.java:163)
 at org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:150)

Spark 2.0 upgraded the version of Jersey(under org.glassfish.jersey instead of com.sun.jersey), but somehow the old jar is still needed.

To fix this, download jersey-bundle-1.17.1.jar and copy it to $SPARK_HOME/jars

Missing Hadoop-LZO

Download hadoop-lzo-0.6.0.2.2.9.0-3393.jar and copy it to $SPARK_HOME/jars

Missing Typesafe Config

We are using Typesafe Config for user configurations. Download config-1.2.1.jar and copy it to $SPARK_HOME/jars

Timeout

Increase spark.rpc.askTimeout and spark.network.timeout(120s by default for both)

spark-submit \
    --master yarn \
    --conf spark.rpc.askTimeout=600s \
    --conf spark.network.timeout=600s \
    --driver-java-options -Dconfig.file=app.conf \
    ...

Memory Issue or Super Slow

spark-submit \
    --master yarn \
    --executor-memory 16g \
    --driver-memory 16g \
    ...

The Queue Was Crowded

We are using a shared cluster, and multiple queues. To see the status of the queues:

$ hadoop queue -list

Then specify the queue:

spark-submit \
    --master yarn \
    --queue foo_queue \
    ...

The Job Does Not Finish

By manual check, the output is already there, the program should shut down, but somehow, it hangs. If we explicitly call spark.stop(), the driver is shut down, but AM may still be running, so some messages may be lost.

From the internet, this may be an unique issue to Java 8.

To Downgrade to Java 7: make sure you use Java 7 when calling sbt package, check by

$ which java
$ java -version

However Typesafe Config 1.3.0 is compiled against Java 8, so use 1.2.1 instead, otherwise you would see this at runtime:

Unsupported major.minor version 52.0

Whether Java 7 can solve the problem is left to tomorrow.

More

Read from file

scala.io.Source.fromInputStream(in).mkString

Read Header

scala> val header = scala.io.Source.fromInputStream(FileSystem.get(spark.sparkContext.hadoopConfiguration).open(new Path("/path/to/header"))).mkString.split("\u0007”, -1)

Create Schema

scala> import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType}

scala> val schema = StructType(header.map(StructField(_, StringType)))

Create DF

scala> val df = spark.read.schema(schema).option("sep","\u0007").option("inferSchema",false").csv("/path/to/file.csv")

or

scala> val df = spark.read.option("sep","\u0007").option("inferSchema", "true").csv("/path/to/file.csv")

Calculate approx quantile

scala> df.stat.approxQuantile("_c810", Array(0, 0.5, 1.0), 0.25)

Play with schema

scala> df.schema.map(field => field.name)
res3: Seq[String] = List(_c0, _c1, _c2, _c3, _c4, _c5, _c6, _c7,

scala> df.schema.map(field => field.dataType == DoubleType)
res10: Seq[Boolean] = List(false, false, false, false, false, ...

scala> df.columns
res0: Array[String] = Array(_c0, _c1, _c2, _c3, _c4, _c5, _c6, _c7, _c8, _c9, _c10, _c11, _c12, _c13, _c14, _c15, _c16, _c17, _c18, _c19, _c20, _c21, _c22, _c23, _c24, _c25, _c26, _c27, _c28, _c29, _c30, _c31, _c32, _c33, _c34, _c35, _c36, _c37, _c38, _c39, _c40, _c41, _c42, _c43, _c44, _c45, _c46, _c47, _c48, _c49, _c50, _c51, _c52, _c53, _c54, _c55, _c56, _c57, _c58, _c59, _c60, _c61, _c62, _c63, _c64, _c65, _c66, _c67, _c68, _c69, _c70, _c71, _c72, _c73, _c74, _c75, _c76, _c77, _c78, _c79, _c80, _c81, _c82, _c83, _c84, _c85, _c86, _c87, _c88, _c89, _c90, _c91, _c92, _c93, _c94, _c95, _c96, _c97, _c98, _c99, _c100, _c101, _c102, _c103, _c104, _c105, _c106, _c107, _c108, _c109, _c110, _c111, _c112, _c113, _c114, _c115, _c116, _c117, _c118, _c119, _c120, _c121, _c122, _c123, _c124, _c12...

Calculate daily mean

scala> val mean = df.groupBy(“date_column").mean()

Save

scala> mean.repartition(1).write.format("csv").option("header", "true").save("dailymean")

Example

import scala.util.Try
import org.apache.spark.sql.types._
val df = sqlContext.read.parquet(config.inputPath)

val scattered = df.flatMap { r =>
  r.schema.map { field =>
    val s = r.getAs[String](field.name)
    if (Try(s.toDouble).isSuccess) {
      (field.name, s.toDouble)
    } else {
      ("Invalid", 0.0)
    }
  }
}.toDF("name", "value")

val result = scattered.groupBy("name").agg(count("value"), avg("value"), max("value"), min("value"))

val res = result.collect().map(r => r.mkString(",")).mkString("\n")

Files.write(
  Paths.get("output.path"),
  res.getBytes,
  StandardOpenOption.CREATE)