MapReduce分布式运算程序
尚硅谷教案:尚硅谷大数据技术之Hadoop(MapReduce)V3.3.docx
MapReduce 是一个分布式运算程序的编程框架,是用户开发 “基于 Hadoop 的数据分析应用” 的核心框架。
MapReduce 核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个 Hadoop 集群上。
MapReduce 的核心思想是将大数据分而治之,即将数据通过一定的数据划分方法,分成多个较小的具有同样计算过程的数据块,数据块之间不存在依赖关系,将每一个数据块分给不同的节点去处理,最后将处理的结果汇总。
举个例子:现在有一堆固定面额的钱,现在需要数出来共有多少张,则可以分给n个人去数钱,最后汇总他们数出来的数量
相关博客:
- https://blog.csdn.net/weixin_52986315/article/details/136692749
- https://cloud.tencent.com/developer/article/1999631
- https://developer.aliyun.com/article/1364630
- https://cloud.tencent.com/developer/article/1886450
wordcount 源码解析
这里使用 jd-gui
查看一下 /opt/module/hadoop-3.3.6/share/hadoop/mapreduce
下的 hadoop-mapreduce-examples-3.3.6.jar
文件
找到里面的 WordCount.class
文件,查看相关代码
可以看见这段代码里存在一个 main方法
和两个静态内部类:继承了 Mapper类
的 TokenizerMapper
类,和继承了 Reducer类
的 IntSumReducer
类
这个方法和这两个类,之后在咱们进行开发的时候,需要拆解成三个类:main方法
对应的Driver
驱动类、继承了Mapper
类的类、继承了Reducer
类的类
先来查看 TokenizerMapper
类 和 IntSumReducer
类 的泛型,可以发现有四个泛型参数。
四个泛型参数每两个为一对,一对中的两个分别对应着Key的类型和Value的类型。两对分别对应着输入的KV键值对的数据类型,和输出的键值对数据类型
Hadoop有自己的数据类型,Text对应着Java中的String,IntWritable对应着Java中的int或者说是Integer
Hadoop数据类型
类型名 | hadoop数据类型 | java数据类型 |
---|---|---|
布尔型 | BooleanWritable | boolean |
整型 | ByteWritable ShortWritable IntWritable LongWritable | byte short int long |
浮点型 | FloatWritable DoubleWritable | float double |
字符串(文本) | Text | String |
数组 | ArrayWritable | Array |
map集合 | MapWritable | map |
空引用 | NullWirtable | null |
编程规范
Mapper阶段
- 用户自定义的Mapper要继承自己的父类(继承Mapper类)
- Mapper的输入数据是
KV对
的形式(KV的类型可自定义)
K是偏移量,V是数据。例如第一行保存的是Hello,第二行保存的是mahe666。那么第一行的K是0,V是Hello,第二行的K是7,V是mahe666。
偏移量开始是0,0到Hello中的H字符,偏移量是1。如此计算,Hello的偏移量计算是从0到5,再算上最后有个换行符,所以是0到6。新的一行,从上一行最后的偏移量加1 - Mapper中的业务逻辑写在
map()
方法中 - Mapper的输出数据是
KV对
的形式(KV的类型可自定义) map()
方法(MapTask进程)对每一对<K,V>
调用一次
每一行数据形成一对KV键值对,每一对为单位,执行一次map()
方法
Reducer阶段
- 用户自定义的Reducer要继承自己的父类(继承Reducer类)
- Reducer的输入数据类型对应Mapper的输出数据类型,也是KV
原始数据在经过map()处理后会形成KV,例如每出现一次Hello单词,就会有一个对应的K为Hello,V为1的键值对 - Reducer的业务逻辑写在
reduce()
方法中 - ReduceTask进程对传参进来的相同K的键值对组调用一次
reduce()
方法
例如有五组K为Hello的键值对,那么就是这五个为一组,调用一次方法。
Driver阶段
相当于YARN集群的客户端,用于提交我们整个程序到YARN集群,提交的是封装了MapReduce程序相关运行参数的job对象
WordCount案例
需求:输入一个文本文档,统计其中每个英文单词出现的次数
输入文件:wordcount.txt
处理该案例的流程
- 准备输入数据
- 分析预期输出什么样子的数据
- Mapper
- 将MapTask传给我们的文本内容转换成String
以行为单位进行分割 - 根据空格将这一行切分成单词
以空格为单位进行分割 - 将单词输出为键值对
<单词, 1>
每有一个重复的单词,就会重复输出一次键值对,键值对中的值为1
- 将MapTask传给我们的文本内容转换成String
- Reducer
- 汇总各个Key的个数
接收每个相同单词的所有键值对,对于每个键值对中的值,做怎样的运算需要看实际编码中的业务逻辑 - 输出该key的总次数
- 汇总各个Key的个数
- Driver
- 获取配置信息,获取job对象实例
- 指定本程序的jar包所在的本地路径
- 关联Mapper/Reducer业务类
- 指定Mapper输出数据的kv类型
- 指定最终输出的数据的kv类型
- 指定job的输入原始文件所在目录
- 指定job的输出结果所在目录
- 提交作业
环境准备
创建一个不使用模板的Maven项目,依赖如下
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.3.6</version>
</dependency>
<!-- 单元测试 -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
</dependency>
<!-- 用来打印日志 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.36</version>
</dependency>
log4j配置文件如下
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
继承了Mapper类的 WordCountMapper
先来解析一下父类Mapper
其中方法体最多的代码是 run()
方法,方法里涵盖了该类中的其他三个方法,来简单解读一下该方法
setup()
方法没啥说的,注释写了:在map阶段正式开始之前,调用一次。Mapper类被继承后可以重写
cleanup()
方法也没啥说的,注释写了:在map阶段结束后,调用一次。Mapper类被继承后可以重写
map()
方法,注释说每有一对键值对调用一次,大部分的时候都是在子类中重写该方法以实现自己的业务逻辑
WordCountMapper
代码如下
/*
下面的包是用于1.x版本Hadoop的
import org.apache.hadoop.mapred.Mapper;
而下面的包才是咱们使用的是2.x和3.x版本的hadoop包
import org.apache.hadoop.mapreduce.Mapper;
*/
import org.apache.hadoop.mapreduce.Mapper;
// 要的是 org.apache.hadoop.io.Text
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import java.io.IOException;
/**
* 泛型有四个参数
* KEYIN map阶段输入Key的类型,之前说过,这个参数是这行包括最后的换行符的偏移量,LongWritable
* VALUEIN map阶段输入Value的类型,这个参数是这一行的具体内容,Text
* KEYOUT map阶段输出Key的类型。这里的参数是检测到的单词(不重复,单词的重复在Value中体现),Text
* VALUEOUT map阶段输出Value的类型。这里的参数是出现单词的次数,IntWritable
*/
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
// 输出的对象
private Text outKey = new Text();
// 输出的值
private IntWritable outValue = new IntWritable(1);
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 获取一行的值
String line = value.toString();
// 按空格分隔该行中的内容
String[] words = line.split(" ");
// 循环写出
for (String word : words) {
/*
生产环境中的数据量会很大,而通过new关键字会创建很多对象
例如这行有十万个单词,下面就要创建十万个对象
想要优化性能,需要创建类变量
不把创建对象的过程提到for循环外的原因是:每有一行数据调用一次map方法,还是会创建很多对象
context.write(new Text(word), new IntWritable(1));
*/
outKey.set(word);
// 写出
context.write(outKey, outValue);
}
}
}
继承了Reducer类的 WordCountReducer
来解析一下父类Reducer
其中方法体最多的代码还是 run()
方法。setup()
和 cleanup()
就不用说了
run()
方法中的核心逻辑,还是在于 reduce()
方法
reduce()
方法说每有一对键值对会调用一次,大部分应用会通过重写该方法去定义自己的reduce类
// 同样,下面这个包才是2.x和3.x版本的Hadoop的包
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* 泛型有四个参数
* KEYIN reduce阶段输入Key的类型,这里是map阶段结束时输出的Key类型,Text
* VALUEIN reduce阶段输入Value的类型,这里是map阶段结束时输出的Value类型,IntWritable
* KEYOUT reduce阶段输出Key的类型。这里的参数是互相不重复的单词,Text
* VALUEOUT reduce阶段输出Value的类型。这里的参数是一种单词出现的次数,IntWritable
*/
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
// 提升作用域
private IntWritable outValue = new IntWritable();
/**
* map阶段输出的value是(单词, 1)
* 而reduce方法参数会把 map阶段输出的value 也就是 reduce阶段输入的value 转换为 (单词, (1,1...1))
* 括号内,1 出现的次数取决于map阶段输出多少重复的该单词
*
* @param key 输入的key,就是单词
* @param values 输入的所有value会被整合成一个可迭代对象,里面全是map阶段传过来的1。
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
// 累加
sum += val.get();
}
outValue.set(sum);
// 写出
context.write(key, outValue);
}
}
自实现的Driver WordCountDriver
// 导包不要导错,这个才是
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class WordCountDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
// 1.获取job
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 2.设置jar包路径
job.setJarByClass(WordCountDriver.class);
// 3.关联mapper和reducer
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
// 4.设置map输出的KV类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 5.设置最终输出的KV类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 6.设置输入路径和输出路径
FileInputFormat.setInputPaths(job, new Path("C:\\Users\\MaHe666\\Desktop\\wordcount.txt"));
// 这里的路径是个文件夹
FileOutputFormat.setOutputPath(job, new Path("D:\\output"));
// 7.提交job
// 源码当中用的就是 waitForCompletion(),该方法中已经有submit()了
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
Debug看代码
先在自己写的 Reducer 和 Mapper 类中的 run()
方法体代码打断点
再进入到父级的 Reducer 和 Mapper 类中的 run()
方法体代码打断点
然后重新Debug执行代码
集群运行
刚才的代码其实不是在集群中运行的,不信可以关掉集群试一下
刚才的运行模式是之前提到过的 本地模式
它是由如下Maven依赖支持的
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.3.6</version>
</dependency>
在生产环境中,很少在Windows环境上运行代码
而在Linux上运行代码,最简单的办法,就是将代码打成jar包,放在Linux上运行
添加打包配置至 pom.xml
文件,与dependencies标签同级
这样就可以把Maven依赖也打进包里了
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
修改完后,别忘了,自己编写的那个Driver类是写死的路径地址,需要改成灵活的形式
完全复制wordcount包,粘贴为wordcount2。之后的命令会用到
如下
// 导包不要导错,这个才是
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class WordCountDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
// 1.获取job
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 2.设置jar包路径
job.setJarByClass(WordCountDriver.class);
// 3.关联mapper和reducer
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
// 4.设置map输出的KV类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 5.设置最终输出的KV类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 6.设置输入路径和输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
// 这里的路径是个文件夹
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 7.提交job
// 源码当中用的就是 waitForCompletion(),该方法中已经有submit()了
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
在Linux上运行如下命令
hadoop jar testMapReduceDemo-1.0-SNAPSHOT-jar-with-dependencies.jar com.mahe666.wordcount2.WordCountDriver /wcinput/wordcount.txt /wcoutput
注意
注意,这里的输入路径和输出路径指的是在HDFS系统中的路径
上传 wordcount.txt
文件至HDFS系统中,路径看上面的命令
HDFS系统中不能已存在输出路径
运行之后,查看一下结果。可以发现已经成功的输出出想要的数据了
查看输出的数据可以发现一个有趣的事情,输出数据是按照a-z的顺序进行排序的。
那么也就是输出的Key,使用了A-Z进行排序。这里后面会说到