OutputFormat
OutputFormat是MapReduce输出的基类,所有实现MapReduce输出都实现了OutputFormat接口。
常见的OutputFormat实现类如下所示,在IDEA中 按下 Ctrl + H
查看
自定义OutputFormat
输入文件:web_logs.txt
需求:将其中包含 mahe 字符串的网站,输出到一个文件中,其他不包含的去重后输出数据到另一个文件中
Mapper类
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class LogMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {
// 不做任何处理
context.write(value, NullWritable.get());
}
}
Reducer类
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class LogReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
@Override
protected void reduce(Text key, Iterable<NullWritable> values, Reducer<Text, NullWritable, Text, NullWritable>.Context context) throws IOException, InterruptedException {
// 循环所有
// 如果全部去重的话,直接输出就好了,因为key默认就是不重复的
for (NullWritable val : values) {
context.write(key, val);
if (!key.toString().contains("mahe")){
break;
}
}
}
}
OutputFormat类
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
* 这里的泛型是Reducer的输出类型
*/
public class LogOutputFormat extends FileOutputFormat<Text, NullWritable> {
@Override
public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
// 这里需要给LogRecordWriter类写一个构造方法,然后让这个类和job有关联
return new LogRecordWriter(job);
}
}
RecordWriter类
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import java.io.IOException;
/**
* 这里也是Reducer输出的类型
*/
public class LogRecordWriter extends RecordWriter<Text, NullWritable> {
private FSDataOutputStream maheOut;
private FSDataOutputStream otherOut;
public LogRecordWriter(TaskAttemptContext job) {
try {
// 创建两条流
FileSystem fs = FileSystem.get(job.getConfiguration());
maheOut = fs.create(new Path("D:\\output\\mahe.log"));
otherOut = fs.create(new Path("D:\\output\\other.log"));
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public void write(Text key, NullWritable value) throws IOException, InterruptedException {
// 具体写的操作
String log = key.toString() + "\n";
if (log.contains("mahe")){
maheOut.write(log.getBytes());
}else{
otherOut.write(log.getBytes());
}
}
@Override
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
// 关流
IOUtils.closeStream(maheOut);
IOUtils.closeStream(otherOut);
}
}
Driver类
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class LogDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
// 1.获取job
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 2.设置jar包路径
job.setJarByClass(LogDriver.class);
// 3.关联mapper和reducer
job.setMapperClass(LogMapper.class);
job.setReducerClass(LogReducer.class);
// 4.设置map输出的KV类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
// 5.设置最终输出的KV类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
// 设置OutputFormat
job.setOutputFormatClass(LogOutputFormat.class);
// 6.设置输入路径和输出路径
FileInputFormat.setInputPaths(job, new Path("C:\\Users\\MaHe666\\Desktop\\web_logs.txt"));
// 这里的路径是个文件夹
FileOutputFormat.setOutputPath(job, new Path("D:\\output"));
// 7.提交job
// 源码当中用的就是 waitForCompletion(),该方法中已经有submit()了
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}