Enrollments closing soon for Post Graduate Certificate Program in Applied Data Science & AI By IIT Roorkee | 3 Seats Left

  Apply Now

Adv Spark Programming

25 / 52

Adv Spark Programming - Accumulators

Code for accumulator-example.scala

Script

Normally, when a function is passed to a Spark operation (such as map or reduce) is executed on a remote cluster node, it works on separate copies of all the variables used in the function. These variables are copied to each machine, and no updates to the variables on the remote machine are propagated back to the driver program. Supporting general, read-write shared variables across tasks would be inefficient. However, Spark does provide two limited types of shared variables for two common usage patterns: broadcast variables and accumulators.

When your program has to report back to the launcher, you would need accumulators. All of your spark applications would write to accumulator variables. Since the order is not guaranteed in the updates to accumulator variables therefore only associative operations lead to correct results.

Accumulators are variables that are only “added” to through an associative operation. Therefore accumulators can be efficiently supported in parallel.

They can be used to implement counters (as in MapReduce) or summations. Accumulators can be used to track the progress of your Spark application.

In this example, we are breaking lines of text into one word per line. And while doing so we are counting the number of blank lines encountered till that point in time. Let's go stepwise to understand the code. You will have to run this code on spark-shell.

First, let’s set the log level to error. This avoids unnecessary text on the screen. It would only display errors if any.

sc.setLogLevel("ERROR")

Now, load the file from the HDFS with textFile method.

var file = sc.textFile("/data/mr/wordcount/input/")

Next, we initialize the numBlankLines as an accumulator. We are passing the initial value as 0. The data type of accumulator is guessed based on the initial value. If you want the accumulator to store long, you can specify zero followed by L as the initial value.

var numBlankLines = sc.accumulator(0)

Now, we define a method toWords that takes a line and returns an array of words. This method adds one to the accumulator variable if the line length is 0. Please note that numBlankLines is not an integer. It is an instance of an accumulator holding an integer value.

def toWords(line:String): Array[String] = {    
  if(line.length == 0) {numBlankLines += 1}    
  return line.split(" ");    
}

Now, let’s call toWords using flatMap and then save the results to a file in HDFS.

var words = file.flatMap(toWords)
words.saveAsTextFile("words3")

Once done, check the value of the accumulator by calling the member variable value on numBlankLines. It should be 24857

printf("Blank lines: %d", numBlankLines.value)    
//Blank lines: 24858

We have already learned that yarn re-executes failed tasks or slow tasks if speculative execution is on. This means, that if a task is slow, Spark would preemptively launch a speculative copy of the slow worker task. What would happen to accumulators in such cases? Will the values of the accumulator be rolled back or will remain the same?

The result is that the same function may run multiple times on the same data. Does it mean accumulators will give the wrong result?

Yes, accumulators in Transformation might give wrong results because due to fault tolerance the same function might be executed multiple times. But for actions, it would not be executed multiple times. For accumulators in actions, Each task’s accumulator update is applied once. For a reliable absolute value counter, put it inside an action. In transformations, this guarantee doesn't exist. In transformations, use accumulators for debugging only.

Out of the box, Spark supports accumulators of types Double, Long, and Float.

Spark also includes an API to define custom accumulator types and custom aggregation operations

(e.g., finding the maximum of the accumulated values instead of adding them).

Custom accumulators need to extend AccumulatorParam.

Slides - Adv Spark Programming (2)


No hints are availble for this assesment

Answer is not availble for this assesment

Loading comments...