合并 Combiner
Combiner
是MR程序中Mapper
和Reducer
之外的一种组件。Combiner
组件的父类就是Reducer
。Combiner
和Reducer
的区别在于运行的位置Combiner
是在每一个MapTask
所在的节点运行Reducer
是接收全局所有Mapper
的输出结果
Combiner
的意义就是对每一个MapTask
的输出进行局部汇总,以减小网络传输量Combiner
能够应用的前提是不能影响最终的业务逻辑。而且,Combiner
的输出kv应该跟Reducer
的输入kv类型要对应起来。
Combiner
可以理解为是 Reducer
之前的预处理。之前的求和都是在Reducer
做的,如果能把求和先在Mapper
做一下,就会提高Reducer
的效率
案例
这里还是用wordcount案例来做
输入文件:wordcount.txt
将合并的部分放到Combiner中来做,Reducer直接输出
警告
注意,不能没有Reducer,没有的话就没有一整个Shuffle的过程了
Combiner类
package com.mahe666.combiner;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* 这里泛型的四个参数
* 前两个是Mapper的输出类型
* 后两个是Reducer的输入类型
*/
public class WordCountCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable outValue = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
// 累加
sum += val.get();
}
outValue.set(sum);
// 写出
context.write(key, outValue);
}
}
Reducer类
package com.mahe666.combiner;
// 同样,下面这个包才是2.x和3.x版本的Hadoop的包
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* 泛型有四个参数
* 前两个是Combiner输出的类型
* 后两个是Reducer输出的类型
*/
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable outValue = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
for (IntWritable val : values) {
// 数据量小,只有这一个,循环只执行一次,所以直接写出就好
outValue.set(val.get());
}
// 写出
context.write(key, outValue);
}
}
Driver类
package com.mahe666.combiner;
// 导包不要导错,这个才是
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class WordCountDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
// 1.获取job
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 2.设置jar包路径
job.setJarByClass(WordCountDriver.class);
// 3.关联mapper和reducer
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
// 4.设置map输出的KV类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 5.设置最终输出的KV类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 设置Combiner
job.setCombinerClass(WordCountCombiner.class);
// 6.设置输入路径和输出路径
FileInputFormat.setInputPaths(job, new Path("C:\\Users\\MaHe666\\Desktop\\wordcount.txt"));
// 这里的路径是个文件夹
FileOutputFormat.setOutputPath(job, new Path("D:\\output"));
// 7.提交job
// 源码当中用的就是 waitForCompletion(),该方法中已经有submit()了
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
总结
其实Combiner的逻辑和Reducer的逻辑差不多
实际情况中如果需要使用Combiner可以根据业务逻辑来判断是否需要单独写一个Combiner,不需要的话可以直接拿Reducer类去指定Combiner