Wednesday, November 25, 2009

Hadoop and MapReduce: A Parallel Program to Assign Row Numbers

This post discusses (and solves) the problem of assigning consecutive row numbers to data, with no holes. Along the way, it also introduces some key aspects of the Hadoop framework:
  • Using the FileSystem package to access HDFS (a much better approach than in my previous posting).
  • Reading configuration parameters in the Map function.
  • Passing parameters from the main program to the Map and Reduce functions.
  • Writing out intermediate results from the Map function.
These are all important functionality for using the Hadoop framework. In addition, I plan on using this technique for assigning unique ids to values in various columns.


The "Typical" Approach

The "typical" approach is to serialize the problem, by creating a Reducer function that adds the row number. By limiting the framework to only a single reducer (using setNumReduceTasks(1) in the JobConf class), this outputs the row number.

There are several problems with this solution. The biggest issue is, perhaps, aesthetic. Shouldn't a parallel framework, such as Hadoop, be able to solve such a simple problem? Enforced serialization is highly inefficient, since the value of Hadoop is in the parallel programming capabilities enabled when multiple copies of maps and reduces are running.

Another issue is the output file. Without some manual coding, the output is a single file, which may perhaps be local to a single cluster node (depending on how the file system is configured). This can slow down subsequent map reduce tasks that use the file.


An Alternative Fully Parallel Approach


There is a better way, a fully parallel approach that uses two passes through the Map-Reduce framework. Actually, the full data is only passed once through the framework, so this is a much more efficient alternative to the first approach.

Let me describe the approach using three passes through the data, since this makes for a simpler explanation (the actual implementation combines the first two steps).

The first pass through the data consists of a Map phase that assigns a new key to each row and no Reduce phase. The key is consists of two parts: the partition id and the row number within the partition.

The second pass counts the number of rows in each partition, by extracting the maximum row number with each partition key.

These counts are then combined to get cumulative sums of counts up to each partition. Although I could do this in the reduce step, I choose not to (which I'll explain below). Instead, I do the work in the main program.

The third pass adds the offset to the row number and outputs the results. Note that the number of map tasks in the first task can be different from the number in subsequent passes, since the code always uses the original partition number for its calculations.


More Detail on the Approach -- Pass 1

The code is available in this file RowNumberTwoPass.java. It contains one class with two Map phases and one Reduce phase. This code assumes that the data is stored in a text file. This assumption simplifies the code, because I do not have to introduce any auxiliary classes to read the data. However, the same technique would work for any data format.

The first map phase, NewKeyOutputMap, does two things. The simpler thing is to output the parition id and the row number within the partition for use in subsequent processing. The second is to save a copy of the data, with this key, for the second pass.

Assigning the Partition ID

How does any Map function figure out its partition id? The partition id is stored in the job configuration, and is accessed using the code:

....partitionid = conf.getInt("mapred.task.partition", 0);

In the version of Hadoop that I'm using (0.18.3, through the Yahoo virtual machine), the job configuration is only visible to a configuration function. This is an optional function that can be defined when implementing an instance of the MapReduceBase class. It gets called once to initialize the environment. The configuration function takes one argument, the job configuration. I just store the result in a static variable local to the NewOutputKeyMap class.

In more recent versions of Hadoop, the configuration is available in the context argument to the map function.

Using Sequence Files in the Map Phase

The second task is to save the original rows with the new key values. For this, I need a sequence file. Or, more specifically, I need a different sequence file for each Map task. Incorporating the partition id into the file name accomplishes this.

Sequence files are data stores specific to the Hadoop framework, which contain key-value pairs. At first, I found them a bit confusing: Did the term "sequence file" refer to a collection of files available to all map tasks or to a single instance of one of these files? In fact, the term refers to a single instance file. To continue processing, we will actually need a collection of sequence files, rather than a single "sequence file".

They are almost as simple to use as any other files, as the following code in the configuration() function shows:

....FileSystem fs = FileSystem.get(conf);
....sfw = SequenceFile.createWriter(fs, conf,
........new Path(saverecordsdir+"/"+String.format("records%05d", partitionid.get())),
........Text.class, Text.class);

The first statement simply retrieves the appropriate file system for creating the file. The second statement uses the SequenceFile.createWriter() function to open the file and save the id in the sfw variable. There are several versions of this function, with various additional options. I've chosen the simplest version. The specific file will go in the directory referred to by the variable saverecordsdir. This will contains a series of files with the names "records#####" where ##### is a five-digit, left-padded number.

This is all enclosed in try-catch logic to catch appropriate exceptions.

Later in the code, the map() writes to the sequence file using the logic:

....sfw.append(outkey, value);

Very simple!

Pass1: Reduce and Combine Functions

The purpose of the reduce function is to count the number of rows in each partition. Instead of counting, the function actually takes the maximum of the partition row count. By taking this approach, I can use the same function for both reducing and combining.

For efficiency purposes, the combine phase is very important to this operation. The way the problem is structured, the combine output should be a single record for each map instance -- and sending this data around for the reduce phase should incur very little overhead.


More Detail on the Approach -- Offset Calculation and Pass 2

At the end of the first phase, the summary key result files contains a single row for each partition, containing the number of rows in each partition. For instance, from my small test data, the data looks like:


0 2265

1 2236

2 3

The first column is the partition id, the second is the count. The offset is the cumulative sum of previous values. So, I want this to be:


0 2265 0

1 2236 2265

2 3 4501

To accomplish this, I read the data in the main loop, after running the first job. The following loop in main() gets the results, does the calculation, and saves the results as parameters in the job configuration:

....int numvals = 0;
....long cumsum = 0;
....FileStatus[] files = fs.globStatus(new Path(keysummaryoutput+ "/p*"));
....for (FileStatus fstat : files) {
........FSDataInputStream fsdis = fs.open(fstat.getPath());
........String line = "";
........while ((line = fsdis.readLine()) != null) {
............finalconf.set(PARAMETER_cumsum_nthvalue + numvals++, line + "\t" + cumsum);
............String[] vals = line.split("\t");
............cumsum += Long.parseLong(vals[1]);
........}
....}
....finalconf.setInt(PARAMETER_cumsum_numvals, numvals);

Perhaps the most interesting part of this code is the use of the function fs.globStatus() to get a list of HDFS files that match wildcards (in this case, anything that starts with "p" in the keysummaryouput directory).


Conclusion

Parallel Map-Reduce is a powerful programming paradigm, that makes it possible to solve many different types of problems using parallel dataflow constructs.

Some problems seem, at first sight, to be inherently serial. Appending a sequential row number onto each each row is one of those problems. After all, don't you have to process the previous row to get the number for the next row? And isn't this a hallmark of inherently serial problems?

The answers to these questions are "no" and "not always". The algorithm described here should scale to very large data sizes and very large machine sizes. For large volumes of data, it is much, much more efficient than the serial version, since all processing is in parallel. That is almost true. The only "serial" part of the algorithm is the calculation of the offsets between the passes. However, this is such a small amount of data, relative to the overall data, that its effect on overall efficiency is negligible.

The offsets are passed into the second pass using the JobConfiguration structure. There are other ways of passing this data. One method would be to use the distributed data cache. However, I have not learned how to use this yet.

Another distribution method would be to do the calculations in the first pass reduce phase (by using only one reducer in this phase). The results would be in a file. This file could then be read by subsequent map tasks to extract the offset data. However, such an approach introduces a lot of contention, because suddenly there will be a host of tasks all trying to open the same file -- contention that can slow processing considerably.

2 comments:

  1. Good stuff... been looking for how to accomplish this exact task.

    Thanks!!!!!!

    ReplyDelete
  2. @ "Another distribution method would be to do the calculations in the first pass reduce phase (by using only one reducer in this phase). The results would be in a file. This file could then be read by subsequent map tasks to extract the offset data. However, such an approach introduces a lot of contention, because suddenly there will be a host of tasks all trying to open the same file -- contention that can slow processing considerably. "

    You can actually use distributed cache to distribute this files' replicas across nodes.Just my 2 cents.

    ReplyDelete

Your comment will appear when it has been reviewed by the moderators.