Data Partitioning in Spark

Data partitioning is Spark’s mechanism for dividing data between multiple nodes in a cluster. It’s a fundamental aspect of RDDs that can have a big effect on performance and resource consumption.

Each part (piece or slice) of an RDD is called a partition.

When you load a text file from your local filesystem into Spark, for example, the file’s contents are split into partitions, which are evenly distributed to nodes in a cluster. More than one partition may end up on the same node. The sum of all those partitions forms your RDD.

Each RDD maintains a list of its partitions and an optional list of preferred locations for computing the partitions.

The number of RDD partitions is important because, in addition to influencing data distribution throughout the cluster, it also directly determines the number of tasks that will be running RDD transformations. If this number is too small, the cluster will be underutilized. Furthermore, memory problems could result, because working sets might get too big to fit into the memory of executors. We recommend using three to four times more partitions than there are cores in your cluster. Moderately larger values shouldn’t pose a problem, so feel free to experiment. But don’t get too crazy, because management of a large number of tasks could create a bottleneck.

Spark Data Partitioner

Partitioning of RDDs is performed by Partitioner objects that assign a partition index to each RDD element. Two implementations are provided by Spark: HashPartitioner and RangePartitioner. Pair RDDs also accept custom partitioners.

HashPartitioner is the default partitioner in Spark. It calculates a partition index based on an element’s Java hash code (or a key’s hash code in pair RDDs), according to this simple formula: partitionIndex = hashCode % numberOfPartitions. The partition index is determined quasi-randomly; consequently, the partitions most likely won’t be exactly the same size. In large datasets with a relatively small number of partitions, though, the algorithm is likely to distribute data evenly among them. The default number of data partitions when using HashPartitioner is determined by the Spark configuration parameter spark.default.parallelism. If that parameter isn’t specified by the user, it will be set to the number of cores in the cluster.

RangePartitioner partitions data of sorted RDDs into roughly equal ranges. It samples the contents of the RDD passed to it and determines the range boundaries according to the sampled data. You aren’t likely to use RangePartitioner directly.

Custom partitioners can be used only on pair RDDs, by passing them to pair RDD transformations. Most pair RDD transformations have two additional overloaded methods: one that takes an additional Int argument (the desired number of partitions) and another that takes an additional argument of the (custom) Partitioner type. The method that takes the number of partitions uses the default Hash-Partitioner. For example, these two lines of code are equal, because they both apply HashPartitioner with 100 partitions:

rdd.foldByKey(afunction, 100)
rdd.foldByKey(afunction, new HashPartitioner(100))

Leave a Comment