Apache Spark Basics

71 / 89

Apache Spark - More RDD Operations

Not able to play video? Try with youtube


Let's take a look at some more important operations.

Sample is transformation provided on rdd. The sample generates a small rdd containing a fraction of the actual. We can specify the fraction as second argument. The first argument is a flag to allow repetition.

Here 0.1 fraction means 10% records. Also, since sample is a transformation, we need to call an action on it to get results.

Please note that every time you collect, you would get different results because on collect() the entire process would be reexecuted.

Returns a new RDD by applying a function to each partition of an RDD. MapPartitions is a transformation that calls the given function with each partition as argument. It is similar to map just that it runs on each partition instead of each record unlike map.

Let us first create an rdd having records from one to fifty with three partitions.

Next we define a function that take an integer iterator as argument and sums the integers.

The we call the Map Partitions transformation with this function as argument. And then we call collect action on the resulting RDD. As you can see that mapPartitions has computed the sum of each partition.

Another very important transformation is sortBy.

Sort By sorts this RDD based on the key provided by key function which is provided as argument.

The other argument it accepts is a flag to specify whether you want the result to be ascending or descending. The default value of the this flag is True which means by default the results will be sorted in ascending order of the key provided by key function.

The third argument specifies how many total number of Partitions you would want in the resulting RDD. sort by is a transformation which involves the shuffling of the elements of RDDs - therefore it is more efficient to repartition while shuffling is happening else there might be unnecessary data movement. Therefore, numPartitions is an important switch for optimization.

Let's take a look at an example use of sortBy. We are creating an RDD by using parallelize over a list of tuples. Here it is an RDD having the tuples but it could contain anything.

Next we sort the RDD by a function which returns the first element of tuple. Then we call collect(). You can see that the data is sorted by first element of tuple which is string.

We can also sort by the second element of tuple by just creating another key function which returns second value of tuple. You can see that the data is sorted by second value of tuple which is number.

If you want to sort the elements based on the values of elements, can create this kind of key function.

Though RDD is not really a set but still the set operations try to provide you utility set functions

Distinct creates an RDD with distinct or unique values. It gives the set property to your rdd. Distinct is an expensive operation as sorting or shuffling is required.

Union Simply appends one rdd to another. It is not same as mathematical union operator as it does not try to remove the duplicates. So, the result might have the duplicate records.

Subtract transformation returns values which are only in first RDD and not in the second RDD. It involves shuffling similar to sortby or distinct transformation.

Creates a new RDD with the common values in the two. It also removes duplicates. It also requires shuffling. In the example above it has returned coffee and monkey because these elements exist in both RDDs.

Another very important transformation is cartesian. It returns all possible pairs of (a,b) where a is in source RDD and b is in other RDD.

fold is very similar to reduce. It provide a little extra control over the initialisation. It lets us specify an initial value to which the values can be merged.

Let's take a look at how fold works. Say we have an RDD having two partitions. First partitions having 1,7 and 2. The second partition of this RDD has 4 ,7 and 6.

While calling fold we would provide two things. First, an initial value into which elements would merged and Second, the function that would be used for merging values. This function takes two arguments similar to reduce. First argument points to the value aggregated so far and second one is a value from partition.

Each of the value from partition will be merged into initial value. As you can see, the initial value is created on both partitions. On first partition, 1 is getting merged and on second partition 4 is getting merged. The values are getting merged using the function provided to us. The result is saved back to initial value. Please note that in some cases such as array concatenations, you can simply return the first argument after appending the second argument in function. This would create the resulting object only once during initialization.

In the similar fashion, other elements from each partition would get merged.

Afterwards, the result from both partitions would be merged using another initial value in similar fashion.

Let's take an example. In this example, we are going to concatenate the strings. We are keeping underscore as the initial value for concatenation. So, lets first create an RDD out of numbers from 1 to 10 with two partitions.

And since we want the result to be strings, we are converting all the numbers to strings using map. Why so? Because like reduce, the the elements must be of the same data type as result.

Next we define our function which concatenates two string s and n and returns the result. Here the name of function is concat.

Next we call fold with initial value as underscore and concat as the function. You can see that the result first concatenated 1 to 5 with underscore and 6 to 7 with another underscore and then concatenated the result with another underscore. So, fold provided a little more control than reduce by letting us specify the initial value.

Even though with fold we were able to specify initial value but it still is not providing enough control over reduction. The aggregate function is the more powerful action to compute the result. With aggregate we specify an initial value, a function that would merge the elements from partitions into the result and then another function that would merge the results of each partition into a final result.

Please note that the result of aggregate can be of different data type than the RDD elements. Here in the diagram the circle represents the data type of result and square represents the data type of elements from RDD. Also, note that the data type of result has to be of the same data type as the initial value. So, in diagram we are first combining square with circle to give out circle using first function. And then we are combining two circles using another function.

Let's take an example of computing average. The idea is to compute Total and Count. The average can be easily computed outside spark by dividing total with count.

So, let's first create an RDD having integers from 1 to 100.

We want the result to be a tuple having sum and count. So, let's have an initial value tuple having both values as zero. First value of this tuple is sum and second is count of the values.

Lets define a function seq which would merge each integers from partition into the initial value by adding it to sum and increasing count by one. When executed we would get a tuple for each partition.

To merge these resulting tuples from all partitions, we define another function.

Here we are defining a comb function to merge two tuple by summing up total and counts.

Finally we call aggregate with initial value, function seq and comb as shown here. You can see that the result contains total and count. We can devide total by count to get average.

Even though you can compute word frequencies as discussed in examples, spark provides you an off the shelf action to compute the frequencies of elements. This action countByValue internally sortes and groups the values and then computes counts in each group.

Spark provides an action to find few top elements. It first orders data in descending order and then calls take action. You can also achieve the same using SortBy transformation and take action.

As you can see, the top 6 has returned 10, 9 , 8, 4, 4 and 3 in descending order.

takeOrdered gets the N elements from an RDD ordered in ascending order or as specified by the optional key function. Please note that top provided largest n elements while takeOrdered provided smalled n elements by default.

Also note that it is possible to specify Ordering as well as the key function as shown on the slide.

Foreach applies a function to all elements of this RDD. If you run the example, it would print the result as an when the function is getting executed in between the log messages.

Though foreach looks like map but there are few differences. Use foreach if you don't expect any result. For example saving to database. Foreach is an action and Map is a transformation.

Similar to mapPartition we have foreachPartition function. If you do no expect any return value and want to operate on entire partition, use foreachPartition. The provided function would be executed with the iterator of each partition.

Again, unlike mapPartition, foreachPartition is an action.

Let us take an example. First we define a function paritionSum which accepts an iterator as argument. This function simply prints the sum using sum method of iterator class.

Next, we call this function with foreachPartition with our RDD having 4 partitions. You can see that the sum of each of the four partitions have been printed.

  • Sample an RDD, with or without replacement

    val seq = sc.parallelize(1 to 100, 5)
    seq.sample(false, 0.1).collect();

    Write the following commands in a new cell:

    seq.sample(true, 0.1).collect();
  • Return a new RDD by applying a function to each partition of this RDD

    val rdd = sc.parallelize(1 to 50, 3)
    def f(l:Iterator[Int]):Iterator[Int] = {
            var sum = 0
                    sum = sum + l.next
            return List(sum).iterator
  • Sorts this RDD by the given keyfunc

    var tmp = List(('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5))
    var rdd = sc.parallelize(tmp)
    rdd.sortBy(x => x._1).collect()
    rdd.sortBy(x => x._2).collect()

    Write the following commands in a new cell:

    var rdd = sc.parallelize(Array(10, 2, 3,21, 4, 5))
    var sortedrdd = rdd.sortBy(x => x)
  • More Actions - fold()

    var myrdd = sc.parallelize(1 to 10, 2)
    var myrdd1 = myrdd.map(_.toString)
    def concat(s:String, n:String):String = s + n
    var s = "_"
  • More Actions - aggregate()

    var rdd = sc.parallelize(1 to 100)
    var init = (0, 0) // sum, count
    def seq(t:(Int, Int), i:Int): (Int, Int) = (t._1 + i, t._2 + 1)
    def comb(t1:(Int, Int), t2:(Int, Int)): (Int, Int) = (t1._1 + t2._1, t1._2 + t2._2)
    var d = rdd.aggregate(init)(seq, comb)
  • More Actions: countByValue()

    var rdd = sc.parallelize(List(1, 2, 3, 3, 5, 5, 5))
    var dict = rdd.countByValue()
  • More Actions: top(n)

    var a=sc.parallelize(List(4,4,8,1,2, 3, 10, 9))
  • More Actions: takordered()

    sc.parallelize(List(10, 1, 2, 9, 3, 4, 5, 6, 7)).takeOrdered(6)
    var l = List((10, "SG"), (1, "AS"), (2, "AB"), (9, "AA"), (3, "SS"), (4, "RG"), (5, "AU"), (6, "DD"), (7, "ZZ"))
    var r = sc.parallelize(l)
    r.takeOrdered(6)(Ordering[Int].reverse.on(x => x._1))

    Write the following commands in a new cell:

    r.takeOrdered(6)(Ordering[String].reverse.on(x => x._2))

    Write the following commands in a new cell:

    r.takeOrdered(6)(Ordering[String].on(x => x._2))
  • More Actions: foreach()

    def f(x:Int)= println(s"Save $x to DB")
    sc.parallelize(1 to 5).foreach(f)
  • More Actions: foreachPartition(f)

    def partitionSum(itr: Iterator[Int]) =
        println("The sum of the parition is " + itr.sum.toString)
    sc.parallelize(1 to 40, 4).foreachPartition(partitionSum)
  • More Actions: distinct()

    val rdd = sc.parallelize(List((1,20), (1,21), (1,20), (2,20), (2,22), (2,20), (3,21), (3,22)))
  • More Actions: subtract()

    val firstRDD = spark.sparkContext.parallelize(1 to 10)
    val secordRDD = spark.sparkContext.parallelize(6 to 10)
  • More Actions: intersection()

    val firstRDD = spark.sparkContext.parallelize(1 to 10)
    val secordRDD = spark.sparkContext.parallelize(6 to 10)
  • More Actions: cartesian()

    val data1 = sc.parallelize(List(1,2,3))
    val data2 = sc.parallelize(List(3,4,5))
    val cartesianfunc = data1.cartesian(data2)

Loading comments...