Shuffle机制
Map过程之后、Reduce方程之前,这段数据处理过程被称为 Shuffle
相关博客:
要求:将统计结果按照条件输出到不同文件中(分区)。
比如:将统计结果按照手机归属地不同省份输出到不同文件中(分区)
在Driver类中,通过添加如下配置手动改变一下分区,不改的话,数据少,看不出Shuffle过程
job.setNumReduceTasks(2);运行一下可以发现,最后的结果出现了两个
下面是Shuffle的解析
来看一下之前写过的 Mapper 代码,其他的都是业务代码,所以这里和 Shuffle 有关的代码只有一行代码 context.write(outKey, outValue);
来分析一下这句话。首先从map方法的入参 Mapper<>.Context 中可以看出,context 对象的类是父类 Mapper 中的 Context 内部抽象类
该抽象类中没有对应的 write 方法,而该抽象类实现了 MapContext 接口,点进去接着看,发现这个接口也没有对应的方法,而且 MapContext 接口实现了 TaskInputOutputContext 接口
点进去发现,write 方法就在这个接口里,通过debug可以发现接口的实现类是 TaskInputOutputContextImpl。
在该类中,write 方法对应的实现也只有一行代码,点进方法体中的 write 方法,跳转到了 RecordWriter 抽象类中,通过debug可以发现,这里调用的实现类是 org.apache.hadoop.mapred 包下的 MapTask.NewOutputCollector 内部类,实际调用的 write 方法也是该子类中的方法
注意,如下是重点!
点进 write 方法体中的 collect 方法,跳到了 MapOutputCollector 接口中。通过Debug可以发现,该接口的实现类是 MapTask.MapOutputBuffer,也就是 MapTask 类的内部类 MapOutputBuffer,实际调用的 collect 方法,也是这个类中的
提示
之前在工作流程分析中,知道了在环形缓冲区中,内存达到80%就开始 反向溢写
具体是怎么做的,可以去看 MapTask.MapOutputBuffer 类中的 init 方法
collect 方法的方法体不用多看,直接来看前面给该方法传过来的参数
前两个参数不用多说了,看一下第三个参数:partitioner.getPartition(key, value, partitions)
这个名为 partitioner 的 Partitioner 接口类型变量的实际对象可以去看 MapTask.NewOutputCollector 类的构造方法
可以看出,导致接口实现类不同的地方在于 partitions 变量,而这个变量又是 ReduceTask 的个数,这个的个数,我们之前设置过了
因为 partitions 变量大于1,所以这里他用了一个反射工具类,获取了一个实现类。通过debug可以看出,这里获取的实现类是 org.apache.hadoop.mapreduce.lib.partition 包下的 HashPartitioner 类
该类中的 getPartition 方法非常简单,key的hashcode 二进制逻辑与 Integer的最大值,再对 ReduceTask 的个数取模
在这里,用户无法控制哪个key存到哪个分区。