该文转载自”http://www.cnblogs.com/taven/archive/2012/11/03/2752076.html“
什么是MapReduce
首先MapReduce它是两个英文单词组成的,Map表示映射,Reduce表示化简,它是一种编程模型,用于大规模数据集(大于1TB)的并行运算,主要思想来自函数式编程。
在Hadoop中,MapReduce过程分三个步骤:Map(主要是分解并行的任务)、Combine(主要是为了提高Reduce的效率)和Reduce(把处理后的结果再汇总起来) 。
MapReduce实现
接下来运行一个Hadoop作业的启动代码:
Job job = new Job(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
在运行一个Hadoop作业之前,先得指定MapperClass, CombinerClass,ReducerClass。
假设我们交给Hadoop去分析的一个文本内容为:
lixy csy lixy zmde nitamade hehe
realy amoeba woyou weibo hehe
好了,提供的内容很简单,就是3行文本,第1行文本包含n个单词,第2行是空的,第3行也包含n个单词,单词与单词之间用空格隔开,下面我们来看看MapperClass 是如何实现的,又是如何运行的呢?看看 TokenizerMapper 的代码:
public class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
System.out.println("TokenizerMapper.map...");
System.out.println("Map key:"+key.toString()+" Map value:"+value.toString());
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
String tmp = itr.nextToken();
word.set(tmp);
context.write(word, one);
System.out.println("tmp:"+tmp+" one:"+one);
}
System.out.println("context:"+context.toString());
}
}
这时要说一下”IntWritable one = new IntWritable(1);“的用意,因为我们不管一个单词会出现几次,只要出现,我们就计算1次,所以”context.write(word, one)”这行代码将一个单词写入的时候,值永远是1;
在运行的时候,根据你文件中内容的情况,上面的 map(Object key, Text value, Context context) 方法可能会被调用多次,将本例子提供的文件内容执行后,控制台输出内容如下(为了方便阅读,我添加了一些换行):
TokenizerMapper.map...
Map key:0 Map value:lixy csy lixy zmde nitamade hehe
tmp:lixy one:1
tmp:csy one:1
tmp:lixy one:1
tmp:zmde one:1
tmp:nitamade one:1
tmp:hehe one:1
context:org.apache.hadoop.mapreduce.Mapper$Context@1af0b4a3
TokenizerMapper.map...
Map key:34 Map value:
context:org.apache.hadoop.mapreduce.Mapper$Context@1af0b4a3
TokenizerMapper.map...
Map key:36 Map value:realy amoeba woyou weibo hehe
tmp:realy one:1
tmp:amoeba one:1
tmp:woyou one:1
tmp:weibo one:1
tmp:hehe one:1
context:org.apache.hadoop.mapreduce.Mapper$Context@1af0b4a3
IntSumReducer.reduce...
val.get():1
Reduce key:amoeba Reduce result:1
Reduce Context:org.apache.hadoop.mapreduce.Reducer$Context@d5d4de6 Result:1
IntSumReducer.reduce...
val.get():1
Reduce key:csy Reduce result:1
Reduce Context:org.apache.hadoop.mapreduce.Reducer$Context@d5d4de6 Result:1
IntSumReducer.reduce...
val.get():1
val.get():1
Reduce key:hehe Reduce result:2
Reduce Context:org.apache.hadoop.mapreduce.Reducer$Context@d5d4de6 Result:2
IntSumReducer.reduce...
val.get():1
val.get():1
Reduce key:lixy Reduce result:2
Reduce Context:org.apache.hadoop.mapreduce.Reducer$Context@d5d4de6 Result:2
IntSumReducer.reduce...
val.get():1
Reduce key:nitamade Reduce result:1
Reduce Context:org.apache.hadoop.mapreduce.Reducer$Context@d5d4de6 Result:1
IntSumReducer.reduce...
val.get():1
Reduce key:realy Reduce result:1
Reduce Context:org.apache.hadoop.mapreduce.Reducer$Context@d5d4de6 Result:1
IntSumReducer.reduce...
val.get():1
Reduce key:weibo Reduce result:1
Reduce Context:org.apache.hadoop.mapreduce.Reducer$Context@d5d4de6 Result:1
IntSumReducer.reduce...
val.get():1
Reduce key:woyou Reduce result:1
Reduce Context:org.apache.hadoop.mapreduce.Reducer$Context@d5d4de6 Result:1
IntSumReducer.reduce...
val.get():1
Reduce key:zmde Reduce result:1
Reduce Context:org.apache.hadoop.mapreduce.Reducer$Context@d5d4de6 Result:1
从TokenizerMapper的 map(Object key, Text value, Context context) 调用的信息输出情况可以分析出,文件内容中有两行,所以该方法一共调用了2次(因为TextInputFormat类型的,都是按行处理)。
每一行的内容会在value参数中传进来,也就是说每一行的内容都对应了一个key,这个key为此行的开头位置在本文件中的所在位置(所以第1行的key是0,第2行的key是34,第3行的key是36),一般为数字的。
在这个map方法中,我们可以加入一些自己的处理逻辑,比如根据空格来取得每个单词,然后我们需要将处理后的结果,写入到 context 参数中,便于hadoop处理完后续的处理逻辑。(这里我们需要注意的是“IntWritable one”变量都是数值1)。
接下来我们再看reduce的过程,先看看 IntSumReducer 的代码:
public class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
System.out.println("IntSumReducer.reduce...");
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
System.out.println("val.get():" + val.get());
}
result.set(sum);
context.write(key, result);
System.out.println("Reduce key:" + key.toString() + " Reduce result:" + result.get());
System.out.println("Reduce Context:" + context + " Result:" + result);
}
}
执行调用后,控制台输出如下内容:
IntSumReducer.reduce...
val.get():1
Reduce key:amoeba Reduce result:1
Reduce Context:org.apache.hadoop.mapreduce.Reducer$Context@6c04ab2f Result:1
IntSumReducer.reduce...
val.get():1
Reduce key:csy Reduce result:1
Reduce Context:org.apache.hadoop.mapreduce.Reducer$Context@6c04ab2f Result:1
IntSumReducer.reduce...
val.get():2
Reduce key:hehe Reduce result:2
Reduce Context:org.apache.hadoop.mapreduce.Reducer$Context@6c04ab2f Result:2
IntSumReducer.reduce...
val.get():2
Reduce key:lixy Reduce result:2
Reduce Context:org.apache.hadoop.mapreduce.Reducer$Context@6c04ab2f Result:2
IntSumReducer.reduce...
val.get():1
Reduce key:nitamade Reduce result:1
Reduce Context:org.apache.hadoop.mapreduce.Reducer$Context@6c04ab2f Result:1
IntSumReducer.reduce...
val.get():1
Reduce key:realy Reduce result:1
Reduce Context:org.apache.hadoop.mapreduce.Reducer$Context@6c04ab2f Result:1
IntSumReducer.reduce...
val.get():1
Reduce key:weibo Reduce result:1
Reduce Context:org.apache.hadoop.mapreduce.Reducer$Context@6c04ab2f Result:1
IntSumReducer.reduce...
val.get():1
Reduce key:woyou Reduce result:1
Reduce Context:org.apache.hadoop.mapreduce.Reducer$Context@6c04ab2f Result:1
IntSumReducer.reduce...
val.get():1
Reduce key:zmde Reduce result:1
Reduce Context:org.apache.hadoop.mapreduce.Reducer$Context@6c04ab2f Result:1
通过执行 reduce(Text key, Iterable