NLineInputFormat
注意
现在已经不常用
该类的注释:NLineInputFormat,它将 N 行输入拆分为一个拆分。在许多“令人愉快”的并行应用程序中,每个进程/映射器处理相同的输入文件,但计算由不同的参数控制。(称为“参数扫描”)。实现此目的的一种方法是将一组参数(每行一组)指定为控制文件中的输入(这是 map-reduce 应用程序的输入路径,其中输入数据集是通过 JobConf 中的配置变量指定的)。NLineInputFormat 可用于此类应用程序,它拆分输入文件,以便默认情况下将一行作为值提供给一个映射任务,并且 key 是偏移量。即 (k, v) 是 (LongWritable, Text)。位置提示将跨越整个映射聚类。
如果使用NlineInputFormat,代表每个map进程处理的InputSplit不再按Block块去划分,而是按NlinelnputFormat指定的行数N来划分。即输入文件的总行数/N=切片数,如果不整除,切片数=商+1。
例如:输入文件共有六行数据,现在N为2。则会开启两个MapTask,第一个读前三行数据,一个读后三行数据
这里的键和值与 TextInputFormat
是一样的
案例
测试:测试一下11行数据时,指定N为3,会不会分成4个切片
简单修改一下 KeyValueTextInputFormat
的文件:KVWordCount.txt
Mapper类
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class NLineMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
// 输出的对象
private Text outKey = new Text();
// 输出的值
private IntWritable outValue = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
// 获取一行的值
String line = value.toString();
// 按空格分隔该行中的内容
String[] words = line.split(" ");
// 循环写出
for (String word : words) {
outKey.set(word);
// 写出
context.write(outKey, outValue);
}
}
}
Reducer类
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class NLineReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable outValue = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
// 累加
sum += val.get();
}
outValue.set(sum);
// 写出
context.write(key, outValue);
}
}
Driver类
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class NLineDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
// 1.获取job
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 设置InputFormat的实现类是哪个
job.setInputFormatClass(NLineInputFormat.class);
// 设置每个切片划分3条记录
NLineInputFormat.setNumLinesPerSplit(job, 3);
// 2.设置jar包路径
job.setJarByClass(NLineDriver.class);
// 3.关联mapper和reducer
job.setMapperClass(NLineMapper.class);
job.setReducerClass(NLineReducer.class);
// 4.设置map输出的KV类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 5.设置最终输出的KV类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 6.设置输入路径和输出路径
FileInputFormat.setInputPaths(job, new Path("C:\\Users\\MaHe666\\Desktop\\NlineWordCount.txt"));
// 这里的路径是个文件夹
FileOutputFormat.setOutputPath(job, new Path("D:\\output"));
// 7.提交job
// 源码当中用的就是 waitForCompletion(),该方法中已经有submit()了
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
校验结果
不需要查看生成的文件,直接查看控制台中的日志即可,如果出现如下日志,则代表成功(从前面往后翻,大概第7行左右)