Job提交流程源码
相关视频
https://www.bilibili.com/video/BV1Qp4y1n7EN?p=88&vd_source=de9cfd633cf5a71794dc91a35f34cd48
Job提交流程
之前的Driver类中,主要写就是 org.apache.hadoop.mapreduce
包下的 Job类
相关的代码
而写出来的这些代码中,前面的代码都是在设置Job的相关内容,例如Mapper的类、Reduce的类、设置输出类型和文件读写路径等。
前面的代码没什么值得详细说的,最后这个Job对象被 提交,并且返回了一个运行结果,这里才是重点。
点进 waitForCompletion
方法,在该方法的方法体的最上面,有一个if判断,查看源代码可得知,该类的实例的 state
属性的默认值就是 JobState.DEFINE
,所以这里为true,submit
方法被调用
点进 submit
方法,发现一上来就调用了三个方法
第一个方法 ensureState
没啥说的,看名字并联系上下文就知道该方法是用来确保state属性是正确的。点进去看方法体,代码的意思很简单:如果检测到 state
不对就抛异常
第二个方法 setUseNewAPI
比较特别,简单概括一下,该方法解决了新旧两套API的兼容问题
第三个方法 connect
处理的是客户端与服务器集群的连接。可以思考一下,正常情况来说,客户端连接服务器集群需要YARN。但是目前没有启动服务器集群的话,或者说我现在在代码里面根本没有配置服务器集群的地址,它连接的是谁?之前咱们给出过答案,它连接的是本地环境。那这个Job提交的流程是什么样的?
来看一下 connect
方法的代码,上来就校验 cluster 对象是否为空,这个单词的翻译是 簇,这里可以理解为是集群。如果该对象是空,则给该对象赋值
赋值操作调用了 ugi
对象的 doAs
方法,该方法的参数是一个 PrivilegedExceptionAction
接口的实例对象。因为是接口,所以必须要实现其中的抽象方法,咱们需要的是 Cluster
对象,所以通过泛型等操作,该方法创建了一个Cluster
类实例,并传入了 getConfiguration
方法的返回值作为参数。
PrivilegedExceptionAction
这个接口先不管,直接来看 Cluster
类的构造方法,可以发现该构造方法调用了另一个构造方法,找到根源,也就是那个需要两个参数的构造方法
该构造方法中,前两个是对象属性的赋值,没啥好说的,接着来看 initialize
方法
initialize
方法的第一行代码,调用了 initProviderList
方法,这个方法点进去简单看一下,该方法的作用是初始化了一个名为 providerList
的对象。点一下 providerList
集合,可以发现它是一个泛型为 ClientProtocolProvider
类的List集合
接着往下看 initialize
方法,可以看到有一个增强for循环,被循环的对象就是刚才提到的 providerList
集合。
学员可以通过debug查看 providerList
集合中都有什么类的对象。这里直接说,点进刚才提到的 ClientProtocolProvider
抽象类,可以通过IDE,查看到它有两个实现类:YarnClientProtocolProvider
和 LocalClientProtocolProvider
。在 providerList
集合中,这两个对象是都存在的
只不过在这里,经过 initialize
方法中for循环里的一堆if,保留下来了 LocalClientProtocolProvider
对象
所以,这里连接的是本地环境。(具体的判定逻辑不用管)
然后往下走,一层一层的就又退出到 submit
方法中。至此,connect
方法执行完毕
connect
方法执行完毕之后,往下看,创建了一个 JobSubmitter
类的实例对象。通过该对象的 submitJobInternal
方法,向集群提交了相关的Job信息。这里也是个核心代码
点进该方法,进来之后,第一个方法就是 checkSpecs
,该方法实际校验的就是输出的相关信息。
点进 checkSpecs
方法,该方法中的if判断有点繁琐,实际判断的是三元运算符中返回的两个值。
查看接下来 org.apache.hadoop.mapreduce
包下的 OutputFormat
类的实际对象所调用的 checkOutputSpecs
方法
默认调用的是 OutputFormat
抽象类的 FileOutputFormat
实现类。查看实现类中的 checkOutputSpecs
方法
往下翻,可以看见校验了路径,按照对应的校验方式,如果路径有问题,就会弹出相应的报错。
checkOutputSpecs
方法执行完后,外层的 checkSpecs
方法也就执行完了
往下看,addMRFrameworkToDistributedCache
方法不用多说,看名字就知道作用是将 MapReduce框架添加到已经分发的缓存
中
再往下,就获取了一个名为 jobStagingArea
的 Path
对象。注意,这个类是 org.apache.hadoop.fs
包下的
可以通过Debug查看该路径的具体值是什么,抽象来说是 /tmp/hadoop.mapred/staging/文件夹/.staging
(Windows系统的根路径要看代码在哪个盘下,然后去对应的盘下的根路径去找就可以)。每一次执行,他这个文件夹都不一样,建议使用Debug一点一点执行,也方便观察最后的 .staging
文件夹里有什么变化
往下看代码,一直到if语句执行完,获取了本地的一些信息,然后拿着这些信息进行了一些配置
再往下看,可以看到一个对象调用了 getNewJobID
方法,并返回了一个 JobID
对象
然后往下看,创建了一个提交Job的 Path
路径对象。这个路径就是在上面的 .staging
文件夹下的(注意,这里还没创建文件夹呢)
然后一直往下翻,中间的代码全都是处理缓存,不用管,一直翻到 copyAndConfigureFiles
方法,点击进去看一下里面长什么样子
直接来看方法的核心部分,通过构造方法创建的 JobResourceUploader
类对象,调用了名为 uploadResources
方法
点进该方法,可以看到一共调用了三个方法,第一个是初始化了共享的缓存,最后一个是结束了共享的缓存。来看一下中间的 uploadResourcesInternal
方法
这个方法前面还是处理配置、打印日志、抛出异常什么的,然后可以看见下面又创建了一个 Path
对象,可以发现和上面用来提交Job的 Path
对象的值是一样的
再往下看可以发现有一个 mkdir
方法。这个方法执行完之后,用来提交Job的路径才被创建完成。
再往下看,中间的代码不用管,直接找到 uploadFiles
、uploadLibJars
、uploadArchives
、uploadJobJar
,这四个方法是向集群提交一些文件
重点来说一下最后一个 uploadJobJar
,如果我们当前是在集群模式下,自己编写的代码的Jar包,就可以通过客户端的方式,上传到集群
所以,整个 copyAndConfigureFiles
方法,如果是本地模式下,就不用提交文件;如果是集群模式下,就需要提交文件。
代码读到这里,看一下刚才说的job文件夹,里面就有了对应的4个文件,可以看一下 job.split
文件,内容有乱码,但是大致是可以看出来都有什么内容的
退出了 copyAndConfigureFiles
方法后,接着往下看。下面调用了一个重要的方法,名叫 writeSplits
,并把切片的返回值,也就是切片数量赋值给了 maps
变量
提示
writeSplits
方法中涉及的东西较多,之后单开一部分详细说
紧贴着这行代码的下一行代码的作用是,设置 MapTask
的数量,具体是通过什么设置的,可以点进 MRJobConfig.NUM_MAPS
看一下
代码接着往下走,一直到 writeConf
方法。Debug可以看出来,在执行完该方法后,job文件夹内又多出两个文件。
点开这两个里面的 xml文件,可以看见,里面存放的全是运行这个job的参数,也就是说这个job是通过这个参数执行任务
代码接着往下走,可以看到有个 status
对象,简单提一下,这个对象里保存的是客户端提交的job信息
再接着往下一直走,代码就开始不断地返回到上层了。一直到Job
类的 submitJobInternal
方法执行完毕,也就是回到了一开始提到的 submit
方法里
下一行代码很简单,将job状态置为RUNNING。这个Job就开始运行了。
再往下执行,再跳出,然后就回到了一开始提到的 waitForCompletion
方法中,再往下就是监控程序的代码了
但是到这里,一直都没有看到什么时候删除的 job文件夹下的6个文件,只需要让他将监控程序的代码执行完毕,他自己就删掉了
可以通过Debug查看 monitorAndPrintJob
方法执行完毕后,6个文件是否还存在
以上就是所有的Job提交流程了