Caching data in most cases will improve your query performance and execution. Most commonly used command for caching table in Spark SQL is by using in-memory columnar format with dataFrame.cache()
. This will tell Spark SQL to scan only required columns and will automatically tune compression to minimize memory usage.
To remove table from cache, you can call the dataFrame.unpersist()
function.
Configuring the in-memory caching using the setConf
method on SparkSession
or by running SET key=value
commands using SQL. Some of the optimisation can be done by tuning the parameters with key and value of couple of selected:
spark.sql.inMemoryColumnarStorage.compressed
spark.sql.inMemoryColumnarStorage.batchSize
spark.sql.files.minPartitionNum
spark.sql.shuffle.partitions
spark.sql.sources.parallelPartitionDiscovery.parallelism
Spark SQL can also be optimized with couple of JOIN hints. These are:
BROADCAST
MERGE
SHUFFLE_HASH
and SHUFFLE_REPLICATE_NL
And all can be used with different languages; Scala, R, Python, SQL and Java.
R will look like:
Data1 <- sql("SELECT * FROM table1") Data2 <- sql("SELECT * FROM table2") head(join(Data1, hint(Data2, "broadcast"), Data1$key == Data2$key))
Python will look like:
spark.table("Data1").join(spark.table("Data2").hint("broadcast"), "key").show()
SQL will look like:
SELECT BROADCAST(r),* FROM Data1 AS d JOIN Data2 AS s ON r.key = s.key -- OR with Broadcastjoin SELECT BROADCASTJOIN (r) FROM Data1 AS d JOIN Data2 AS s ON r.key = s.key
This hint instructs Spark to use the hinted strategy on specified relation when joining tables together. When BROADCASTJOIN
hint is used on Data1
table with Data2
table and overrides the suggested setting of statistics from configuration spark.sql.autoBroadcastJoinThreshold
.
Spark also prioritise the join strategy, and also when different JOIN strategies are used, Spark SQL will always prioritise them.
Repartitioning Spark SQL hints are good for performance tuning and reducing the number of outputed results (or files).
The “COALESCE” hint only has a partition number as a parameter.
The “REPARTITION” hint has a partition number, columns, or both/neither of them as parameters.
The “REPARTITION_BY_RANGE” hint must have column names and a partition number is optional.
These hints are only available in Spark SQL language. The syntax is as following
SELECT COALESCE(3) * FROM Data1; SELECT REPARTITION(3) * FROM Data1; SELECT REBALANCE * FROM Data1;
There are also some features worth looking at to handle better optimisation.
Tomorrow we will make a gentle introduction into Spark Streaming.
Compete set of code, documents, notebooks, and all of the materials will be available at the Github repository: https://github.com/tomaztk/Spark-for-data-engineers
Happy Spark Advent of 2021! 🙂