Enrollments closing soon for Post Graduate Certificate Program in Applied Data Science & AI By IIT Roorkee | 3 Seats Left

  Apply Now

Advanced Spark Programming with Python

11 / 49

Adv Spark Programming - Partitioning HandsOn (Python)

As part of this Hands-On we will learn what it means by partitioning in Apache Spark.

An RDD has partitions and each partitions has records. How to place the records in various partitions is controlled by partitioning. Partitioning is basically about choosing the distribution of keys in the RDD partitions. This is sometimes needed as per project objectives and sometimes it is needed for optimization.

Lets take an example. At cloudxLab, you use the spark-shell command. You can also run other versions of spark from /usr/ folder. Here, lets start the spark-shell.

On the scala spark shell, lets first create an array of numbers from 1 to 10:

nums = range(1, 11)

Now, let convert in an RDD using parallelize:

numsRdd = sc.parallelize(nums)

To know about the number of partitions we can use numsRdd.partitions.length function:

numsRdd.getNumPartitions()

But to take a look at the data we can eithe save to HDFS using saveAsTextFile and then take a look at the actual data in the files created.

numsRdd.saveAsTextFile("somedir")

If your data is really small you can use glom to convert the partitions into arrary of arrays and then use collect. collect is generally impactical on huge data.

numsRdd.glom().collect()

Here you can see that it creates and array of arrays:

[[1], [2, 3], [4, 5], [6], [7, 8], [9, 10]]

Since only key-value RDD can be partitioned, let us create an RDD with key-value pairs with 1 as the value for every key.

kvrdd = numsRdd.map(lambda x: (x, 1));

Now, to partition, we need to specify partitioners. Partitioners are the classes or pieces of logic which specify how to distribute data between the partitions of RDD.

Let us first partition our kv-rdd using hash partitioner as argument. Here we are creating 3 partitions:

prdd = kvrdd.partitionBy(3, partitionFunc=hash)

Let us check if the resulting RDD is well partitioned with:

prdd.glom().collect()

[[(3, 1), (6, 1), (9, 1)], [(1, 1), (4, 1), (7, 1), (10, 1)], [(2, 1), (5, 1), (8, 1)]]

You can see it has partitioned the RDD into three parts. Now, please notice that the records have been put into partitioned in round-robin manner. The hash partitioner first computes the hashcode of the key and then computes the remainder of the hashcode after dividing by a number of partitions. This remainder specifies the partition in which the key needs to be put.

When we save this RDD using into an HDFS file, one file per partition will be created in HDFS with the same order as here. Let us take a look:

output.saveAsTextFile("myoutput_dir")

If you go into HDFS, you will be able to see that there is a folder having two files. Each file would have one key, value per line.

This concludes our discussion around partitioning.

Please note that if we do not specify the number of partitions in parallelize method, it by default creates the number of partitions in RDD equal to the number of CPU cores which is 16. Now, when saving such an RDD it tried to allocate 16 blocks on HDFS which is 16*128 MB which overflows the disk quota. So, always specify a smaller number of partitions in the second argument of parallelizing method.

Scala Partitioning Video

Slides - Adv Spark Programming (1)


No hints are availble for this assesment

Answer is not availble for this assesment

Loading comments...