Custom Partitioner:
To create your custom partitioner you would need to define a class that extends Partitioner. In the first line, we are importing this partitioner class.
Our partitioner takes an integer as the argument with name numPartitions in constructor even though we are not using this variable at all.
This class should override three methods:
First, getPartition which gets key as the argument and it returns a number 0 to 1 less than the total number of partitions. So, this method is the one that dictates to which partition a key should go.
The second method equals is used for comparing two partitioners and third method hashCode is used for computing hashcode. We aren’t using either of those.
Now, let us use this partitioner.
First, we create an RDD x having key-value pairs by using parallelize over an Array. We are passing 3 as the second argument to parallelize. This creates an RDD x with three partitions.
Let us convert the entire RDD into array of arrays such that each partition becomes an array element. We can use collect() to get and print the whole array of arrays. This helps us understand the partitions very clearly.
Next, we run the partitionBy method on our RDD with a new instance of the partitioner class we just created. We assign the result to new RDD y.
Let’s check using glom how is the structure of our new RDD y.
You can see that this new RDD has two partitions. First partition containing all the keys starting with alphabet a to j. And second partition has the keys whose first character is after j.
You can see that we were able to successfully use our newly created partitioner.
Taking you to the next exercise in seconds...
Want to create exercises like this yourself? Click here.
No hints are availble for this assesment
Answer is not availble for this assesment
Please login to comment
15 Comments
what if i don't pass the second variable which defines number of partitions while creating rdd using parallelize function? will it create partitions based on number of cores of executor ?
Upvote ShareHi,
Instead of me answering this for you, why don't you try both the cases and compare the results.
Thanks.
Upvote SharePlease help!!!!
Hi,
Could you please run this in Jupyter notebook appearing in the right-hand side of assessment.
I have just checked this in the notebook and it is working fine.
Upvote Sharescala> class TwoPartsPartitioner(override val numPartitions: Int) extends Partitioner { | def getPartition(key: Any): Int = key match { | case s: String => { | if (s(0).toUpper > 'J') 1 else 0 | } | } | }<console>:11: error: not found: type Partitioner class TwoPartsPartitioner(override val numPartitions: Int) extends Partitioner {
Upvote ShareGetting below error message
Hi,
Could you please run this on the Jupyter notebook given on the right?
Thanks.
Upvote ShareThis comment has been removed.
Class TwoPartsPartitioner(override val numPartitions:Int) extends Partitioner{
def getPartition(key:Any):Int = Keymatch
{
case s:String=>{
if (s(0).toUpper > 'J') 1 else 0
}
}
}
Please help
Upvote ShareHi, Amit.
Below is the right code :-
import org.apache.spark.Partitioner
class TwoPartsPartitioner(override val numPartitions: Int) extends Partitioner {
def getPartition(key: Any): Int = key match {
case s: String => {
if (s(0).toUpper > 'J') 1 else 0
}
}
}
You can refer this GITHUB repo for more insights :-
https://gist.github.com/girisandeep/f90e456da6f2381f9c86e8e6bc4e8260
Which is already there in the Lecture.
All the best!
Upvote ShareThis comment has been removed.
Hello Sir,
Here TwoPartsPartitioner is giving an error
Upvote ShareHi Vishal,
I hope your issue is resolved. If your are still facing the issue, then please share the screenshot of the error.
Thanks & Regards,
Upvote ShareShubh Wadekar
Team CloudxLab
here TwoPartsPartitioner is giving an error.
please look into the matter.
Upvote ShareHi Anubhav,
I hope your issue is resolved. If your are still facing the issue, then please share the screenshot of the error.
Thanks & Regards,
Upvote ShareShubh Wadekar
Team CloudxLab