Spark - IO
Read
Read DataFrame with schema
val df = spark.read.schema(schema).option("sep","\u0007").option("inferSchema", “false").csv("/path/to/data")
Infer schema:
val df = spark.read.option("sep","\u0007").option("inferSchema", "true").csv("/path/to/data")
Read From HDFS
def read(path: String)(implicit sc: SparkContext): String = {
val conf = sc.hadoopConfiguration
val fs = FileSystem.get(conf)
val in = fs.open(new Path(path))
scala.io.Source.fromInputStream(in).mkString
}
def readHeader(path: String, delimiter: String = ",")(implicit sc: SparkContext): Array[String] = {
val header = read(path).trim
header.split(delimiter, -1).map(_.trim)
}
Write
Write to local
Files.write(
Paths.get(path),
df.mkString("\n").getBytes,
StandardCharsets.UTF_8,
StandardOpenOption.CREATE)
Write to HDFS
Save in one file(use repartition)
df.repartition(1).write
.format("com.databricks.spark.csv")
.option("header", "true")
.save(path)
Append
df.write.mode(SaveMode.Append).save(path)
overwrite
df.write.mode(SaveMode.Overwrite).save("output/")
with partition
df.write.partitionBy("zipcode").format("json").save(path)}