全排序
我们肯定遇到过这样的需求:查询 北方前10个GDP最高的城市、北京前10个最赚钱的职业 等
那么这个东西怎么做,首先可以确定的是需要分区。分区之后,就是排序
需求:有一个较为规整的数据,现在要求把电话号码流量的总流量**从大到小(降序)**排序
需求之中没提到分区相关的事,所以这里可以不用管分区。而相较于之前 序列化 那章的代码,改动如下
序列化Bean类 FlowBean
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
* 命名为FlowBean的意思是:Flow是流量,流量的Bean类
* 1. 定义类实现WritableComparable接口
* 2. 重写序列化和反序列化方法
* 3. 重写无参构造方法
* 4. 重写compareTo()方法
* 5. 重写toString()方法
*/
public class FlowBean implements WritableComparable<FlowBean> {
/**
* 上行流量
*/
private long upFlow;
/**
* 下行流量
*/
private long downFlow;
/**
* 总流量
*/
private long sumFlow;
// get和set方法
public long getUpFlow() {
return upFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}
public long getDownFlow() {
return downFlow;
}
public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}
public long getSumFlow() {
return sumFlow;
}
public void setSumFlow(long sumFlow) {
this.sumFlow = sumFlow;
}
/**
* 重载setSumFlow()方法
*/
public void setSumFlow() {
this.sumFlow = this.upFlow + this.downFlow;
}
/**
* 重写无参构造方法
*/
public FlowBean() {
}
/**
* 序列化方法,方法体中先序列化哪个,反序列化时就需要先接哪个
*
* @param out <code>DataOuput</code> to serialize this object into.
* @throws IOException
*/
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow);
}
/**
* 反序列化方法,接收值时,需要与序列化时的顺序完全一致
*
* @param in <code>DataInput</code> to deseriablize this object from.
* @throws IOException
*/
@Override
public void readFields(DataInput in) throws IOException {
this.upFlow = in.readLong();
this.downFlow = in.readLong();
this.sumFlow = in.readLong();
}
/**
* 方便写入数据
*
* @return
*/
@Override
public String toString() {
return upFlow + "\t" + downFlow + "\t" + sumFlow;
}
@Override
public int compareTo(FlowBean o) {
// 总流量的倒序,进行排序
if (this.sumFlow > o.sumFlow) {
// 倒叙,返回-1
return -1;
} else if (this.sumFlow < o.sumFlow) {
return 1;
} else {
return 0;
}
}
}
Mapper类 FlowMapper
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* 因为Hadoop是对Key排序,所以这里要把Key和Value的输入互换
* 等排序之后,再转换回来
* 前两个泛型不变
* 第三个泛型是第一次互换后的key,也就是之前的value,流量状况
* 第四个泛型是第一次互换后的value,也就是之前的key,手机号
*/
public class FlowMapper extends Mapper<LongWritable, Text, FlowBean, Text> {
private FlowBean outKey = new FlowBean();
private Text outValue = new Text();
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, FlowBean, Text>.Context context) throws IOException, InterruptedException {
// 获取一行,并切割
String[] split = value.toString().split("\t");
// 封装,第一次反转是在这里做的!!!
// 获取电话号,放到value里
outValue.set(split[0]);
outKey.setUpFlow(Long.parseLong(split[1]));
outKey.setDownFlow(Long.parseLong(split[2]));
outKey.setSumFlow();
// 写出
context.write(outKey, outValue);
}
}
Reducer类 FlowReducer
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* 前面的map阶段互换了key和value,这里要进行第二次互换,也就是让他变成正常的样子
*
* 第一个泛型是map阶段输出的是bean对象,现在是key,一会需要输出为value
* 第二个泛型是map阶段输出的text,不需要处理,一会直接输出为key
* 第三个泛型是当前reduce阶段将要输出的Key,也是手机号。
* 第四个泛型是当前reduce阶段将要输出的Value
*/
public class FlowReducer extends Reducer<FlowBean, Text, Text, FlowBean> {
@Override
protected void reduce(FlowBean key, Iterable<Text> values, Reducer<FlowBean, Text, Text, FlowBean>.Context context) throws IOException, InterruptedException {
for (Text value : values) {
context.write(value, key);
}
}
}
Driver类 FlowDriver
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueLineRecordReader;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class FlowDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
// 1.获取job
Configuration conf = new Configuration();
conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPARATOR, "\t");
Job job = Job.getInstance(conf);
// 2.设置jar包路径
job.setJarByClass(FlowDriver.class);
// 3.关联mapper和reducer
job.setMapperClass(FlowMapper.class);
job.setReducerClass(FlowReducer.class);
// 4.设置map输出的KV类型,这里需要更改!
job.setMapOutputKeyClass(FlowBean.class);
job.setMapOutputValueClass(Text.class);
// 5.设置最终输出的KV类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
// 6.设置输入路径和输出路径
FileInputFormat.setInputPaths(job, new Path("C:\\Users\\MaHe666\\Desktop\\phone_data1.txt"));
FileOutputFormat.setOutputPath(job, new Path("D:\\output"));
// 7.提交job
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}