Enrollments closing soon for Post Graduate Certificate Program in Applied Data Science & AI By IIT Roorkee | 3 Seats Left
Apply NowLogin using Social Account
     Continue with GoogleLogin using your credentials
As part of this Hands-On we will learn what it means by partitioning in Apache Spark.
An RDD has partitions and each partitions has records. How to place the records in various partitions is controlled by partitioning. Partitioning is basically about choosing the distribution of keys in the RDD partitions. This is sometimes needed as per project objectives and sometimes it is needed for optimization.
Lets take an example. At cloudxLab, you use the spark-shell command. You can also run other versions of spark from /usr/ folder. Here, lets start the spark-shell.
On the scala spark shell, lets first create an array of numbers from 1 to 10:
nums = range(1, 11)
Now, let convert in an RDD using parallelize:
numsRdd = sc.parallelize(nums)
To know about the number of partitions we can use numsRdd.partitions.length function:
numsRdd.getNumPartitions()
But to take a look at the data we can eithe save to HDFS using saveAsTextFile and then take a look at the actual data in the files created.
numsRdd.saveAsTextFile("somedir")
If your data is really small you can use glom to convert the partitions into arrary of arrays and then use collect. collect is generally impactical on huge data.
numsRdd.glom().collect()
Here you can see that it creates and array of arrays:
[[1], [2, 3], [4, 5], [6], [7, 8], [9, 10]]
Since only key-value RDD can be partitioned, let us create an RDD with key-value pairs with 1 as the value for every key.
kvrdd = numsRdd.map(lambda x: (x, 1));
Now, to partition, we need to specify partitioners. Partitioners are the classes or pieces of logic which specify how to distribute data between the partitions of RDD.
Let us first partition our kv-rdd using hash partitioner as argument. Here we are creating 3 partitions:
prdd = kvrdd.partitionBy(3, partitionFunc=hash)
Let us check if the resulting RDD is well partitioned with:
prdd.glom().collect()
[[(3, 1), (6, 1), (9, 1)], [(1, 1), (4, 1), (7, 1), (10, 1)], [(2, 1), (5, 1), (8, 1)]]
You can see it has partitioned the RDD into three parts. Now, please notice that the records have been put into partitioned in round-robin manner. The hash partitioner first computes the hashcode of the key and then computes the remainder of the hashcode after dividing by a number of partitions. This remainder specifies the partition in which the key needs to be put.
When we save this RDD using into an HDFS file, one file per partition will be created in HDFS with the same order as here. Let us take a look:
output.saveAsTextFile("myoutput_dir")
If you go into HDFS, you will be able to see that there is a folder having two files. Each file would have one key, value per line.
This concludes our discussion around partitioning.
Please note that if we do not specify the number of partitions in parallelize method, it by default creates the number of partitions in RDD equal to the number of CPU cores which is 16. Now, when saving such an RDD it tried to allocate 16 blocks on HDFS which is 16*128 MB which overflows the disk quota. So, always specify a smaller number of partitions in the second argument of parallelizing method.
Scala Partitioning Video
Taking you to the next exercise in seconds...
Want to create exercises like this yourself? Click here.
No hints are availble for this assesment
Answer is not availble for this assesment
Loading comments...