二次排序
现在,有三个总流量相同的数据。新增需求:将总流量相同的数据按照上行流量升序排序(由小到大)
Bean类
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
* 命名为FlowBean的意思是:Flow是流量,流量的Bean类
* 1. 定义类实现WritableComparable接口
* 2. 重写序列化和反序列化方法
* 3. 重写无参构造方法
* 4. 重写compareTo()方法
* 5. 重写toString()方法
*/
public class FlowBean implements WritableComparable<FlowBean> {
/**
* 上行流量
*/
private long upFlow;
/**
* 下行流量
*/
private long downFlow;
/**
* 总流量
*/
private long sumFlow;
// get和set方法
public long getUpFlow() {
return upFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}
public long getDownFlow() {
return downFlow;
}
public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}
public long getSumFlow() {
return sumFlow;
}
public void setSumFlow(long sumFlow) {
this.sumFlow = sumFlow;
}
/**
* 重载setSumFlow()方法
*/
public void setSumFlow() {
this.sumFlow = this.upFlow + this.downFlow;
}
/**
* 重写无参构造方法
*/
public FlowBean() {
}
/**
* 序列化方法,方法体中先序列化哪个,反序列化时就需要先接哪个
*
* @param out <code>DataOuput</code> to serialize this object into.
* @throws IOException
*/
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow);
}
/**
* 反序列化方法,接收值时,需要与序列化时的顺序完全一致
*
* @param in <code>DataInput</code> to deseriablize this object from.
* @throws IOException
*/
@Override
public void readFields(DataInput in) throws IOException {
this.upFlow = in.readLong();
this.downFlow = in.readLong();
this.sumFlow = in.readLong();
}
/**
* 方便写入数据
*
* @return
*/
@Override
public String toString() {
return upFlow + "\t" + downFlow + "\t" + sumFlow;
}
@Override
public int compareTo(FlowBean o) {
// 总流量的倒序,进行排序
if (this.sumFlow > o.sumFlow) {
// 倒叙,返回-1
return -1;
} else if (this.sumFlow < o.sumFlow) {
return 1;
} else {
if (this.upFlow > o.upFlow) {
return 1;
}else if (this.upFlow < o.upFlow) {
return -1;
}
return 0;
}
}
}
Driver类
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueLineRecordReader;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class FlowDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
// 1.获取job
Configuration conf = new Configuration();
conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPARATOR, "\t");
Job job = Job.getInstance(conf);
// 2.设置jar包路径
job.setJarByClass(FlowDriver.class);
// 3.关联mapper和reducer
job.setMapperClass(FlowMapper.class);
job.setReducerClass(FlowReducer.class);
// 4.设置map输出的KV类型,这里需要更改!
job.setMapOutputKeyClass(FlowBean.class);
job.setMapOutputValueClass(Text.class);
// 5.设置最终输出的KV类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
// 6.设置输入路径和输出路径
FileInputFormat.setInputPaths(job, new Path("C:\\Users\\MaHe666\\Desktop\\phone_data2.txt"));
FileOutputFormat.setOutputPath(job, new Path("D:\\output"));
// 7.提交job
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}