Saturday, November 21, 2009

Hadoop and MapReduce: Controlling the Hadoop File System from theMapReduce Program

[This first comment explains that Hadoop really does have a supported interface to the hdfs file system, though the FileSystem package ("import org.apache.hadoop.fs.FileSystem"). Yeah! I knew such an interface should exist -- and even stumbled across it myself after this post. Unfortunately, there is not as simple an interface for the "cat" operation, but you can't have everything.]

In my previous post, I explained some of the challenges in getting a Hadoop environment up and running. Since then, I have succeeding in using Hadoop both on my home machine and on Amazon EC2.

In my opinion, one of the major shortcomings of the programming framework is the lack of access to the HDFS file system from MapReduce programs. More concretely, if you have attempted to run the WordCount program, you may have noticed that you can run it once without a problem. The second time you get an error saying that the output files already exist.

What do you do? You go over to the machine running HDFS -- which may or may not be your development machine -- and you delete the files using the "hadoop fs -rmr" command. Can't java do this?

You may also have noticed that you cannot see the output. Files get created, somewhere. What fun. To see them, you need to use the "hadoop fs -cat" command. Can't java do this?

Why can't we create a simple WordCount program that can be run multiple times in a row, without error, and that prints out the results? And, to further this question, I want to do all the work in java. I don't want to work with an additional scripting language, since I already feel that I've downloaded way too many tools on my machine to get all this to work.

By the way, I feel that both of these are very, very reasonable requests, and the hadoop framework should support them. It does not. For those who debate whether hadoop is better or worse than parallel databases, recognize that the master process in parallel databases typically support functionality similar to what I'm asking for here.

Why is this not easy? Java, Hadoop, and the operating systems seem to conspire to prevent this. But I like challenge. This posting, which will be rather long, is going to explain my solution. Hey, I'll even include some code so other people don't have to suffer through the effort.

I want to do this on the configuration I'm running from home. This configuration consists of:
  • Windows Vista, running Eclipse
  • Ubuntu Linux virtual machine, courtesy of Yahoo!, running Hadoop 0.18
However, I also want the method to be general and work regardless of platform. So, I want it to work if I write the code directly on my virtual machine, or if I write the code on Amazon EC2. Or, if I decide to use Karmasphere instead of Eclipse to write the code, or if I just write the code in a Java IDE. In all honesty, I've only gotten the system to work on my particular configuration, but I think it would not be difficult to get it to work on Unix.


Overview of Solution


The overview of the solution is simple enough. I am going to do the following:
  • Create a command file called "myhadoopfs.bat" that I can call from java.
  • Write a class in java that will run this bat file with the arguments to do what I want.
Boy, that's simple. NOT!

Here are a sample of the problems:
  • Java has to call the batch file without any path. This is because Windows uses the backslash to separate directories whereas Unix uses forward slashes. I lose platform independence if I use full paths.
  • The batch file has to connect to a remote machine. Windows Vista does not have a command to do this. Unix uses the command "rsh".
  • The java method for executing commands (Runtime.getRuntime().exec()) does not execute batch files easily.
  • The java method for executing commands hangs, after a few lines are output. And, the lines could be in either the standard output stream (stdout) or the error output stream (stderr), and it is not obvious how to read both of them at the same time.
This post is going to resolve these problems, step by step.


What You Need

To get started, you need to do a few things to your computer so everything will work.

First, install the program PuTTY (from here). Actually, choose the option for "A Windows installer for everything except PuTTYtel". You can accept all the defaults. As far as I know, this runs on all versions of Windows.

Next, you need to change the system path so it can find two things by default:
  • The PuTTY programs.
  • The batch file you are going to write.
The system path variable specifies where the operating system looks for executable files, when you have a command prompt, or when you execute a command from java.

Decide on the directory where you want the batch file. I chose "c:\users\gordon".

To change the system path, to the the "My Computer" or "Computer" icon on your desktop and right click to get "Properties" and then choose "Advanced System Settings". Click on the "Environment Variables" button. And scroll down to find "Path" in the variables. Edit the "Path" variable.

BE VERY CAREFUL NOT TO DELETE THE PREVIOUS VALUES IN THE PATH VARIABLE!!! ONLY ADD ONTO THEM!!!

At the end of the path variable, I appended the following (without the double quotes): ";c:\Program Files (x86)\PuTTY\;c:\users\gordon". The part after the second semicolon should be where you want to put your batch file. The first part is where the putty commands are located (which may vary on different versions of Windows).

Then, I found that I had to reboot my machine in order for Eclipse to know about the new path. I speculate that this is because there is a java program running somewhere that picks up the path when it starts, and this is where Eclipse gets the path. If I'm correct, all that needs to be done is to restart that program. Rebooting the machine was easier than tracking down a simpler solution.


Test the Newly Installed Software

The equivalent of rsh in this environment is called plink. To see if things work, you need the following:
  • IP address of the other machine. On a Unix system, you can find this using either "ipconfig" or "ifconfig". In my case, the IP address is 192.168.65.128. This is the address of the virtual machine, but this should work even if you are connecting to a real machine.
  • The user name to login as. In my case, this is "hadoop-user", which is provided by the virtual machine.
  • The password. In my case, this is "hadoop".
Here is a test command to see if you get to the right machine:
  • plink -ssh -pw hadoop hadoop-user@192.168.65.128 hostname
If this works by returning the name of the machine you are connecting to, then everything is working correctly. In my case, it returns "hadoop-desk".

Since we are going to be connecting to the hadoop file system, we might as well test that as well. I noticed that the expected command:
  • plink -ssh -pw hadoop hadoop-user@192.168.65.128 hadoop fs -ls
Does not work. This is because the Unix environment is not initializing the environment properly, so it cannot find the command. On the Yahoo! virtual machine, the initializations are in the ".profile" file. So, the correct command is:
  • plink -ssh -pw hadoop hadoop-user@192.168.65.128 source .profile; hadoop fs -ls
Voila! That magically seems to work, indicating that we can, indeed, connect to another machine and run the hadoop commands.


Write the Batch File

I call the batch file "myhadoop.bat". This file contains the following line:

"c:\Program Files (x86)\PuTTY\plink.exe" -ssh -pw %3 %2@%1 source .profile; hadoop fs %4 %5 %6 %7 %8 %9

This file takes the following arguments in the following order:
  • host ip address (or hostname, if it can be resolved)
  • user name
  • password
  • commands to be executed (in arguments %4 though %9)
Yes, the password is in clear text. If this is a problem, learn about PuTTY ssh with security and encryption.

You can test this batch file in the same way you tested plink.


Write a Java Class to Run the Batch File

This is more complicated than it should be for two reasons. First, the available exec() command does not execute batch files. So, you need to use "cmd /q /c myhadoop.bat" to run it. This invokes a command interpreter to run the command (the purpose of the "/c" option). It also does not echo the commands being run, courtesy of the "/q" option.

The more painful part is the issue with stdout and stderr. Windows blocks a process when either of these buffers are full. What that means is that your code hangs, without explanation, rhyme, or reason. This problem, as well as others, are explained and solved in this excellent article, When Runtime.exec() won't.

The solution is to create separate threads to read each of the streams. With the example from the article, this isn't so hard. It is available in this file: HadoopFS.java.

Let me explain a bit how this works. The class HadoopFS has four fields:
  • command is the command that is run.
  • exitvalue is the integer code returned by the running process. Typically, processes return 0 when they are successful and an error code otherwise.
  • stdout is a list of strings containing the standard output.
  • stderr is a list of strings containing the standard error.
Constructing an object requires a string. This is the part of the hadoop command that appears after the "fs". So, for "hadoop fs -ls", this would be "-ls". As you can see, this could be easily modified to run any command, either under Windows or on the remote box, but I'm limiting it to Hadoop fs commands.

This file also contains a private class called threadStreamReader. (Hmmm, I don't think I have the standard java capitalization down, since classes often start with capital letters.) This is quite similar to the StreamGobbler class in the above mentioned article. The difference is that my class stores the strings in a data structure instead of writing them to the console.


Using the HadoopFS Class

At the beginning of this posting, I said that I wanted to do two things: (1) delete the output files before running the Hadoop job and (2) output the results. The full example for the WordCount drive class is in this file:
WordCount.java.

To delete the output files, I use the following code before the job is run:

....HadoopFS hdfs_rmr = new HadoopFS("-rmr "+outputname);
....hdfs_rmr.callCommand();

I've put the name of the output files in the string outputname.

To show the results, I use:

....HadoopFS hdfs_cat = new HadoopFS("-cat "+outputname+"/*");
....hdfs_cat.callCommand();
....for (String line : hdfs_cat.stdout) {
........System.out.println(line);
....}

This is pretty simple and readable. More importantly, they seem to work.


Conclusion


The hadoop framework does not allow us to do some rather simple things. There are typically three computing environments when running parallel code -- the development environment, the master environment, and the grid environment. The master environment controls the grid, but does not provide useful functionality for the development environment. In particular, the master environment does not give the development environment critical access to the parallel distributed files.

I want to develop my code strictly in java, so I need more control over the environment. Fortunately, I can extend the environment to support the "hadoop fs" commands in the development environment. I believe this code could easily be extended for the Unix world (by writing appropriate "cmd" and "myhadoop.bat" files). This code would then be run in exactly the same way from the java MapReduce code.

This mechanism is going to prove much more powerful than merely affecting the aesthetics of the WordCount program. In the next post, I will probably explain how to use this method to return arbitrary data structures between MapReduce runs.

3 comments:

  1. Hadoop does provide access to HDFS via the FileSystem API (http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/fs/FileSystem.html).

    So, to take the WordCount example from the Hadoop homepage, to clear the output directory you can modify your main program to run this:

    // this is taken from the WordCount example
    public static void main(String[] args) throws Exception {
    JobConf conf = new
    JobConf(WordCount.class);

    // delete contents of output dir
    FileSystem fs = FileSystem.get(conf);
    Path outputDir = new Path(args[1]);
    fs.delete(outputDir, true);

    // do the rest of the WordCount example
    }

    Similarly, you run "cat" using FileSystem.open() to retrieve an InputStream, and then use System.out.println() to print out the contents of the file.

    ReplyDelete
  2. where is the mapper and reduce class?

    ReplyDelete
  3. Hadoop the software does is bust that data into pieces that it then spreads across your different servers.

    Hadoop Development

    ReplyDelete

Your comment will appear when it has been reviewed by the moderators.