As part of this Hands-On we will learn what it means by partitioning in Apache Spark.
An RDD has partitions and each partitions has records. How to place the records in various partitions is controlled by partitioning. Partitioning is basically about choosing the distribution of keys in the RDD partitions. This is sometimes needed as per project objectives and sometimes it is needed for optimization.
Lets take an example. At cloudxLab, you use the spark-shell command. You can also run other versions of spark from /usr/ folder. Here, lets start the spark-shell.
On the scala spark shell, lets first create an array of numbers from 1 to 10:
var nums = 1 to 10;
Now, let convert in an RDD using parallelize:
var numsRdd = sc.parallelize(nums)
To know about the number of partitions we can use numsRdd.partitions.length function:
numsRdd.partitions.length
But to take a look at the data we can eithe save to HDFS using saveAsTextFile and then take a look at the actual data in the files created.
numsRdd.saveAsTextFile("somedir")
If your data is really small you can use glom to convert the partitions into arrary of arrays and then use collect. collect is generally impactical on huge data.
numsRdd.glom().collect()
Here you can see that it creates and array of arrays.
Since, only key-value RDD can be partitioned, let us create an RDD with key-value pairs with 1 as the value for every key.
var kvrdd = numsRdd.map((_, 1));
Now, to partition, we need to specify partitioners. Partitioner are the classes or pieces of logic which specify how to distribute data between the partitions of RDD. As part of core spark, we are provided with two partitioners: HashPartitioner and RangeParitioner
We also need to import the HashPartitioner:
import org.apache.spark.HashPartitioner
Let us first partition our kv-rdd using hashpartitioner as argument. Here we are creating 3 partitions:
var prdd = kvrdd.partitionBy(new HashPartitioner(3));
Let us check if the resulting RDD is well partitioned with:
prdd.glom().collect()
You can see it has partitioned the RDD into three parts. Now, please notice that the records have being put into partitioned in round robin manner. The hashPartitioner first computes the hashcode of the key and then computing the remainder of the hashcode after dividing by number of partitions. This remainder specifies the partition in which key needs to be put.
Sometimes, you may want to partition an RDD into non-overlapping partitions. This is where the range based partitioning comes into play.
So, let repartition the wordsCount using rangePartitioner:
import org.apache.spark.RangePartitioner
var words = sc.parallelize(Array("this", "is", "a", "cat", "there", "is", "a", "rat", "cat", "eats", "rat"), 2)
var wordsTups = words.map((_, 1))
var output = wordsTups.partitionBy(new RangePartitioner(2, wordsTups))
output.glom().collect()
Let us take a look at the result:
Array(Array((is,1), (a,1), (cat,1), (is,1), (a,1), (cat,1), (eats,1)), Array((this,1), (there,1), (rat,1), (rat,1)))
You can see that there are two partitions one having words less than or equal to is and second having keys bigger than "is".
The way it works is it samples the RDD and figures out what kind of range does the input RDD has and accordingly it splits the rdd. In order to first sample before partitioning, the rangepartitioner takes the rdd as argument too.
When we save this RDD using into an HDFS file, one file per partition will be created in HDFS with the same order as here. Let us take a look:
output.saveAsTextFile("myoutput_dir")
If you go into HDFS, you will be able to see that there is a folder having two files. Each files would have one key,value per line.
This concludes our discussion around partitioning.
Please note that if we do not specify the number of partitions in parallelize method, it by default creates the number of partitions in RDD equal to the number of CPU cores which is 16. Now, when saving such an RDD it tried to allocate 16 blocks on HDFS which is 16*128 MB which overflows the disk quota. So, always specify a smaller number of partitions in the second argument of parallelize method.
Taking you to the next exercise in seconds...
Want to create exercises like this yourself? Click here.
No hints are availble for this assesment
Answer is not availble for this assesment
Please login to comment
59 Comments
Hi team, I don't have access to hue. How to trace my saved file as per tutorial
Upvote Sharehow to run pyspark code on cloudxlab provided jupyter?
Hi,
Please use the following with Python3 kernel: https://cloudxlab.com/assessment/displayslide/6307/apache-spark-with-python-preparing-the-environment?course_id=117&playlist_id=703
Thanks.
Upvote ShareHello,
Even after deleting files still getting the disk space error. I chave taken the action as suggested to others yet issue not resolved.
Hi,
Please retry by closing all console tabs and browser. Then reopen the things and retry.
Thanks.
Upvote ShareHi, Have tried again. Still same issue
Upvote SharePlease take a look at this one. It is because the total number of partitions are too many.
https://discuss.cloudxlab.com/t/error-quota-exceeded-while-running-spark-job-but-havent-used-much-of-disk-space-solved/3472/3
Hello,
Im unable to view Hue under services, please help.
Hi,
HUE is deprecated on the lab. Please use console.
Thanks.
Upvote ShareOkay, Thankyou.
Upvote ShareOut[19]:
why is the partition value coming to 16 and not 4 .. is it because of this its not getting saved? is there any setting we need to revert?
Please respond - since many have posted this query but theres no answer from your end..
Upvote ShareBecause there are 16 cores on the machine.
Upvote ShareI meant that there are 16 CPU Cores on the machine.
Upvote ShareDo we have HashPartitioner and RangePartitioner in pyspark?
Yes.
Upvote ShareI can't log in to my hue account it says incorrect username or password what is my username and password hue login account?
Upvote ShareHi Zaid,
Hue has been removed where are you trying to login?, can you post some screenshot?
Upvote Sharecould u please help here.
Upvote Sharescala> 20/10/28 17:27:56 ERROR DFSClient: Failed to close inode 19828781
org.apache.hadoop.hdfs.protocol.DSQuotaExceededException: The DiskSpace quota of/user/devsanga5438 is exceeded: quota = 4294967296 B = 4 GB but diskspace consumed = 4359043970 B = 4.06 GB
at org.apache.hadoop.hdfs.server.namenode.DirectoryWithQuotaFeature.verifyStoragespaceQuota(Directory
Please see this: https://discuss.cloudxlab.com/t/error-quota-exceeded-while-running-spark-job-but-havent-used-much-of-disk-space-solved/3472/3
numsRdd.saveAsTextFile("somedir")
20/10/20 12:16:49 WARN DFSClient: DataStreamer Exceptionorg.apache.hadoop.hdfs.protocol.DSQuotaExceededException: The DiskSpace quota of /user/shallenderparihar4221079 is exceeded: quota = 4294967296 B = 4 GB but diskspace consumed = 4358468086 B = 4.06 GB
Upvote ShareHi,
Please have a look at here: https://discuss.cloudxlab.com/t/the-diskspace-quota-of-user-is-execeeded/5156
Thanks.
Upvote ShareI still dont have access to hue. This is becoming very difficult. Please see to it.
Upvote ShareHi,
Hue is no longer supported on the lab for the following reasons: https://discuss.cloudxlab.com/t/should-we-be-using-hue/5821/2
Please use console.
Thanks.
Upvote Sharewhen i tried checking no. of partitions using below command it showed 16. How it possible?. I saved to hdfs and checked the data in the partitions i found some missing and empty partitions created. plz explain where am i doing wrong?
I think the number of paritions depends on the number of CPU cores. On the lab we have 16 cores CPU.
Upvote ShareI don't have link of Hue. Kindly look into this.
Upvote ShareThis comment has been removed.
hi,
i didn't understand this line (" The hashPartitioner first computes the hashcode of the key and then computing the remainder of the hashcode after dividing by number of partitions. This remainder specifies the partition in which key needs to be put.")
Upvote ShareYes that is True.
The basic principle of HashPartitioner is For a given key, calculate its hashCode and divide it by the number of partitions. If the remainder is less than 0, use the remainder + the number of partitions.
Kindly refer this for implementations :- https://www.programmersought.com/article/8750375827/
All the best!
Upvote SharenumsRdd.glom().collect()
Upvote ShareHi, Amit.
glom().collect() will give the output as Partitions for the RDDs. The [] seems to be the partitions as per your inputs.
The hashPartitioner first computes the hashcode of the key and then computing the remainder of the hashcode after dividing by number of partitions.
The hashPartitioner is not able to find the remainder of Hashcode for your inputs that is why you are getting the empty list.
All the best!
Upvote ShareI guess it is related to the number of partitions and data provided for partitioning (10 numbers and 16 partition). In our case data is too small and partitions are more so there is nothing to write on the particular partition. Spark is creating the partition but there is no data (as Satyajit said "not able to find the remainder") on partition so returning empty brackets in Array.
See below:
var nums = 1 to 10;
var numsRdd1 = sc.parallelize(nums,10) /*10 is partition value*/
print(numsRdd1.partitions.length)
numsRdd1.glom().collect()
Output with 10: No Empty brackets
10
nums = Range(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
numsRdd1 = ParallelCollectionRDD[23] at parallelize at <console>:34
[[1], [2], [3], [4], [5], [6], [7], [8], [9], [10]]
Output with 11: Empty brackets - First one in the Array
11
nums = Range(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
numsRdd1 = ParallelCollectionRDD[25] at parallelize at <console>:35
[[], [1], [2], [3], [4], [5], [6], [7], [8], [9], [10]]
Output with 9: No Empty brackets. [9,10] are merged
9
nums = Range(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
numsRdd1 = ParallelCollectionRDD[27] at parallelize at <console>:35
[[1], [2], [3], [4], [5], [6], [7], [8], [9, 10]]
HDFS fs -cat output for 16 patition [No values created on part 0,2,5,8,10,13]
same were returned empty by glom
numsRdd.glom().collect()
[[], [1], [], [2], [3], [], [4], [5], [], [6], [], [7], [8], [], [9], [10]]
I am getting the below message
Caused by: org.apache.hadoop.ipc.RemoteException: The DiskSpace quota of /user/spanda697338 is exceeded: quota = 4294967296 B = 4 GB but diskspace consumed = 4
322976912 B = 4.03 GB
I went and deleted most of the files and folders. I still keep getting the above message while doing "saveAsTextFile".
Can someone help?
Thanks.
Upvote Sharefor 10 values, why is it showing as 16 partitions
I also observed this. It seems it is creating 16 partitions by default unless specified. Some partittions are blank
Upvote SharenumsRdd.saveAsTextFile("numsRdd-handson") is giving the following error:
The DiskSpace quota of /user/bvrajuitp3393 is exceeded: quota =
4294967296 B = 4 GB but diskspace consumed = 4364893733 B = 4.07 GB
I have removed some of the directories in my home directory...but the problem still persists
Upvote ShareHI Sandeep, how do we decide the number of partitions?
Upvote Shareff
Upvote ShareI am getting diskspace quota error while running a basic command as numsRdd.saveAsTextFile("somedir").. I have set the replication factor of my user/rahuljhaplay8491 directory as 1.. my hdfs overall space used shows 390 mb... I dont understand what is the problem!
Upvote Share@disqus_XTh3bUKOBh:disqus
@sandeepgiri:disqus
Consider table(DataFrame) where it has
---------------------------------------
CountryName | CityName | NumOfBuyers |
Paris--------------- XXX1------- 20
Paris---------------XXX2 --------10
Paris---------------XXX3 ------- 05
USA---------------AAA1 ------- 10
USA--------------- AAA2------- 20
UK---------------- SSS1------- 10
UK--------------- SSS2 ------- 20
How to do partition based on country name. Just like we do it in hive partitioning ..
1. var mycountryArray = Array(("paris","xxx1",10),("paris","xxx2",10),
("uk","xxx1",100),("uk","xxx1",10),("usa","xxx5",5),
("usa","xxx2",6),("usa","xxx1",2))
2. var rdd = sc.parallelize(mycountryArray)
3. var kvrddtup = rdd.map(x => (x._1, (x._2,x._3)))
scala> kvrddmap.glom().collect()
res11: Array[Array[scala.collection.immutable.Map[String,(String, Int)]]] = Array(Array(), Array(), Array(Map(paris -> (xxx1,10))), Array(), Array(), Array(Map(paris -> (xxx2,10))), Array(), Array(), Arra
y(Map(uk -> (xxx1,100))), Array(), Array(), Array(Map(uk -> (xxx1,10))), Array(), Array(), Array(Map(usa -> (xxx5,5))), Array(), Array(), Array(Map(usa -> (xxx2,6))), Array(), Array(Map(usa -> (xxx1,2))))
4. kvrddtup.collect()
5. var output = kvrddtup.partitionBy(new HashPartitioner(3))
6. output.glom().collect()
7. output.saveAsTextFile("hashParitionData")
output is creating 3 partitions but data is available only in 2 partitions and other partition is empty
part-00000 36 bytes karthikcppj1382 karthikcppj1382 -rw-r--r-- June 02, 2020 08:36 PM
part-00001 0 bytes karthikcppj1382 karthikcppj1382 -rw-r--r-- June 02, 2020 08:36 PM
part-00002 76 bytes karthikcppj1382 karthikcppj1382 -rw-r--r-- June 02, 2020 08:36 PM
Thanks for great tutorials ... :)
2 Upvote Shareoutput.getNumPartitionsres22: Int = 3
kvrddtup.getNumPartitionsres23: Int = 16
rdd.getNumPartitionsres24: Int = 16
Does sc.parallelize will do the partitions? if so, why are we again doing partition does it mean doing partitions as we like can someone explain.
Thanks
Upvote Sharesc.parallelize creates RDD and RDD has partitions.
Yes, if there is a requirement to reorganize the data in partitions we use the partitioners.
Upvote ShareMy partition is 20;
numsRdd.glom().collect()
Upvote Shareres4: Array[Array[Int]] = Array(Array(), Array(1), Array(), Array(2), Array(), Array(3), Array(), Array(4), Array(), Array(5), Array(), Array(6), Array(), Array(7), Array(), Array(8), Array(), Array(9), Array(), Array(10))
org.apache.hadoop.ipc.RemoteException: The DiskSpace quota of /user/kini2u8781 is exceeded: quota = 4294967296 B = 4 GB but diskspace consumed = 4353987594 B = 4.05 GB .
Please help.
Upvote ShareHi, Abhijith.
The message is self explanatory that your usage got exceeded than the space provided to you that is 4GB.
kindly refer the below discussions:- https://cloudxlab.com/faq/6....
All the best!
-- Satyajit Das
Upvote Sharegetting error
Caused by: org.apache.hadoop.ipc.RemoteException: The DiskSpace quota of /user/rahulmahale8679 is exceeded: quota = 4294967296 B = 4 GB but diskspace con
sumed = 4619286654 B = 4.30 GB
Deleted all content from HDFS from my folder, still getting same error
Upvote ShareCaused by: org.apache.hadoop.ipc.RemoteException: The DiskSpace quota of /user/kunalpcea2433 is exceeded: quota = 4294967296 B = 4 GB but diskspace consumed = 4346
319264 B = 4.05 GB
Checked the space used
[kunalpcea2433@cxln5 ~]$ hdfs dfs -du -s -h ../kunalpcea2433
358.0 M .
Please check.
Resolved using
Upvote Sharehadoop dfs -setrep -w 1 -R /user/username
@kanthanravichandran:disqus
this is what i get thanks..
Upvote ShareI am getting the following error while executing saveAsTextFile:
Caused by: org.apache.hadoop.ipc.RemoteException: The DiskSpace quota of /user/mailtokanthan4318 is exceeded: quota = 4294967296 B = 4 GB but diskspace consumed = 4364805024 B = 4.07 GB
I checked the HDFS utilization for my id are below :-
[santurock1778679@cxln4 ~]$ hdfs dfs -du -s -h ../santurock1778679
Upvote Share358.1 M .
[santurock1778679@cxln4 ~]$
My partition length is 16 instead of 4 and it does not want to saveAsTextFile, what could be the problem?
1 Upvote ShareHi,
I am getting the following error while executing saveAsTextFile:
Caused by: org.apache.hadoop.ipc.RemoteException: The DiskSpace quota of /user/mailtokanthan4318 is exceeded: quota = 4294967296 B = 4 GB but diskspace consumed = 4364805024 B = 4.07 GB
Can you please check.
Thanks,
Upvote ShareKanthan
Hi @@kanthanravichandran:disqus ,
This discussion will help you.
Thanks
Upvote Sharewith this command numsRdd.saveAsTextFile("folder_name") . Blank folder created with no partitions.
Was the RDD empty? Did you check numsRdd.take(10)?
Upvote ShareI tried to run the command to save the file in disk but it says quota is alloted by 4 GB. I require more than 4 GB then. I deleted some of the folder but still its showing same error. How can I emptyfy Trash
Upvote ShareFirst, The files remain in trash. please use hue to remove from trash too.
Second, try decreasing the replication factor for the unimportant files.
Upvote ShareWhy is it showing not completed in course summary, even after marking as completed
Upvote ShareCan you check now. It shows to be completed.
Upvote SharenumsRdd.partitions.length gives us 4, so by default spark will create 4 partitions?
Upvote Share