Enrollments closing soon for Post Graduate Certificate Program in Applied Data Science & AI By IIT Roorkee | 3 Seats Left
Apply NowLogin using Social Account
     Continue with GoogleLogin using your credentials
Sometimes, the existing accumulator is not sufficient and you want to customize how the values are accumulated. In such cases, you need to create your custom accumulator.
You need to create a class by extending AccumulatorParam. This class should implement two methods zero
and addInPlace
. In the example below, we are trying to create an accumulator that works on tuples. It records a pair of values. This is just to demonstrate the usecase.
from pyspark import AccumulatorParam
class TupleAccumulator(AccumulatorParam):
def zero(self, initialVal):
return initialVal
def addInPlace(self, v1, v2):
return v1[0] + v2[0],v1[1]+v2[1]
That's it! Now, you can use it. Pass it as the second argument in sc.accumulator
.
vecAccum = sc.accumulator((0,0),TupleAccumulator())
Now you can use it like other accumulators.
myrdd = sc.parallelize([1,2,3])
def myfunc(x):
print("vecAccum add")
vecAccum.add((x, x))
myrdd.foreach(myfunc)
In order to see the accumulated value, we can simply use .value
attribute of the accumulator instance
vecAccum.value
The last step should print (6,6) which is a tuple.
Taking you to the next exercise in seconds...
Want to create exercises like this yourself? Click here.
No hints are availble for this assesment
Answer is not availble for this assesment
Loading comments...