Map Reduce in Hadoop Example

Hadoop MapReduce is a software framework used for distributed processing of large data sets on compute clusters.

According to The Apache Software Foundation, the objective of Map/Reduce is to split the input data set into independent chunks that are processed completely in parallel. The Hadoop MapReduce framework sorts the outputs of the maps, which are then input to the reduce phase. Normally both the input and the output of the job is stored in a file system.

In this blog post I will show you how to write a Map/Reduce word count program step by step in order to understand what is happening in every phase and line of code.

A Map/Reduce system is composed of three operations:

  1. Map
  2. Shuffle
  3. Reduce

First of all, you have to add the dependency for Hadoop core and common in your POM or Gradle build file.

Now let’s start with the mapping phase:

To create the Map class we have to extend the Mapper Class from the Hadoop package.

This class has 4 type arguments which I will explain right now:

  • LongWritable –type of input key
  • Text – type of input value
  • Text – type of output key
  • IntWritable – type of output value

The code for the Mapping phase is:

public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {

    private final static IntWritable one = new IntWritable(1);

    public void map(LongWritable key, Text inputValue, Context output) 
       throws IOException, InterruptedException {

        String convertedInputValueLine = inputValue.toString();
        StringTokenizer tokenizer = new StringTokenizer(convertedInputValueLine);

        while (tokenizer.hasMoreTokens()) {
            inputValue.set(tokenizer.nextToken());
            output.write(inputValue, one);
        }
    }
}

After extending the Mapper class, we have to implement the map function. In this case, the input attributes are the LongWritable key (input key of the Mapper class) , Text inputValue (input value of the Mapper class) and the Context which is used to write the output of the Mapper class.

Now let’s go forward with the logic within the map function.

In the first line, we convert the inputValue to String and store the value in a variable of type String called convertedInputValueLine.

For example, we have an input text: “This is my blog post posted on the blog”, where the key will be the byte offset of every line (in this case one line) which is a hexadecimal number.

The next step is the use of a StringTokenizer used to extract the words from the convertedInputValueLine based on the spaces between them.

In a while loop we check if the tokenizer contains any more words, and if this is the case the while loop will assign the next token to the variable inputValue.

After that, we will use the context which has been already defined and we call the function write which contains two attributes: the key extracted from the tokenizer and the value of one used for the word count.

In order to have a better understanding of the value of 1 from IntWritable(1), here is how the line “This is my blog post posted on the blog” will be written as output in the context:

This, 1
is, 1
my, 1
blog, 1
post, 1
posted, 1
on, 1
the, 1
blog, 1

Now it’s time for the Reduce class:

public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {

    public void reduce(Text key, Iterable<IntWritable> values, Context output)

            throws IOException, InterruptedException {

        int wordfrequency = 0;

        for (IntWritable val : values) {

            wordfrequency += val.get();
        }
        output.write(key, new IntWritable(wordfrequency));
    }
}

The Reduce class extends a super class called Reducer (like Map extends the Mapper). In this case we have again 4 arguments:

  • Text – type of input key
  • IntWritable – type of input value
  • Text – type of output key
  • IntWritable – type of output value

What I want to highlight is the fact that the last 2 parameters from the Map class (Text, IntWritable) are the first ones from Reducer, so the output of the Mapper class is sent as  input to the Reducer class.

The next step is to implement the reduce function which contains 3 attributes as input:

  • keyText
  • valuesIterable<IntWritable>
  • outputContext

To have a better understanding of the input for Reducer coming from the Mapper class, this is what we have from the sentence: “This is my blog post posted on the blog”:

This, (1)
is, (1)
my, (1)
blog, (1, 1)
post, (1)
posted, (1)
on, (1)
the, (1)

The word “blog” appears two times in our sentence and that’s why it will be stored as shown. The logic in the reduce function is the following: Firstly, we define a variable called wordfrequency to calculate the frequency of the key or word. Secondly, we run a foreach loop for every element of the values Iterable. Finally, we write down the key which is nothing but the word and its frequency. The context output in our case will be:

This, 1
is, 1
my, 1
blog, 2
post, 1
posted, 1
on, 1
the, 1

Now let’s understand the main function:

public static void main(String[] args) throws Exception {

    Configuration conf = new Configuration();
    Job job = new Job(conf, "MapReduceHadoopWordCount"); 

    job.setJarByClass(WordCount.class); 

    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    job.setMapperClass(Map.class);
    job.setReducerClass(Reduce.class);

    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);

    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    job.waitForCompletion(true);
}

First of all, we define an object of the class Configuration to specify the entire configuration of our word count example.

Then we define the Job that needs to be executed on the Hadoop cluster and which we initialize with the configuration and also the name of our Map/Reduce program (in this case I called it MapReduceHadoopWordCount).

The next step is to set the Jar by class which is the name of the main class, in my case “WordCount”, the Mapper class which was called Map, the Reducer class which was called Reduce, the output key/value classes which in our case are of type Text and IntWritable, and the input/output format classes of type TextInputFormat andTextOutputFormat, respectively.

In the following lines of code we configure the input/output path from the file system into the job. To specify the paths, the command line arguments passed to the main method are used.

This is the WordCount program explained step by step. I hope it helped you to have a better understanding of the code. To create the jar and run the application I will list some tips below.

To create the jar in Eclipse you can check out this tutorial : https://www.youtube.com/watch?v=jDDYSdgNf9Q

To create the jar in IntelliJ IDEA you can check out this tutorial: https://www.youtube.com/watch?v=3Xo6zSBgdgk

To run the WordCount program you can install Hadoop and run the program using Docker while following the steps described in this tutorial: https://clubhouse.io/developer-how-to/how-to-set-up-a-hadoop-cluster-in-docker/

Views All Time
1610
Views Today
5
Short URL for this post: https://blog.oio.de/ndqns
This entry was posted in Java Runtimes - VM, Appserver & Cloud and tagged , , . Bookmark the permalink.

Leave a Reply

Your email address will not be published. Required fields are marked *