KeyValueTextInputFormat
注意
现在已经不常用
该类的注释:纯文本文件的 InputFormat。文件被分成几行。换行或回车用于发出线路结束的信号。每行通过分隔符字节分为键和值部分。如果不存在这样的字节,则键将为整行,值将为空。分隔符字节可以在配置文件中的属性名称 mapreduce.input.keyvaluelinerecordreader.key.value.separator
下指定。默认值为制表符 ('t')。
第一个分隔符前的字符会被作为Key,分隔符后面的所有字符会被作为Value
例如如下,以 \t
制表符 作为分隔符
line1 Hello World
line2 Mahe 666
line3 What a handsome boy!
读取到的KV键值对如下
Key | Value |
---|---|
line1 | Hello World |
line2 | Mahe 666 |
line3 | What a handsome boy! |
如果想改变分隔符,可以通过如下设置来做,conf是 org.apache.hadoop.conf
包下 Configuration
类的实例对象
conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPARATOR, "\t");
案例
需求:已知分隔符是四个空格,想获取相同的Key的个数
和之前的WordCount案例一样,只不过读取的是Key
用到的数据文件:KVWordCount.txt
Mapper类
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class KVTextMapper extends Mapper<Text, Text, Text, IntWritable> {
private IntWritable outValue = new IntWritable(1);
@Override
protected void map(Text key, Text value, Mapper<Text, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
// 把Key的值,直接输出,格式是 (Key, 1)
context.write(key, outValue);
}
}
Reducer类
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class KVTextReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable outValue = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
// 累加
sum += val.get();
}
outValue.set(sum);
// 写出
context.write(key, outValue);
}
}
Driver类
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueLineRecordReader;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class KVTextDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
// 1.获取job
Configuration conf = new Configuration();
// 设置分隔符
conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPARATOR, " ");
Job job = Job.getInstance(conf);
// 设置InputFormat的实现类是哪个
job.setInputFormatClass(KeyValueTextInputFormat.class);
// 2.设置jar包路径
job.setJarByClass(KVTextDriver.class);
// 3.关联mapper和reducer
job.setMapperClass(KVTextMapper.class);
job.setReducerClass(KVTextReducer.class);
// 4.设置map输出的KV类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 5.设置最终输出的KV类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 6.设置输入路径和输出路径
FileInputFormat.setInputPaths(job, new Path("C:\\Users\\MaHe666\\Desktop\\KVWordCount.txt"));
// 这里的路径是个文件夹
FileOutputFormat.setOutputPath(job, new Path("D:\\output"));
// 7.提交job
// 源码当中用的就是 waitForCompletion(),该方法中已经有submit()了
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}