Sunday, November 29, 2009

Hadoop and MapReduce: Switching to 0.20 and Cloudera

Recently, I decided to switch from Hadoop 0.18 to 0.20 for several reasons:
  1. I'm getting tired of using deprecated features -- it is time to learn the new interface.
  2. I would like to use some new features, specifically MultipleInputFormats.
  3. The Yahoo! Virtual Machine (which I recommended in my first post) is not maintained, whereas the Cloudera training machine is.
  4. And, for free software, I have so far found the Cloudera community support quite effective.
I chose the Cloudera Virtual Machine for a simple reason: it was recommended by Jeff, who works there and describes himself as "a big fan of [my data mining] books". I do not know if there are other VMs that are available, and I am quite happy with my Cloudera experience so far. Their community support provided answers to key questions, even over the Thanksgiving long weekend.

That said, there are a few downsides to the upgrade:
  • The virtual machine has the most recent version of Eclipse (called Ganymede), which does not work with Hadoop.
  • Hence, the virtual machine requires using command lines for compiling the java code.
  • I haven't managed to get the virtual machine to share disks with the host (instead, I send source files through gmail).
The rest of this post explains how I moved the code that assigns consecutive row numbers (from my previous post) to Hadoop 0.20. It starts with details about the new interface and then talks about updating to the Cloudera virtual machine.


Changes from Hadoop 0.18 to 0.20

The updated code with the Hadoop 0.20 API is in RowNumberTwoPass-0.20.java.

Perhaps the most noticeable change is the packages. Before 0.20, Hadoop used classes in a package called "mapred". Starting with 0.20, it uses classes in "mapreduce". These have a different interface, although it is pretty easy to switch from one to the other.

The reason for this change has to do with future development for Hadoop. This change will make it possible to separate releases of HDFS (the distributed file system) and releases of MapReduce. The following are packages that contain the new interface:

import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.map.*;
import org.apache.hadoop.mapreduce.lib.reduce.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;

In the code itself, there are both subtle and major code differences. I have noticed the following changes in the Map and Reduce classes:
  • The classes now longer need the "implements" syntax.
  • The function called before the map/reduce is now called setup() rather than configure().
  • The function called after the map/reduce is called cleanup().
  • The functions all take an argument whose class is Context; this is used instead of Reporter and OutputCollector.
  • The map and reduce functions can also throw InterruptedException.
The driver function has more changes, caused by the fact that JobConf is no longer part of the interface. Instead, the work is set up using Job. Variables and values are passed into the Map and Reduce class through Conf rather than JobConf. Also, the code for the Map and Reduce classes is added in using the call Job.setJarByClass().

There are a few other minor coding differences. However, the code follows the same logic as in 0.18, and the code ran the first time after I made the changes.


The Cloudera Virtual Machine

First, I should point out that I have no connection to Cloudera, which is a company that makes money (or intends to make money) by providing support and training for Hadoop.

The Cloudera Virtual Machine is available here. It requires running a VMWare virtual machine, which is available here. Between the two, these are about 1.5 Gbytes, so have a good internet connection when you want to download them.

The machine looks different from the Yahoo! VM, because it runs X rather than just a terminal interface. The desktop is pre-configured with a terminal, Eclipse, Firefox, and perhaps some other stuff. When I start the VM, I open the terminal and run emacs in the background. Emacs is a text editor that I know well from my days as a software programmer (more years ago than I care to admit). To use the VM, I would suggest that you have some facility with either emacs or VI.

The version of Hadoop is 0.20.1. Note that as new versions are released, Cloudera will probably introduce new virtual machines. Any work you do on this machine will be lost when you replace the VM with a newer version. As I said, I am sending source files back and forth via gmail. Perhaps you can get the VM to share disks with the host machine. The libraries for Hadoop are in /usr/lib/Hadoop-2.0.

Unfortunately, the version of Eclipse installed in the VM does not fully support Hadoop (if you want to see the bug reports, google something like "Hadoop Ganymede"). Fortunately, you can use Eclipse/Ganymede to write code, and it does full syntax checking. However, you'll have to compile and run the code outside the Eclipse environment. I believe this is a bug in this version of Eclipse, which will hopefully be fixed sometime in the near future.

I suppose that I could download the working version of Eclipse (Europe, which is, I think, version 3.2). But, that was too much of a bother. Instead, I learned to use the command line interface for compiling and running code.


Compiling and Running Programs

To compile and run programs you will need to use command line commands.

To build a new project, create a project in Eclipse by creating a new java project. The one thing it needs is a pointer to the Hadoop 0.20 libraries (actually "jars"). To install a pointer to this library, do the following after creating the project:
  • Right click on the project name and choose "Properties".
  • Click on "Java Build Path" and go to the "Libraries" tab.
  • Click on "Add External JARs".
  • Navigate to /usr/lib/hadoop-0.20 and choose hadoop-0.20.1+133-core.jar.
  • Click "OK" on the windows until you are out.
You'll see the new library in the project listing.

Second, you should create a package and then source code in the package.

After you have created the project, you can compile and run the code from a command line by doing the following.
  • Go to the project directory (~/workshop/).
  • Issue the following command: "javac -cp /usr/lib/hadoop-0.20/hadoop-0.20.1+133-core.jar -d bin/ src/*/*.java" [note: there is a space after "bin/"].
  • Create the jar: "cd bin; jar ../ cvf */*; cd ..". So, for the RowNumberTwoPass command, I use: "cd bin; jar cvf ../RowNumberTwoPass.jar */*; cd ..".
  • Run the code using the command: "hadoop jar RowNumberTwoPass.jar RowNumberTwoPass/rownumbertwopass". The first argument after "hadoop jar" is the jar file with the code. The second is the class and package where the main() function is located.
Although this seems a little bit complicated, it is only cumbersome the first time you run it. After that, you have the commands and running them again is simple.

Wednesday, November 25, 2009

Hadoop and MapReduce: A Parallel Program to Assign Row Numbers

This post discusses (and solves) the problem of assigning consecutive row numbers to data, with no holes. Along the way, it also introduces some key aspects of the Hadoop framework:
  • Using the FileSystem package to access HDFS (a much better approach than in my previous posting).
  • Reading configuration parameters in the Map function.
  • Passing parameters from the main program to the Map and Reduce functions.
  • Writing out intermediate results from the Map function.
These are all important functionality for using the Hadoop framework. In addition, I plan on using this technique for assigning unique ids to values in various columns.


The "Typical" Approach

The "typical" approach is to serialize the problem, by creating a Reducer function that adds the row number. By limiting the framework to only a single reducer (using setNumReduceTasks(1) in the JobConf class), this outputs the row number.

There are several problems with this solution. The biggest issue is, perhaps, aesthetic. Shouldn't a parallel framework, such as Hadoop, be able to solve such a simple problem? Enforced serialization is highly inefficient, since the value of Hadoop is in the parallel programming capabilities enabled when multiple copies of maps and reduces are running.

Another issue is the output file. Without some manual coding, the output is a single file, which may perhaps be local to a single cluster node (depending on how the file system is configured). This can slow down subsequent map reduce tasks that use the file.


An Alternative Fully Parallel Approach


There is a better way, a fully parallel approach that uses two passes through the Map-Reduce framework. Actually, the full data is only passed once through the framework, so this is a much more efficient alternative to the first approach.

Let me describe the approach using three passes through the data, since this makes for a simpler explanation (the actual implementation combines the first two steps).

The first pass through the data consists of a Map phase that assigns a new key to each row and no Reduce phase. The key is consists of two parts: the partition id and the row number within the partition.

The second pass counts the number of rows in each partition, by extracting the maximum row number with each partition key.

These counts are then combined to get cumulative sums of counts up to each partition. Although I could do this in the reduce step, I choose not to (which I'll explain below). Instead, I do the work in the main program.

The third pass adds the offset to the row number and outputs the results. Note that the number of map tasks in the first task can be different from the number in subsequent passes, since the code always uses the original partition number for its calculations.


More Detail on the Approach -- Pass 1

The code is available in this file RowNumberTwoPass.java. It contains one class with two Map phases and one Reduce phase. This code assumes that the data is stored in a text file. This assumption simplifies the code, because I do not have to introduce any auxiliary classes to read the data. However, the same technique would work for any data format.

The first map phase, NewKeyOutputMap, does two things. The simpler thing is to output the parition id and the row number within the partition for use in subsequent processing. The second is to save a copy of the data, with this key, for the second pass.

Assigning the Partition ID

How does any Map function figure out its partition id? The partition id is stored in the job configuration, and is accessed using the code:

....partitionid = conf.getInt("mapred.task.partition", 0);

In the version of Hadoop that I'm using (0.18.3, through the Yahoo virtual machine), the job configuration is only visible to a configuration function. This is an optional function that can be defined when implementing an instance of the MapReduceBase class. It gets called once to initialize the environment. The configuration function takes one argument, the job configuration. I just store the result in a static variable local to the NewOutputKeyMap class.

In more recent versions of Hadoop, the configuration is available in the context argument to the map function.

Using Sequence Files in the Map Phase

The second task is to save the original rows with the new key values. For this, I need a sequence file. Or, more specifically, I need a different sequence file for each Map task. Incorporating the partition id into the file name accomplishes this.

Sequence files are data stores specific to the Hadoop framework, which contain key-value pairs. At first, I found them a bit confusing: Did the term "sequence file" refer to a collection of files available to all map tasks or to a single instance of one of these files? In fact, the term refers to a single instance file. To continue processing, we will actually need a collection of sequence files, rather than a single "sequence file".

They are almost as simple to use as any other files, as the following code in the configuration() function shows:

....FileSystem fs = FileSystem.get(conf);
....sfw = SequenceFile.createWriter(fs, conf,
........new Path(saverecordsdir+"/"+String.format("records%05d", partitionid.get())),
........Text.class, Text.class);

The first statement simply retrieves the appropriate file system for creating the file. The second statement uses the SequenceFile.createWriter() function to open the file and save the id in the sfw variable. There are several versions of this function, with various additional options. I've chosen the simplest version. The specific file will go in the directory referred to by the variable saverecordsdir. This will contains a series of files with the names "records#####" where ##### is a five-digit, left-padded number.

This is all enclosed in try-catch logic to catch appropriate exceptions.

Later in the code, the map() writes to the sequence file using the logic:

....sfw.append(outkey, value);

Very simple!

Pass1: Reduce and Combine Functions

The purpose of the reduce function is to count the number of rows in each partition. Instead of counting, the function actually takes the maximum of the partition row count. By taking this approach, I can use the same function for both reducing and combining.

For efficiency purposes, the combine phase is very important to this operation. The way the problem is structured, the combine output should be a single record for each map instance -- and sending this data around for the reduce phase should incur very little overhead.


More Detail on the Approach -- Offset Calculation and Pass 2

At the end of the first phase, the summary key result files contains a single row for each partition, containing the number of rows in each partition. For instance, from my small test data, the data looks like:


0 2265

1 2236

2 3

The first column is the partition id, the second is the count. The offset is the cumulative sum of previous values. So, I want this to be:


0 2265 0

1 2236 2265

2 3 4501

To accomplish this, I read the data in the main loop, after running the first job. The following loop in main() gets the results, does the calculation, and saves the results as parameters in the job configuration:

....int numvals = 0;
....long cumsum = 0;
....FileStatus[] files = fs.globStatus(new Path(keysummaryoutput+ "/p*"));
....for (FileStatus fstat : files) {
........FSDataInputStream fsdis = fs.open(fstat.getPath());
........String line = "";
........while ((line = fsdis.readLine()) != null) {
............finalconf.set(PARAMETER_cumsum_nthvalue + numvals++, line + "\t" + cumsum);
............String[] vals = line.split("\t");
............cumsum += Long.parseLong(vals[1]);
........}
....}
....finalconf.setInt(PARAMETER_cumsum_numvals, numvals);

Perhaps the most interesting part of this code is the use of the function fs.globStatus() to get a list of HDFS files that match wildcards (in this case, anything that starts with "p" in the keysummaryouput directory).


Conclusion

Parallel Map-Reduce is a powerful programming paradigm, that makes it possible to solve many different types of problems using parallel dataflow constructs.

Some problems seem, at first sight, to be inherently serial. Appending a sequential row number onto each each row is one of those problems. After all, don't you have to process the previous row to get the number for the next row? And isn't this a hallmark of inherently serial problems?

The answers to these questions are "no" and "not always". The algorithm described here should scale to very large data sizes and very large machine sizes. For large volumes of data, it is much, much more efficient than the serial version, since all processing is in parallel. That is almost true. The only "serial" part of the algorithm is the calculation of the offsets between the passes. However, this is such a small amount of data, relative to the overall data, that its effect on overall efficiency is negligible.

The offsets are passed into the second pass using the JobConfiguration structure. There are other ways of passing this data. One method would be to use the distributed data cache. However, I have not learned how to use this yet.

Another distribution method would be to do the calculations in the first pass reduce phase (by using only one reducer in this phase). The results would be in a file. This file could then be read by subsequent map tasks to extract the offset data. However, such an approach introduces a lot of contention, because suddenly there will be a host of tasks all trying to open the same file -- contention that can slow processing considerably.

Saturday, November 21, 2009

Hadoop and MapReduce: Controlling the Hadoop File System from theMapReduce Program

[This first comment explains that Hadoop really does have a supported interface to the hdfs file system, though the FileSystem package ("import org.apache.hadoop.fs.FileSystem"). Yeah! I knew such an interface should exist -- and even stumbled across it myself after this post. Unfortunately, there is not as simple an interface for the "cat" operation, but you can't have everything.]

In my previous post, I explained some of the challenges in getting a Hadoop environment up and running. Since then, I have succeeding in using Hadoop both on my home machine and on Amazon EC2.

In my opinion, one of the major shortcomings of the programming framework is the lack of access to the HDFS file system from MapReduce programs. More concretely, if you have attempted to run the WordCount program, you may have noticed that you can run it once without a problem. The second time you get an error saying that the output files already exist.

What do you do? You go over to the machine running HDFS -- which may or may not be your development machine -- and you delete the files using the "hadoop fs -rmr" command. Can't java do this?

You may also have noticed that you cannot see the output. Files get created, somewhere. What fun. To see them, you need to use the "hadoop fs -cat" command. Can't java do this?

Why can't we create a simple WordCount program that can be run multiple times in a row, without error, and that prints out the results? And, to further this question, I want to do all the work in java. I don't want to work with an additional scripting language, since I already feel that I've downloaded way too many tools on my machine to get all this to work.

By the way, I feel that both of these are very, very reasonable requests, and the hadoop framework should support them. It does not. For those who debate whether hadoop is better or worse than parallel databases, recognize that the master process in parallel databases typically support functionality similar to what I'm asking for here.

Why is this not easy? Java, Hadoop, and the operating systems seem to conspire to prevent this. But I like challenge. This posting, which will be rather long, is going to explain my solution. Hey, I'll even include some code so other people don't have to suffer through the effort.

I want to do this on the configuration I'm running from home. This configuration consists of:
  • Windows Vista, running Eclipse
  • Ubuntu Linux virtual machine, courtesy of Yahoo!, running Hadoop 0.18
However, I also want the method to be general and work regardless of platform. So, I want it to work if I write the code directly on my virtual machine, or if I write the code on Amazon EC2. Or, if I decide to use Karmasphere instead of Eclipse to write the code, or if I just write the code in a Java IDE. In all honesty, I've only gotten the system to work on my particular configuration, but I think it would not be difficult to get it to work on Unix.


Overview of Solution


The overview of the solution is simple enough. I am going to do the following:
  • Create a command file called "myhadoopfs.bat" that I can call from java.
  • Write a class in java that will run this bat file with the arguments to do what I want.
Boy, that's simple. NOT!

Here are a sample of the problems:
  • Java has to call the batch file without any path. This is because Windows uses the backslash to separate directories whereas Unix uses forward slashes. I lose platform independence if I use full paths.
  • The batch file has to connect to a remote machine. Windows Vista does not have a command to do this. Unix uses the command "rsh".
  • The java method for executing commands (Runtime.getRuntime().exec()) does not execute batch files easily.
  • The java method for executing commands hangs, after a few lines are output. And, the lines could be in either the standard output stream (stdout) or the error output stream (stderr), and it is not obvious how to read both of them at the same time.
This post is going to resolve these problems, step by step.


What You Need

To get started, you need to do a few things to your computer so everything will work.

First, install the program PuTTY (from here). Actually, choose the option for "A Windows installer for everything except PuTTYtel". You can accept all the defaults. As far as I know, this runs on all versions of Windows.

Next, you need to change the system path so it can find two things by default:
  • The PuTTY programs.
  • The batch file you are going to write.
The system path variable specifies where the operating system looks for executable files, when you have a command prompt, or when you execute a command from java.

Decide on the directory where you want the batch file. I chose "c:\users\gordon".

To change the system path, to the the "My Computer" or "Computer" icon on your desktop and right click to get "Properties" and then choose "Advanced System Settings". Click on the "Environment Variables" button. And scroll down to find "Path" in the variables. Edit the "Path" variable.

BE VERY CAREFUL NOT TO DELETE THE PREVIOUS VALUES IN THE PATH VARIABLE!!! ONLY ADD ONTO THEM!!!

At the end of the path variable, I appended the following (without the double quotes): ";c:\Program Files (x86)\PuTTY\;c:\users\gordon". The part after the second semicolon should be where you want to put your batch file. The first part is where the putty commands are located (which may vary on different versions of Windows).

Then, I found that I had to reboot my machine in order for Eclipse to know about the new path. I speculate that this is because there is a java program running somewhere that picks up the path when it starts, and this is where Eclipse gets the path. If I'm correct, all that needs to be done is to restart that program. Rebooting the machine was easier than tracking down a simpler solution.


Test the Newly Installed Software

The equivalent of rsh in this environment is called plink. To see if things work, you need the following:
  • IP address of the other machine. On a Unix system, you can find this using either "ipconfig" or "ifconfig". In my case, the IP address is 192.168.65.128. This is the address of the virtual machine, but this should work even if you are connecting to a real machine.
  • The user name to login as. In my case, this is "hadoop-user", which is provided by the virtual machine.
  • The password. In my case, this is "hadoop".
Here is a test command to see if you get to the right machine:
  • plink -ssh -pw hadoop hadoop-user@192.168.65.128 hostname
If this works by returning the name of the machine you are connecting to, then everything is working correctly. In my case, it returns "hadoop-desk".

Since we are going to be connecting to the hadoop file system, we might as well test that as well. I noticed that the expected command:
  • plink -ssh -pw hadoop hadoop-user@192.168.65.128 hadoop fs -ls
Does not work. This is because the Unix environment is not initializing the environment properly, so it cannot find the command. On the Yahoo! virtual machine, the initializations are in the ".profile" file. So, the correct command is:
  • plink -ssh -pw hadoop hadoop-user@192.168.65.128 source .profile; hadoop fs -ls
Voila! That magically seems to work, indicating that we can, indeed, connect to another machine and run the hadoop commands.


Write the Batch File

I call the batch file "myhadoop.bat". This file contains the following line:

"c:\Program Files (x86)\PuTTY\plink.exe" -ssh -pw %3 %2@%1 source .profile; hadoop fs %4 %5 %6 %7 %8 %9

This file takes the following arguments in the following order:
  • host ip address (or hostname, if it can be resolved)
  • user name
  • password
  • commands to be executed (in arguments %4 though %9)
Yes, the password is in clear text. If this is a problem, learn about PuTTY ssh with security and encryption.

You can test this batch file in the same way you tested plink.


Write a Java Class to Run the Batch File

This is more complicated than it should be for two reasons. First, the available exec() command does not execute batch files. So, you need to use "cmd /q /c myhadoop.bat" to run it. This invokes a command interpreter to run the command (the purpose of the "/c" option). It also does not echo the commands being run, courtesy of the "/q" option.

The more painful part is the issue with stdout and stderr. Windows blocks a process when either of these buffers are full. What that means is that your code hangs, without explanation, rhyme, or reason. This problem, as well as others, are explained and solved in this excellent article, When Runtime.exec() won't.

The solution is to create separate threads to read each of the streams. With the example from the article, this isn't so hard. It is available in this file: HadoopFS.java.

Let me explain a bit how this works. The class HadoopFS has four fields:
  • command is the command that is run.
  • exitvalue is the integer code returned by the running process. Typically, processes return 0 when they are successful and an error code otherwise.
  • stdout is a list of strings containing the standard output.
  • stderr is a list of strings containing the standard error.
Constructing an object requires a string. This is the part of the hadoop command that appears after the "fs". So, for "hadoop fs -ls", this would be "-ls". As you can see, this could be easily modified to run any command, either under Windows or on the remote box, but I'm limiting it to Hadoop fs commands.

This file also contains a private class called threadStreamReader. (Hmmm, I don't think I have the standard java capitalization down, since classes often start with capital letters.) This is quite similar to the StreamGobbler class in the above mentioned article. The difference is that my class stores the strings in a data structure instead of writing them to the console.


Using the HadoopFS Class

At the beginning of this posting, I said that I wanted to do two things: (1) delete the output files before running the Hadoop job and (2) output the results. The full example for the WordCount drive class is in this file:
WordCount.java.

To delete the output files, I use the following code before the job is run:

....HadoopFS hdfs_rmr = new HadoopFS("-rmr "+outputname);
....hdfs_rmr.callCommand();

I've put the name of the output files in the string outputname.

To show the results, I use:

....HadoopFS hdfs_cat = new HadoopFS("-cat "+outputname+"/*");
....hdfs_cat.callCommand();
....for (String line : hdfs_cat.stdout) {
........System.out.println(line);
....}

This is pretty simple and readable. More importantly, they seem to work.


Conclusion


The hadoop framework does not allow us to do some rather simple things. There are typically three computing environments when running parallel code -- the development environment, the master environment, and the grid environment. The master environment controls the grid, but does not provide useful functionality for the development environment. In particular, the master environment does not give the development environment critical access to the parallel distributed files.

I want to develop my code strictly in java, so I need more control over the environment. Fortunately, I can extend the environment to support the "hadoop fs" commands in the development environment. I believe this code could easily be extended for the Unix world (by writing appropriate "cmd" and "myhadoop.bat" files). This code would then be run in exactly the same way from the java MapReduce code.

This mechanism is going to prove much more powerful than merely affecting the aesthetics of the WordCount program. In the next post, I will probably explain how to use this method to return arbitrary data structures between MapReduce runs.

Wednesday, November 18, 2009

Getting Started with Hadoop and MapReduce

Over the past week, I have been teaching myself Hadoop -- a style of programming made popular by Google (who invented the parallel version of MapReduce), Yahoo! (who created the open source version called Hadoop), and Amazon (who provides cloud computing resources called EC2 for running the software).

My purpose is simple: one of our clients has a lot of web log data on Amazon S3 (which provides lots of cheap data storage). This data can be analyzed on EC2, using Hadoop. Of course, the people who put the data up there are busy running the web site. They are not analysts/data miners. Nor do they have much bandwidth to help a newbie get started. So, this has been a do-it-yourself effort.

I have some advice for anyone else who might be in a similar position. Over the past few days, I have managed to turn my Windows Vista laptop into a little Hadoop machine, where I can write code in an IDE (that means "interactive development editor", which is what you program in ), run it on a one-node hadoop cluster (that is, my laptop), and actually get results.

There are several ways to get started with Hadoop. Perhaps you work at a company that has Hadoop running on one or more clusters or has a cluster in the cloud. You can just talk to people where you are and get started.

You can also endeavor to install Hadoop yourself. There is a good chance that I will attempt this one day. However, it is supposed to be a rather complicated process. And, all the configuration and installation is a long way from data.

My preferred method is to use a Hadoop virtual machine. And I found a nice one through the Yahoo Hadoop Tutorial (and there are others . . . I would like to find one running the most recent version of Hadoop).

I have some advice, corrections and expectations for anyone else who wants to try this.

(1) Programming languages.

Hopefully you already know some programming languages, preferably of the object-oriented sort. If you are a java programmer, kudos to you, since hadoop is written in java. However, I found that I can struggle through java with my rush knowledge of C++ and C#. (This post is *not* about my complaints about java.)

For the purposes of this discussion, I do not consider SAS or SPSS to be worthy programming languages. You should be familiar with ideas such as classes, constructors, static and instance variables, functions, and class inheritance. You should also be willing to read through long run-time error messages, whose ultimate meaning is something like "I was expecting a constructor with no arguments" or "I received a Text type when I was expecting an IntWritable."


(2) Unix Familiarity

In the same way that Hadoop was developed using java, it was developed on a Unix environment. This means that you should be familiar with Unix shell commands (you know, "ls" versus "dir", "rm" versus "del", and so on). There is a rumor that a version of Hadoop will run under Windows. I am sure, though, that trying to install open source Unix-based software under Windows is going to be a major effort; so I chose the virtual machine route.

By the way, if you want to get Unix shell commands on your Windows box, then use cygwin! It provides the common Unix commands. Just download the most recent version from www.cygwin.com.


(3) Use Yahoo!'s Hadoop Tutorial (which is here)

This has proven invaluable in my efforts. Although I have a few corrections and clarifications, which I'll describe below.


(4) Be Prepared to Load Software! (And have lots of disk space)

I have loaded the following software packages on my computer, to make this work:
  • VMWare Player 3.0 (from VM Ware). This is free software that allows any computer to run a virtual desktop in another operating system.
  • Hadoop VM Appliance (from Yahoo). This has the information for running a Hadoop virtual machine.
  • Eclipse SDE, version 3.3.2 (from the Eclipse project archives). Note that the Tutorial suggests version 3.3.1. Version 3.3.2 works. Anything more recent is called "Ganymede", and it just doesn't work with Hadoop.
  • Java Development Kit from Sun.
These are all multi-hundred megabyte zipped files.


(5) Getting VMWare to Share Folders between the host and virtual machine

In order to share folders (the easiest way to pass data back and forth), you need to install VMWare Tools. VMWare has instructions here. However, it took me a while to figure out what to do. The real steps are:

(a) Under VM-->Settings go to the Hardware tab. Set the CD/DVD to be "connected" and use the ISO image file. My path is "C:\Program Files (x86)\VMware\VMware Player\linux.iso", but I believe that it appeared automatically, after a pause. This makes the virtual machine think it is reading from a CD when it is really reading from this file.

(b) Run the commands to mount and extract files
Login or su to root and then run the following:
mount /cdrom
cd /tmp
tar zxf /cdrom/vmware-freebsd-tools.tar.gz
umount /cdrom

(c) Do the installation
I accepted all the defaults when it asked a question.

cd vmware-tools-distrib
./vmware-install.pl
vmware-config-tools.pl

(d) Set up a shared folder

Go to VM --> Settings... and go to the Options tab. Enable the shared folders. Choose an appropriate folder on the host machine (I chose "c:\temp") and give it a name on the virtual machine ("temp").

(e) The folder is available on the virtual machine at /mnt/hgfs/temp.


(6) Finding the Hadoop Plug-In for Eclipse

The Tutorial has the curious instructions in the third part "In the hadoop-0.18.0/contrib/eclipse-plugin directory on this CD, you will find a file named hadoop-0.18.0-eclipse-plugin.jar. Copy this into the plugins/ subdirectory of wherever you unzipped Eclipse." These are curious because there is no CD.

You will find this directory on the Virtual Machine. Go to it. Copy the jar file to the shared folder. Then go to the host machine and copy it to the described place.


(7) The code examples may not work in the Tutorial

I believe the following code is not correct in the driver code:
   FileInputPath.addInputPath(conf, new Path("input"));
FileOutputPath.addOutputPath(conf, new Path("output"));

The following works:
   conf.setInputPath(new Path("input"));
conf.setOutputPath.addOutputPath(new Path("output"));

(8) Thinking the Hadoop Way

Hadoop is a different style of programming, because it is distributed. Actually, there are three different "machines" that it uses.

The first is your host machine, which is where you develop code. Although you can develop software on the Hadoop machine, the tools are better on your desktop.

The second is the Hadoop machine, which is where you can issue the commands related to Hadoop. In particular, the command "hadoop" provides access to the parallel data. This machine has a shared drive with the host machine, which you can use to move files back and forth.

The data used by the programs, though, is in a different place, the Hadoop Distributed File System (HDFS). To move data between HDFS and the virtual machine, use the "hadoop fs" command. Using the shared folder you can move it to the local machine or anywhere.

--gordon

Friday, November 13, 2009

From Item Sets to Association Rules Using Chi-Square

In Data Analysis Using SQL and Excel, I introduce the chi-square metric for evaluating association rules. This posting extends that discussion, to explain how to use the chi-square metric for generating rules.

An association rule is of the form: left-hand-side --> right-hand-side (or, alternatively, LHS --> RHS). The left hand side consists of one or more items, and the right-hand side consists of a single item. A typical example of an association rule is "graham crackers plus chocolate bars implies marshmallows", which may readers will recognize that the recipe for a childhood delight called smores.

Association rules are not only useful for retail analysis. They are also useful for web analysis, where we are trying to track the parts of a web page where people go. I have also seen them used in financial services and direct marketing.

The key to understanding how the chi-square metric fits in is to put the data into a contingency table. For this discussion, let's consider that we have a rule of the form LHS --> RHS, where each side consists of one item. In the following table, the numbers A, B, C, D represent counts:



RHS-present RHS-absent

LHS-present A B

LHS-absent C D

A is the count where the LHS and RHS items are both present. B has the LHS item but not the RHS item, and so on. Different rules have different contingency tables. We choose the best one using the chi-square metric (described in Chapter 3 of the above book). This tells us how unusual these counts are. In other words, the chi-square metric is a measure of how unlikely the counts that are measured are due to a random split of the data.

Once we get a contingency table, though, we still do not have a rule. A contingency table really has four different rules:
  • LHS --> RHS
  • LHS --> not RHS
  • not LHS --> RHS
  • not LHS --> not RHS
(Or, another way of saying this is that these rules all generate the same contingency table.) How can we choose which rule is the best one?

In this case, we'll choose the rule based on how much better they do than just guessing. This is called the lift or improvement for a rule. So, the rule LHS --> RHS is correct for A/(A+B) of the records: the LHS is true for A+B records, and for A of these, the rule is true.

Overall, simply guessing that RHS is true would be correct for (A+C)/(A+B+C+D) of the records. The ratio of these is the lift for the rule. A lift greater than 1 indicates that the rule does better than guessing; a lift less than 1 indicates that guessing is better.

The following are the ratios for the four possible rules in the table:
  • (A/(A+B))/((A+C)/(A+B+C+D))
  • (B/(A+B))/((A+C)/(A+B+C+D))
  • (C/(C+D))/((B+D)/(A+B+C+D))
  • (D/(C+D))/((B+D)/(A+B+C+D))
When choosing among these, choose the one with highest lift.

The process for choosing rules is to choose the item sets based on the highest chi-square value. And then to choose the rules using the best lift.

This works well for rules with a single item on each side. What do we do for more complicated rules, particularly ones with more items in the left hand side? One method would be to extend the chi-square test to multiple dimensions. I am not a fan of the multidimensional chi-square test, as I've explained in another blog.

In this case, we just consider rules with a single item on the RHS side. So, if an item set has four items, a, b, c, and d, then we would consider only the rules:
  • a+b+c --> d
  • a+b+d --> c
  • a+c+d --> b
  • b+c+d --> a
We are ignoring possibilities such as a+b-->c+d.

Each of these rules can now be evaluated using the chi-square metric, and then the best rule chosen using the lift of the rule.

Friday, November 6, 2009

Oversampling in General

Dear Data Miners,

I am trying to find out statistical reasons for balancing data sets when building models with binary targets, and nobody is able to intelligently describe why it is being done. In fact, there are mixed opinions on sampling when the response rate is low.

Based on literature and data mining professional opinions, here are few versions (assume that the response rate is 1%):

1) As long as the number of responders is approximately equal or greater than 10 times the number variables included, no additional sampling is needed.

2) Oversample or undersample (based on the total number of observations) at least until the response rate = 10%.

3) Oversample or undersample (based on the total number of observations) until the response rate = 50%.

4) Undersampling is useful only for cutting down on processing time; really no good reason to do it statistically as long as the number of observations for responders is "sufficient" (% does not matter).


Having an advanced degree in mathematics but not being a statistician, I would like to understand whether there really is any statistical benefit in doing that.

I appreciate your time answering this.

Sincerely,

Your fellow data miner

Many years ago, I was doing a churn model for SK Telecom (in South Korea) using SAS Enterprise Miner. A friend of mine at SAS, Anne Milley, had suggested that having a 50% density for a binary response model would produce optimal models. Her reasoning was that with a 50% density of each target value, the contrast between the two values would be maximized, making it easier to pick out patterns in the data.

I spent some time testing decision trees with all sorts of different densities. To my surprise, the decision trees with more than 30% density performed better than trees with lower densities, regardless of the splitting criterion and other factors. This convinced me that 50% is not a bad idea.

There is a reason why decision trees perform better on balanced samples. The standard pruning algorithm for decision trees uses classification as the metric for choosing subtrees. That is, a leaf chooses its dominant class -- the one in excess of 50% for two classes. This works best when the classes are evenly distributed in the data. (Why data mining software implementing trees doesn't take the original density into account is beyond me.)

In addition, the splitting criteria may be more sensitive to deviations around 50% than around other values.

Standard statistical techniques are insensitive to the original density of the data. So, a logistic regression run on oversampled data should produce essentially the same model as on the original data. It turns out that the confidence intervals on the coefficients do vary, but the model remains basically the same.

Hmmm, as I think about it, I wonder if the oversampling rate would affect stepwise or forward selection of variables. I could imagine that, when testing each variable, the variance in results using a rare target would be larger than the variance using a balanced model set. This, in turn, might lead to a poorer choice of variables. But I don't know if this is the case.

For neural networks, the situation is more complicated. Oversampling does not necessarily improve the neural network -- there is no theoretical reason why. However, it does allow the network to run on a smaller set of data, which makes convergence faster. This, in turn, allows the modeler to experiment with different models. Faster convergence is a benefit in other ways.

Some other techniques such as k-means clustering and nearest neighbor approaches probably do benefit from oversampling. However, I have not investigated these situations in detail.

Because I am quite fond of decision trees, I prefer a simple rule, such as "oversample to 50%", since this works under the maximum number of circumstances.

In response to your specific questions, I don't think that 10% is a sufficient density. If you are going to oversample, you might as well go to 50% -- there is at least an elegant reason why (the contrast idea between the two response values). If you don't have enough data, then use weights instead of oversampling to get the same effect.

In the end, though, if you have the data and you have the software, try out different oversampling rates and see what produces the best models!

Wednesday, November 4, 2009

Scoring Association Rules

At M2009 (SAS's data mining conference), I was approached with the question of scoring association rules for customers. This is not a topic I have thought about very much. More typically, association rules are used qualitatively or to understand products. I hadn't thought about assigning the "best" rule (or rules) back to customers.

As a reminder, association rules provide information about items that are purchased at the same time. For example, we might find that marshmallows and chocolate bars imply graham crackers. The "marshmallows" and "chocolate bars" are the left hand side of the rule (LHS) and the graham crackers is the right hand side (RHS). The presumption is that when graham crackers are missing from a shopper's basket, then they should be there.

Most data mining software, such as SAS Enterprise Miner, SQL Server Data Mining, and SPSS Clementine, can be used to generate association rules. I prefer to calculate the rules myself using database technology, using code similar to that in Data Analysis Using SQL and Excel.

However, data mining tools do not provide the ability to score association rules for individual customers. Neither is this is a topic that I discuss in my book. My goal here is to discuss scoring rules in databases. This is because scoring is computationally expensive. Because databases can take advantage of indexing and parallel processing, they offer scope to make the score more efficient.

Hmmm, what does scoring association rules even mean? Scoring is the process of finding the best rule that a customer matches, either for a single RHS or for all possible RHSs. In the former case, the result is one rule. In the latter, it is an array of rules, for each possible RHS.

An association rule is traditionally defined by three metrics: support, confidence, and lift (as well as a fourth, the chi-square metric, which I prefer). For the purposes of this discussion, the best rule is the one with the highest confidence.

The simplistic way of doing such scoring is by considering each rule for each customer, to determine which rules apply to each customer. From the set that do apply, do some work to find the best one.

Imagine that we have a table, rules, with the following columns:
  • The number of LHS items (we assume there is 1 RHS item);
  • The RHS item.
  • The LHS items, as a string: "item1;item2;..."
There is another table, custitem, containing each customer and each item as a separate row.

The following query find all matching rules for each customer in the innermost subquery, by counting the number of items matched on the left hand side. The outer query then finds the rule (for each RHS) that has the maximum confidence, using SQL window functions.

SELECT cr.*
FROM (SELECT customerid, r.rhs, r.ruleid,

. ...........(MAX(r.confidence) OVER (PARTITION BY customerid, rhs)
.............) as maxconfidence
......FROM (SELECT ci.customerid, r.rhs, r.ruleid,
..
.................COUNT(*) as nummatches,
............FROM custitem ci CROSS JOIN
.................rules r
............WHERE CHARINDEX(ci.item||';', r.lhs) > 0
............GROUP BY ci.customerid, r.rhs, r.ruleid
............HAVING COUNT(*) = MAX(r.numlhs)
...........) matchrules JOIN
...........rules r
...........ON matchrules.ruleid = rules.ruleid
......) cr
WHERE confidence = maxconfidence

This query is expensive, as you might guess from the use of CROSS JOIN. And, its performance gets longer particularly as the number of rules gets larger (and presumably the number of customers is larger still).

It is possible to make it more efficient, by doing tricks, such as:
  • If there are a few number of items, then the LHS could be encoded using bits. This eliminates the need for string matching.
  • The rules can be pruned, so only the rules with the highest confidence are kept.
And, although this cannot be done in SQL, the rules could be ordered by confidence (for each RHS) from highest to lowest. The first match would then stop the search.

An alternative method requires storing the rules in two tables. The first is rules, containing descriptive information about each rule, such as:
  • ruleid;
  • rhs; and,
  • numlhs.
The second is ruleitem, which contains each item in the rules. Incidentally, this is more in keeping with the spirit of normalization in relational databases.

The subquery for the scoring now changes to a join. This is useful, because it means that we can use database mechanisms -- such as indexing and table partitioning -- to speed it up.

SELECT cr.*
FROM (SELECT customerid, r.rhs, r.ruleid,
.............(MAX(r.confidence) OVER (PARTITION BY customerid, rhs)
.............) as maxconfidence
......FROM (SELECT ci.customerid, r.rhs, r.ruleid,
...................COUNT(*) as nummatches, MAX(numlhs) as numlhs
............FROM custitem ci JOIN
.................ruleitems ri
.................ON ci.item = ri.item JOIN
.................rule r
.................ON ri.ruleid = ri.ruleid
............GROUP BY ci.customerid, r.rhs, r.ruleid
............HAVING COUNT(*) = MAX(r.numlhs)
...........) matchrules JOIN
...........rules r
...........ON matchrules.ruleid = rules.ruleid
......) cr
WHERE confidence = maxconfidence

Of course, such an approach makes a difference only when you need to score many customers and you have many rules. This same approach can be used for looking at a single product in the RHS or at several at one time. Of course, this would require summarizing the multiple products at the customer level in order to append the desired information on the customer record.