KeyValueTextInputFormat
注意
现在已经不常用
该类的注释:纯文本文件的 InputFormat。文件被分成几行。换行或回车用于发出线路结束的信号。每行通过分隔符字节分为键和值部分。如果不存在这样的字节,则键将为整行,值将为空。分隔符字节可以在配置文件中的属性名称 mapreduce.input.keyvaluelinerecordreader.key.value.separator 下指定。默认值为制表符 ('t')。
第一个分隔符前的字符会被作为Key,分隔符后面的所有字符会被作为Value
例如如下,以 \t制表符 作为分隔符
line1 Hello World
line2 Mahe 666
line3 What a handsome boy!读取到的KV键值对如下
| Key | Value |
|---|---|
| line1 | Hello World |
| line2 | Mahe 666 |
| line3 | What a handsome boy! |
如果想改变分隔符,可以通过如下设置来做,conf是 org.apache.hadoop.conf 包下 Configuration 类的实例对象
conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPARATOR, "\t");案例
需求:已知分隔符是四个空格,想获取相同的Key的个数
和之前的WordCount案例一样,只不过读取的是Key
用到的数据文件:KVWordCount.txt
Mapper类
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class KVTextMapper extends Mapper<Text, Text, Text, IntWritable> {
private IntWritable outValue = new IntWritable(1);
@Override
protected void map(Text key, Text value, Mapper<Text, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
// 把Key的值,直接输出,格式是 (Key, 1)
context.write(key, outValue);
}
}Reducer类
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class KVTextReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable outValue = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
// 累加
sum += val.get();
}
outValue.set(sum);
// 写出
context.write(key, outValue);
}
}Driver类
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueLineRecordReader;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class KVTextDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
// 1.获取job
Configuration conf = new Configuration();
// 设置分隔符
conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPARATOR, " ");
Job job = Job.getInstance(conf);
// 设置InputFormat的实现类是哪个
job.setInputFormatClass(KeyValueTextInputFormat.class);
// 2.设置jar包路径
job.setJarByClass(KVTextDriver.class);
// 3.关联mapper和reducer
job.setMapperClass(KVTextMapper.class);
job.setReducerClass(KVTextReducer.class);
// 4.设置map输出的KV类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 5.设置最终输出的KV类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 6.设置输入路径和输出路径
FileInputFormat.setInputPaths(job, new Path("C:\\Users\\MaHe666\\Desktop\\KVWordCount.txt"));
// 这里的路径是个文件夹
FileOutputFormat.setOutputPath(job, new Path("D:\\output"));
// 7.提交job
// 源码当中用的就是 waitForCompletion(),该方法中已经有submit()了
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}