Using Unravel to tune Spark data skew and partitioning
In Spark, it is very important that the RDD partitions are aligned with the number of available tasks. Spark assigns one task per partition and each core can process one task at a time.
By default, the number of partitions is set to the total number of cores on all the nodes hosting executors.
Too few partitions leads to less concurrency, processing skew, and improper resource utilization.
Too many partitions leads to low throughput and high task scheduling overhead.
Tuning a Spark app
You need to identify the stages that represent the bottlenecks during the execution. This can easily be done using Unravel's Spark Application Performance Manager (APM).
Bring up your app in the Spark APM and select the Gannt Chart tab. Locate the job with the longest duration; click on it to bring the job up in the Spark Job APM. Now locate and click on the stage with the longest duration.
The Spark job above took 3 hours 53-minutes to complete with the longest stage taking 43 minutes. This is the stage to examine.
Clicking on the stage brings up its information. Click the Timeline tab to show the duration and I/O of each task. The histogram charts allow you to quickly identify outlier tasks. In this case, there were 200 tasks; 199 tasks took approximately 5-minutes to complete and one approximately 35-40 minutes. This one task accounts for over 82% of the time the stage took to complete.
To see the timelines for the first bucket (199 tasks), select it and then the Timeline tab under the histogram. This view shows an understandable view of the skew. In this example, you can see many executors are sitting idle as they wait for the outlier task to complete.
Select the outlier bucket and the Timeline tab updates to display the information for this task. In this case the duration of the associated executor is almost equal to the duration of the entire app.
The Spark APM's Graphs > Containers graph shows the bursting of containers when the longer executor started. Adding more partitions via
repartition()
can help distribute the data set among the executors and decrease the skew.Unravel might provide recommendations for optimizations in some cases where the join key or group by key are skewed. In this case, Unravel had three recommendations to improve the app's efficiency.
Example
In this Spark SQL example two dummy data sources are used, both of which are partitioned.
The join operation between customer and order table is on cust_id column which is heavily skewed. Examining the code shows the key,1000, has the most number of entries in the orders table. Therefore, one of the reduce partitions contains all the 1000 entries. In such cases there are techniques which can help to avoid skewed processing.
Increasing the spark.sql.autoBroadcastJoinThreshold value enables the smaller table “customer” to get broadcasted. Ensuring sufficient driver memory should address this problem.
If memory in executors is sufficient, then decreasing the spark.sql.shuffle.partitions to accommodate more data per reduce partitions can help. This helps all the reduce tasks to have approximately the same duration.
If possible find out the keys which are skewed and process them separately by using filters.
Example: Decrease the spark.sql.shuffle.partitions to accommodate more data per reduce partitions. (Technique #2.)
In this example, the spark.sql.shuffle.partitions default is 200.
Here, there is lone task which takes more time during shuffle. That means the next stage can’t be started and executors are lying idle.
Now change the spark.sql.shuffle.partitions to ten (10). As the shuffle input/output is well within executor memory sizes, we can safely make this change.
import org.apache.spark.sql.SparkSession object SkewedDataframeTest { val INVALID_USAGE = s""" |Please enter all cmd line parameters! | |<numElements> valid numbers are 1 to any positive integer. |<skewPercent> valid numbers between 1 to 99 """.stripMargin def main(args: Array[String]) = { if (args.length < 2) { println(INVALID_USAGE) System.exit(-1) } val numElements = args(0).toInt val skewPercent = args(1).toFloat val spark = SparkSession.builder(). enableHiveSupport().getOrCreate() import spark.implicits._ val numSkewedKeys = (numElements * (skewPercent / 100)).toInt val numCustomers = numElements / 100 // Dummy data source which is partitioned val df3 = spark.sparkContext.parallelize(0 to numElements, 16).map(i => { val join_column = if (i < numSkewedKeys) 1000 else i % 999 (i, join_column) }).toDF("order_no", "cust_id"). repartition(16, $"order_no") df3.createOrReplaceGlobalTempView("orders") // Dummy data source which is partitioned val df4 = spark.sparkContext.parallelize(0 to numCustomers, 32). map(i => (i, i.toString)). toDF("cust_id", "name"). repartition(32, $"cust_id") df4.createOrReplaceGlobalTempView("customer") spark.sql("select t1.name, COUNT( t2.order_no) " + "from global_temp.customer t1, global_temp.orders t2" + " where t1.cust_id=t2.cust_id " + " group by t1.name " + " having COUNT( t2.order_no) >= 50000").show } }
Real-life deployments
In product deployments not all skew problems can be solved by configurations and repartitioning. If the data source itself is skewed, then tasks which read from these sources can’t be optimized; modifying the underlying data layout can help. Sometimes, at enterprise level, modification isn't possible as the data source is used from different tools and pipelines.