MapReduce Programming

5 / 13
Run MapReduce jobs using Ha...
   

Welcome to the session on Hadoop streaming.

In this session, we will learn how to use any command as mapper and reducer.

What is Hadoop streaming? Hadoop streaming is a Hadoop Library which makes it possible to use any program as mapper or reducer

We need it because of the following reasons: 1. Writing Java MapReduce is cumbersome 2. There could be legacy code which is written in some other language that needs to be used in mapper or reducer 3. There are many non-java programmers who need big data computation

Please note that here streaming doesn't mean real time data processing or continously running a process forever.

Here it signifies that the data is passed through or streaming through the programs.

You can use any program as mapper or reducer as long as it reads data from standard input and writes to standard output.

The mapper should give out key value pairs separated by tab. If there is no tab, the entire line will be considered as key and value would be null. If there are more than one tabs, the entire content after the first tab will be considered as value. Also, note that, a key-value pair is separated by new line from another key-value pair.

The reducer gets the data which is sorted by the key but not grouped by the key. If you need grouping you would have to do it by yourself inside reducer.

Also, the hadoop streaming is a jar that basically written in Java the way we wrote our mapreduce program. So, it actually ungroups the data before calling the reducer program.

The command you see on the screen uses unix commands to compute the frequencies of the words in the files located in hdfs directory /data/mr/wordcount/input. It uses sed as mapper and uniq as reducer.

Lets login to console and execute the command provided on the screen. Once executed, it will generate the results in a folder wordcount_output_unix in your home HDFS directory.

As you can see has generated the counts of uniq words.

How did it work? The simplistic dataset should help us understand the process. if you have not gone through our video on computing word frequencies using unix without hadoop, please go through that.

In the example, we have input line containing sa re sa ga. sed command is our mapper. The input file will be fed to sed line wise. sed command here is replacing space with newline which means it is generating each word as a key and value is null. Hadoop will sort this data and on sorted data, it would call reducer which uniq -c. uniq -c print the freqencies of lines in the sorted input.

So, if you look at it closely, here hadoop has only replaced the "sort" command in our unix way of solving wordcount.

In the similar fashion, if we have multiple reducers, the uniq command will be executed on multiple machines simultaneously. Note that all of values for a key would go to same reducer.

In the previous case, we used sed and uniq command which were already installed on all machines. What if the programs that we are using as mapper and reducer are not installed on all machines?

You can use the -files option to send the files to all the machine. These files will get copied onto the current workding directory of mapper or reducer on all nodes.

Lets take a look. In the previous example, our data was not cleaned enough. Lets try to clean the data using our shell script.

Create a file called mycmd.sh by using nano text editor. type nano mycmd.sh and press enter. The first line of this file need to have #! followed by the name of program with which we want to execute the script. here since we are using unix shell program so keeping it /bin/bash is good enough.

Then we write the script. In this script first sed is replace the spaces with newline and second sed is removing non-alphanumeric characters from the output of previous command. Then the tr command is converting everything into lowercases. The result would be one word per line in lower cases having only alphanumeric characters.

Then we save it by pressing control x and then y. Afterwords we make it executable with chmod +x mycmd.sh.

Now, our mapper script is ready. Lets use this script instead of sed command in our previous hadoop-streaming job. The only additional parameter we need to provide is "-file". "-file" takes this script and puts on all the machiens before the job starts running. Since it put the file in the current working directory of mapper, we need to refer it with ./

You can also set the number of reducers by using a paramter -D mapred.reduce.tasks=number of reducers.

Once this program completes, the output would be in wordcount_clean_unix directory in HDFS home. Please check. It is very clean now. The result is more meaningful now.

Few notes about mapreduce streaming. It is OK to have no reducer, you can omit -reducer and the program. If you dont have any reducer and don't want any sorting, you can use an option -D space mapred.reduce.tasks equals 0

In such cases, number of mappers would decide how many files are generated otherwise number of files generated are equal to the total number of reduce tasks.

Number of map tasks is equal to the total number of inputsplits and is a function of InputFormat.

If we want to customize number of mappers, we can do so either using conf.setNumMapTasks in our java driver or using -D mapred.map.tasks = mapstasks from hadoop jar command line.

Since number of reduce tasks is a choice that we need to make, a common question is how many reduce tasks are good enough? If you have too many reduce tasks, the job will finish quickly but the framework load will be higher. Also, more number of reduce tasks lowers the chances of failures.

After experimentation, it was realized that our reduce tasks should be somewhere between .95 to 1.75 times the maximum tasks possible. The maximum tasks that can be executed on your entire cluster is total number of nodes in the cluster multiplied by maximum simultaneous tasks that be executed by each node which is 2 by default.

Testing of your mapreduce job is very much required.

So, generally you first test on small random sampled data instead of on head or tail of a file.

You should separately Test Mapper and Reducer.

A streaming Job could be tested with simple unix commands without hadoop: cat inputfile | mymapper | sort | myreducer > outputfile