Enrollments closing soon for Post Graduate Certificate Program in Applied Data Science & AI By IIT Roorkee | 3 Seats Left

  Apply Now

Advanced Spark Programming with Python

26 / 49

How to create custom Accumulators in Apache Spark (Python)

Sometimes, the existing accumulator is not sufficient and you want to customize how the values are accumulated. In such cases, you need to create your custom accumulator.

You need to create a class by extending AccumulatorParam. This class should implement two methods zero and addInPlace. In the example below, we are trying to create an accumulator that works on tuples. It records a pair of values. This is just to demonstrate the usecase.

from pyspark import AccumulatorParam

class TupleAccumulator(AccumulatorParam):
    def zero(self, initialVal):
       return initialVal
    def addInPlace(self, v1, v2):
        return v1[0] + v2[0],v1[1]+v2[1]

That's it! Now, you can use it. Pass it as the second argument in sc.accumulator.

vecAccum = sc.accumulator((0,0),TupleAccumulator())

Now you can use it like other accumulators.

myrdd = sc.parallelize([1,2,3])
def myfunc(x):
    print("vecAccum add")
    vecAccum.add((x, x))

myrdd.foreach(myfunc)

In order to see the accumulated value, we can simply use .value attribute of the accumulator instance vecAccum.value

The last step should print (6,6) which is a tuple.


No hints are availble for this assesment

Answer is not availble for this assesment

Loading comments...