One of the claims made in this particular presentation is that "it can be necessary to send tons of data to each Mapper Node. Depending on your bandwidth and memory available, this could be impossible." This claim is false, which in turn removes much of the motivation for the alternative algorithm, which called "canopy clustering".

The K-Means Clustering Algorithm

There are many good introductions to k-means clustering available, including our book Data Mining Techniques for Marketing, Sales, and Customer Support. The Google presentation mentioned above provides a very brief introduction.

Let's review the k-means clustering algorithm. Given a data set where all the columns are numeric, the algorithm for k-means clustering is basically the following:

(1) Start with k cluster centers (chosen randomly or according to some specific procedure).

(2) Assign each row in the data to its nearest cluster center.

(3) Re-calculate the cluster centers as the "average" of the rows in (2).

(4) Repeat, until the cluster centers no longer change or some other stopping criterion has been met.

In the end, the k-means algorithm "colors" all the rows in the data set, so similar rows have the same color.

K-Means in a Parallel World

To run this algorithm, it seems, at first, as though all the rows assigned to each cluster in Step (2) need to be brought together to recalculate the cluster centers.

However, this is not true. K-Means clustering is an example of an embarrassingly parallel algorithm, meaning that that it is very well suited to parallel implementations. In fact, it is quite adaptable to both SQL and to MapReduce, with efficient algorithms. By "efficient", I mean that large amounts of data do not need to be sent around processors and that the processors have minimum amounts of communication. It is true that the entire data set does need to be read by the processors for each iteration of the algorithm, but each row only needs to be read by one processor.

A parallel version of the k-means algorithm was incorporated into the Darwin data mining package, developed by Thinking Machines Corporation in the early 1990s. I do not know if this was the first parallel implementation of the algorithm. Darwin was later purchased by Oracle, and became the basis for Oracle Data Mining.

How does the parallel version work? The data can be partitioned among multiple processors (or streams or threads). Each processor can read the previous iteration's cluster centers and assign the rows on the processor to clusters. Each processor then calculates new centers for its of data. Each actual cluster center (for the data across all processors) is then the weighted average of the centers on each processor.

In other words, the rows of data do not need to be combined globally. They can be combined locally, with the reduced set of results combined across all processors. In fact, MapReduce even contains a "combine" method for just this type of algorithm.

All that remains is figuring out how to handle the cluster center information. Let us postulate a shared file that has the centroids as calculated for each processor. This file contains:

- The iteration number.
- The cluster id.
- The cluster coordinates.
- The number of rows assigned to the cluster.

There are two ways to do this in the MapReduce framework. The first uses map, combine, and reduce. The second only uses map and reduce.

K-Means Using Map, Combine, Reduce

Before begining, a file is created accessible to all processors that contains initial centers for all clusters. This file contains the cluster centers for each iteration.

The Map function reads this file to get the centers from the last finished iteration. It then reads the input rows (the data) and calculates the distance to each center. For each row, it produces an output pair with:

- key -- cluster id;
- value -- coordinates of row.

- key is cluster
- value is number of records and average values of the coordinates.

The Reduce function (and one of these is probably sufficient for this problem regardless of data size and the number of Maps) calcualtes the weighted average of its input. Its output should be written to a file, and contain:

- the iteration number;
- the cluster id;
- the cluster center coordinates;
- the size of the cluster.

K-Means Using Just Map and Reduce

Using just Map and Reduce, it is possible to do the same things. In this case, the Map and Combine functions described above are combined into a single function.

So, the Map function does the following:

- Initializes itself with the cluster centers from the previous iteration;
- Keeps information about each cluster in memory. This information is the total number of records assigned to the cluster in the processor and the total of each coordinate.
- For each record, it updates the information in memory.
- It then outputs the key-value pairs for the Combine function described above.

K-Means Using SQL

Of course, one of my purposes in discussing MapReduce has been to understand whether and how it is more powerful than SQL. For fifteen years, databases have been the only data-parallel application readily available. The parallelism is hidden underneath the SQL language, so many people using SQL do not fully appreciate the power they are using.

An iteration of k-means looks like:

SELECT @iteration+1, cluster_id,

.......AVERAGE(d.data) as center

FROM (SELECT d.data, cc.cluster_id,

.............ROW_NUMBER() OVER (PARTITION BY d.data

................................ORDER BY DISTANCE(d.data, cc.center) as ranking

......FROM data d CROSS JOIN

.....(SELECT *

......FROM cluster_centers cc

......WHERE iteration = @iteration) cc

.....) a

WHERE ranking = 1

GROUP BY cluster_id

This code assumes the existence of functions or code for the AVERAGE() and DISTANCE() functions. These are placeholders for the correct functions. Also, it uses analytic functions. (If you are not familiar with these, I recommend my book Data Analysis Using SQL and Excel.)

The efficiency of the SQL code is determined, to a large extent, by the analytic function that ranks all the cluster centers. We hope that a powerful parallel engine will recognize that the data is all in one place, and hence that this function will be quite efficient.

A Final Note About K-Means Clustering

The K-Means clustering algorithm does require reading through all the data for each iteration through the algorithm. In general, it tends to converge rather quickly (tens of iterations), so this may not be an issue. Also, the I/O for reading the data can all be local I/O, rather than sending large amounts of data through the network.

For most purposes, if you are dealing with a really big dataset, you can sample it down to a fraction of its original size to get reasonable clusters. If you are not satisfied with this method, then sample the data, find the centers of the clusters, and then use these to initialize the centers for the overall data. This will probably reduce the number of iterations through the entire data to less than 10 (one pass for the sample, a handful for the final clustering).

When running the algorithm on very large amounts of data, numeric overflow is a very real issue. This is another reason why clustering locally, taking averages, and then taking the weighted average globally is beneficial -- and why doing sample is a good way to begin.

Also, before clustering, it is a good idea to standardize numeric variables (subtract the average and divide by the standard deviation).

--gordon

Check out my latest book Data Analysis Using SQL and Excel.

Nice explanation, thanks.

ReplyDeleteHowever I would correct the mention to canopy clustering as a "competitor" of kmeans, as this algorithm is supposed to be an initialization step for selecting kmeans initial centroids using a cheap metric (http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.34.4329&rep=rep1&type=pdf).

Can you please explain how the iteration works in MapReduce? I mean how to

ReplyDeleteinput map->output->reduce->input map->output->reduce and so on. It is soo complicated :(

Repeating Job creating....That's my way.

Deletein the Driver class, check the returned object of the submission expression:

DeletejobClient.runJob(job), it is a RunningJob. define a variable to hold this object and ask it for the values of the jobs counters. then you have to check if one of the counters values satisfies your condition and rerun the job with the same configuration or another one

Very nice explanation. I have a question though. Your implementation of "Using Just Map and Reduce" uses a Combiner too? This solution can be implemented only if you define your own RecordReader, so that each map function can have access to several records and keep them in memory for later emission. Thanks!

ReplyDelete