Understanding and avoiding unnecessary shuffling in Spark

Physical movement of data between partitions is called shuffling.

It occurs when data from multiple partitions needs to be combined in order to build partitions for a new RDD. When grouping elements by key, for example, Spark needs to examine all of the RDD’s partitions, find elements with the same key, and then physically group them, thus forming new partitions.

The transform function puts all values of each key in a single partition (partitions P1 to P3) into a list. Spark then writes this data to intermediate files on each node. In the next phase, the merge function is called to merge lists from different partitions, but of the same key, into a single list for each key. The default partitioner (HashPartitioner) then kicks in and puts each key in its proper partition.

Tasks that immediately precede and follow the shuffle are called map and reduce tasks, respectively. The results of map tasks are written to intermediate files (often to the OS’s filesystem cache only) and read by reduce tasks. In addition to being written to disk, the data is sent over the network, so it’s important to try to minimize the number of shuffles during Spark jobs.

Shuffling can happen due to

  • Shuffling when explicitly changing partitioners
  • Shuffle caused by partitioner removal

Shuffling also occurs if a different HashPartitioner than the previous one is used. Two HashPartitioners are the same if they have the same number of partitions (because they’ll always choose the same partition for the same object, if the number of partitions is the same). So shuffling will also occur if a HashPartitioner with a different number of partitions than the previous one is used in the transformation.

Tip: Because changing the partitioner provokes shuffles, the safest approach, performance-wise, is to use a default partitioner as much as possible and avoid inadvertently causing a shuffle.

Sometimes a transformation causes a shuffle, although you were using the default partitioner. map and flatMap transformations remove the RDD’s partitioner, which doesn’t cause a shuffle per se. But if you transform the resulting RDD (with one of the transformations previously mentioned, for example), even using the default partitioner, a shuffle will occur.

List of transformation that causes shuffling

  • Pair RDD transformations that can change the RDD’s partitioner: aggregateByKey, foldByKey, reduceByKey, groupByKey, join, leftOuterJoin, rightOuterJoin, fullOuterJoin, and subtractByKey
  • RDD transformations: subtract, intersection, and groupWith
  • sortByKey transformation (which always causes a shuffle)
  • partitionBy or coalesce with shuffle=true

External Shuffle Service

During shuffling, executors need to read files from one another (a shuffle is pull-based). If some executors get killed, other executors can no longer get shuffle data from them, and the data flow is interrupted. An external shuffle service is meant to optimize the exchange of shuffle data by providing a single point from which executors can read intermediate shuffle files. If an external shuffle service is enabled (by setting spark.shuffle.service.enabled to true), one external shuffle server is started per worker node.

Parameters which affects Shuffling

Shuffling service uses two implementations first one is called as sort-based and second one is hash-based. By default spark uses sort-based

You can define which shuffle implementation to use by setting the value of the spark.shuffle.manager parameter to either hash or sort.

Shuffling can require a lot of memory for aggregation and co-grouping. The spark.shuffle.spill parameter specifies whether the amount of memory used for these tasks should be limited (the default is true). In that case, any excess data will spill over to disk. The memory limit is specified by the spark.shuffle.memoryFraction parameter (the default is 0.2). Furthermore, the spark.shuffle.spill.compress parameter tells Spark whether to use compression for the spilled data (the default is again true).

  • spark.shuffle.compress specifies whether to compress intermediate files (the default is true).
  • spark.shuffle.spill.batchSize specifies the number of objects that will be serialized or deserialized together when spilling to disk. The default is 10,000.
  • spark.shuffle.service.port specifies the port the server will listen on if an external shuffle service is enabled.

Leave a Comment