Apache Spark Basics with Python

69 / 86

Apache Spark with Python - More RDD Operations

Now, we will look at some more important operations.

  1. sample - This is a transformation provided on RDD. The sample generates a small RDD containing a fraction of the actual. We can specify the fraction as the second argument, the first argument is a flag to allow repetition. Following is the syntax for sample:

    RDD.sample(withReplacement, fraction, [seed])
  2. mapPartitions - This returns a new RDD by applying a function to each partition of an RDD. mapPartitions is a transformation that calls the given function with each partition as argument. It is similar to map but it runs on each partition instead of each record unlike map. Following is the syntax for mapPartitions:

    RDD.mapPartitions(f, preservesPartitioning=False)
  3. sortBy - It sorts an RDD based on the key provided by key function which is provided as argument. The second argument is a flag to specify whether you want the result to be ascending or descending. The default value of the this flag is True which means by default the results will be sorted in ascending order of the key provided by key function. The third argument specifies how many total number of Partitions you would want in the resulting RDD. sortBy is a transformation which involves the shuffling of the elements of RDDs - therefore it is more efficient to repartition while shuffling is happening else there might be unnecessary data movement. Therefore, numPartitions is an important switch for optimization. This is what it's syntax looks like:

    RDD.sortBy(func, ascending=True, numPartitions=None)

Even though RDD is not a set, still the set operations try to provide you utility set functions. Here are a few important set operations:

  1. distinct - This creates an RDD with distinct or unique values. It gives the set property to your RDD. distinct is an expensive operation as sorting or shuffling is required.

  2. union - It simply appends one RDD to another. It is not the same as the mathematical union operator as it does not try to remove the duplicates. So, the resulting RDD might have the duplicate records.

  3. subtract - subtract transformation returns values which are only in first RDD and not in the second RDD. It involves shuffling similar to sortBy or distinct transformation.

  4. intersection - Creates a new RDD with the common values in the two. It also removes duplicates. intersection also requires shuffling.

  5. cartesian - Another very important transformation is the cartesian. It returns all possible pairs of (a,b) where a is in source RDD and b is in other RDD.

  6. fold - It is very similar to reduce. However, it provide a little extra control over the initialization. It let's us specify an initial value to which the values can be merged.

  7. aggregate - The aggregate function is a powerful action to compute the result. With aggregate we specify an initial value, a function that would merge the elements from partitions into the result and then another function that would merge the results of each partition into a final result. Please note that the result of aggregate can be of different data type than the RDD elements.

  8. countByValue - It internally sortes and groups the values and then computes counts in each group.

  9. top - This action finds the top few elements. It first orders data in descending order and then calls take action. You can also achieve the same using sortBy transformation and take action.

  10. takeOrdered - It gets the N elements from an RDD ordered in ascending order or as specified by the optional key function. Please note that top provided largest N elements while takeOrdered provided smallest N elements by default. Also note that it is possible to specify Ordering as well as the key function.

  11. foreach - foreach applies a function to all elements of this RDD. Though foreach looks like map but there are few differences. You can use foreach if you don't expect any result. For example saving to database. foreach is an action and map is a transformation.

  12. foreachPartition - Similar to mapPartition, we have foreachPartition function. If you do no expect any return value and want to operate on entire partition, use foreachPartition. The provided function would be executed with the iterator of each partition. Again, unlike mapPartition, foreachPartition is an action.

Let's look at each of these in action.

  • First, let's define an RDD of a series of numbers from 1 to 101 and save it in a variable named seq

    <<your code goes here>> = sc.parallelize(range(1,101), 5)

    Now let's use sample on this and collect them before observing the results, with and without allowing repetition

    seq.sample(False, 0.1).<<your code goes here>>()
    seq.<<your code goes here>>(True, 0.1).collect()

    Please note that every time you collect, you would get different results because on collect the entire process would be re-executed

  • For mapPartition, we will first define an RDD as below

    rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10], 3)

    Next, we will define a function called adder that will yield the sum of the elements

    def <<your code goes here>>(iterator):
        yield sum(iterator)

    Note that instead of return, we are using yield. yield executes the function, stops the execution, sends values to the caller, but retains the previous states so that next time it is called it can start the execution from it had left off. Now we will apply mapPartition on this RDD using the above function. It would simply add the elements of that RDD for each partition

    rdd.<<your code goes here>>(adder).collect()
  • Now we will see how to use the sortBy function. First, we define a list of tuples and save it in the tmp variable, and then create an RDD from the same

    tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
    rdd = sc.parallelize(tmp)

    Again, we will use sortBy to sort this list of tuples using the second element of the tuple in descending order

    rdd.<<your code goes here>>(lambda x: x[1], ascending=False).collect()
  • Now it's time to look at distinct. First, let's define an RDD like below

    seq = sc.parallelize([1,2,3,4,5,4,3,4,2,8])

    Notice the duplicate elements in the RDD. We will use distinct to get only the unique values from this RDD

    seq.<<your code goes here>>().collect()
  • For union we will define 2 RDDs as rdd1 and rdd2

    rdd1 = sc.parallelize([1,2,3,4,5])
    <<your code goes here>> = sc.parallelize([5,6,7,8,9])

    Next, we will union on these 2 RDDs

    rdd1.<<your code goes here>>(rdd2).collect()

    Notice how union retains the digit 5 from both the RDDs

  • Now let's try subtract with these 2 RDDs

    rdd1.<<your code goes here>>(rdd2).collect()
  • Next, we will see how intersection affects these 2 RDDs

    rdd1.<<your code goes here>>(rdd2).collect()
  • Let's try cartesian on these 2 RDDs, it will give all possible combinations of tuples formed from these 2 RDDs

  • For fold, we will first define a new RDD as shown below

    rdd = sc.parallelize(list([1,2,3,4,5]),2)

    Next, we will use fold as shown below

    rdd.<<your code goes here>>(10, lambda x,y:x+y)

    Here, we have the zeroth element as 10. So the sum that would otherwise get of all numbers in the RDD, is now added by 10 (i.e. zeroth element + all other elements which is equal to 10+1+2+3+4+5 = 25). Also now we have two partitions (i.e. number of partitions * zeroth element which is equal to 2*10 = 20). So the final output that fold gives is 25+20 = 45.

  • For aggregate we will use the following example

      (0, 0),
      (lambda acc, value: (acc[0] + value, acc[1] + 1)),
      (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])))

    Here, we have an initial tuple of (0,0), and the first lambda function sums the element of the RDD, while the second lambda function counts the elements. The aggregate function returns both the numbers as tuples.

  • To show how countByValue works, will will first create an RDD with multiple duplicate elements

    rdd = sc.parallelize([1,2,3,3,5,5,5])

    Now we will generate a dictionary of each element along with their count and check the dict dictionary

    dict = rdd.<<your code goes here>>()
  • What if we would want to get the top 10 elements from the below RDD?

    rdd = sc.parallelize(range(1,101),2)

    Simple, we will use top

    rdd.<<your code goes here>>(10)
  • To show an example of takeOrdered we will define a new RDD

    rdd = sc.parallelize([5,3,7,9,4,6,2])

    Now we will use takeOrdered to display the 3 elements in ascending order

  • For foreach let's start by defining a simple function

    def f(x):
        print("Save ",x," to DB")

    Now we will define an RDD and apply the above function on each element using foreach

    sc.parallelize(range(1, 10), 3).<<your code goes here>>(f)

    However, please note that it does not return a value. This is particularly useful if you have to perform some calculation on an RDD and log the result somewhere else, for example a database or call a REST API with each element in the RDD.

  • Last but not the least, foreachPartition applies a function to each partition of an RDD. Let's start by defining the function

    def partitionSum(itr) :
        print("The sum of the parition is ", sum(itr))

    Now we will use foreachPartition to apply this function on a simple RDD with 4 partitions

    sc.parallelize(range(1, 40), 4).<<your code goes here>>(partitionSum)
See Answer

No hints are availble for this assesment

Note - Having trouble with the assessment engine? Follow the steps listed here

Loading comments...