Adv Spark Programming

29 / 52

Adv Spark Programming - Custom Accumulators

Code for : custom-accum-v1.scala

class MyComplex(var x: Int, var y: Int) extends Serializable{
def reset(): Unit = {
    x = 0
    y = 0
}
def add(p:MyComplex): MyComplex = {
    x = x + p.x
    y = y + p.y
    return this
}
}

import org.apache.spark.AccumulatorParam
class ComplexAccumulatorV1 extends AccumulatorParam[MyComplex] {

    def zero(initialVal: MyComplex): MyComplex = {
        return initialVal
    }

    def addInPlace(v1: MyComplex, v2: MyComplex): MyComplex = {
        v1.add(v2)
        return v1;
    }
}

val vecAccum = sc.accumulator(new MyComplex(0,0))(new ComplexAccumulatorV1)

var myrdd = sc.parallelize(Array(1,2,3))
def myfunc(x:Int):Int = {
    vecAccum += new MyComplex(x, x)
    return x * 3
}
var myrdd1 = myrdd.map(myfunc)
myrdd1.collect()
vecAccum.value.x
vecAccum.value.y

Code for custom-accumulator-v2.scala:

class MyComplex(var x: Int, var y: Int) extends Serializable{
def reset(): Unit = {
x = 0
y = 0
}
def add(p:MyComplex): MyComplex = {
x = x + p.x
y = y + p.y
return this
}
}
import org.apache.spark.util.AccumulatorV2
object ComplexAccumulatorV2 extends AccumulatorV2[MyComplex, MyComplex] {
private val myc:MyComplex = new MyComplex(0,0)
def reset(): Unit = {
myc.reset()
}
def add(v: MyComplex): Unit = {
myc.add(v)
}
def value():MyComplex = {
return myc
}
def isZero(): Boolean = {
return (myc.x == 0 && myc.y == 0)
}
def copy():AccumulatorV2[MyComplex, MyComplex] = {
return ComplexAccumulatorV2
}
def merge(other:AccumulatorV2[MyComplex, MyComplex]) = {
myc.add(other.value)
}
}
sc.register(ComplexAccumulatorV2, "mycomplexacc")
//using custom accumulator
var ca = ComplexAccumulatorV2
var rdd = sc.parallelize(1 to 10)
var res = rdd.map(x => ca.add(new MyComplex(x,x)))
res.count
ca.value.x
ca.value.y

Slides - Adv Spark Programming (2)


No hints are availble for this assesment

Answer is not availble for this assesment

Please login to comment

17 Comments

If we extend any object in spark, should we need to override all the functions from that extended object in spark?

Here in spark 2.x version code, I have not understood why should we take copy() function and how is it used in our addition of elements using accumulator. 

    def copy():AccumulatorV2[MyComplex, MyComplex] = {
        return ComplexAccumulatorV2
    }

I tried removing that part but code is failing. 

You have explained spark 1.x version code in the video already. Is it possible for you to add explaination to 2.x version code as well. For now, it would be great if you can explain 2.x code in chat itself. Thanks in advance!

  Upvote    Share

Hi,

Thank you for your feedback. We will definitely consider this and would try to create an explanation of the 2.x version of the code.

Thanks.

  Upvote    Share

I am able to execute version 2 in the console but an error occured in the notebook. Can you please explain the reason? Code is same for both the places.

Output on console

scala> res.count
res1: Long = 10
scala> ca.value.x
res2: Int = 55
scala> ca.value.y
res3: Int = 55

Output of class and object:

defined class MyComplex
defined object ComplexAccumulatorV2

var ca = ComplexAccumulatorV2;

ca = ComplexAccumulatorV2$(id: 195, name: Some(mycomplexacc), value: MyComplex@7161862c)
var rdd = sc.parallelize(1 to 10);

rdd = ParallelCollectionRDD[8] at parallelize at <console>:29

var res = rdd.map(x => ca.add(new MyComplex(x,x)));

Name: Compile Error
Message: <console>:36: error: type mismatch;
 found   : MyComplex
 required: MyComplex
       var res = rdd.map(x => ca.add(new MyComplex(x,x)));

Notebook version:

sc.version

2.1.1.2.6.2.0-205

It is vice-versa for version 1. It is executed successfully in the notebook but an error occured on console.

Notebook output:

myrdd1.collect()

[3, 6, 9]

vecAccum.value.x

6

vecAccum.value.y

6

Error on console: I guess the error is due to scala version. I am using version 2.x in console. Please confirm.

scala> import org.apache.spark.AccumulatorParamimport org.apache.spark.AccumulatorParam

scala> class ComplexAccumulatorV1 extends AccumulatorParam[MyComplex] {...

<console>:12: error: not found: type AccumulatorParam class ComplexAccumulatorV1 extends AccumulatorParam[MyComplex] {

  Upvote    Share

The notebook has older version of spark.

  Upvote    Share

Getting  error: not found: type AccumulatorParam

  Upvote    Share

Script for Version 2.x worked

  Upvote    Share

Hi,

Could you please try the same on the Jupyter notebook given on the right?

Thanks.

  Upvote    Share

This comment has been removed.

Hi,

I am getting 'not found: type AccumulatorParam' error.

Please go through the below link for details.

 

https://cloudxlab.com/assessment/displayslide/559/adv-spark-programming-custom-accumulators?course_id=73&playlist_id=349

  Upvote    Share

I am getting this error despite of import statement. Please suggest the correction ?

<console>:12: error: not found: type AccumulatorParam
       class ComplexAccumulatorV1 extends AccumulatorParam[MyComplex] {

  Upvote    Share

I have the error message when defining myfunc() function.
------------->
scala> def myfunc(x:Int):Int = {
| vecAccum += new MyComplex(x, x)
| return x * 3
| }
<console>:26: error: not found: value vecAccum
vecAccum += new MyComplex(x, x)
^
--->
would you check this for me?

  Upvote    Share

Hi, Changsub.

Here, you need to first define the class MyComplex and then import org.apache.spark.AccumulatorParam and the we create the "myrdd" variable.
Kindly run the command sequentially and you should be able to do it.

All the best!

-- Satyajit Das

  Upvote    Share

Can we run launch the v2 version somehow? The spark-shell version is 1.5.2. I tried export SPARK_MAJOR_VERSION=2 before launching but no luck.

  Upvote    Share
Abhinav Singh

Hi @@disqus_9KAngoench:disqus ,

This guide https://cloudxlab.com/blog/... will help you in accessing Spark 2 on CloudxLab.

Thanks

  Upvote    Share

Thanks

  Upvote    Share