logo

Spark Trouble Shooting - Out of memory

The "java out of memory" error is coming because spark uses its spark.default.parallelismproperty while determining number of splits, which by default is number of cores available.

// From CoarseGrainedSchedulerBackend.scala
override def defaultParallelism(): Int = {
  conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2))
}

When the input becomes large, and you have limited memory, you should increase number of splits.

You can do something as follows:

val input = List("a,b,c,d", "b,c,e", "a,c,d", "e,g")
val splitSize = 10000 // specify some number of elements that fit in memory.

val numSplits = (input.size / splitSize) + 1 // has to be > 0.
val groups = sc.parallelize(input, numSplits) // specify the # of splits.

val weights = Array(("a", 3), ("b", 2), ("c", 5), ("d", 1), ("e", 9), ("f", 4), ("g", 6)).toMap

def isHeavy(inp: String) = inp.split(",").map(weights(_)).sum > 12
val result = groups.filter(isHeavy)

You may also consider increasing executor memory size using spark.executor.memory.