Spark - Overview
What is new in Spark 3
- Upgrade to Scala 2.12, support JDK 11.
- Upgrade to Python 3. Deprecate Python 2.x.
- SQL Improvements:
- Adaptive execution of SparkSQL
- DPP: Dynamic Partition Pruning
- Support Deep Learning
- Kubernetes Integration
- SparkGraph
- ACID Transactions with Delta Lake
- Integrate with Apache Arrow (an in-memory columnar format)
- Data Source
- Support
binaryFile
data source - Pluggable catalog
- Support
Getting Started
Create SparkConf. local[2]
means local mode, 2 cores.
val conf = new SparkConf().setAppName("myAppName").setMaster("local[2]")
Create SparkContext
val sc = new SparkContext(conf)
Create SQLContext
val sqlContext = new SQLContext(sc)
Load data file
val distFile = sc.textFile("src/main/resources/Titanic/train.csv")
print some info
println(distFile.count())
//892
println(distFile.first())
//PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
print all the lines, use .foreach
instead of .map
, since .map
is a transformation, will not be evaluated until an action
distFile.foreach(println)
To join strings, use .mkString
records.foreach(row => println(row.mkString(",")))
Write to file(use Java API)
val writer = new FileOutputStream("path/to/file.csv")
writer.write(records.mkString("\n").getBytes("UTF-8"))
sbt: %% auto scala version
libraryDependencies += "com.databricks" %% "spark-csv" % "1.0.3"
equivalent to
libraryDependencies += "com.databricks" % "spark-csv_2.11" % "1.0.3"
Spark RDD Cache vs Checkpoint
- cache: save in memory, may need to recompute upon worker failure
- checkpoint: save in external storage(disk)
Set Serialization
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
How to minimize data transfers in Spark?
- Using Broadcast Variable to enhance the efficiency of joins between small and large RDDs.
- Using Accumulators to update the values of variables in parallel while executing.
- Avoid operations ByKey, repartition or any other operations which trigger shuffles.
max(), min(), avg() undefined
When calling Spark DataFrame agg(), IDE cannot find the definition of max(), min(), avg() etc.
Solution: add the import
import org.apache.spark.sql.functions._
Estimator and Transformer
-
DataFrame -> [Transformer] -> DataFrame
val newDf = transformer.transform(oldDf)
-
DataFrame -> [Estimator] -> Transformer(model)
val transformer = estimator.fit(oldDf) val newDf = transformer.transform(oldDf)
RandomForest VariableImportance
val importances = model // this would be your trained model
.stage(2)
.asInstanceOf[RandomForestClassificationModel]
.featureImportances
scala vector vs spark vector
scala:
val v = Vector(…)
spark:
val v = Vectors.dense(…)
Serialization
columns requires an access to schema and schema depends on queryExecution which is transient hence won't be shipped to the workers.
Object has to serialized to be shipped to the workers and transient annotation explicitly excludes attributes from serialization
How can you minimize data transfers when working with Spark?
Minimizing data transfers and avoiding shuffling helps write spark programs that run in a fast and reliable manner. The various ways in which data transfers can be minimized when working with Apache Spark are:
- Using Broadcast Variable- Broadcast variable enhances the efficiency of joins between small and large RDDs.
- Using Accumulators – Accumulators help update the values of variables in parallel while executing.
- The most common way is to avoid operations ByKey, repartition or any other operations which trigger shuffles.
broad case variables
These are read only variables, present in-memory cache on every machine. When working with Spark, usage of broadcast variables eliminates the necessity to ship copies of a variable for every task, so data can be processed faster. Broadcast variables help in storing a lookup table inside the memory which enhances the retrieval efficiency when compared to an RDD lookup.
ByteBuffer Limit
ByteBuffer
is limited by Integer.MAX_SIZE
(2 GB)!
val buf = ByteBuffer.allocate(length.toInt)
No Spark shuffle block can be greater than 2 GB
Skewed Data
Salting
- Normal Key: Foo
- Salted Key: Foo1 Foo2 ...
ReduceByKey over GroupByKey
ReduceByKey
can do almost anything that GroupByKey
can do
ReduceByKey
has a fixed limit of Memory requirements(take two inputs and return one)GroupByKey
is unbound and dependent of the data
Stats
val df = sqlContext.read.parquet(config.inputPath)
val scattered = df.flatMap { r =>
r.schema.map { field =>
val s = r.getAs[String](field.name)
if (Try(s.toDouble).isSuccess) {
(field.name, s.toDouble)
} else {
("Invalid", 0.0)
}
}
}.toDF("name", "value")
val result = scattered.groupBy("name").agg(count("value"), avg("value"), max("value"), min("value"))
val res = result.collect().map(r => r.mkString(",")).mkString("\n")
Files.write(
Paths.get("output.path"),
res.getBytes,
StandardOpenOption.CREATE)
Timeout
Increase spark.rpc.askTimeout
and spark.network.timeout
(120s by default for both)
spark-submit \
--master yarn \
--conf spark.rpc.askTimeout=600s \
--conf spark.network.timeout=600s \
--driver-java-options -Dconfig.file=app.conf \
...
Memory Issue or Super Slow
spark-submit \
--master yarn \
--executor-memory 16g \
--driver-memory 16g \
...
The Queue Was Crowded
We are using a shared cluster, and multiple queues. To see the status of the queues:
$ hadoop queue -list
Then specify the queue:
spark-submit \
--master yarn \
--queue foo_queue \
...
The Job Does Not Finish
By manual check, the output is already there, the program should shut down, but somehow, it hangs. If we explicitly
call spark.stop()
, the driver is shut down, but AM may still be running, so some messages may be lost.
From the internet, this may be an unique issue to Java 8.
To Downgrade to Java 7: make sure you use Java 7 when calling sbt package
, check by
$ which java
$ java -version
However Typesafe Config 1.3.0 is compiled against Java 8, so use 1.2.1 instead, otherwise you would see this at runtime:
Unsupported major.minor version 52.0
Whether Java 7 can solve the problem is left to tomorrow.
More
Read from file
scala.io.Source.fromInputStream(in).mkString
Read Header
scala> val header = scala.io.Source.fromInputStream(FileSystem.get(spark.sparkContext.hadoopConfiguration).open(new Path("/path/to/header"))).mkString.split("\u0007”, -1)
Create Schema
scala> import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType}
scala> val schema = StructType(header.map(StructField(_, StringType)))
Create DF
scala> val df = spark.read.schema(schema).option("sep","\u0007").option("inferSchema", “false").csv("/path/to/file.csv")
or
scala> val df = spark.read.option("sep","\u0007").option("inferSchema", "true").csv("/path/to/file.csv")
Calculate approx quantile
scala> df.stat.approxQuantile("_c810", Array(0, 0.5, 1.0), 0.25)
Play with schema
scala> df.schema.map(field => field.name)
res3: Seq[String] = List(_c0, _c1, _c2, _c3, _c4, _c5, _c6, _c7,
scala> df.schema.map(field => field.dataType == DoubleType)
res10: Seq[Boolean] = List(false, false, false, false, false, ...
scala> df.columns
res0: Array[String] = Array(_c0, _c1, _c2, _c3, _c4, _c5, _c6, _c7, _c8, _c9, _c10, _c11, _c12, _c13, _c14, _c15, _c16, _c17, _c18, _c19, _c20, _c21, _c22, _c23, _c24, _c25, _c26, _c27, _c28, _c29, _c30, _c31, _c32, _c33, _c34, _c35, _c36, _c37, _c38, _c39, _c40, _c41, _c42, _c43, _c44, _c45, _c46, _c47, _c48, _c49, _c50, _c51, _c52, _c53, _c54, _c55, _c56, _c57, _c58, _c59, _c60, _c61, _c62, _c63, _c64, _c65, _c66, _c67, _c68, _c69, _c70, _c71, _c72, _c73, _c74, _c75, _c76, _c77, _c78, _c79, _c80, _c81, _c82, _c83, _c84, _c85, _c86, _c87, _c88, _c89, _c90, _c91, _c92, _c93, _c94, _c95, _c96, _c97, _c98, _c99, _c100, _c101, _c102, _c103, _c104, _c105, _c106, _c107, _c108, _c109, _c110, _c111, _c112, _c113, _c114, _c115, _c116, _c117, _c118, _c119, _c120, _c121, _c122, _c123, _c124, _c12...
Calculate daily mean
scala> val mean = df.groupBy(“date_column").mean()
Save
scala> mean.repartition(1).write.format("csv").option("header", "true").save("dailymean")
Example
import scala.util.Try
import org.apache.spark.sql.types._
val df = sqlContext.read.parquet(config.inputPath)
val scattered = df.flatMap { r =>
r.schema.map { field =>
val s = r.getAs[String](field.name)
if (Try(s.toDouble).isSuccess) {
(field.name, s.toDouble)
} else {
("Invalid", 0.0)
}
}
}.toDF("name", "value")
val result = scattered.groupBy("name").agg(count("value"), avg("value"), max("value"), min("value"))
val res = result.collect().map(r => r.mkString(",")).mkString("\n")
Files.write(
Paths.get("output.path"),
res.getBytes,
StandardOpenOption.CREATE)