logo

Spark - IO

Last Updated: 2022-02-06

Read

Read DataFrame with schema

val df = spark.read.schema(schema).option("sep","\u0007").option("inferSchema",false").csv("/path/to/data")

Infer schema:

val df = spark.read.option("sep","\u0007").option("inferSchema", "true").csv("/path/to/data")

Read From HDFS

def read(path: String)(implicit sc: SparkContext): String = {
  val conf = sc.hadoopConfiguration
  val fs = FileSystem.get(conf)
  val in = fs.open(new Path(path))
  scala.io.Source.fromInputStream(in).mkString
}

def readHeader(path: String, delimiter: String = ",")(implicit sc: SparkContext): Array[String] = {
    val header = read(path).trim
    header.split(delimiter, -1).map(_.trim)
}

Write

Write to local

Files.write(
  Paths.get(path),
  df.mkString("\n").getBytes,
  StandardCharsets.UTF_8,
  StandardOpenOption.CREATE)

Write to HDFS

Save in one file(use repartition)

df.repartition(1).write
  .format("com.databricks.spark.csv")
  .option("header", "true")
  .save(path)

Append

df.write.mode(SaveMode.Append).save(path)

overwrite

df.write.mode(SaveMode.Overwrite).save("output/")

with partition

df.write.partitionBy("zipcode").format("json").save(path)}