MapReduce Programming

1 / 13
Writing MapReduce Code Using Java

In this session, we are going to learn how to write MapReduce programs using Java.

When we want to handle big data, instead of processing data using conventional way, we breakdown our logic into two parts: map and reduce. The map gets executed on all the machines having data and gives output in the form of key-value pairs. And reduce receives the output of all maps grouped by key. The grouping is done by mapreduce framework. Reduce can aggregate the values per key.

Let us learn how to write these map and reduce parts in Java. Why Java? Since Apache Hadoop is written in Java, you can modify the behaviour to a very large extent.

We are going to write a map-reduce job to count unique words in a plain text file big.txt stored in HDFS in /data/mr/wordcount/input/.

Let us first understand how to write mapper. We write a piece of logic which would break down the input record into key-value. The input record is also made up two parts - key and value.

The default inputformat is TextInputFormat which is used for handling plain text files. The textinput format creates a record for everyline in the input file. A record has two parts key and value. In case of TextInputput, the value is the content of line and the key is the location where line starts.

In the example in diagram, The raw data is converted to three records having values "This is a cow", "this is a buffalo" and "there is a hen". Since the first line "this is a cow" starts at location 0, the key is 0. For the line "there is a hen" starts at thirty forth byte or character, the key for the record is 34. Please let is not line number.

A class in java is a complex data type that can have methods and member variable in it too. A class is like is blue print. We create many objects of a class. We can create many objects our of stubmapper. Example of a class could be Person and "sandeep giri" is an instance or object of it. We will write code that defines class between these two curly brackets.

The name of this class could be any thing. Here the name of our class is StubMapper.

We need to create a class for mapper. It means that the behaviour of our class StubMapper would be inherited from Mapper. We will handover this stubmapper class to mapreduce framework and framework will create instance of it for each input split. Your mapper class must extend mapper class. It is mandatory from the framework side.

While extending the Mapper class, we can define the data types of input key, value and output key and value. Between the angle brackets, the first one signifies the Data type of Input Key. In our example, it is number of bytes at which the value is starting.

The second one is the Data type of input value. In our case, input value is each line, i.e. Text

The third datatype is of output key. We are going to give key as word, therefore it is Text

Forth one is for output value. We are going to give value as 1 therefore it is Long

Then we define the function with a name map() and put an override keyword on it. This function should return nothing that is void but has three arguments input key, input value and context. This is the function in which we write the logic to convert the input record into key-values. We will be writing key-values into context object. It is upto us how many and what key-values we give out.

So, the input record that is key and value along with the context is made available to you in this function, you can write whatever logic you want to generate key-values.

In our example where we have the InputFormat as TextInputFormat, the map function would be executed against every line of the file. And your code inside this map() function would be provided with each line as input value and the line's starting location in the input file as input key. And we would be writing the output key value pairs in the context object.

First, we split the input line by space or tabs into an array of strings. Then, for each of the words, we give out the word as key and numeric 1 as the value.

While giving out the key and the value to mapreduce via context, why are we writing "new Text(word)" and new LongWritable(1)?

The usual classes or types of Java to represent numbers, text and other objects were not efficient. So, MapReduce team designed their own classes called writables corresponding to most of the java primitive objects. So, while giving out the values to MapReduce we need to wrap the basic objects into the wrappers provided by Mapreduce framework. It is exactly like wrapping a gift for someone's birthday.

And the values that we recieve from framework such as first two arguments of map() function, we need to unwrap those too before using. In out example, we are converting the Text class to String by calling .toString function on the text object.

Now, here is the complete code along with exceptions declaration which is part of java way of handling errors. To take a look at complete code, please see it on the displayed url of our github repository: https://github.com/singhabhinav/cloudxlab/tree/master/hdpexamples/java/src/com/cloudxlab/wordcount

What does this mapper code do? It basically converts each line into key-value pairs having each word as key and value as 1. It does not do any sorting. Also, the map() method is called for each line. In our example, it has been called three times while the class StubMapper is initialized only once per InputSplit

In the similar way we create reducer. We define our reducer class by extending the Reducer from mapreduce framework and also define the input-output key-value datatypes. Then, we define the reduce method. Notice the arguments of reduce method. First argument is the key and second is an iterator of values which is equivalent of list or cursor. So, your reduce method is called on each key and its value. Please recollect that the output of mapper is grouped on key by mapreduce framework and then for each group your reduce method will be called. All of the values for each uniq key will be passed as second argument.

The role of the reduce function is to process the list of values for each key. In our case, we are simply iterating over ones and summing it up. So, this would give the count for each uniq word. The result of reducer is stored in HDFS.

Once we are done writing mapper and reducer, we can tie things together by creating a job using Driver. The driver code looks like usual Java class having a main method which is traditionally the entry point in Java programs.

We create a job object set its name correctly. Then we set our StubMapper Class as the mapper. Then we set our stubreducer class as the reducer. Then we define output key/values data types. Then we define the input path - the directory or file from which our job will read data. We then define the output path which should be a directory that doesn't exist. If it exists, the job will throw error. please note that the input path and output path both are in HDFS.

Once we are done configuring, we can start the job using job.waitForCompletion()