Code for : custom-accum-v1.scala
class MyComplex(var x: Int, var y: Int) extends Serializable{
def reset(): Unit = {
x = 0
y = 0
}
def add(p:MyComplex): MyComplex = {
x = x + p.x
y = y + p.y
return this
}
}
import org.apache.spark.AccumulatorParam
class ComplexAccumulatorV1 extends AccumulatorParam[MyComplex] {
def zero(initialVal: MyComplex): MyComplex = {
return initialVal
}
def addInPlace(v1: MyComplex, v2: MyComplex): MyComplex = {
v1.add(v2)
return v1;
}
}
val vecAccum = sc.accumulator(new MyComplex(0,0))(new ComplexAccumulatorV1)
var myrdd = sc.parallelize(Array(1,2,3))
def myfunc(x:Int):Int = {
vecAccum += new MyComplex(x, x)
return x * 3
}
var myrdd1 = myrdd.map(myfunc)
myrdd1.collect()
vecAccum.value.x
vecAccum.value.y
Code for custom-accumulator-v2.scala:
class MyComplex(var x: Int, var y: Int) extends Serializable{ | |
def reset(): Unit = { | |
x = 0 | |
y = 0 | |
} | |
def add(p:MyComplex): MyComplex = { | |
x = x + p.x | |
y = y + p.y | |
return this | |
} | |
} | |
import org.apache.spark.util.AccumulatorV2 | |
object ComplexAccumulatorV2 extends AccumulatorV2[MyComplex, MyComplex] { | |
private val myc:MyComplex = new MyComplex(0,0) | |
def reset(): Unit = { | |
myc.reset() | |
} | |
def add(v: MyComplex): Unit = { | |
myc.add(v) | |
} | |
def value():MyComplex = { | |
return myc | |
} | |
def isZero(): Boolean = { | |
return (myc.x == 0 && myc.y == 0) | |
} | |
def copy():AccumulatorV2[MyComplex, MyComplex] = { | |
return ComplexAccumulatorV2 | |
} | |
def merge(other:AccumulatorV2[MyComplex, MyComplex]) = { | |
myc.add(other.value) | |
} | |
} | |
sc.register(ComplexAccumulatorV2, "mycomplexacc") | |
//using custom accumulator | |
var ca = ComplexAccumulatorV2 | |
var rdd = sc.parallelize(1 to 10) | |
var res = rdd.map(x => ca.add(new MyComplex(x,x))) | |
res.count | |
ca.value.x | |
ca.value.y | |
Taking you to the next exercise in seconds...
Want to create exercises like this yourself? Click here.
No hints are availble for this assesment
Answer is not availble for this assesment
Please login to comment
17 Comments
i cant access the link mentioned below the video :
Link is diabled.please advise
Upvote ShareHi,
1. https://gist.github.com/girisandeep/450ff3d29f20f2e31cdd09ad0f1c0df2#file-custom-accum-v1-scala
2. https://gist.github.com/girisandeep/35b21cca890157afe0084a9e400e2e70#file-custom-accumulator-v2-scala
Thanks.
Upvote ShareIf we extend any object in spark, should we need to override all the functions from that extended object in spark?
Here in spark 2.x version code, I have not understood why should we take copy() function and how is it used in our addition of elements using accumulator.
I tried removing that part but code is failing.
You have explained spark 1.x version code in the video already. Is it possible for you to add explaination to 2.x version code as well. For now, it would be great if you can explain 2.x code in chat itself. Thanks in advance!
Upvote ShareHi,
Thank you for your feedback. We will definitely consider this and would try to create an explanation of the 2.x version of the code.
Thanks.
Upvote ShareI am able to execute version 2 in the console but an error occured in the notebook. Can you please explain the reason? Code is same for both the places.
Output on console
scala> res.count
res1: Long = 10
scala> ca.value.x
res2: Int = 55
scala> ca.value.y
res3: Int = 55
Output of class and object:
var ca = ComplexAccumulatorV2;
var res = rdd.map(x => ca.add(new MyComplex(x,x)));
Notebook version:
sc.version
It is vice-versa for version 1. It is executed successfully in the notebook but an error occured on console.
Notebook output:
myrdd1.collect()
vecAccum.value.x
vecAccum.value.y
Error on console: I guess the error is due to scala version. I am using version 2.x in console. Please confirm.
scala> import org.apache.spark.AccumulatorParamimport org.apache.spark.AccumulatorParam
scala> class ComplexAccumulatorV1 extends AccumulatorParam[MyComplex] {...
<console>:12: error: not found: type AccumulatorParam class ComplexAccumulatorV1 extends AccumulatorParam[MyComplex] {
Upvote ShareThe notebook has older version of spark.
Upvote ShareGetting error: not found: type AccumulatorParam
Script for Version 2.x worked
Upvote ShareHi,
Could you please try the same on the Jupyter notebook given on the right?
Thanks.
Upvote ShareThis comment has been removed.
Hi,
I am getting 'not found: type AccumulatorParam' error.
Please go through the below link for details.
https://cloudxlab.com/assessment/displayslide/559/adv-spark-programming-custom-accumulators?course_id=73&playlist_id=349
Upvote ShareI am getting this error despite of import statement. Please suggest the correction ?
<console>:12: error: not found: type AccumulatorParam
Upvote Shareclass ComplexAccumulatorV1 extends AccumulatorParam[MyComplex] {
I have the error message when defining myfunc() function.
Upvote Share------------->
scala> def myfunc(x:Int):Int = {
| vecAccum += new MyComplex(x, x)
| return x * 3
| }
<console>:26: error: not found: value vecAccum
vecAccum += new MyComplex(x, x)
^
--->
would you check this for me?
Hi, Changsub.
Here, you need to first define the class MyComplex and then import org.apache.spark.AccumulatorParam and the we create the "myrdd" variable.
Kindly run the command sequentially and you should be able to do it.
All the best!
-- Satyajit Das
Upvote ShareCan we run launch the v2 version somehow? The spark-shell version is 1.5.2. I tried export SPARK_MAJOR_VERSION=2 before launching but no luck.
Upvote ShareHi @@disqus_9KAngoench:disqus ,
This guide https://cloudxlab.com/blog/... will help you in accessing Spark 2 on CloudxLab.
Thanks
Upvote ShareThanks
Upvote Share