Hadop Reklam

Sponsor Reklam

Tuesday, August 6, 2013

K-Means with Hadoop-like interface



We explain the streaming functionality of MRStreamer by looking at the K-Means algorithm. The details of the algorithm have already been discussed in the following article:

Joos-Hendrik Böse, Artur Andrzejak, Mikael Högqvist: Beyond Online Aggregation: Parallel and Incremental Data Mining with Online MapReduce,
ACM Workshop on Massive Data Analytics over the Cloud (MDAC 2010) at WWW2010, Raleigh, North Carolina, USA, April 26, 2010.

Essence of the algorithm

We state the essence of the iterative K-Means algorithm to help readers understand the MapReduce source code shown below.

The algorithm works iteratively in several steps, which are going to address in the following:
In the first step, the mappers read their share of the input data and compress the original data set into a smaller data set, the so-called auxiliary clusters. These auxiliary clusters help to represent the original data in case of a limited size of main memory.
Each mapper creates k initial clusters from these auxiliary clusters, which are later sent to the reducer.
The (single) reducer merges the clusters from each mapper and recomputes the centroids of all k clusters.
These centroids are now streamed back to the original mappers via a broadcast operation.
Each mapper can now use the new centroids to reassign its auxiliary clusters to these centroids. The mappers send their local clusters back to the reducer.
The reducer will merge the clusters again and recompute the centroids.
This procedure is repeated until the reducer decides to stop resending data to the mappers. This usually happens when the algorithm converges.

Illustration

The figure below illustrates the online and iterative K-Means algorithm implemented with MRStreamer.





Source code of K-Means

This source code and auxiliary classes can be found in the distribution file in the "examples" directory.

package examples;

import algorithms.kmeans.Cluster;
import algorithms.kmeans.Clusters;
import algorithms.kmeans.SamplesCache;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobContext;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.mahout.math.DenseVector;
import org.apache.mahout.math.DenseVectorWritable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.StringTokenizer;

public class KMeansHadoop {

private final static Logger LOG = LoggerFactory.getLogger(KMeansHadoop.class);

public static class KMeansMapper extends
MRMapper<LongWritable, Text, IntWritable, Clusters, Clusters> {

private SamplesCache cache = new SamplesCache(500);
private int cacheSize = 10000;
private Clusters clusters = null;
private int k = 0;
private int nextCentroidToInit = 0;

/**
* Configures the mapper by reading two configuration options:
* - "numClusters": the k in k-Means
* - "numAuxClusters": the number of in-memory auxiliary clusters representing the input data
*
* @param context the mapper context, used to access the configuration
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
Configuration conf = context.getConfiguration();
this.k = conf.getInt("numCluster", 5);
this.clusters = new Clusters(k);
this.cacheSize = conf.getInt("numAuxCluster", 500);
this.cache = new SamplesCache(cacheSize);
}

/**
* Maps the input lines to initial centroids and, as a side-effect, stores auxiliary clusters representing the
* input data in memory
*
* @param key the key provided by the input format, not used here
* @param value one line of the input; input format: one data point per line, vector components delimited by spaces
* @param context the mapper context used to send initial centroids to the reducer
* @throws IOException
* @throws InterruptedException
*/
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {

// Input format: one data point per line, components delimited by spaces
final List<Double> doubleValues = new ArrayList<Double>();
final StringTokenizer tk = new StringTokenizer(value.toString());
while(tk.hasMoreElements()) {
final String token = tk.nextToken();
doubleValues.add(Double.parseDouble(token));
}

double[] dv = new double[doubleValues.size()];
for(int i=0; i<doubleValues.size(); i++) {
dv[i] = doubleValues.get(i);
}
DenseVector dvec = new DenseVector(dv);
DenseVectorWritable sample = new DenseVectorWritable(dvec);

// add sample to local auxiliary clusters
this.cache.addSample(sample);

// first k points are chosen as initial centroids
if (nextCentroidToInit < k) {
this.clusters.set(nextCentroidToInit, new Cluster(sample, sample));
this.nextCentroidToInit += 1;
} else if (nextCentroidToInit == k) {
// send initial centroids to reducer
context.write(new IntWritable(0), this.clusters);
this.nextCentroidToInit += 1;
}
}

/**
* Remaps the input data when a new set of preliminary clusters is received from the reducer by recalculating
* the assignment of the local input data, as represented by the auxiliary clusters, to the preliminary clusters
* and sends the updated centroids to the reducer.
* @param cs the preliminary clusters computed by the reducer
* @param context the mapper context used to send the locally recomputed centroids to the reducer
* @throws IOException
* @throws InterruptedException
*/
public void remap(List<Clusters> cs, Context context) throws IOException, InterruptedException {
LOG.info("Remapping preliminary clusters");
// set the preliminary clusters as new clusters
this.clusters = cs.get(0).clone();
this.clusters.reset();
// reassign the local input data, represented by the auxiliary clusters, to the clusters, thereby readjusting
// the clusters centroids
this.cache.reAssignAll(clusters);
// send the locally updated clusters to the reducer
context.write(new IntWritable(0), this.clusters);
}
}

public static class KMeansReducer extends
MRReducer<IntWritable, Clusters, IntWritable, Clusters, Clusters> {

private double lastError = Double.MAX_VALUE;
private float epsilon = Float.MAX_VALUE;

/**
* Configures the mapper by reading the configuration option "epsilon": The minimum change of the MSE needed to
* trigger a new iteration.
*
* @param context the reducer context, used to access the configuration
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void setup(Context context) throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
epsilon = conf.getFloat("epsilon", 100f);
}

/**
* Reduces a list of clusters locally computed by the mappers into a preliminary global set of clusters, which
* is then restreamed to the mappers, or, iff the MSE of the global set of clusters has not changed by more than
* epsilon since the last reduce invocation ends the iteration by emiting the final set of clusters.
*
* @param key the key set by the mapper, not used here
* @param values the list of locally computed clusters computed by the mappers
* @param context the reducer context, used to restream preliminary clusters to the mappers and emit the final
* clusters
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void reduce(IntWritable key, Iterable<Clusters> values,
MRReduceContext<IntWritable, Clusters, IntWritable, Clusters, Clusters> context) throws IOException, InterruptedException {

// Merge the list of clusters into one set of clusters
Clusters results = null;
for(Clusters clusters : values) {
if( results == null ) {
results = clusters;
} else {
results.merge(clusters);
}
}

Double error = results.getMSE();

LOG.info("Last error " + lastError + ", current error " + error);

if (lastError < Double.MAX_VALUE &&
error <= lastError + epsilon &&
error >= lastError - epsilon) {
// MSE has changed by less than epsilon: Emit final result
context.write(new IntWritable(0), results);
LOG.info("Final result written.");
} else {
// MSE has changed by more than epsilon: Send recomputed preliminary clusters to mappers to start a new
// iteration
this.lastError = error;
results.computeNewCentroids();
context.restream(results);
LOG.info("Preliminary result restreamed.");
}
}

}

/**
* Executes the streaming Hadoop MapReduce program
* @param args first arg is input path, second arg is output path
* @throws Exception
*/
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();

conf.setBoolean("mrstreamer.hadoop.streaming", true);

// has to be 1 to ensure the algorithm producing valid results
conf.setInt(JobContext.NUM_REDUCES, 1);

conf.setInt(JobContext.NUM_MAPS, 4);

conf.set("numCluster", "5");
conf.set("numAuxCluster", "500");

Job job = new MRSJob(conf, "kmeanshadoop");

job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(Clusters.class);

job.setMapperClass(KMeansMapper.class);
job.setReducerClass(KMeansReducer.class);

job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);

FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

job.waitForCompletion(true);
}

}

Monday, August 5, 2013

Hadoop Example: WordCount v1.0 Part-3

Walk-through

The WordCount application is quite straightforward.
The Mapper implementation (lines 14-26), via the map method (lines 18-25), processes one line at a time, as provided by the specified TextInputFormat(line 49). It then splits the line into tokens separated by whitespaces, via the StringTokenizer, and emits a key-value pair of <word, 1>.
For the given sample input the first map emits: < Hello, 1> < World, 1> < Bye, 1> < World, 1>
The second map emits: < Hello, 1> < Hadoop, 1> < Goodbye, 1> < Hadoop, 1>
We’ll learn more about the number of maps spawned for a given job, and how to control them in a fine-grained manner, a bit later in the tutorial.
WordCount also specifies a combiner (line 46). Hence, the output of each map is passed through the local combiner (which is same as the Reducer as per the job configuration) for local aggregation, after being sorted on the keys.
The output of the first map: < Bye, 1> < Hello, 1> < World, 2>
The output of the second map: < Goodbye, 1> < Hadoop, 2> < Hello, 1>
The Reducer implementation (lines 28-36), via the reduce method (lines 29-35) just sums up the values, which are the occurence counts for each key (that is, words in this example).
Thus the output of the job is: < Bye, 1> < Goodbye, 1> < Hadoop, 2> < Hello, 2> < World, 2>
The run method specifies various facets of the job, such as the input/output paths (passed via the command line), key-value types, input/output formats etc., in the JobConf. It then calls the JobClient.runJob (line 55) to submit the and monitor its progress.
We’ll learn more about JobConfJobClientTool, and other interfaces and classes a bit later in the tutorial.

Hadoop Example: WordCount v1.0 Part-2

Usage

Compile WordCount.java:
$ mkdir wordcount_classes $ javac -cp classpath -d wordcount_classes WordCount.java
where classpath is:
  • CDH4 - /usr/lib/hadoop/*:/usr/lib/hadoop/client-0.20/*
  • CDH3 - /usr/lib/hadoop-0.20/hadoop-0.20.2-cdh3u4-core.jar
Create a JAR:
$ jar -cvf wordcount.jar -C wordcount_classes/ .
Assuming that:
  • /user/cloudera/wordcount/input - input directory in HDFS
  • /user/cloudera/wordcount/output - output directory in HDFS
Create sample text files as input and move to HDFS:
$ echo "Hello World Bye World" > file0
$ echo "Hello Hadoop Goodbye Hadoop" > file1
$ hadoop fs -mkdir /user/cloudera /user/cloudera/wordcount /user/cloudera/wordcount/input
$ hadoop fs -put file* /user/cloudera/wordcount/input
Run the application:
$ hadoop jar wordcount.jar org.myorg.WordCount /user/cloudera/wordcount/input /user/cloudera/wordcount/output
Output:
$ hadoop fs -cat /user/cloudera/wordcount/output/part-00000
Bye 1
Goodbye 1
Hadoop 2
Hello 2
World 2
Applications can specify a comma-separated list of paths that would be present in the current working directory of the task using the option -files. The -libjars option allows applications to add JARs to the classpaths of the maps and reduces. The -archives allows them to pass archives as arguments that are unzipped/unjarred and a link with name of the zip/JAR are created in the current working directory of tasks. More details about the command line options are available at Hadoop Command Guide.
Running wordcount example with -libjars and -files:
hadoop jar hadoop-examples.jar wordcount -files cachefile.txt -libjars mylib.jar input output

Hadoop Example: WordCount v1.0 Part-1

Source Code

package org.myorg;

import java.io.IOException;
import java.util.*;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;

public class WordCount {

  public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
      String line = value.toString();
      StringTokenizer tokenizer = new StringTokenizer(line);
      while (tokenizer.hasMoreTokens()) {
        word.set(tokenizer.nextToken());
        output.collect(word, one);
      }
    }
  }

  public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
    public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
      int sum = 0;
      while (values.hasNext()) {
        sum += values.next().get();
      }
      output.collect(key, new IntWritable(sum));
    }
  }

  public static void main(String[] args) throws Exception {
    JobConf conf = new JobConf(WordCount.class);
    conf.setJobName("wordcount");

    conf.setOutputKeyClass(Text.class);
    conf.setOutputValueClass(IntWritable.class);

    conf.setMapperClass(Map.class);
    conf.setCombinerClass(Reduce.class);
    conf.setReducerClass(Reduce.class);

    conf.setInputFormat(TextInputFormat.class);
    conf.setOutputFormat(TextOutputFormat.class);

    FileInputFormat.setInputPaths(conf, new Path(args[0]));
    FileOutputFormat.setOutputPath(conf, new Path(args[1]));

    JobClient.runJob(conf);
  }
}

Sunday, August 4, 2013

Hadoop and Solid State Drives

Is there a story for the Hadoop Storage Stack (HDFS+HBase) on Solid State Drive (SSD)? This is a question that I have been asked by quite a few people in the last two days, mostly by people at the OpenComputeSummit. This piece discusses the possible use cases of using SSD with Hadoop or HBase.

Use Case
Currently, there are two primary use cases for HDFS: data warehousing using map-reduce and a key-value store via HBase. In the data warehouse case, data is mostly accessed sequentially from HDFS, thus there isn't much benefit from using a SSD to store data. In a data warehouse, a large portion of queries access only recent data, so one could argue that keeping the last few days of data on SSDs could make queries run faster. But most of our map-reduce jobs are CPU bound (decompression, deserialization, etc) and bottlenecked on map-output-fetch; reducing the data access time from HDFS does not impact the latency of a map-reduce job. Another use case would be to put map outputs on SSDs, this could potentially reduce map-output-fetch times, this is one option that needs some benchmarking.

For the secone use-case, HDFS+HBase could theoretically use the full potential of the SSDs to make online-transaction-processing-workloads run faster. This is the use-case that the rest of this blog post tries to address.

Background
The read/write latency of data from a SSD is a magnitude smaller than the read/write latency of a spinning disk storage, this is especially true for random reads and writes. For example, a random read from a SSD takes about 30 micro-seconds while a random read from a spinning disk takes 5 to 10 milliseconds. Also, a SSD device can support 100K to 200K operations/sec while a spinning disk controller can possibly issue only 200 to 300 ops/sec. This means that random reads/writes are not a bottleneck on SSDs. On the other hand, most of our existing database technology is designed to store data in spinning disks, so the natural question is "can these databases harness the full potential of the SSDs"?  To answer the above question, we ran two separate artificial random-read workloads, one on HDFS and one on HBase. The goal was to stretch these products to the limit and establish their maximum sustainable throughput on SSDs.

HDFS random-read on cached data
In the first experiment, we created a HDFS cluster with a single NameNode and a single DataNode. We created a 2 GB HDFS file with a HDFS block size of 256 MB and a replication factor of 1. We configured the DataNode to run on a 16 hyper-threaded cores and it stored block-data on xfs. Our benchmark program was co-located on the DataNode machine and had hdfs-read-shortcircuit swicthed on, i.e. the DFSClient bypassed the DataNode and issued read-calls directly to the local xfs filesystem. The entire 2 GB of data was cached in the OS buffer cache and this benchmark did not trigger any IO from disk. The fact that all the data was in the OS cache essentially simulated the behavior of an ultra-fast SSD. We varied the number of client threads and each client thread did a pre-configured number of 16K read calls from HDFS. Since there were only 8 blocks in the file, the DFSClient cached all the block locations of all these 8 blocks and there were no repeatative calls to the NameNode. The first few iterations of this test showed that HDFS can sustain a max random-read-throughput of around 50K ops/sec, but surprisingly the CPU was not maxed out. We found that the read-shortcircuit code path spent considerable time in DNS lookup calls and updating metric-counters. We fixed these two pieces of code and observed that HDFS could sustain a peak random-read-throughput of around 92K ops/sec, the CPUs was now close to 95% usage. HdfsPreadImage is a plot that captures this scenario. The takeaway is that a database that is layered above HDFS would not be able to utilize all the iops offered by a single SSD.

A profiled run of the HDFS code shows that the DFSClient's code path are quite long and causes appreciable impact to throughput for cached random reads. If data-fetch times are in the millisecond range(from spinning disks), the long code paths in the DFSClient do not add appreciable overhead, but when the data is cached in the OS cache (or in SSDs), these code paths need some major rework. Another option would be to write a HDFS readonly-client in C or C++, thereby avoiding some of the overhead of the current Java-based DFSClient.

HBase random-get on cached data
In the second experiment, we did a similar experiment on HBase. We created a single table with a single region and all data was cached in the OS cache of a single HBase regionserver. The OS cache is simulating a super fast SSD device. We used a set of 4 client machines to drive random-get calls to the regionserver. The regionserver was configured to use a maximum of 2000 threads. The HBase table has lzo compression and delta-encoding-fast-diff enabled. Since the data set is cached in OS buffers, this benchmark does not cause any disk io from spinning disks. We saw that the HBase throughput  maxes out at around 35K ops/sec and we were not able to drive the CPU usage on that machine to more than 45%. Heavy lock contention and heavy context switching causes the regionserver to not be able to use all the available CPU on the machine. The detailed chart is at Cache4G.

What does this mean
The two experiments show that HBase+HDFS, as it stands today, will not be able to harness the full potential that is offered by SSDs. It is possible that some code restructuring could improve the random-read-throughput of these solutions but my guess is that it will need significant engineering time to make HBase+HDFS sustain a throughput of 200K ops/sec.

These results are not unique to HBase+HDFS. Experiments on other non-Hadoop databases show that they also need to be re-engineered to achieve SSD-capable throughputs. My conclusion is that database and storage technologies would need to be developed from scratch if we want to utilize the full potential of Solid State Devices. The search is on for there new technologies!