Adv Spark Programming

11 / 52

Adv Spark Programming - Partitioning HandsOn

As part of this Hands-On we will learn what it means by partitioning in Apache Spark.

An RDD has partitions and each partitions has records. How to place the records in various partitions is controlled by partitioning. Partitioning is basically about choosing the distribution of keys in the RDD partitions. This is sometimes needed as per project objectives and sometimes it is needed for optimization.

Lets take an example. At cloudxLab, you use the spark-shell command. You can also run other versions of spark from /usr/ folder. Here, lets start the spark-shell.

On the scala spark shell, lets first create an array of numbers from 1 to 10:

var nums = 1 to 10;

Now, let convert in an RDD using parallelize:

var numsRdd = sc.parallelize(nums)

To know about the number of partitions we can use numsRdd.partitions.length function:

numsRdd.partitions.length

But to take a look at the data we can eithe save to HDFS using saveAsTextFile and then take a look at the actual data in the files created.

numsRdd.saveAsTextFile("somedir")

If your data is really small you can use glom to convert the partitions into arrary of arrays and then use collect. collect is generally impactical on huge data.

numsRdd.glom().collect()

Here you can see that it creates and array of arrays.

Since, only key-value RDD can be partitioned, let us create an RDD with key-value pairs with 1 as the value for every key.

var kvrdd = numsRdd.map((_, 1));

Now, to partition, we need to specify partitioners. Partitioner are the classes or pieces of logic which specify how to distribute data between the partitions of RDD. As part of core spark, we are provided with two partitioners: HashPartitioner and RangeParitioner

We also need to import the HashPartitioner:

import org.apache.spark.HashPartitioner

Let us first partition our kv-rdd using hashpartitioner as argument. Here we are creating 3 partitions:

var prdd = kvrdd.partitionBy(new HashPartitioner(3));

Let us check if the resulting RDD is well partitioned with:

prdd.glom().collect()

You can see it has partitioned the RDD into three parts. Now, please notice that the records have being put into partitioned in round robin manner. The hashPartitioner first computes the hashcode of the key and then computing the remainder of the hashcode after dividing by number of partitions. This remainder specifies the partition in which key needs to be put.

Sometimes, you may want to partition an RDD into non-overlapping partitions. This is where the range based partitioning comes into play.

So, let repartition the wordsCount using rangePartitioner:

import org.apache.spark.RangePartitioner
var words = sc.parallelize(Array("this", "is", "a", "cat", "there", "is", "a", "rat", "cat", "eats", "rat"), 2)
var wordsTups = words.map((_, 1))
var output = wordsTups.partitionBy(new RangePartitioner(2, wordsTups))
output.glom().collect()

Let us take a look at the result:

Array(Array((is,1), (a,1), (cat,1), (is,1), (a,1), (cat,1), (eats,1)), Array((this,1), (there,1), (rat,1), (rat,1)))

You can see that there are two partitions one having words less than or equal to is and second having keys bigger than "is".

The way it works is it samples the RDD and figures out what kind of range does the input RDD has and accordingly it splits the rdd. In order to first sample before partitioning, the rangepartitioner takes the rdd as argument too.

When we save this RDD using into an HDFS file, one file per partition will be created in HDFS with the same order as here. Let us take a look:

output.saveAsTextFile("myoutput_dir")

If you go into HDFS, you will be able to see that there is a folder having two files. Each files would have one key,value per line.

This concludes our discussion around partitioning.

Please note that if we do not specify the number of partitions in parallelize method, it by default creates the number of partitions in RDD equal to the number of CPU cores which is 16. Now, when saving such an RDD it tried to allocate 16 blocks on HDFS which is 16*128 MB which overflows the disk quota. So, always specify a smaller number of partitions in the second argument of parallelize method.

Slides - Adv Spark Programming (1)


No hints are availble for this assesment

Answer is not availble for this assesment

Loading comments...