Wednesday, February 6, 2008

Using SQL to Emulate MapReduce Functionality

My previous blog entry explained that there are two ways that MapReduce functionality (here) is more powerful than SQL aggregations:


  • MapReduce implements functions using a full-fledged programming language. This is more powerful than the functions permitted in SQL.
  • MapReduce allows one row to be part of more than one aggregation group.
In fact, SQL can emulate this functionality, bringing it much closer to MapReduce's capabilities. This post discusses how SQL can emulate this functionality and then discusses why this might not be a good idea. (This discussion has been inspired by the rather inflammatory and inaccurate post here.)

First, let me note that the first limitation is not serious, because I assume that SQL can be extended by adding new scalar and aggregation user defined functions. Although more cumbersome than built-in programming constructs, the ability to add user defined functions does make it possible to add in a wide breadth of functionality.

The second strength can be emulated by assuming the existence of a table, which I'll call enumerate, that simply contains one column which contains numbers starting at 1.

How does such a table help us? Consider a table that has a start date and a stop date for each customer. The SQL code to count up the starts and the stops might look like:

SELECT thedate, SUM(isstart) as numstarts, SUM(isstop) as numstops
FROM ((SELECT start_date as thedate, 1 as isstart, 0 as issend
.......FROM customer c) union all
......(SELECT stop_date as thedate, 0 as isstart, 1 as issend
.......FROM customer c)) a
GROUP BY thedate
ORDER BY 1

This is inelegant, particularly as the expressions for the tables get more complicated -- imagine what happens if the customer table is actually a complicated set of joins and aggregations. In addition, we can see how expressing the SQL suggests that two full passes are needed through the table. Yuck!

Let's assume that we have the enumerate table. In this case, the same query could be expressed as:

SELECT (CASE WHEN e.i = 1 THEN start_date ELSE end_date END) as thedate,
.......SUM(CASE WHEN e.i = 1 THEN 1 ELSE 0 END) as numstarts,
.......SUM(CASE WHEN e.i = 2 THEN 1 ELSE 0 END) as numstops
FROM customer c CROSS JOIN
.....(SELECT * FROM enumerate WHERE i <= 2) e
GROUP BY (CASE WHEN e.i = 1 THEN start_date ELSE end_date END)
ORDER BY 1

This query is setting up a counter that lets us, conceptually, loop through each row in the table. The counter is set to take on two values. On the first pass through the loop, the query uses the start date; on the second, it uses the stop date. The result is the same as for the previous query. The SQL, though, is different because it does not express two passes through the data.

This example is simple. It is obvious how to extend it further, for instance, if there were more dates stored in each row. It should also be obvious how this can be expressed as map/reduce functions.

One of the most common places where MapReduce is used is for parsing text strings. Say we have a list of product descriptions that are like:

  • "green,big,square"
  • "red,small,square"
  • "grey"
  • "medium,smelly,cube,cotton"
The idea here is that the description strings consist of any number of comma separate values. Now, let's say that our goal is to count the number of times that each keyword appears in a set of products. The first thought is that something like this really cannot be done in SQL. So, to give a help, let's assume that there are two helper functions:

  • NumWords(string, sepchar): This function takes a string and a separate character and returns the number of words in the string.
  • GetWord(string, sepchar, i): This function takes a string, a separator character, and a word number and returns the word in the string.
For instance, for the examples above, NumWords() and GetWord() return the following using comma as a separator and when called with 1, 2, 3, and so on:


  • 3 and "green", "big", and "square"
  • 3 and "red", "small", "square"
  • 1 and "grey"
  • 4 and "medium", "smelly", "cube", "cotton"

These functions are not difficult to write in a procedural programming language.

With such functions, the SQL to count up the attributes in our products looks like:

SELECT GetWord(p.desc, ',', e.i) as attribute, COUNT(*)
FROM product p JOIN
.....enumerate e
.....ON e.i <= NumWords(p.desc, ',')
GROUP BY GetWord(p.desc, ',', e.i)
ORDER BY 2 DESC

The structure of this query is very similar to the previous query. The one difference is that each row has a different loop counter, because there are a different number of words in any given product description. Hence, the two tables are joined using a standard inner join operator, rather than a cross join.

An enumerate table, in conjunction with user defined functions, can give SQL much of the functionality of MapReduce.

One objection to the previous example is that such a database structure violates many rules of good database design. Such a product description string is not normalized, for instance. And packing values in strings is not a good design practice. My first reaction is that the real world is filled with similar examples, so regardless of what constitutes good design, we still need to deal with it.

A more honest answer is that the world is filled with strings that contain useful information -- description strings, URLs, and so on. SQL cannot just ignore such data, or dismiss it as not being normalized.

There is a problem: performance. One way to do the join is to create an intermediate table that is the cross product (or a large subset of the cross product) of the two tables. Of course, such an intermediate table is equivalent to reading the first table twice, so we have not gained anything.

This is likely to happen in the first case. Without more information, SQL is likely to do nested loop joins. If the customer table is the outer loop, then each customer row is read and duplicated, once with i=1 and the second time with i=2. This is not actually so bad. The original row is actually read once and then processed multiple times in memory for each value of i.

Of course, there is no guarantee that the SQL engine won't put the enumerate table as the outer loop, which requires two full reads of the customer table.

The situation becomes worse if the data is partitioned in a parallel environment. This is important, because MapReduce's advantage is that it always runs in parallel.

The SQL engine is likely to run a nested loop join on a single processor, even if the customer table is partitioned (or if the database is configured to be "multithreaded" or "multiprocessor"). Only a very smart optimizer would figure out that the enumerate table could be duplicated and distributed so the nested loop join could run in parallel.

The optimization problem is even worse in the second case, because the number of rows needed from enumerate varies for different products. Of course, database directives were invested to tell SQL optimizers how to do joins. I would prefer that the database have the enumerate table built-in, so the optimize can take full advantage of it.

Much of MapReduce's functionality comes from the ability to give a singel row multiple aggregation keys, while running in parallel. Even on large datasets, we can set up SQL to solve many problems that MapReduce does by combining user-defined functions, the enumerate table, and appropriate compiler directives so the large joins are done in parallel.

--gordon

2 comments:

  1. Hi Gordon,

    Great post! This is one of the most interesting observations I've seen on the Web in many months.

    I think it can be done with SSIS/UDFs in 2008 and sparse tables.

    ReplyDelete
  2. One can do it in Oracle SQL this way:

    create table productx
    (id number, description clob)
    /

    create or replace type t_vc2_50 as table of varchar2(50)
    /

    select b.column_value token
    , count(*)
    from
    (select ','||description||',' txt
    from productx ) t
    ,table ( cast ( multiset
    ( select substr (txt,
    instr (txt, ',', 1, level ) + 1,
    instr (txt, ',', 1, level+1)
    - instr (txt, ',', 1, level) -1 ) token
    from dual
    connect by level <= length(t.txt)
    -length(replace(t.txt,',',''))-1
    ) as t_vc2_50 ) ) b
    group by b.column_value
    /


    The connect by level <= .. is a way to generate a table with numbers from 1 to n. (the enumerate table)

    One could also use user defined funtions.

    RC

    ReplyDelete

Your comment will appear when it has been reviewed by the moderators.