Shuffle机制
Map过程之后、Reduce方程之前,这段数据处理过程被称为 Shuffle
相关博客:
要求:将统计结果按照条件输出到不同文件中(分区)。
比如:将统计结果按照手机归属地不同省份输出到不同文件中(分区)
在Driver类中,通过添加如下配置手动改变一下分区,不改的话,数据少,看不出Shuffle过程
job.setNumReduceTasks(2);
运行一下可以发现,最后的结果出现了两个
下面是Shuffle的解析
来看一下之前写过的 Mapper
代码,其他的都是业务代码,所以这里和 Shuffle
有关的代码只有一行代码 context.write(outKey, outValue);
来分析一下这句话。首先从map方法的入参 Mapper<>.Context
中可以看出,context 对象的类是父类 Mapper
中的 Context
内部抽象类
该抽象类中没有对应的 write
方法,而该抽象类实现了 MapContext
接口,点进去接着看,发现这个接口也没有对应的方法,而且 MapContext
接口实现了 TaskInputOutputContext
接口
点进去发现,write
方法就在这个接口里,通过debug可以发现接口的实现类是 TaskInputOutputContextImpl
。
在该类中,write
方法对应的实现也只有一行代码,点进方法体中的 write
方法,跳转到了 RecordWriter
抽象类中,通过debug可以发现,这里调用的实现类是 org.apache.hadoop.mapred
包下的 MapTask.NewOutputCollector
内部类,实际调用的 write
方法也是该子类中的方法
注意,如下是重点!
点进 write
方法体中的 collect
方法,跳到了 MapOutputCollector
接口中。通过Debug可以发现,该接口的实现类是 MapTask.MapOutputBuffer
,也就是 MapTask
类的内部类 MapOutputBuffer
,实际调用的 collect
方法,也是这个类中的
提示
之前在工作流程分析中,知道了在环形缓冲区中,内存达到80%就开始 反向溢写
具体是怎么做的,可以去看 MapTask.MapOutputBuffer
类中的 init
方法
collect
方法的方法体不用多看,直接来看前面给该方法传过来的参数
前两个参数不用多说了,看一下第三个参数:partitioner.getPartition(key, value, partitions)
这个名为 partitioner 的 Partitioner
接口类型变量的实际对象可以去看 MapTask.NewOutputCollector
类的构造方法
可以看出,导致接口实现类不同的地方在于 partitions 变量,而这个变量又是 ReduceTask
的个数,这个的个数,我们之前设置过了
因为 partitions 变量大于1,所以这里他用了一个反射工具类,获取了一个实现类。通过debug可以看出,这里获取的实现类是 org.apache.hadoop.mapreduce.lib.partition
包下的 HashPartitioner
类
该类中的 getPartition
方法非常简单,key的hashcode 二进制逻辑与 Integer的最大值,再对 ReduceTask 的个数取模
在这里,用户无法控制哪个key存到哪个分区。