MapReduce Programming

1 / 13

Writing MapReduce Code Using Java




Not able to play video? Try with vimeo

In this session, we are going to learn how to write MapReduce programs using Java.

When we want to handle big data, instead of processing data using a conventional way, we breakdown our logic into two parts: map and reduce. The map gets executed on all the machines having data and gives output in the form of key-value pairs. And reduce receives the output of all maps grouped by key. The grouping is done by the MapReduce framework. Reduce can aggregate the values per key.

Let us learn how to write these map and reduce parts in Java. Why Java? Since Apache Hadoop is written in Java, you can modify the behavior to a very large extent.

We are going to write a map-reduce job to count unique words in a plain text file big.txt stored in HDFS in /data/mr/wordcount/input/.

Let us first understand how to write a mapper. We write a piece of logic that would break down the input record into key-value. The input record is also made up of two parts - key and value.

The default input format is TextInputFormat which is used for handling plain text files. The textinput format creates a record for every line in the input file. A record has two parts key and value. In the case of TextInputput, the value is the content of the line and the key is the location where the line starts.

In the example in the diagram, The raw data is converted to three records having values "This is a cow", "this is a buffalo" and "there is a hen". Since the first line, "this is a cow" starts at location 0, the key is 0. For the line "there is a hen" starts at thirty-fourth byte or character, the key for the record is 34. Please note that this is not a line number.

A class in java is a complex data type that can have methods and member variables in it too. A class is like a blueprint. We create many objects of a class. We can create many objects our of stubmapper. An example of a class could be Person and "sandeep giri" is an instance or object of it. We will write code that defines class between these two curly brackets.

The name of this class could be anything. Here the name of our class is StubMapper.

We need to create a class for the mapper. It means that the behavior of our class StubMapper would be inherited from Mapper. We will handover this stubmapper class to mapreduce framework and the framework will create an instance of it for each input split. Your mapper class must extend the mapper class. It is mandatory from the framework side.

While extending the Mapper class, we can define the data types of the input key, input value, and output key and output value. Between the angle brackets, the first one signifies the Data type of Input Key. In our example, it is the number of bytes at which the value is starting.

The second one is the Data type of input value. In our case, the input value is each line, i.e. Text

The third data type is of output key. We are going to give key as word, therefore it is Text.

Forth one is for the output value. We are going to give value as 1, therefore, it is Long.

Then we define the function with a name map() and put an override keyword as an annotation on it. This function should return nothing that is void but has three arguments input key, input value, and context. This is the function in which we write the logic to convert the input record into key-values. We will be writing key-values into a context object. It is up to us how many and what key-values we give out.

So, the input record that is key and value along with the context is made available to you in this function, you can write whatever logic you want to generate key-values.

In our example where we have the InputFormat as TextInputFormat, the map function would be executed against every line of the file. And your code inside this map() function would be provided with each line as input value and the line's starting location in the input file as input key. And we would be writing the output key-value pairs in the context object.

First, we split the input line by space or tabs into an array of strings. Then, for each of the words, we give out the word as key and numeric 1 as the value.

While giving out the key and the value to MapReduce via context, why are we writing new Text(word) and new LongWritable(1)?

The usual classes or types of Java to represent numbers, text and other objects were not efficient. So, the MapReduce team designed their own classes called writables corresponding to most of the java primitive objects. So, while giving out the values to MapReduce we need to wrap the basic objects into the wrappers provided by Mapreduce framework. It is exactly like wrapping a gift for someone's birthday.

And the values that we receive from a framework such as the first two arguments of map() function, we need to unwrap those too before using. In our example, we are converting the Text class to String by calling .toString function on the text object.

Now, here is the complete code along with exceptions declaration which is part of java way of handling errors. To take a look at complete code, please see it on the displayed URL of our GitHub repository:

https://github.com/singhabhinav/cloudxlab/tree/master/hdpexamples/java/src/com/cloudxlab/wordcount

What does this mapper code do? It basically converts each line into key-value pairs having each word of the line as key and value as 1. It does not do any sorting. Also, the map() method is called for each line. In our example, it has been called three times while the class StubMapper is initialized only once per InputSplit

In a similar way we create reducer. We define our reducer class by extending the Reducer from MapReduce library framework and also define the input-output key-value datatypes. Then, we define the reduce method. Notice the arguments of the reduce method. The first argument is the key and second is an iterator of values which is equivalent of list or cursor. So, your reduce method is called on each key and its values. Please recollect that the output of the mapper is grouped on a key by MapReduce framework and then for each group your reduce method will be called. All of the values for each unique key will be passed as the second argument.

The role of the reduce function is to process the list of values for each key. In our case, we are simply iterating over ones and summing them up. So, this would give the count for each unique word. The result of the reducer is stored in HDFS.

Once we are done writing mapper and reducer, we can tie things together by creating a job using Driver. The driver code looks like usual Java class having a main method which is traditionally the entry point in Java programs.

We create a job object, set its name correctly. Then we set our StubMapper Class as the mapper. Then we set our StubReducer class as the reducer. Then we define output key/values data types. Then we define the input path - the directory or file from which our job will read data. We then define the output path which should be a directory that doesn't exist yet. If it exists, the job will throw an error. please note that the input path and output path both are in HDFS.

Once we are done configuring, we can start the job using job.waitForCompletion()


Loading comments...