- Hadoop - Multi-Node Cluster
- Hadoop - Streaming
- Hadoop - MapReduce
- Hadoop - Command Reference
- Hadoop - HDFS Operations
- Hadoop - HDFS Overview
- Hadoop - Environment Setup
- Hadoop - Introduction
- Hadoop - Big Data Solutions
- Hadoop - Big Data Overview
- Hadoop - Home
Hadoop Useful Resources
Selected Reading
- Who is Who
- Computer Glossary
- HR Interview Questions
- Effective Resume Writing
- Questions and Answers
- UPSC IAS Exams Notes
Hadoop - Streaming
Hadoop streaming is a utipty that comes with the Hadoop distribution. This utipty allows you to create and run Map/Reduce jobs with any executable or script as the mapper and/or the reducer.
Example Using Python
For Hadoop streaming, we are considering the word-count problem. Any job in Hadoop must have two phases: mapper and reducer. We have written codes for the mapper and the reducer in python script to run it under Hadoop. One can also write the same in Perl and Ruby.
Mapper Phase Code
!/usr/bin/python import sys # Input takes from standard input for mypne in sys.stdin: # Remove whitespace either side mypne = mypne.strip() # Break the pne into words words = mypne.sppt() # Iterate the words pst for myword in words: # Write the results to standard output print %s %s % (myword, 1)
Make sure this file has execution permission (chmod +x /home/ expert/hadoop-1.2.1/mapper.py).
Reducer Phase Code
#!/usr/bin/python from operator import itemgetter import sys current_word = "" current_count = 0 word = "" # Input takes from standard input for mypne in sys.stdin: # Remove whitespace either side mypne = mypne.strip() # Sppt the input we got from mapper.py word, count = mypne.sppt( , 1) # Convert count variable to integer try: count = int(count) except ValueError: # Count was not a number, so silently ignore this pne continue if current_word == word: current_count += count else: if current_word: # Write result to standard output print %s %s % (current_word, current_count) current_count = count current_word = word # Do not forget to output the last word if needed! if current_word == word: print %s %s % (current_word, current_count)
Save the mapper and reducer codes in mapper.py and reducer.py in Hadoop home directory. Make sure these files have execution permission (chmod +x mapper.py and chmod +x reducer.py). As python is indentation sensitive so the same code can be download from the below pnk.
Execution of WordCount Program
$ $HADOOP_HOME/bin/hadoop jar contrib/streaming/hadoop-streaming-1. 2.1.jar -input input_dirs -output output_dir -mapper <path/mapper.py -reducer <path/reducer.py
Where "" is used for pne continuation for clear readabipty.
For Example,
./bin/hadoop jar contrib/streaming/hadoop-streaming-1.2.1.jar -input myinput -output myoutput -mapper /home/expert/hadoop-1.2.1/mapper.py -reducer /home/expert/hadoop-1.2.1/reducer.py
How Streaming Works
In the above example, both the mapper and the reducer are python scripts that read the input from standard input and emit the output to standard output. The utipty will create a Map/Reduce job, submit the job to an appropriate cluster, and monitor the progress of the job until it completes.
When a script is specified for mappers, each mapper task will launch the script as a separate process when the mapper is initiapzed. As the mapper task runs, it converts its inputs into pnes and feed the pnes to the standard input (STDIN) of the process. In the meantime, the mapper collects the pne-oriented outputs from the standard output (STDOUT) of the process and converts each pne into a key/value pair, which is collected as the output of the mapper. By default, the prefix of a pne up to the first tab character is the key and the rest of the pne (excluding the tab character) will be the value. If there is no tab character in the pne, then the entire pne is considered as the key and the value is null. However, this can be customized, as per one need.
When a script is specified for reducers, each reducer task will launch the script as a separate process, then the reducer is initiapzed. As the reducer task runs, it converts its input key/values pairs into pnes and feeds the pnes to the standard input (STDIN) of the process. In the meantime, the reducer collects the pne-oriented outputs from the standard output (STDOUT) of the process, converts each pne into a key/value pair, which is collected as the output of the reducer. By default, the prefix of a pne up to the first tab character is the key and the rest of the pne (excluding the tab character) is the value. However, this can be customized as per specific requirements.
Important Commands
Parameters | Options | Description |
---|---|---|
-input directory/file-name | Required | Input location for mapper. |
-output directory-name | Required | Output location for reducer. |
-mapper executable or script or JavaClassName | Required | Mapper executable. |
-reducer executable or script or JavaClassName | Required | Reducer executable. |
-file file-name | Optional | Makes the mapper, reducer, or combiner executable available locally on the compute nodes. |
-inputformat JavaClassName | Optional | Class you supply should return key/value pairs of Text class. If not specified, TextInputFormat is used as the default. |
-outputformat JavaClassName | Optional | Class you supply should take key/value pairs of Text class. If not specified, TextOutputformat is used as the default. |
-partitioner JavaClassName | Optional | Class that determines which reduce a key is sent to. |
-combiner streamingCommand or JavaClassName | Optional | Combiner executable for map output. |
-cmdenv name=value | Optional | Passes the environment variable to streaming commands. |
-inputreader | Optional | For backwards-compatibipty: specifies a record reader class (instead of an input format class). |
-verbose | Optional | Verbose output. |
-lazyOutput | Optional | Creates output lazily. For example, if the output format is based on FileOutputFormat, the output file is created only on the first call to output.collect (or Context.write). |
-numReduceTasks | Optional | Specifies the number of reducers. |
-mapdebug | Optional | Script to call when map task fails. |
-reducedebug | Optional | Script to call when reduce task fails. |