Adv Spark Programming

29 / 52

Adv Spark Programming - Custom Accumulators

Code for : custom-accum-v1.scala

class MyComplex(var x: Int, var y: Int) extends Serializable{
def reset(): Unit = {
    x = 0
    y = 0
}
def add(p:MyComplex): MyComplex = {
    x = x + p.x
    y = y + p.y
    return this
}
}

import org.apache.spark.AccumulatorParam
class ComplexAccumulatorV1 extends AccumulatorParam[MyComplex] {

    def zero(initialVal: MyComplex): MyComplex = {
        return initialVal
    }

    def addInPlace(v1: MyComplex, v2: MyComplex): MyComplex = {
        v1.add(v2)
        return v1;
    }
}

val vecAccum = sc.accumulator(new MyComplex(0,0))(new ComplexAccumulatorV1)

var myrdd = sc.parallelize(Array(1,2,3))
def myfunc(x:Int):Int = {
    vecAccum += new MyComplex(x, x)
    return x * 3
}
var myrdd1 = myrdd.map(myfunc)
myrdd1.collect()
vecAccum.value.x
vecAccum.value.y

Code for custom-accumulator-v2.scala:

Slides - Adv Spark Programming (2)


No hints are availble for this assesment

Answer is not availble for this assesment

Loading comments...