logo

Spark - Utilities

Read Flat File Content From HDFS

def read(path: String)(implicit sc: SparkContext): String = {
    val conf = sc.hadoopConfiguration
    val fileSystem = FileSystem.get(conf)
    val in = fileSystem.open(new Path(path))
    scala.io.Source.fromInputStream(in).mkString
  }

sc.textFile will ignore files starts with dot(.), to read files like .pig_header:

def readHeader(path: String, delimiter: String = ",")(implicit sc: SparkContext): Array[String] = {
      val header = read(path).trim
      header.split(delimiter, -1).map(_.trim)
  }

Application Config by Typesafe's ConfigFactory

Add ConfigFactory as dependency:

libraryDependencies += "com.typesafe" % "config" % "1.3.0"

in code:

import com.typesafe.config.ConfigFactory
def main(args: Array[String]): Unit = {

  val appConf = ConfigFactory.load()

  println(appConf.entrySet().toArray.mkString("\n"))
}

a conf file test.conf

test-conf {
    key-a = "b"
    key-c = 2.0
}

run:

spark-submit --master yarn --queue default —class xx.xx.ClassName --driver-java-options -Dconfig.file=/path/to/test.conf /path/to/xxx.jar

result:

akka.actor.deployment.default.mailbox=ConfigString("")
akka.io.udp.received-message-size-limit=ConfigString("unlimited")
akka.actor.default-dispatcher.thread-pool-executor.max-pool-size-max=ConfigInt(64)
akka.io.udp.direct-buffer-size=ConfigString("128 KiB")
akka.remote.netty.tcp.applied-adapters=SimpleConfigList([])
java.vm.version=ConfigString("25.11-b03")
hdp.version=ConfigString("2.2.9.0-3393")
test-conf.key-a=ConfigString("b")
...

It will load all the conf files found in classpath, both system params and user defined params, e.g. default in $SPARK_HOME/conf/metrics.properties

Find out more about ConfigFactory: https://github.com/typesafehub/config

Remove Directory

import org.apache.commons.io.FileUtils
FileUtils.deleteDirectory(new File("/path/to/folder"))

Write to file(use Java API)

val writer = new FileOutputStream("path/to/file.csv")
writer.write(records.mkString("\n").getBytes("UTF-8"))