I have used a Map Reduce based system at my present employer (Bank of America – Merrill Lynch) to process (read “crunch”) extremely large datasets in matter of seconds. Sometimes I used those to price bonds in real-time otherwise it was used for data processing/reporting purposes.
It is an in-house product, known as Hugs framework and it is based on Map-Reduce implementation though it doesn’t have all the exciting features of Apache Hadoop. I have been tinkering around with Hadoop lately and thought of putting a basic tutorial. I will be writing a detailed one later on explaining the code and different aspects of Hadoop. I’m using the latest stable version 1.1.1 of Hadoop on my Ubuntu.
I will use a weather dataset from NOAA as an example and will try to find out the average temperature for a given year.
If I were to find the avg temperature for the year 1901 via a UNIX bash script, this is how I would do it:
gaurav@asgard:~$ cat 1901 | awk '{temp = substr($0, 88, 5) + 0; q = substr($0, 93, 1); if (temp != 9999 && q ~ /[01459]/) { count += 1; sum += temp; } } END { print sum, count, sum/count }'
Output: 306529 6564 46.6985
Here, I just computed the result from one file for a single year! It would take a lot longer time to process data for 100+ years!
Now to find the same data using Hadoop, I will be writing program using 2 languages – Java and Python. Hadoop API natively supports Java but other languages can also be used. Languages such as Ruby and Python can interact with Hadoop via Streams (input and output). If you are into C++, you should have a look at Hadoop Pipes.
Usually, you will write two components – Map and Reduce. For optimization purposes, you can also add a combiner right in between.
* Please make sure JAVA_HOME env variable is pointing to the correct version on your box.
gaurav@asgard:~$ echo $JAVA_HOME
/usr/lib/jvm/java-7-openjdk-amd64
* Download the latest hadoop libs (stable is preferred) from http://hadoop.apache.org/releases.html#Download and unzip somewhere on your box.
* Include “hadoop_folder/bin” in your PATH ENV variable. To make sure hadoop runs fine on your box, try this command:
gaurav@asgard:~$ hadoop version
Hadoop 1.1.1
Subversion https://svn.apache.org/repos/asf/hadoop/common/branches/branch-1.1 -r 1411108
Compiled by hortonfo on Mon Nov 19 10:48:11 UTC 2012
From source with checksum 9be520e845cf135867fb8b927a40affb
* I wrote a single Java src file which has a Main class which is responsible for setting up the Hadoop Job along with a Mapper and Reducer class.
import java.io.IOException; import java.util.Calendar; import java.util.Date; import java.util.Iterator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class AvgTemperature { @SuppressWarnings("deprecation") public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { if (args.length != 2) { System.out.println("Usage: AvgTemperature
Output will be created in another folder. Output folder names should be unique and the folder should be empty else Hadoop doesn’t like it! Hence I create a new folder each time using the timestamp value.
* Hadoop needs to know where you are keeping your Java classes. For this, we need to create a special env variable – HADOOP_CLASSPATH. You can point to where your java classes lie:
gaurav@asgard:~$ export HADOOP_CLASSPATH=/home/gaurav/workspace/test/bin
* Let’s run the code by invoking Hadoop on the java class and also I pass in the input file and a part of the output folder path:
gaurav@asgard:~/workspace/test/bin$ hadoop AvgTemperature /home/gaurav/dataset/1901 /home/gaurav/dataSet/output
Output file contains the following:
1901 46.698507007922
Console will show me the following output:
13/02/18 23:22:34 INFO util.NativeCodeLoader: Loaded the native-hadoop library 13/02/18 23:22:34 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same. 13/02/18 23:22:34 WARN mapred.JobClient: No job jar file set. User classes may not be found. See JobConf(Class) or JobConf#setJar(String). 13/02/18 23:22:34 INFO input.FileInputFormat: Total input paths to process : 1 13/02/18 23:22:34 WARN snappy.LoadSnappy: Snappy native library not loaded 13/02/18 23:22:34 INFO mapred.JobClient: Running job: job_local_0001 13/02/18 23:22:34 INFO util.ProcessTree: setsid exited with exit code 0 13/02/18 23:22:34 INFO mapred.Task: Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@5699b665 13/02/18 23:22:34 INFO mapred.MapTask: io.sort.mb = 100 13/02/18 23:22:34 INFO mapred.MapTask: data buffer = 79691776/99614720 13/02/18 23:22:34 INFO mapred.MapTask: record buffer = 262144/327680 13/02/18 23:22:34 INFO mapred.MapTask: Starting flush of map output 13/02/18 23:22:35 INFO mapred.MapTask: Finished spill 0 13/02/18 23:22:35 INFO mapred.Task: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting 13/02/18 23:22:35 INFO mapred.LocalJobRunner: 13/02/18 23:22:35 INFO mapred.Task: Task 'attempt_local_0001_m_000000_0' done. 13/02/18 23:22:35 INFO mapred.Task: Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@61234e59 13/02/18 23:22:35 INFO mapred.LocalJobRunner: 13/02/18 23:22:35 INFO mapred.Merger: Merging 1 sorted segments 13/02/18 23:22:35 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 72206 bytes 13/02/18 23:22:35 INFO mapred.LocalJobRunner: 13/02/18 23:22:35 INFO mapred.Task: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting 13/02/18 23:22:35 INFO mapred.LocalJobRunner: 13/02/18 23:22:35 INFO mapred.Task: Task attempt_local_0001_r_000000_0 is allowed to commit now 13/02/18 23:22:35 INFO output.FileOutputCommitter: Saved output of task 'attempt_local_0001_r_000000_0' to /home/gaurav/unzippedDataSet/outputclear/23_22_34 13/02/18 23:22:35 INFO mapred.LocalJobRunner: reduce > reduce 13/02/18 23:22:35 INFO mapred.Task: Task 'attempt_local_0001_r_000000_0' done. 13/02/18 23:22:35 INFO mapred.JobClient: map 100% reduce 100% 13/02/18 23:22:35 INFO mapred.JobClient: Job complete: job_local_0001 13/02/18 23:22:35 INFO mapred.JobClient: Counters: 20 13/02/18 23:22:35 INFO mapred.JobClient: File Output Format Counters 13/02/18 23:22:35 INFO mapred.JobClient: Bytes Written=33 13/02/18 23:22:35 INFO mapred.JobClient: FileSystemCounters 13/02/18 23:22:35 INFO mapred.JobClient: FILE_BYTES_READ=1848910 13/02/18 23:22:35 INFO mapred.JobClient: FILE_BYTES_WRITTEN=214207 13/02/18 23:22:35 INFO mapred.JobClient: File Input Format Counters 13/02/18 23:22:35 INFO mapred.JobClient: Bytes Read=888190 13/02/18 23:22:35 INFO mapred.JobClient: Map-Reduce Framework 13/02/18 23:22:35 INFO mapred.JobClient: Reduce input groups=1 13/02/18 23:22:35 INFO mapred.JobClient: Map output materialized bytes=72210 13/02/18 23:22:35 INFO mapred.JobClient: Combine output records=0 13/02/18 23:22:35 INFO mapred.JobClient: Map input records=6565 13/02/18 23:22:35 INFO mapred.JobClient: Reduce shuffle bytes=0 13/02/18 23:22:35 INFO mapred.JobClient: Physical memory (bytes) snapshot=0 13/02/18 23:22:35 INFO mapred.JobClient: Reduce output records=1 13/02/18 23:22:35 INFO mapred.JobClient: Spilled Records=13128 13/02/18 23:22:35 INFO mapred.JobClient: Map output bytes=59076 13/02/18 23:22:35 INFO mapred.JobClient: CPU time spent (ms)=0 13/02/18 23:22:35 INFO mapred.JobClient: Total committed heap usage (bytes)=386400256 13/02/18 23:22:35 INFO mapred.JobClient: Virtual memory (bytes) snapshot=0 13/02/18 23:22:35 INFO mapred.JobClient: Combine input records=0 13/02/18 23:22:35 INFO mapred.JobClient: Map output records=6564 13/02/18 23:22:35 INFO mapred.JobClient: SPLIT_RAW_BYTES=103 13/02/18 23:22:35 INFO mapred.JobClient: Reduce input records=6564
Now if I were to use Python, this is how I will do it. Remember, the data will be flowing in and out of the mapper and reduce via Streams.
Mapper Implementation – avg_temp_map.py:
#!/usr/bin/env python import sys import re for line in sys.stdin: val = line.strip() year, temp, q = val[15: 19], val[87: 92], val[92: 93] if temp != "+9999" and re.match("[01459]", q): print '%s\t%s' % (year, temp)
Reducer implementation – avg_temp_red.py
#!/usr/bin/env python import sys lastKey, sumVal, count = None, 0, 0 for line in sys.stdin: key, val = line.strip().split('\t') if lastKey and lastKey != key: print '%s\t%s' % (lastKey, maxVal) lastKey = key sumVal += int(val) count += 1 if lastKey: print '%s\t%s' % (lastKey, sumVal / float(count))
To run these pieces of code, we need to use the Hadoop’s streamer feature by running the following command:
gaurav@asgard:~/workspace/test/src$ hadoop jar /home/gaurav/hadoop-1.1.1/contrib/streaming/hadoop-streaming-*.jar -input ~/dataset/1901 -output ~/dataset/output/23_45_20 -mapper ~/workspace/test/src/avg_temp_map.py -reducer ~/workspace/test/src/avg_temp_red.py
Please remember to change the output folder name in here. The output file contains the following result:
1901 46.6985070079
When i try to run this program i got following error :-
command:-
javac -classpath /usr/local/hadoop/hadoop-core-1.2.1.jar -d avgtemp AvgTemperature*.java
Error:-
AvgTemperature.java:72: error: incompatible types: Object cannot be converted to IntWritable
for (IntWritable value : values) {
^
Note: AvgTemperature.java uses unchecked or unsafe operations.
Note: Recompile with -Xlint:unchecked for details.
1 error
The error is at:-
for (IntWritable value : values) {
^