Tinkering with Apache Hadoop – Map Reduce Framework

I have used a Map Reduce based system at my present employer (Bank of America – Merrill Lynch) to process (read “crunch”) extremely large datasets in matter of seconds. Sometimes I used those to price bonds in real-time otherwise it was used for data processing/reporting purposes.

It is an in-house product, known as Hugs framework and it is based on Map-Reduce implementation though it doesn’t have all the exciting features of Apache Hadoop. I have been tinkering around with Hadoop lately and thought of putting a basic tutorial. I will be writing a detailed one later on explaining the code and different aspects of Hadoop. I’m using the latest stable version 1.1.1 of Hadoop on my Ubuntu.

I will use a weather dataset from NOAA as an example and will try to find out the average temperature for a given year.

If I were to find the avg temperature for the year 1901 via a UNIX bash script, this is how I would do it:

gaurav@asgard:~$ cat 1901 | awk '{temp = substr($0, 88, 5) + 0;
                        q = substr($0, 93, 1);
                        if (temp != 9999 && q ~ /[01459]/) 
                        {
                           count += 1;
                           sum += temp;
                        } } 
                        END { print sum, count, sum/count }'

Output: 306529 6564 46.6985

Here, I just computed the result from one file for a single year! It would take a lot longer time to process data for 100+ years!

Now to find the same data using Hadoop, I will be writing program using 2 languages – Java and Python. Hadoop API natively supports Java but other languages can also be used. Languages such as Ruby and Python can interact with Hadoop via Streams (input and output). If you are into C++, you should have a look at Hadoop Pipes.

Usually, you will write two components – Map and Reduce. For optimization purposes, you can also add a combiner right in between.

* Please make sure JAVA_HOME env variable is pointing to the correct version on your box.
gaurav@asgard:~$ echo $JAVA_HOME
/usr/lib/jvm/java-7-openjdk-amd64

* Download the latest hadoop libs (stable is preferred) from http://hadoop.apache.org/releases.html#Download and unzip somewhere on your box.

* Include “hadoop_folder/bin” in your PATH ENV variable. To make sure hadoop runs fine on your box, try this command:
gaurav@asgard:~$ hadoop version
Hadoop 1.1.1
Subversion https://svn.apache.org/repos/asf/hadoop/common/branches/branch-1.1 -r 1411108
Compiled by hortonfo on Mon Nov 19 10:48:11 UTC 2012
From source with checksum 9be520e845cf135867fb8b927a40affb

* I wrote a single Java src file which has a Main class which is responsible for setting up the Hadoop Job along with a Mapper and Reducer class.

import java.io.IOException;
import java.util.Calendar;
import java.util.Date;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class AvgTemperature {
	@SuppressWarnings("deprecation")
	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		if (args.length != 2) {
			System.out.println("Usage: AvgTemperature  ");
			System.exit(-1);
		}
		
		Configuration conf = new Configuration();
		Job job = new Job(conf); 
		job.setJarByClass(AvgTemperature.class);
		job.setJobName("Avg Temperature");
		job.setMapperClass(AvgTemperatureMapper.class);
		job.setReducerClass(AvgTemperatureReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(DoubleWritable.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);

		Calendar cld = Calendar.getInstance();
		Date date = cld.getTime();
		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1] + "/" + date.getHours() + "_" + date.getMinutes() + "_" + date.getSeconds()));
		
		// job.submit();
		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}

class AvgTemperatureMapper extends Mapper {
    public static final int MISSING = 9999;

    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            String year = line.substring(15, 19);
            int airTemperature;
            if (line.charAt(87) == '+')
                    airTemperature = Integer.parseInt(line.substring(88, 92));
            else
                    airTemperature = Integer.parseInt(line.substring(87, 92));

            String quality = line.substring(92, 93);
            if (airTemperature != MISSING && quality.matches("[01459]"))
            	context.write(new Text(year), new IntWritable(airTemperature));
    }
}

class AvgTemperatureReducer extends Reducer {

	public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
		
		int sum = 0;
		int count = 0;
		for (IntWritable value : values) {
			count += 1;
			sum += value.get();
		}

		context.write(key, new DoubleWritable(sum / (double)count));
	}
}

Output will be created in another folder. Output folder names should be unique and the folder should be empty else Hadoop doesn’t like it! Hence I create a new folder each time using the timestamp value.

* Hadoop needs to know where you are keeping your Java classes. For this, we need to create a special env variable – HADOOP_CLASSPATH. You can point to where your java classes lie:
gaurav@asgard:~$ export HADOOP_CLASSPATH=/home/gaurav/workspace/test/bin

* Let’s run the code by invoking Hadoop on the java class and also I pass in the input file and a part of the output folder path:
gaurav@asgard:~/workspace/test/bin$ hadoop AvgTemperature /home/gaurav/dataset/1901 /home/gaurav/dataSet/output

Output file contains the following:
1901 46.698507007922

Console will show me the following output:

13/02/18 23:22:34 INFO util.NativeCodeLoader: Loaded the native-hadoop library
13/02/18 23:22:34 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
13/02/18 23:22:34 WARN mapred.JobClient: No job jar file set.  User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
13/02/18 23:22:34 INFO input.FileInputFormat: Total input paths to process : 1
13/02/18 23:22:34 WARN snappy.LoadSnappy: Snappy native library not loaded
13/02/18 23:22:34 INFO mapred.JobClient: Running job: job_local_0001
13/02/18 23:22:34 INFO util.ProcessTree: setsid exited with exit code 0
13/02/18 23:22:34 INFO mapred.Task:  Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@5699b665
13/02/18 23:22:34 INFO mapred.MapTask: io.sort.mb = 100
13/02/18 23:22:34 INFO mapred.MapTask: data buffer = 79691776/99614720
13/02/18 23:22:34 INFO mapred.MapTask: record buffer = 262144/327680
13/02/18 23:22:34 INFO mapred.MapTask: Starting flush of map output
13/02/18 23:22:35 INFO mapred.MapTask: Finished spill 0
13/02/18 23:22:35 INFO mapred.Task: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
13/02/18 23:22:35 INFO mapred.LocalJobRunner: 
13/02/18 23:22:35 INFO mapred.Task: Task 'attempt_local_0001_m_000000_0' done.
13/02/18 23:22:35 INFO mapred.Task:  Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@61234e59
13/02/18 23:22:35 INFO mapred.LocalJobRunner: 
13/02/18 23:22:35 INFO mapred.Merger: Merging 1 sorted segments
13/02/18 23:22:35 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 72206 bytes
13/02/18 23:22:35 INFO mapred.LocalJobRunner: 
13/02/18 23:22:35 INFO mapred.Task: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting
13/02/18 23:22:35 INFO mapred.LocalJobRunner: 
13/02/18 23:22:35 INFO mapred.Task: Task attempt_local_0001_r_000000_0 is allowed to commit now
13/02/18 23:22:35 INFO output.FileOutputCommitter: Saved output of task 'attempt_local_0001_r_000000_0' to /home/gaurav/unzippedDataSet/outputclear/23_22_34
13/02/18 23:22:35 INFO mapred.LocalJobRunner: reduce > reduce
13/02/18 23:22:35 INFO mapred.Task: Task 'attempt_local_0001_r_000000_0' done.
13/02/18 23:22:35 INFO mapred.JobClient:  map 100% reduce 100%
13/02/18 23:22:35 INFO mapred.JobClient: Job complete: job_local_0001
13/02/18 23:22:35 INFO mapred.JobClient: Counters: 20
13/02/18 23:22:35 INFO mapred.JobClient:   File Output Format Counters 
13/02/18 23:22:35 INFO mapred.JobClient:     Bytes Written=33
13/02/18 23:22:35 INFO mapred.JobClient:   FileSystemCounters
13/02/18 23:22:35 INFO mapred.JobClient:     FILE_BYTES_READ=1848910
13/02/18 23:22:35 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=214207
13/02/18 23:22:35 INFO mapred.JobClient:   File Input Format Counters 
13/02/18 23:22:35 INFO mapred.JobClient:     Bytes Read=888190
13/02/18 23:22:35 INFO mapred.JobClient:   Map-Reduce Framework
13/02/18 23:22:35 INFO mapred.JobClient:     Reduce input groups=1
13/02/18 23:22:35 INFO mapred.JobClient:     Map output materialized bytes=72210
13/02/18 23:22:35 INFO mapred.JobClient:     Combine output records=0
13/02/18 23:22:35 INFO mapred.JobClient:     Map input records=6565
13/02/18 23:22:35 INFO mapred.JobClient:     Reduce shuffle bytes=0
13/02/18 23:22:35 INFO mapred.JobClient:     Physical memory (bytes) snapshot=0
13/02/18 23:22:35 INFO mapred.JobClient:     Reduce output records=1
13/02/18 23:22:35 INFO mapred.JobClient:     Spilled Records=13128
13/02/18 23:22:35 INFO mapred.JobClient:     Map output bytes=59076
13/02/18 23:22:35 INFO mapred.JobClient:     CPU time spent (ms)=0
13/02/18 23:22:35 INFO mapred.JobClient:     Total committed heap usage (bytes)=386400256
13/02/18 23:22:35 INFO mapred.JobClient:     Virtual memory (bytes) snapshot=0
13/02/18 23:22:35 INFO mapred.JobClient:     Combine input records=0
13/02/18 23:22:35 INFO mapred.JobClient:     Map output records=6564
13/02/18 23:22:35 INFO mapred.JobClient:     SPLIT_RAW_BYTES=103
13/02/18 23:22:35 INFO mapred.JobClient:     Reduce input records=6564

Now if I were to use Python, this is how I will do it. Remember, the data will be flowing in and out of the mapper and reduce via Streams.

Mapper Implementation – avg_temp_map.py:

#!/usr/bin/env python
import sys
import re

for line in sys.stdin:
        val = line.strip()
        year, temp, q = val[15: 19], val[87: 92], val[92: 93]
        if temp != "+9999" and re.match("[01459]", q):
                print '%s\t%s' % (year, temp)

Reducer implementation – avg_temp_red.py

#!/usr/bin/env python

import sys

lastKey, sumVal, count = None, 0, 0
for line in sys.stdin:
        key, val = line.strip().split('\t')
        if lastKey and lastKey != key:
                print '%s\t%s' % (lastKey, maxVal)
        lastKey = key
        sumVal += int(val)
        count += 1

if lastKey:
        print '%s\t%s' % (lastKey, sumVal / float(count))

To run these pieces of code, we need to use the Hadoop’s streamer feature by running the following command:

gaurav@asgard:~/workspace/test/src$ hadoop jar /home/gaurav/hadoop-1.1.1/contrib/streaming/hadoop-streaming-*.jar -input ~/dataset/1901 -output ~/dataset/output/23_45_20 -mapper ~/workspace/test/src/avg_temp_map.py -reducer ~/workspace/test/src/avg_temp_red.py 

Please remember to change the output folder name in here. The output file contains the following result:
1901 46.6985070079

2 Responses to Tinkering with Apache Hadoop – Map Reduce Framework

  1.  

    When i try to run this program i got following error :-
    command:-
    javac -classpath /usr/local/hadoop/hadoop-core-1.2.1.jar -d avgtemp AvgTemperature*.java

    Error:-
    AvgTemperature.java:72: error: incompatible types: Object cannot be converted to IntWritable
    for (IntWritable value : values) {
    ^
    Note: AvgTemperature.java uses unchecked or unsafe operations.
    Note: Recompile with -Xlint:unchecked for details.
    1 error

  2.  

    The error is at:-
    for (IntWritable value : values) {
    ^

leave your comment