Login using Social Account
     Continue with GoogleLogin using your credentials
Now, we will look at some more important operations.
sample
- This is a transformation provided on RDD. The sample
generates a small RDD containing a fraction of the actual. We can specify the fraction as the second argument, the first argument is a flag to allow repetition. Following is the syntax for sample
:
RDD.sample(withReplacement, fraction, [seed])
mapPartitions
- This returns a new RDD by applying a function to each partition of an RDD. mapPartitions
is a transformation that calls the given function with each partition as argument. It is similar to map
but it runs on each partition instead of each record unlike map
. Following is the syntax for mapPartitions
:
RDD.mapPartitions(f, preservesPartitioning=False)
sortBy
- It sorts an RDD based on the key provided by key function which is provided as argument. The second argument is a flag to specify whether you want the result to be ascending or descending. The default value of the this flag is True which means by default the results will be sorted in ascending order of the key provided by key function. The third argument specifies how many total number of Partitions you would want in the resulting RDD. sortBy
is a transformation which involves the shuffling of the elements of RDDs - therefore it is more efficient to repartition while shuffling is happening else there might be unnecessary data movement. Therefore, numPartitions
is an important switch for optimization. This is what it's syntax looks like:
RDD.sortBy(func, ascending=True, numPartitions=None)
Even though RDD is not a set, still the set operations try to provide you utility set functions. Here are a few important set operations:
distinct
- This creates an RDD with distinct or unique values. It gives the set property to your RDD. distinct
is an expensive operation as sorting or shuffling is required.
union
- It simply appends one RDD to another. It is not the same as the mathematical union operator as it does not try to remove the duplicates. So, the resulting RDD might have the duplicate records.
subtract
- subtract
transformation returns values which are only in first RDD and not in the second RDD. It involves shuffling similar to sortBy
or distinct
transformation.
intersection
- Creates a new RDD with the common values in the two. It also removes duplicates. intersection
also requires shuffling.
cartesian
- Another very important transformation is the cartesian
. It returns all possible pairs of (a,b)
where a
is in source RDD and b
is in other RDD.
fold
- It is very similar to reduce
. However, it provide a little extra control over the initialization. It let's us specify an initial value to which the values can be merged.
aggregate
- The aggregate
function is a powerful action to compute the result. With aggregate
we specify an initial value, a function that would merge the elements from partitions into the result and then another function that would merge the results of each partition into a final result. Please note that the result of aggregate can be of different data type than the RDD elements.
countByValue
- It internally sortes and groups the values and then computes counts in each group.
top
- This action finds the top few elements. It first orders data in descending order and then calls take
action. You can also achieve the same using sortBy
transformation and take
action.
takeOrdered
- It gets the N
elements from an RDD ordered in ascending order or as specified by the optional key function. Please note that top
provided largest N
elements while takeOrdered
provided smallest N
elements by default. Also note that it is possible to specify Ordering as well as the key function.
foreach
- foreach
applies a function to all elements of this RDD. Though foreach
looks like map
but there are few differences. You can use foreach
if you don't expect any result. For example saving to database. foreach
is an action and map
is a transformation.
foreachPartition
- Similar to mapPartition
, we have foreachPartition
function. If you do no expect any return value and want to operate on entire partition, use foreachPartition
. The provided function would be executed with the iterator of each partition. Again, unlike mapPartition
, foreachPartition
is an action.
Let's look at each of these in action.
First, let's define an RDD of a series of numbers from 1 to 101 and save it in a variable named seq
<<your code goes here>> = sc.parallelize(range(1,101), 5)
Now let's use sample
on this and collect
them before observing the results, with and without allowing repetition
seq.sample(False, 0.1).<<your code goes here>>()
seq.<<your code goes here>>(True, 0.1).collect()
Please note that every time you collect
, you would get different results because on collect
the entire process would be re-executed
For mapPartition
, we will first define an RDD as below
rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10], 3)
Next, we will define a function called adder
that will yield
the sum of the elements
def <<your code goes here>>(iterator):
yield sum(iterator)
Note that instead of return
, we are using yield
. yield
executes the function, stops the execution, sends values to the caller, but retains the previous states so that next time it is called it can start the execution from it had left off. Now we will apply mapPartition
on this RDD using the above function. It would simply add the elements of that RDD for each partition
rdd.<<your code goes here>>(adder).collect()
Now we will see how to use the sortBy
function. First, we define a list of tuples and save it in the tmp
variable, and then create an RDD from the same
tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
rdd = sc.parallelize(tmp)
Again, we will use sortBy
to sort this list of tuples using the second element of the tuple in descending order
rdd.<<your code goes here>>(lambda x: x[1], ascending=False).collect()
Now it's time to look at distinct
. First, let's define an RDD like below
seq = sc.parallelize([1,2,3,4,5,4,3,4,2,8])
Notice the duplicate elements in the RDD. We will use distinct
to get only the unique values from this RDD
seq.<<your code goes here>>().collect()
For union
we will define 2 RDDs as rdd1
and rdd2
rdd1 = sc.parallelize([1,2,3,4,5])
<<your code goes here>> = sc.parallelize([5,6,7,8,9])
Next, we will union
on these 2 RDDs
rdd1.<<your code goes here>>(rdd2).collect()
Notice how union
retains the digit 5
from both the RDDs
Now let's try subtract
with these 2 RDDs
rdd1.<<your code goes here>>(rdd2).collect()
Next, we will see how intersection
affects these 2 RDDs
rdd1.<<your code goes here>>(rdd2).collect()
Let's try cartesian
on these 2 RDDs, it will give all possible combinations of tuples formed from these 2 RDDs
rdd1.cartesian(rdd2).collect()
For fold
, we will first define a new RDD as shown below
rdd = sc.parallelize(list([1,2,3,4,5]),2)
Next, we will use fold
as shown below
rdd.<<your code goes here>>(10, lambda x,y:x+y)
Here, we have the zeroth element as 10. So the sum that would otherwise get of all numbers in the RDD, is now added by 10 (i.e. zeroth element + all other elements which is equal to 10+1+2+3+4+5 = 25). Also now we have two partitions (i.e. number of partitions * zeroth element which is equal to 2*10 = 20). So the final output that fold gives is 25+20 = 45.
For aggregate
we will use the following example
sc.parallelize([2,7,4,5]).aggregate(
(0, 0),
(lambda acc, value: (acc[0] + value, acc[1] + 1)),
(lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])))
Here, we have an initial tuple of (0,0), and the first lambda
function sums the element of the RDD, while the second lambda
function counts the elements. The aggregate
function returns both the numbers as tuples.
To show how countByValue
works, will will first create an RDD with multiple duplicate elements
rdd = sc.parallelize([1,2,3,3,5,5,5])
Now we will generate a dictionary of each element along with their count and check the dict
dictionary
dict = rdd.<<your code goes here>>()
dict
What if we would want to get the top 10 elements from the below RDD?
rdd = sc.parallelize(range(1,101),2)
Simple, we will use top
rdd.<<your code goes here>>(10)
To show an example of takeOrdered
we will define a new RDD
rdd = sc.parallelize([5,3,7,9,4,6,2])
Now we will use takeOrdered
to display the 3 elements in ascending order
rdd.takeOrdered(3)
For foreach
let's start by defining a simple function
def f(x):
print("Save ",x," to DB")
Now we will define an RDD and apply the above function on each element using foreach
sc.parallelize(range(1, 10), 3).<<your code goes here>>(f)
However, please note that it does not return a value. This is particularly useful if you have to perform some calculation on an RDD and log the result somewhere else, for example a database or call a REST API with each element in the RDD.
Last but not the least, foreachPartition
applies a function to each partition of an RDD. Let's start by defining the function
def partitionSum(itr) :
print("The sum of the parition is ", sum(itr))
Now we will use foreachPartition
to apply this function on a simple RDD with 4 partitions
sc.parallelize(range(1, 40), 4).<<your code goes here>>(partitionSum)
Taking you to the next exercise in seconds...
Want to create exercises like this yourself? Click here.
No hints are availble for this assesment
Note - Having trouble with the assessment engine? Follow the steps listed here
Loading comments...