Friday, January 25, 2008

MapReduce and SQL Aggregations

This is another post discussing the article on MapReduce written by Professors Michael Stonebraker and David DeWitt (available here).

One of the claims that they make is:

To draw an analogy to SQL, map is like the group-by clause of an aggregate query. Reduce is analogous to the aggregate function (e.g., average) that is computed over all the rows with the same group-by attribute.

This claim is worth discussing in more detail because it is a very powerful and intuitive analogy. And, it is simply incorrect. MapReduce is much more powerful than SQL aggregations.

Another reason why I find their claim interesting is because I use the same analogy to describe MapReduce to people familiar with databases. Let me explain this in a bit more detail. Consider the following SQL query to count the number of customers who start in each month:

SELECT MONTH(c.start_date), COUNT(*)
FROM customer c
GROUP BY MONTH(c.start_date)

Now, let's consider how MapReduce would "implement" this query. Of course, MapReduce is a programming framework, not a query interpreter, but we can still use it to solve the same problem. The MapReduce framework solves this problem in the following way:

First, the map phase would read records from the customer table and produce an output record with two parts. The first part is called the "key", which is populated with MONTH(c.start_date). The second is the "value", which can be arbitrarily complicated. In this case, it is as simple as it gets. The value part simply contains "1".

The reduce phase then reads the key-value pairs, and aggregates them. The MapReduce framework sorts the data so records with the same key always occur together. This makes it easy for the reduce phase to add together all the "1"s to get the count for each key (which is the month number). The result is a count for each key.

I am intentionally oversimplified this process by describing it at a high level. The first simplification is leaving out all the C++ or Java overhead for producing the programs (although there are attempts at interpreted languages to greatly simplify this process). Another is not describing the parallel processing aspects. And yet another oversimplification is leaving out the "combine" step. The above algorithm can be made more efficient by first "reducing" the values locally on each processor to get subtotals, and then "reducing" these again. This post, however, is not about computational efficiency.

The important thing to note is the following three correspondences between MapReduce and SQL aggregates.
  1. First, MapReduce uses a "key". This key is the GROUP BY expression in a SQL aggregation statement.
  2. Second, MapReduce has a "map" function. This is the expression inside the parentheses. This can be an arbitrary function or CASE statement in SQL. In databases that support user defined functions, this can be arbitrarily complicated, as with the "map" function in MapReduce.
  3. Third, MapReduce has a "reduce" function. This is the aggregation function. In SQL, this is traditionally one of a handful of functions (SUM(), AVG(), MIN(), MAX()). However, in some databases support user defined aggregation functions, which can be arbitrarily complicated.
So, it seems that SQL and MapReduce are equivalent, particularly in an environment where SQL supports user defined functions written in an external language (such as C++, Java, or C-Sharp).

Wrong!

The next example extends the previous one by asking how many customers start and how many stop in each month. There are several ways of approaching this. The following shows one approach using a FULL OUTER JOIN:

SELECT m, ISNULL(numstarts, 0), ISNULL(numstops, 0)
FROM (SELECT MONTH(start_date) as m, COUNT(*) as numstarts
......FROM customer c
......GROUP BY MONTH(start_date)
.....) start FULL OUTER JOIN
.....(SELECT MONTH(stop_date) as m, COUNT(*) as numstops
......FROM customer c
......GROUP BY MONTH(stop_date)
.....) stop
.....ON start.m = stop.m

Another formulation might use an aggregation and UNION:

SELECT m, SUM(isstart), SUM(isstop)
FROM ((SELECT MONTH(start_date) as m, 1 as isstart, 0 as isstop
.......FROM customer c)
......UNION ALL
........(SELECT MONTH(stop_date) as m, 0 as isstart, 1 as isstop
........FROM custommer c)) a
GROUP BY m

Now, in both of these, there are two pieces of processing, one for the starts and one for the stops. In almost any databases optimizer that I can think of, both these queries (and other, similar queries) require two passes through the data, one pass for the starts and one pass for the stops. And, regardless of the optimizer, the SQL statements describe two passes through the data.

The MapReduce framework has a more efficient, and perhaps, even more intuitive solution. The map phase can produce two output keys for each record:
  • The first has a key that is MONTH(start_date) and the value is a structure containing isstart and isstop with values of 1 and 0 respectively.
  • The second has a key that is MONTH(stop_date) and the value are 0 and 1 respectively.
What is important about this example is not the details, simply the fact that the processing is quite different. The SQL methods describe two passes through the data. The MapReduce method has only one pass through the data. In short, MapReduce can be more efficient than SQL aggregations.

How much better becomes apparent when we look in more detail at what is happening. When processing data, SQL limits us to one key per record for aggregation. MapReduce does not have this limitation. We can have as many keys as we like for each record.

This difference is particularly important when analyzing complex data structures to extract features -- of which processing text from web pages is an obvious example. To take another example, one way of doing content filtering of email for spam involves looking for suspicious words in the text and then building a scoring function based on those words (naive Bayesian models would be a typical approach).

Attempting to do this in MapReduce is quite simple. The map phase looks for each word and spits out a key value pair for that word (in this case, the key is actually the email id combined with the word). The reduce phase counts them all up. Either the reduce phase or a separate program can then apply the particular scoring code. Extending such a program to include more suspicious words is pretty simple.

Attempting to do this in SQL . . . well, I wouldn't do it in the base language. It would be an obfuscated mess of CASE statements or a non-equi JOIN on a separate word table. The MapReduce approach is simpler and more elegant in this case. Ironically, I usually describe the problem as "SQL is not good in handling text." However, I think the real problem is that "SQL aggregations are limited to one key per record."

SQL has many capabilities that MapReduce does not have. Over time, MapReduce may incorporate many of the positive features of SQL for analytic capabilities (and hopefully not incorporate unneeded overhead for transactions and the like). Today, SQL remains superior for most analytic needs -- and it can take advantage of parallelism without programming. I hope SQL will also evolve, bringing together the enhanced functionality from other approaches.

--gordon

6 comments:

  1. There's a one-pass equivalent of the UNION ALL example in standard SQL:

    select m, SUM(isstart), sum(isstop)
    FROM
    customer c,
    lateral(
    select month(c.start_date),1,0
    union all
    select month(c.stop_date),0,1
    ) dt(m,isstart,isstop)
    group by m

    The syntax for a "lateral derived table" varies from system to system. For example, the equivalent formulation in SQL Server would use a CROSS APPLY. It's also possible to put an arbitrary user-defined table function in the lateral, and therefore break out each row of c into however many rows you want.

    ReplyDelete
  2. Using the LATERAL keyword is indeed a clever method for possibly reducing the need for two passes.

    The LATERAL keyword basically allows a subquery in a FROM statement to reference earlier tables in the statement. However, using the keyword makes no specification about how the query is run. That is left to the optimizer.

    In this case, an optimizer could choose a one-pass method (as the commenter suggests). However, the opimizer could also rewrite the query as joins and unions that use two passes.

    User-defined table functions are another approach. Besides being highly database-specific, such an approach basically turns SQL from a declarative language into an imperative language. That is, SQL statements do not specify how to execute the query; they specify the desired result. User-defined table functions put the important data processing logic into the function, rather than in the database. One might ask if this approach is really better than MapReduce, since it is highly database specific and outside the scope of "typical" SQL functionality.

    ReplyDelete
  3. Passing twice isn't necessary.

    with data as
    (
    select to_char(start_date,'yyyymm') start_month,to_char(stop_date,'yyyymm') stop_month, count(*) custs
    from customer
    group by cube (to_char(start_date,'yyyymm'),to_char(stop_date,'yyyymm'))
    )
    select nvl(d1.start_month,d2.stop_month) month, nvl(d1.custs,0) started_customers, nvl(d2.custs,0) stopped_customers
    from data d1 full outer join data d2
    on d1.start_month = d2.stop_month
    where d1.stop_month is null
    and d2.start_month is null
    and not (d1.start_month is null and d2.stop_month is null)
    order by month
    /

    (Oracle SQL, but other databases implement cube too)

    ReplyDelete
  4. It is more elegant to do it with 'group by grouping sets' in SQL instead of cube.

    Cube and especially grouping sets makes it possible to do a group by on more than 1 key without passing the data more than once. So (Oracle) SQL can produce two or more keys per record too.

    with data as
    (
    select to_char(start_date,'yyyymm') start_month
    , to_char(stop_date,'yyyymm') stop_month
    , count(*) custs
    from customer
    group by grouping sets (to_char(start_date,'yyyymm'),to_char(stop_date,'yyyymm'))
    )
    select nvl(d1.start_month,d2.stop_month) month
    , nvl(d1.custs,0) started_customers
    , nvl(d2.custs,0) stopped_customers
    from data d1 full outer join data d2
    on d1.start_month = d2.stop_month
    where nvl(d1.start_month,d2.stop_month) is not null
    order by 1
    /

    RC

    ReplyDelete
  5. The previous poster made an excellent comment. GROUPING SETS provides much more powerful functionality for SQL than either basic aggregations or CUBE (from a performance perspective). They are much more akin to MapReduce. I discuss this in another blog entry (http://www.data-miners.com/blog/2008/12/mapreduce-and-sql-aggregations-using.html

    ReplyDelete
  6. Thanks for the Grouping Sets suggestion. I will try it out soon.

    ReplyDelete

Your comment will appear when it has been reviewed by the moderators.