Saturday, January 9, 2010

Hadoop and Parallel Dataflow Programming

Over the past three months, I have been teaching myself enough Hadoop to get comfortable with using the environment for analytic purposes.

There has been a lot of commentary about Hadoop/MapReduce versus relational databases (such as the articles referenced in my previous post on the subject). I actually think this discussion is misplaced because comparing open-source software with commercial software aligns people on "religious" grounds. Some people will like anything that is open-source. Some people will attack anything that is open-source (especially people who work for commercial software vendors). And, the merits of real differences get lost. Both Hadoop and relational databases are powerful systems for analyzing data, and each has its own distinct set of advantages and disadvantages.

Instead, I think that Hadoop should be compared to a parallel dataflow style of programming. What is a dataflow style of programming? It is a style where we watch the data flow through different operations, forking and combining along the way, to achieve the desired goal. Not only is a dataflow a good way to understand relational databases (which is why I introduce it in Chapter 1 of Data Analysis Using SQL and Excel), but the underlying engines that run SQL queries are dataflow engines.

Parallel dataflows extend dataflow processing to grid computing. To my knowledge, the first commercial tool that implements parallel dataflows was developed by Ab Initio. This company was a spin-off from a bleeding edge parallel supercomputer vendor called Thinking Machines that went bankrupt in 1994. As a matter of full disclosure: Ab Initio was actually formed from the group that I worked for at Thinking Machines. Although they are very, very, very resistant to sharing information about their technology, I am rather familiar it. I believe that the only publicly available information about them (including screen shots) is published in our book Mastering Data Mining: The Art and Science of Customer Relationship Management.

I am confident that Apache has at least one dataflow project, since when I google "dataflow apache" I get a pointer to the Dapper project. My wish, however, is that Hadoop were the parallel dataflow project.

Much of what Hadoop does goes unheralded by the typical MapReduce user. On a massively parallel system, Hadoop keeps track of the different parts of an HDFS file and, when the file is being used for processing, Hadoop does its darndest to keep the processing local to each file part being processed. This is great, since data locality is key to achieving good performance.

Hadoop also keeps track of which processors and disk systems are working. When there is a failure, Hadoop tries again, insulating the user from sporadic hardware faults.

Hadoop also does a pretty good job of shuffling data around, between the map and reduce operations. The shuffling method -- sorting, send, and sort again -- may not be the most efficient but it is quite general.

Alas, there are several things that Hadoop does not do, at least when accessed through the MapReduce interface. Supporting these features would allow it move beyond the MapReduce paradigm, giving it the power to support more general parallel dataflow constructs.

The first thing that bothers me about Hadoop is that I cannot easily take a text file and just copy it with the Map/Reduce primitives. Copying a file seems like something that should be easy. The problem is that a key gets generated during the map processing. The original data gets output with a key prepended, unless I do a lot of work to parse out the first field and use it as a key.

Could the context.write() function be overloaded with a version that does not output a key? Perhaps this would only be possible in the reduce phase, since I understand the importance of the key for going from map to reduce.

A performance issue with Hadoop is the shuffle phase between the map and the reduce. As I mentioned earlier, the sort-send-sort process is quite general. Alas, though, it requires a lot of work. An alternative that often works well is simply hashing. To maintain the semantics of map-reduce, I think this would be hash-send-combine or hash-send-sort. The beauty of using hashing is that the data can be sent to its destination while the map is still processing it. This allows concurrent use of the processing and network during this operation.

And, speaking of performance, why does the key have to go before the data? Why can't I just point to a sequence of bytes and use that for the key? This would enable a programming style that doesn't spend so much time parsing keys and duplicating information between values and keys.

Perhaps the most frustrating aspect of Hadoop is the MapReduce framework itself. The current version allows processing like (M+)(R)(M*). What this notation means is that the processing starts with one or more map jobs, goes to a reduce, and continues with zero or more map jobs.

THIS IS NOT GENERAL ENOUGH! I would like to have an arbitrary number of maps and reduces connected however I like. So, one map could feed two different reduces, each having different keys. At the same time, one of the reduces could feed another reduce without having to go through an intermediate map phase.

This would be a big step toward parallel dataflow parallel programming, since Map and Reduce are two very powerful primitives for this purpose.

There are some other primitives that might be useful. One would be broadcast. This would take the output from one processing node during one phase and send it to all the other nodes (in the next phase). Let's just say that using broadcast, it would be much easier to send variables around for processing. No more defining weird variables using "set" in the main program, and then parsing them in setup() functions. No more setting up temporary storage space, shared by all the processors. No more using HDFS to store small serial files, local to only one node. Just send data through a broadcast, and it goes everywhere. (If the broadcast is running on more than one node, then the results would be concatenated together, everywhere.)

And, if I had a broadcast, then my two-pass row number code (here) would only require one pass.

I think Hadoop already supports having multiple different input files into one reduce operator. This is quite powerful, and a much superior way of handling join processing.

It would also be nice to have a final sort operator. In the real world, people often do want sorted results.

In conclusion, parallel dataflows are a very powerful, expressive, and efficient way of implementing complex data processing tasks. Relational databases use dataflow engines for their processing. Using non-procedural languages such as SQL, the power of dataflows are hidden from the user -- and, some relatively simple dataflow constructs can be quite difficult to express in SQL.

Hadoop is a powerful system that emulates parallel dataflow programming. Any step in a dataflow can be implemented using a MapReduce pass -- but this requires reading, writing, sorting, and sending the data multiple times. With a few more features, Hadoop could efficiently implement parallel dataflows. I feel this would be a big boost to both performance and utility, and it would leverage the power already provided by the Hadoop framework.

5 comments:

  1. Have you seen Cascading (http://www.cascading.org/)? There's also a simpler product - http://www.pervasivedatarush.com.

    Of course, I haven't either of them but would be interesting to hear what others think.

    ReplyDelete
  2. Final sort can be enforced by setting the # of reducers to 1.

    ReplyDelete
  3. Hi,

    We are looking at Hadoop as a distributed analytics tool.

    However, we are getting pretty bad performance in comparison to traditional databases even in one-node configuration. Before moving to a cluster configuration, we decided to test a one-node pseudo cluster hoping that we can extrapolate our results.

    On a 2.5GB flat file, two-key aggregation (a group by analog) takes about 180 seconds with all the time being CPU time and almost no IO. Is this kind of performance something to be expected ?

    Thanks

    VJ

    ReplyDelete
  4. Its current license is probably much too restrictive to be useful to you (not to mention tied to other proprietary Microsoft software) but Dryad sounds like what you're asking for. It treats the distributed computation as a DAG. Google "dryadlinq" for the easiest way to find it.

    ReplyDelete
  5. Very informative post indeed!
    Being a Hadoop evangalist, I am interested in knowing more on Mapreduce and Large Data analytics..
    This one resource looks great... High Performance Analytics with Hadoop http://www.impetus.com/featured_webinar?eventid=16

    ReplyDelete

Your comment will appear when it has been reviewed by the moderators.