Adv Spark Programming

11 / 52

Adv Spark Programming - Partitioning HandsOn

As part of this Hands-On we will learn what it means by partitioning in Apache Spark.

An RDD has partitions and each partitions has records. How to place the records in various partitions is controlled by partitioning. Partitioning is basically about choosing the distribution of keys in the RDD partitions. This is sometimes needed as per project objectives and sometimes it is needed for optimization.

Lets take an example. At cloudxLab, you use the spark-shell command. You can also run other versions of spark from /usr/ folder. Here, lets start the spark-shell.

On the scala spark shell, lets first create an array of numbers from 1 to 10:

var nums = 1 to 10;

Now, let convert in an RDD using parallelize:

var numsRdd = sc.parallelize(nums)

To know about the number of partitions we can use numsRdd.partitions.length function:

numsRdd.partitions.length

But to take a look at the data we can eithe save to HDFS using saveAsTextFile and then take a look at the actual data in the files created.

numsRdd.saveAsTextFile("somedir")

If your data is really small you can use glom to convert the partitions into arrary of arrays and then use collect. collect is generally impactical on huge data.

numsRdd.glom().collect()

Here you can see that it creates and array of arrays.

Since, only key-value RDD can be partitioned, let us create an RDD with key-value pairs with 1 as the value for every key.

var kvrdd = numsRdd.map((_, 1));

Now, to partition, we need to specify partitioners. Partitioner are the classes or pieces of logic which specify how to distribute data between the partitions of RDD. As part of core spark, we are provided with two partitioners: HashPartitioner and RangeParitioner

We also need to import the HashPartitioner:

import org.apache.spark.HashPartitioner

Let us first partition our kv-rdd using hashpartitioner as argument. Here we are creating 3 partitions:

var prdd = kvrdd.partitionBy(new HashPartitioner(3));

Let us check if the resulting RDD is well partitioned with:

prdd.glom().collect()

You can see it has partitioned the RDD into three parts. Now, please notice that the records have being put into partitioned in round robin manner. The hashPartitioner first computes the hashcode of the key and then computing the remainder of the hashcode after dividing by number of partitions. This remainder specifies the partition in which key needs to be put.

Sometimes, you may want to partition an RDD into non-overlapping partitions. This is where the range based partitioning comes into play.

So, let repartition the wordsCount using rangePartitioner:

import org.apache.spark.RangePartitioner
var words = sc.parallelize(Array("this", "is", "a", "cat", "there", "is", "a", "rat", "cat", "eats", "rat"), 2)
var wordsTups = words.map((_, 1))
var output = wordsTups.partitionBy(new RangePartitioner(2, wordsTups))
output.glom().collect()

Let us take a look at the result:

Array(Array((is,1), (a,1), (cat,1), (is,1), (a,1), (cat,1), (eats,1)), Array((this,1), (there,1), (rat,1), (rat,1)))

You can see that there are two partitions one having words less than or equal to is and second having keys bigger than "is".

The way it works is it samples the RDD and figures out what kind of range does the input RDD has and accordingly it splits the rdd. In order to first sample before partitioning, the rangepartitioner takes the rdd as argument too.

When we save this RDD using into an HDFS file, one file per partition will be created in HDFS with the same order as here. Let us take a look:

output.saveAsTextFile("myoutput_dir")

If you go into HDFS, you will be able to see that there is a folder having two files. Each files would have one key,value per line.

This concludes our discussion around partitioning.

Please note that if we do not specify the number of partitions in parallelize method, it by default creates the number of partitions in RDD equal to the number of CPU cores which is 16. Now, when saving such an RDD it tried to allocate 16 blocks on HDFS which is 16*128 MB which overflows the disk quota. So, always specify a smaller number of partitions in the second argument of parallelize method.

Slides - Adv Spark Programming (1)


No hints are availble for this assesment

Answer is not availble for this assesment

Please login to comment

59 Comments

Hi team, I don't have access to hue. How to trace my saved file as per tutorial

  Upvote    Share

how to run pyspark code on cloudxlab provided jupyter?

 

  Upvote    Share

Hello,

Even after deleting files still getting the disk space error. I chave taken the action as suggested to others yet issue not resolved.

 

 

  Upvote    Share

Hi,

Please retry by closing all console tabs and browser. Then reopen the things and retry. 

Thanks.

  Upvote    Share

Hi, Have tried again. Still same issue

  Upvote    Share

Please take a look at this one. It is because the total number of partitions are too many.

https://discuss.cloudxlab.com/t/error-quota-exceeded-while-running-spark-job-but-havent-used-much-of-disk-space-solved/3472/3

 

  Upvote    Share

Hello,

Im unable to view Hue under services, please help.

  Upvote    Share

Hi,

HUE is deprecated on the lab. Please use console.

Thanks.

  Upvote    Share

Okay, Thankyou.

  Upvote    Share

 

var nums = 1 to 10;
var numsRdd = sc.parallelize(nums)
numsRdd.partitions.length
nums = Range(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
numsRdd = ParallelCollectionRDD[14] at parallelize at <console>:32

Out[19]:

16

 

why is the partition value coming to 16 and not 4 .. is it because of this its not getting saved? is there any setting we need to revert?

Please respond - since many have posted this query but theres no answer from your end..

  Upvote    Share

Because there are 16 cores on the machine.

  Upvote    Share

I meant that there are 16 CPU Cores on the machine.

  Upvote    Share

Do we have HashPartitioner and RangePartitioner in pyspark?

 

  Upvote    Share

Yes.

  Upvote    Share

I can't log in to my hue account it says incorrect username or password what is my username and password hue login account?

  Upvote    Share

Hi Zaid,

Hue has been removed where are you trying to login?, can you post some screenshot?

  Upvote    Share

could u please help here.
scala> 20/10/28 17:27:56 ERROR DFSClient: Failed to close inode 19828781
org.apache.hadoop.hdfs.protocol.DSQuotaExceededException: The DiskSpace quota of/user/devsanga5438 is exceeded: quota = 4294967296 B = 4 GB but diskspace consumed = 4359043970 B = 4.06 GB
        at org.apache.hadoop.hdfs.server.namenode.DirectoryWithQuotaFeature.verifyStoragespaceQuota(Directory

  Upvote    Share

numsRdd.saveAsTextFile("somedir")

20/10/20 12:16:49 WARN DFSClient: DataStreamer Exceptionorg.apache.hadoop.hdfs.protocol.DSQuotaExceededException: The DiskSpace quota of /user/shallenderparihar4221079 is exceeded: quota = 4294967296 B = 4 GB but diskspace consumed = 4358468086 B = 4.06 GB

  Upvote    Share

I still dont have access to hue. This is becoming very difficult. Please see to it.

  Upvote    Share

Hi,

Hue is no longer supported on the lab for the following reasons: https://discuss.cloudxlab.com/t/should-we-be-using-hue/5821/2

Please use console.

Thanks.

  Upvote    Share

when i tried checking no. of partitions using  below command it showed 16. How it possible?. I saved to hdfs and checked the data in the partitions  i found some missing and empty partitions created. plz explain where am i doing wrong?

numsRdd.partitions.length
  Upvote    Share

I think the number of paritions depends on the number of CPU cores. On the lab we have 16 cores CPU.

  Upvote    Share

I don't have link of Hue. Kindly look into this. 

  Upvote    Share

This comment has been removed.

hi, 

i didn't understand this line (" The hashPartitioner first computes the hashcode of the key and then computing the remainder of the hashcode after dividing by number of partitions. This remainder specifies the partition in which key needs to be put.")

  Upvote    Share

Yes that is True. 

The basic principle of HashPartitioner is For a given key, calculate its hashCode and divide it by the number of partitions. If the remainder is less than 0, use the remainder + the number of partitions. 

Kindly refer this for implementations :-  https://www.programmersought.com/article/8750375827/

All the best!

  Upvote    Share

numsRdd.glom().collect()

[[], [1], [], [2], [3], [], [4], [5], [], [6], [], [7], [8], [], [9], [10]] what is the first square bracket denote before each number please explan.
  Upvote    Share

Hi, Amit. 

glom().collect() will give the output as Partitions for the RDDs. The [] seems to be the partitions as per your inputs. 

The hashPartitioner first computes the hashcode of the key and then computing the remainder of the hashcode after dividing by number of partitions. 

The hashPartitioner is not able to find the remainder of Hashcode for your inputs that is why you are getting the empty list.

All the best!

  Upvote    Share

I guess it is related to the number of partitions and data provided for partitioning (10 numbers and 16 partition). In our case data is too small and partitions are more so there is nothing to write on the particular partition. Spark is creating the partition but there is no data (as Satyajit said "not able to find the remainder") on partition so returning empty brackets in Array. 

See below:

var nums = 1 to 10;
var numsRdd1 = sc.parallelize(nums,10) /*10 is partition value*/
print(numsRdd1.partitions.length)
numsRdd1.glom().collect()

Output with 10: No Empty brackets

10
nums = Range(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
numsRdd1 = ParallelCollectionRDD[23] at parallelize at <console>:34
[[1], [2], [3], [4], [5], [6], [7], [8], [9], [10]]

Output with 11: Empty brackets - First one in the Array

11
nums = Range(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
numsRdd1 = ParallelCollectionRDD[25] at parallelize at <console>:35
[[], [1], [2], [3], [4], [5], [6], [7], [8], [9], [10]]

Output with 9: No Empty brackets. [9,10] are merged

9
nums = Range(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
numsRdd1 = ParallelCollectionRDD[27] at parallelize at <console>:35
[[1], [2], [3], [4], [5], [6], [7], [8], [9, 10]]

HDFS fs -cat output for 16 patition [No values created on part 0,2,5,8,10,13]

same were returned empty by glom

numsRdd.glom().collect()

[[], [1], [], [2], [3], [], [4], [5], [], [6], [], [7], [8], [], [9], [10]]

 

 1  Upvote    Share

I am getting the below message

Caused by: org.apache.hadoop.ipc.RemoteException: The DiskSpace quota of /user/spanda697338 is exceeded: quota = 4294967296 B = 4 GB but diskspace consumed = 4
322976912 B = 4.03 GB

I went and deleted most of the files and folders. I still keep getting the above message while doing "saveAsTextFile".

Can someone help?

Thanks.

  Upvote    Share

for 10 values, why is it showing as 16 partitions

  Upvote    Share

I also observed this. It seems it is creating 16 partitions by default unless specified. Some partittions are blank

  Upvote    Share

numsRdd.saveAsTextFile("numsRdd-handson") is giving the following error:

The DiskSpace quota of /user/bvrajuitp3393 is exceeded: quota =
4294967296 B = 4 GB but diskspace consumed = 4364893733 B = 4.07 GB

I have removed some of the directories in my home directory...but the problem still persists

  Upvote    Share

HI Sandeep, how do we decide the number of partitions?

  Upvote    Share

ff

  Upvote    Share

I am getting diskspace quota error while running a basic command as numsRdd.saveAsTextFile("somedir").. I have set the replication factor of my user/rahuljhaplay8491 directory as 1.. my hdfs overall space used shows 390 mb... I dont understand what is the problem!

  Upvote    Share

@disqus_XTh3bUKOBh:disqus
@sandeepgiri:disqus

Consider table(DataFrame) where it has

---------------------------------------
CountryName | CityName | NumOfBuyers |
Paris--------------- XXX1------- 20
Paris---------------XXX2 --------10
Paris---------------XXX3 ------- 05
USA---------------AAA1 ------- 10
USA--------------- AAA2------- 20
UK---------------- SSS1------- 10
UK--------------- SSS2 ------- 20

How to do partition based on country name. Just like we do it in hive partitioning ..

1. var mycountryArray = Array(("paris","xxx1",10),("paris","xxx2",10),
("uk","xxx1",100),("uk","xxx1",10),("usa","xxx5",5),
("usa","xxx2",6),("usa","xxx1",2))

2. var rdd = sc.parallelize(mycountryArray)
3. var kvrddtup = rdd.map(x => (x._1, (x._2,x._3)))

scala> kvrddmap.glom().collect()
res11: Array[Array[scala.collection.immutable.Map[String,(String, Int)]]] = Array(Array(), Array(), Array(Map(paris -> (xxx1,10))), Array(), Array(), Array(Map(paris -> (xxx2,10))), Array(), Array(), Arra
y(Map(uk -> (xxx1,100))), Array(), Array(), Array(Map(uk -> (xxx1,10))), Array(), Array(), Array(Map(usa -> (xxx5,5))), Array(), Array(), Array(Map(usa -> (xxx2,6))), Array(), Array(Map(usa -> (xxx1,2))))

4. kvrddtup.collect()
5. var output = kvrddtup.partitionBy(new HashPartitioner(3))
6. output.glom().collect()
7. output.saveAsTextFile("hashParitionData")

output is creating 3 partitions but data is available only in 2 partitions and other partition is empty
part-00000 36 bytes karthikcppj1382 karthikcppj1382 -rw-r--r-- June 02, 2020 08:36 PM
part-00001 0 bytes karthikcppj1382 karthikcppj1382 -rw-r--r-- June 02, 2020 08:36 PM
part-00002 76 bytes karthikcppj1382 karthikcppj1382 -rw-r--r-- June 02, 2020 08:36 PM

Thanks for great tutorials ... :)

 2  Upvote    Share

output.getNumPartitionsres22: Int = 3

kvrddtup.getNumPartitionsres23: Int = 16

rdd.getNumPartitionsres24: Int = 16

Does sc.parallelize  will do the partitions? if so, why are we again doing partition does it mean doing partitions as we like can someone explain.

Thanks

  Upvote    Share

sc.parallelize creates RDD and RDD has partitions.

Yes, if there is a requirement to reorganize the data in partitions we use the partitioners.

  Upvote    Share

My partition is 20;

numsRdd.glom().collect()
res4: Array[Array[Int]] = Array(Array(), Array(1), Array(), Array(2), Array(), Array(3), Array(), Array(4), Array(), Array(5), Array(), Array(6), Array(), Array(7), Array(), Array(8), Array(), Array(9), Array(), Array(10))

  Upvote    Share

org.apache.hadoop.ipc.RemoteException: The DiskSpace quota of /user/kini2u8781 is exceeded: quota = 4294967296 B = 4 GB but diskspace consumed = 4353987594 B = 4.05 GB .

Please help.

  Upvote    Share

Hi, Abhijith.

The message is self explanatory that your usage got exceeded than the space provided to you that is 4GB.
kindly refer the below discussions:- https://cloudxlab.com/faq/6....
All the best!

-- Satyajit Das

  Upvote    Share

getting error

Caused by: org.apache.hadoop.ipc.RemoteException: The DiskSpace quota of /user/rahulmahale8679 is exceeded: quota = 4294967296 B = 4 GB but diskspace con
sumed = 4619286654 B = 4.30 GB

Deleted all content from HDFS from my folder, still getting same error

  Upvote    Share

Caused by: org.apache.hadoop.ipc.RemoteException: The DiskSpace quota of /user/kunalpcea2433 is exceeded: quota = 4294967296 B = 4 GB but diskspace consumed = 4346
319264 B = 4.05 GB

Checked the space used
[kunalpcea2433@cxln5 ~]$ hdfs dfs -du -s -h ../kunalpcea2433
358.0 M .
Please check.

Resolved using
hadoop dfs -setrep -w 1 -R /user/username

  Upvote    Share

@kanthanravichandran:disqus

l

this is what i get thanks..

  Upvote    Share

I am getting the following error while executing saveAsTextFile:

Caused by: org.apache.hadoop.ipc.RemoteException: The DiskSpace quota of /user/mailtokanthan4318 is exceeded: quota = 4294967296 B = 4 GB but diskspace consumed = 4364805024 B = 4.07 GB

I checked the HDFS utilization for my id are below :-

[santurock1778679@cxln4 ~]$ hdfs dfs -du -s -h ../santurock1778679
358.1 M .
[santurock1778679@cxln4 ~]$

  Upvote    Share

My partition length is 16 instead of 4 and it does not want to saveAsTextFile, what could be the problem?

 1  Upvote    Share

Hi,

I am getting the following error while executing saveAsTextFile:

Caused by: org.apache.hadoop.ipc.RemoteException: The DiskSpace quota of /user/mailtokanthan4318 is exceeded: quota = 4294967296 B = 4 GB but diskspace consumed = 4364805024 B = 4.07 GB

Can you please check.

Thanks,
Kanthan

  Upvote    Share
Abhinav Singh

Hi @@kanthanravichandran:disqus ,

This discussion will help you.

Thanks

  Upvote    Share

with this command numsRdd.saveAsTextFile("folder_name") . Blank folder created with no partitions.

  Upvote    Share

Was the RDD empty? Did you check numsRdd.take(10)?

  Upvote    Share

I tried to run the command to save the file in disk but it says quota is alloted by 4 GB. I require more than 4 GB then. I deleted some of the folder but still its showing same error. How can I emptyfy Trash

  Upvote    Share

First, The files remain in trash. please use hue to remove from trash too.

Second, try decreasing the replication factor for the unimportant files.

  Upvote    Share

Why is it showing not completed in course summary, even after marking as completed

  Upvote    Share

Can you check now. It shows to be completed.

  Upvote    Share

numsRdd.partitions.length gives us 4, so by default spark will create 4 partitions?

  Upvote    Share