分区 自定义Partitioner
继续来看 org.apache.hadoop.mapreduce.lib.partition
包下的 HashPartitioner
类。想要自定义 Partitioner
,来模仿它去写就好
自定义 Partitioner
类,步骤如下
- 自定义类继承
Partitioner
,重写getPartition
方法 - 在Job驱动中,设置自定义
Partitioner
类 - 自定义
Partition
后,要根据自定义Partitioner
的逻辑设置相应数量的ReduceTask
案例
使用之前 电话流量 的案例(在 序列化 那篇文章中)
数据文件:phone_data.txt
将132、155开头的电话号码按照电话号码省份放在一个文件中,其他的单独放到一个文件中
这里只需要改 Driver
类,再添加一个 Partition
类就可以了
Partitioner 类
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
/**
* Partitioner 是 shuffle 过程中分区的处理过程
* 因为 shuffle在map阶段之后,所以这里的 key和value 的泛型应该是 map 阶段输出的类型
*/
public class ProvincePartitioner extends Partitioner<Text, FlowBean> {
@Override
public int getPartition(Text text, FlowBean flowBean, int numPartitions) {
// 获取电话号前三位
String prePhoneNumber = text.toString().substring(0, 3);
int partition;
// 开始分区
if ("132".equals(prePhoneNumber)) {
partition = 0;
} else if ("155".equals(prePhoneNumber)) {
partition = 1;
} else {
partition = 2;
}
return partition;
}
}
Driver 类
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueLineRecordReader;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class FlowDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
// 1.获取job
Configuration conf = new Configuration();
conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPARATOR, "\t");
Job job = Job.getInstance(conf);
// 2.设置jar包路径
job.setJarByClass(FlowDriver.class);
// 3.关联mapper和reducer
job.setMapperClass(FlowMapper.class);
job.setReducerClass(FlowReducer.class);
// 4.设置map输出的KV类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
// 5.设置最终输出的KV类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
// 6.设置输入路径和输出路径
FileInputFormat.setInputPaths(job, new Path("C:\\Users\\MaHe666\\Desktop\\phone_data.txt"));
FileOutputFormat.setOutputPath(job, new Path("D:\\output"));
// 分区相关设置
job.setPartitionerClass(ProvincePartitioner.class);
// Partitioner类里分了3个区,所以这里也要分3个
job.setNumReduceTasks(3);
// 7.提交job
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
分区总结
(1)如果 ReduceTask的数量
> getPartition的结果数
,则会多产生几个空的输出文件 part-r-000xx
;
(2)如果 1 < ReduceTask的数量
< getPartition的结果数
,则有一部分分区数据无处安放,会 Exception
;
(3)如果 ReduceTask的数量
= 1,则不管MapTask端输出多少个分区文件,最终结果都交给这一个 ReduceTask
,最终也就只会产生一个结果文件 part-r-00000
;
(4)分区号必须从零开始,逐一累加。