Spark - Utilities
Last Updated: 2021-11-19
Read Flat File Content From HDFS
def read(path: String)(implicit sc: SparkContext): String = {
val conf = sc.hadoopConfiguration
val fileSystem = FileSystem.get(conf)
val in = fileSystem.open(new Path(path))
scala.io.Source.fromInputStream(in).mkString
}
sc.textFile
will ignore files starts with dot(.), to read files like .pig_header
:
def readHeader(path: String, delimiter: String = ",")(implicit sc: SparkContext): Array[String] = {
val header = read(path).trim
header.split(delimiter, -1).map(_.trim)
}
Application Config by Typesafe's ConfigFactory
Add ConfigFactory
as dependency:
libraryDependencies += "com.typesafe" % "config" % "1.3.0"
in code:
import com.typesafe.config.ConfigFactory
def main(args: Array[String]): Unit = {
val appConf = ConfigFactory.load()
println(appConf.entrySet().toArray.mkString("\n"))
}
a conf file test.conf
test-conf {
key-a = "b"
key-c = 2.0
}
run:
spark-submit --master yarn --queue default —class xx.xx.ClassName --driver-java-options -Dconfig.file=/path/to/test.conf /path/to/xxx.jar
result:
akka.actor.deployment.default.mailbox=ConfigString("")
akka.io.udp.received-message-size-limit=ConfigString("unlimited")
akka.actor.default-dispatcher.thread-pool-executor.max-pool-size-max=ConfigInt(64)
akka.io.udp.direct-buffer-size=ConfigString("128 KiB")
akka.remote.netty.tcp.applied-adapters=SimpleConfigList([])
java.vm.version=ConfigString("25.11-b03")
hdp.version=ConfigString("2.2.9.0-3393")
test-conf.key-a=ConfigString("b")
...
It will load all the conf files found in classpath, both system params and user defined params, e.g. default in $SPARK_HOME/conf/metrics.properties
Find out more about ConfigFactory: https://github.com/typesafehub/config
Remove Directory
import org.apache.commons.io.FileUtils
FileUtils.deleteDirectory(new File("/path/to/folder"))
Write to file(use Java API)
val writer = new FileOutputStream("path/to/file.csv")
writer.write(records.mkString("\n").getBytes("UTF-8"))