MapReduce Programming

5 / 13

Run MapReduce jobs using Hadoop Streaming




Not able to play video? Try with youtube

Welcome to the session on Hadoop streaming.

In this session, we will learn how to use any command as mapper and reducer.

What is Hadoop streaming? Hadoop streaming is a Hadoop Library which makes it possible to use any program as mapper or reducer

We need it because of the following reasons:

  1. Writing Java MapReduce is cumbersome
  2. There could be legacy code which is written in some other language that needs to be used in mapper or reducer
  3. There are many non-java programmers who need big data computation

Please note that here streaming doesn't mean real-time data processing or continuously running a process forever.

Here it signifies that the data is passed through or streamed through the programs.

You can use any program as mapper or reducer as long as it reads the data from standard input and writes the data to standard output.

The mapper should give out key-value pairs separated by tab. If there is no tab, the entire line will be considered as key and value will be null. If there are more than one tabs, the entire content after the first tab will be considered as a value, which means the value would contain the tab. Also, please note that a key-value pair is separated by new line from another key-value pair.

The reducer gets the data which is sorted by the key but not grouped by the key. If you need grouping you would have to do it by yourself inside reducer.

Also, the Hadoop streaming is a jar that basically is written in Java the way we wrote our MapReduce program. So, it actually ungroups the data before calling the reducer program.

The command you see on the screen uses Unix commands to compute the frequencies of the words in the files located in hdfs directory /data/mr/wordcount/input. It uses sed as a mapper and uniq as reducer.

Lets login to console and execute the command provided on the screen. Once executed, it will generate the results in a folder "wordcount_output_unix" in your home HDFS directory.

As you can see has generated the counts of unique words.

How did it work? The simplistic dataset should help us understand the process. if you have not gone through our video on computing word frequencies using Unix without Hadoop, please go through that.

In the example, we have input line containing sa re sa ga. sed command is our mapper. The input file will be fed to sed line wise. sed command here is replacing space with newline which means it is generating each word as a key and value is null. Hadoop will sort this data and on sorted data, it would call reducer which uniq -c. uniq -c print the frequencies of lines in the sorted input.

So, if you look at it closely, here Hadoop has only replaced the sort command in our Unix way of solving wordcount.

In the similar fashion, if we have multiple reducers, the uniq command will be executed on multiple machines simultaneously. Note that all of the values for a key would go to same reducer.

In the previous case, we used sed and uniq command which were already installed on all machines. What if the programs that we are using as mapper and reducer are not installed on all machines?

We can use the -files option to send the files to all of the machines. These files will get copied onto the current working directory of mapper or reducer on all nodes.

Let us take a look. In the previous example, our data was not cleaned enough. Let's try to clean the data using our shell script.

Create a file called mycmd.sh by using nano text editor. type nano mycmd.sh and press enter. The first line of this file need to start with #! followed by the name of program with which we want to execute the script. Here since we are using Unix shell programming so we keep it /bin/bash. Then we write the script. In this script, first sed is replacing the spaces with newline and second sed is removing non-alphanumeric characters from the output of previous command. Then the tr command is converting everything into lowercases. The result would be one word per line in lower cases having only alphanumeric characters.

Then we save it by pressing control x and then y and then pressing enter on the file location. Afterwords we make it executable with chmod +x mycmd.sh. In Unix chmod +x is required if you want to execute the program.

Now, our mapper script is ready. Let's use this script instead of sed command in our previous hadoop-streaming job. The only additional parameter we need to provide is -file or -files. -file takes this script and puts on all the machines before the job starts running. Since it put the file in the current working directory of the mapper, we need to refer it with ./

You can also set the number of reducers by using a paramter -D mapred.reduce.tasks=number of reducers.

Once this program completes, the output would be in wordcount_clean_unix directory in HDFS home. Please check. It is very clean now. The result is more meaningful now.

Few notes about mapreduce streaming. It is OK to have no reducer, you can omit -reducer and the command altogether. If you don't have any reducer and don't want any sorting, you can use an option -D mapred.reduce.tasks=0

In such cases, number of mappers would decide how many files are generated otherwise number of files generated are equal to the total number of reduce tasks.

Number of map tasks is equal to the total number of inputsplits and is a function of InputFormat.

If we want to customize number of mappers, we can do so either using conf.setNumMapTasks in our java driver or using -D mapred.map.tasks = | mapstasks from hadoop jar command line.

Since number of reduce tasks is a choice that we need to make, a common question is how many reduce tasks are good enough? If you have too many reduce tasks, the job will finish quickly but the framework load will be higher. Also, more number of reduce tasks lowers the chances of failures.

After experimentation, it was realized that our reduce tasks should be somewhere between .95 to 1.75 times the maximum tasks possible. The maximum tasks that can be executed on your entire cluster is total number of nodes in the cluster multiplied by maximum simultaneous tasks that be executed by each node which is 2 by default.

Testing of your MapReduce job is very much required.

So, generally, you would first test on small random sampled data instead of on head or tail of a file.

You should separately Test Mapper and Reducer.

A streaming Job could be tested with simple unix commands without hadoop: cat inputfile | mymapper | sort | myreducer > outputfile in this command inputfile is the file that contains random sample data, mymapper is your mapper command and myreducer is your reducer command and outputfile would have the result of the entire pipeline.

Commands:

hadoop jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar -input /data/mr/wordcount/input -output wordcount_output_unix -mapper 'sed "s/ /\n/g"' -reducer "uniq -c"

Loading comments...