2万字硬核spark源码精讲手册

这篇具有很好参考价值的文章主要介绍了2万字硬核spark源码精讲手册。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

大家好,我是老兵。

本期为大家带来spark源码精讲系列,我将结合自身的理解深入浅出的剖析spark内核。全文内容很肝,希望能够给大家提供帮助。

1 引子(环境准备)

本文整体基于Spark2.4.1代码讲解,首先需要准备编译环境。

1)编译环境

1)scala2.11+ jdk1.8+ maven3.5+ Git2.0 + Spark2.4.1
2)windows环境(idea)

2万字硬核spark源码精讲手册

2万字硬核spark源码精讲手册

2)编译

准备好上述环境(自行百度安装教程),开始执行编译。

切换到下载解压后的spark目录,执行maven命令:

mvn -Pyarn -Phadoop-2.6 -Dscala-2.11 -DskipTests clean package

2万字硬核spark源码精讲手册

最终编译成功后的结果如下:

2万字硬核spark源码精讲手册

注:因篇幅问题,源码编译问题可自行百度网上教程

3)注意事项

整体讲解内容分为:任务提交->Driver注册启动->SparkContext初始化->Executor启动->Task启动

主要围绕下面三个流程图展开,所以大家在忘记时请回到这里!!

2万字硬核spark源码精讲手册

standalone模式

2万字硬核spark源码精讲手册

YarnClient模式

2万字硬核spark源码精讲手册

YarnCluster模式

2 源码剖析—Spark任务提交

假如现在我们已经有了一个简单的spark demo,例如word-count计算,并且设置好cores、executors以及部署模式,正待提交集群。

def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("MyWordCount").setMaster("yarn");
    val sc = new SparkContext(conf)
    val result = sc.textFile("hdfs://hadoop002:9000/wordcount.txt")
                .flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
    result.foreach(println)
    sc.stop()
}

2.1 脚本提交

1) 先来看看大致的spark-submit任务提交脚本

2万字硬核spark源码精讲手册

2) 跟踪$SPARK_HOME/bin下的Spark-Submit脚本,实际执行的是bin/spark-class脚本,发现在其内部调用org.apache.spark.deploy.SparkSubmit类。

2万字硬核spark源码精讲手册

整体看下这个脚本主要做了这些事情:

  • 首先校验SPARK_HOME/conf、spark相关依赖目录SPARK_HOME/jars、hadoop相关依赖目录$HADOOP_HOEM/lib

  • 将校验所得所有目录地址拼接为LAUNCH_CLASSPATH变量

  • 将$JAVA_HOME/bin/java 定义为RUNNER变量

  • 调用build_command()方法,创建执行命令

  • 把build_command()方法创建的命令,循环加到数组CMD中,最后执行exec执行CMD命令

2万字硬核spark源码精讲手册

这里只要知道,CMD命令最终调用org.apache.spark.deploy.SparkSubmit类,即Spark程序的入口

2.2 提交主函数执行

接下来我们把目光移入SparkSubmit类。

SparkSubmit职责:准备Driver注册启动和SparkContext启动

定位到脚本中的执行主类org.apache.spark.deploy.SparkSubmit。解析脚本提交参数并根据参数action进行模式匹配,此时为SUBMIT操作,执行submit方法

2万字硬核spark源码精讲手册

1) Submit()方法。执行doRunMain(),判断是否使用proxyUser模式,并执行runMain()

2万字硬核spark源码精讲手册

2) 执行RunMain()方法,这个是核心方法。

2万字硬核spark源码精讲手册

2.2.1 调用prepareSubmitEnvironment()准备Submit环境

跟踪RunMain方法,首先调用的是prepareSubmitEnvironment()函数。进行后续解析参数,获取ArgsClasspathSparkConfMainClass工作

2万字硬核spark源码精讲手册

1) 根据参数中master/deploy-mode配置,设置对应的clusterManager部署模式(YARN/STANDALONE/MESOS/K8S/LOCAL CLIENT/CLUSTER)

2万字硬核spark源码精讲手册

2) 根据args中的参数,设置相关的childArgs/classPath/ChildMainClass等返回结果

2万字硬核spark源码精讲手册

- MAIN_CLASS:CLIENT模式<Yarn client/Standalone client>
使用程序自定义主类

- MAIN_CLASS:STANDALONE-CLUSTER模式
使用REST: org.apache.spark.deploy. RestSubmissionClientApp|| org.apache.spark.deploy. ClientApp

- MAIN_CLASS:YARN-CLUSTER模式
org.apache.spark.deploy.yarn.YarnClusterApplication

- MAIN_CLASS: MESOS-CLUSTER模式
org.apache.spark.deploy. RestSubmissionClientApp

- MAIN_CLASS: KUBERNETES-CLUSTER模式
org.apache.spark.deploy.k8s.submit.KubernetesClientApplication

至此,spark内部已经完成了不同deployMode下的MainClass的定义,并且已经获取到classpath、conf等信息。

2.2.2 调用MainClass类执行方法, 准备注册Driver和启动SparkContext

现在准备加载MainClass主类(注意:此时这个MainClass不一定是我们开发程序里的Main函数),大概有如下步骤:

2万字硬核spark源码精讲手册

1) 获取当前执行线程类加载器ClassLoader并加载Jar包 注意: 需要判断spark加载用户提交的jar和spark自身Jar的优先级(DRIVER_USER_CLASS_PATH_FIRST)

2万字硬核spark源码精讲手册

2) mainClass实例化并执行对应的start()方法

注意: 除了Client模式下的用户自定义类之外,其余的MainClass全部继承SparkApplication类,即client模式走else条件:JavaMainApplication(这个类会调用我们自己写的Main函数)。

2万字硬核spark源码精讲手册

下面我们分模式讨论不同MainClass的start()方法,执行流程不熟悉的同学请移步到那三张图。

2.2.2.1 Standalone cluster模式下start()入口

org.apache.spark.deploy.ClientApp.start()

这里是spark的内置默认模式:standalone模式的MainClass执行逻辑,主要交互角色为Driver->Master->WorkerSparkContext在某worker上初始化。

  • 1)注册RpcEnv,调用onStart()方法向Master提交Driver注册

  • 2)Master接受请求后选择合适的worker启动Driver

  • 3)后续Driver启动并初始化SparkContext

  • 4)执行task任务分发和Executor启动。

2万字硬核spark源码精讲手册

2.2.2.2 Yarn Client/Standalone Client模式下start()入口

org.apache.spark.deploy.JavaApplication.start()

这里是YarnClient模式的MainClass执行逻辑,底层会直接调用我们程序的Main方法,在client端初始化sparkcontext

  • 1)根据反射执行用户自定义程序Main(),并继续后续的SparkContext初始化操作

  • 2)启动Executor并反向注册到Driver

  • 3)程序执行到action算子时候调用DAGSchduler的start()方法执行DAG划分

  • 4)TaskScheduler的taskset调度

  • 5)Executor任务执行

2万字硬核spark源码精讲手册

注: 两者后续的SparkContext初始化过程区别
1)Yarn-Client模式的SparkContext初始化过程中会调用YarnClientSchedulerBackend的start(), 再调用submitApplication()提交启动AppMaster请求到Yarn RM;并在后续的ApplicationMaster中的run()中启动ExecutorRunner
2)Standalone-client模式下的SparkContext初始化过程会调用StandaloneSchedulerBackend的start(), 向Master申请运行Application的资源, 随后在worker上启动对应的Executor运行任务。

2.2.2.3 YarnCluster模式下的start()入口

org.apache.spark.deploy.yarn.YarnClusterApplication.start():

这里是YarnCluster模式下的MainClass执行逻辑,Driver在Yarn的ApplicationMaster上启动,进行SparkContext初始化。

  • 1)获取Application的环境和信息

  • 2)调用SubmitApplication()提交启动AppMaster请求到YarnRM

  • 3)后续等待RM启动AppMaster

  • 4)启动Driver和Executor

2万字硬核spark源码精讲手册

2万字硬核spark源码精讲手册

2.3 Yarn模式后续: AppMaster接收submitApplication请求

这里可以接着上面所述,在Yarn部署环境下,MainClass的start()方法执行成功,同时完成向Yarn提交Application请求。

注意:
1)这里Yarn Client已经完成了SparkContext的初始化操作(并且在YarnClientSchedulerBackend中提交了Application);
2)而Yarn Cluster在main方法中刚提交Application,未开始初始化SparkContext。

Yarn提交任务后,ApplicationMaster启动后执行main函数,并调用自身的run()函数,根据条件判断启动Driver或者ExecutorLauncher接受资源调度。

2万字硬核spark源码精讲手册

2万字硬核spark源码精讲手册

下面我们分别剖析YarnClient和YarnCluster模式的AppMaster启动。

2.3.1 Yarn-Cluster模式的AppMaster启动

原理:Spark内部把Driver作为一个ApplicationMaster在Yarn中启动。

  • 1)执行User自定义Main方法并初始化SparkContext(启动Driver)

  • 2)创建DriverRpcEndpoint连接YarnScheduler

  • 3)后续CreateAllocator()申请container容器并启动Driver

2万字硬核spark源码精讲手册

启动Driver

2万字硬核spark源码精讲手册

调用Main方法

2.3.2 Yarn-Client模式的AppMaster启动

Client模式已经有Driver和SparkContext,此时只需要创建ExecutorLauncher线程(仅负责和SparkContext通信)。

  • 1)不运行SparkContext,只与SparkContext进行联系进行资源的分派)

  • 2)DriverRpcEndpoint连接YarnScheduler

  • 3)后续CreateAllocator()申请container容器并启动

2万字硬核spark源码精讲手册

到目前为止,不同模式下的MainClass的start()方法已经开始执行,但是Driver启动过程我们暂时还不清楚。

3 源码剖析—Driver注册启动

现在我们来看看不同模式下的Driver启动过程。

3.1 Standalone模式

org.apache.spark.deploy.ClientApp.start()开始执行...

3.1.1 ClientApp提交请求

  • 1)ClientApp的start()注册ClientEndpoint

  • 2)封装DriverDescript :MainClass/Jars/memory/cores。

  • 3)向Master发送注册请求

2万字硬核spark源码精讲手册

3.1.2 Master接受Driver注册请求

Master在ACTIVE状况下的大致功能为:
1)启动或释放可用的workerdriversapps缓存(HashMap)
2)存储或释放apps和drivers至等待watings队列(ArrayBuffer)中并持久化
3)等待apps或者drivers变化时调用schedule()调度

2万字硬核spark源码精讲手册

Master接受Driver注册流程

  • 1)根据DriverDescript信息创建Driver对象

  • 2)Driver对象持久化、加入Driver等待队列<待调度>和内存set中

  • 3)Schedule()调度

  • 4)返回注册结果

2万字硬核spark源码精讲手册

schedule()中driver注册逻辑

  • 1)shuffle打散可用的AliveWorkers(根据剩余cores排序)

  • 2)遍历waitingDrivers数组,每个Driver内部循环遍历AliveWorkers。判断AliveWorkers中是否存在当前的可用memory和cores满足当前Driver运行的所需memory和cores,如果满足则waitingDrivers-1并调用launchDriver()

  • 3)调用launchDriver在worker上启动Driver并将状态通知Master

  • 4)调用startExecutorsOnWorkers启动Executor(后续)

3.1.3 Worker上启动Driver

  • 1)WorkerInfo添加Driver信息、DriverInfo添加worker信息

  • 2)向worker发送启动Driver的命令;等待worker启动DriverRunner线程启动Driver(调用launchDriver方法)

  • 3)Worker上的Driver状态置为RUNNING

  • 4)Driver启动完成后,DriverRunner线程通知worker清除内存中的当前driver信息并移动到已完成队列中

  • 5)同时更新当前worker的内存和cpu数量,并通知Master相关Driver状态变更

2万字硬核spark源码精讲手册

launchDriver()方法:org.apache.spark.deploy.Worker

  • 1)创建DriverRunner线程并启动(调用driver方法)

  • 2)创建Driver目录、下载用户Jar包;封装启动命令;启动Driver初始化SparkContext

  • 3)线程阻塞并向Master通知Driver的状态

2万字硬核spark源码精讲手册

prepareAndRunDriver()方法:下载用户 jar包到工作目录、准备启动命令、启动Driver

2万字硬核spark源码精讲手册

2万字硬核spark源码精讲手册

runDriver()方法:封装Driver启动命令,Build命令格式化处理并执行获取返回码状态

2万字硬核spark源码精讲手册

最终执行的Driver命令cmd:执行用户提交的真实ManiClass,并执行后续的SparkContext过程

2万字硬核spark源码精讲手册

3.1.4 Driver启动后反向注册Master

  • 1)Driver启动返回码exitCode适配, DriverRunner线程通知worker

  • 2)清理worker内存中的当前driver信息、添加当前driver至finishedDrivers;Worker中内存和cores更新(减去当前driver的core/memory)

  • 3)发送Driver更新消息至Master

  • 4)Master接受消息并开始调用schedule()调度

2万字硬核spark源码精讲手册

Worker更新内存中driver信息:移除driver信息、减去内存/core、加入已完成driver队列中

2万字硬核spark源码精讲手册

Master接收Driver状态信息:移除driver信息、减去内存/core、加入已完成driver队列中

  • 1)移除Master内存、持久化引擎中的drivers信息

  • 2)当前driver加入到Master的已完成drivers队列; 并设置当前driver的状态

  • 3)减去该driver关联的 worker信息中的Driver内存和cores资源

  • 4)重新调度schedule()方法

2万字硬核spark源码精讲手册

3.1.5 后续调用startExecutorsOnWorkers()启动Executor

2万字硬核spark源码精讲手册

2万字硬核spark源码精讲手册

基于Master的schedule()调度方法调度当前可用资源,每次新的app加入或者资源变化时都会被调用:

  • 1)遍历wattingApps需要分配的apps

  • 2)查找可用的worker列表内存和core是否满足并根据剩余cores排序

  • 3)根据分配机制(spreadOut/non spreadOut方法)分配资源

  • 4)根据分配结果,在worker中启动executor

2万字硬核spark源码精讲手册

2万字硬核spark源码精讲手册

2万字硬核spark源码精讲手册

[补充:schedule()的分配机制]:

  • spreadOut算法(默认): 20core 10executor

根据配置平均将core分配到worker上。遍历所有worker,按照配置将core平均分布到每个 worker上,每个worker只分配一个executor,跳出循环,进入下个worker。 直到所有的 executor分配完成。 结果: 10个worker,每个启动一个2core/executor

  • 非spreadOut算法:

尽可能少的启动worker, 优先在一个worker上分配,完全利用worker上的core。 遍历所有worker, 在每个worker上根据worker中剩余的core数量完全分配给executor, 直到当前 worker上的core分配完成,进入下个worker。直到executor分配完成。 结果: 2个worker, 每个启动1个executor, 10个core

3.2 YarnCluster模式

org.apache.spark.deploy.yarn.YarnClusterApplication.start()开始执行...

3.2.1 Client提交Application

  • 1)YarnClusterApplication类中创建spark.deploy.yarn.Client对象,并执行run()

  • 2)如果设置spark.yarn.submit.waitAppCompletion,run()方法一直运行直到application推出,否则在application提交后client进程就退出

  • 3)执行submitApplication()提交Application, 由RM指定一个NM来执行封装的命令,启动AM

  • 4)获取submitApplication()执行状态,如果failed/killed则抛出错误

2万字硬核spark源码精讲手册

3.2.2 SubmitApplication提交创建AppMaster请求

  • 1)初始化YarnClient对象,执行YarnClient方法,提交Application

  • 2)创建AM容器启动的上下文环境、启动命令、上传程序包到HDFS

  • 3)调用yarnClient自带方法,提交创建AM Contarnier请求

2万字硬核spark源码精讲手册

  • 4)执行APP启动Command bin/java ApplicationMaster –class --jar:根据deployMode不同,启动ApplicationMaster(YarnCluster)和ExecutorLauncher(YarnClient)

2万字硬核spark源码精讲手册

3.2.3 ApplicationMaster启动准备

  • 1)设置系统配置参数

  • 2)根据match配置在NM上启动Driver或ExecutorLauncher

2万字硬核spark源码精讲手册

3.2.4 启动ApplicationMaster并创建Driver容器

  • 1)创建UserThread线程并运行,调用User自定义类Main,启动Driver线程并初始化SparkContext

  • 2)向ResourceManager注册ApplicationMaster(Yarn底层),成功后申请启动Executor的Container资源

  • 3)启动ExecutorRunner

  • 4)在NodeManager上启动ConreasinedExecutorBackend

2万字硬核spark源码精讲手册

3.2.4.1 创建UserThread线程运行用户Main()

加载用户自定义MainClass, 执行Main方法(后续SparkContext初始化)

2万字硬核spark源码精讲手册

3.2.4.2 注册AM并启动ExecutorRunner

  • 1)向RM注册AM (spark.driver.host/port),向Yarn底层的RMClient提交注册AppMaster的申请并启动AppMaster

  • 2)创建DriverEndpoint通信对象,保持和YarnSchedulerBackend通信

  • 3)AM向 RM申请Container容器资源,分配资源并启动ExecutorRunner对象(后续NM启动Executor)

2万字硬核spark源码精讲手册

  • 3.1)获取所有的container资源状态并挑选存在剩余资源的容器,并启动Executor

2万字硬核spark源码精讲手册

2万字硬核spark源码精讲手册

  • 3.2)向NM申请启动Container启动Executor

准备启动CorseGrainedExecutorBackend类命令,在NM上启动Container

2万字硬核spark源码精讲手册

YarnClient模式可自行查阅源码,执行流程和YarnCluster类似,仅最终启动的组件不同(ExecutorLauncher)

4 源码剖析—SparkContext初始化

Driver启动成功后开始调用我们自定义的MainClass方法,即WordCount中的Main(),即来到了第一步:SparkContext初始化

2万字硬核spark源码精讲手册

SparkContext初始化过程透明化,Spark底层做了很多事情,包括Spark环境初始化创建TaskSchdulerDAGSchedulerSparkContext激活等。

4.1 初始化Spark环境及相关配置

1)定义私有变量

2万字硬核spark源码精讲手册

2)初始化相关配置

2万字硬核spark源码精讲手册

4.2 SparkEnv环境创建

1) 从SparkConf中获取Driver信息,调用Create()方法

2万字硬核spark源码精讲手册

2) 创建Driver RPC Endpoint对象

这个是Driver的RPC通信对象,可以和外部组件通信。

2万字硬核spark源码精讲手册

3) 创建SerializerManager(默认为JavaSerializer)、brocastManager、创建MapOutpuTrackerMaster及其RPC Endpoint对象(这几个都是序列化、内部存储、Shuffle相关的组件)

2万字硬核spark源码精讲手册

4) 创建ShuffleManager(默认为sort shuffle)、MemoryManager(默认为UnifiedMemoryManager:1.6之后)

2万字硬核spark源码精讲手册

5) 创建创建BlockManagerMaster、BlockManager等

2万字硬核spark源码精讲手册

4.3 创建TaskSchduler和DAGScheduler

Spark核心的两大组件,贯穿整个Spark任务的DAG划分、task任务分配和提交。

2万字硬核spark源码精讲手册

2万字硬核spark源码精讲手册

4.3.4.1 创建TaskScheduler

1)初始化TaskScheduler/SchedulerBackend

根据不同的master url,创建对应的TaskScheduler和SchedulerBackend(TaskScheduler的RPC对象)

  • 1)Local(本地单CPU模式): TaskSchedulerImpl:max_local_task_failures:1(本地最大任务重试) LocalSchedulerBackend:totalCores:1(本地启动cpu核数数量1)

    2万字硬核spark源码精讲手册

  • 2)Local_N_REGEX(Local[*]模式): TaskSchedulerImpl :max_local_task_failures:1((本地最大任务重试) LocalSchedulerBackend:threadCount:1(本地启动指定数目CPU/所以可执行cpu)

    2万字硬核spark源码精讲手册

  • 3)LOCAL_N_FAILURES_REGEX (Local[n,m]本地失败重试模式): TaskSchedulerImpl :maxFailures:m(本地最大任务失败重试) LocalSchedulerBackend:threadCount:1(本地启动指定数目CPU/所以可执行cpu)

    2万字硬核spark源码精讲手册

  • 4)SPARK_REGEX(StandAlone模式): TaskSchedulerImpl/StandaloneSchedulerBackend

    2万字硬核spark源码精讲手册

  • 5)Yarn模式下的TaskScheduler和SchedulerBackend创建 TaskSchedulerImpl:根据master-url(cluster/client)初始化 YarnClient/YarnClusterSchedulerBackend: 根据master url(cluster/client)初始化相应的Backend

    2万字硬核spark源码精讲手册

2)初始化taskScheduler资源调度池pool

2万字硬核spark源码精讲手册

创建FIFO/FAIR的taskset资源调度池,后续调度taskset任务。

2万字硬核spark源码精讲手册

  • 1)FIFO(队列机制,先进先出)

    2万字硬核spark源码精讲手册

  • 2)FAIR(读取资源调度文件配置)

    2万字硬核spark源码精讲手册

4.3.4.2 创建/启动DAGScheduler并注册心跳

DAGScheduler创建(TaskScheduler引用),等待后续Job的任务DAG调度:

  • 1)初始化事件处理线程,主要作用于后续处理DAG切分的核心逻辑

  • 2)发送TaskScheduler成功创建心跳到HeartbeatReceiver

2万字硬核spark源码精讲手册

4.3.4.3 TaskScheduler启动

1)Schedulebackend启动

创建TaskScheduler RPCEndpoint对象(和Driver进行通信的实例)

2万字硬核spark源码精讲手册

2)推测任务执行

对一个Stage里面运行慢的Task,会在其他节点的Executor上再次启动这个task,如果其中一个Task实例运行成功则将这个最先完成的Task的计算结果作为最终结果,同时会干掉其他Executor上运行的实例,从而加快运行速度。

2万字硬核spark源码精讲手册

  • 1)检测是否有需要推测式执行的Task, 满足非local模式下开启spark.speculation,开启推测执行,存在则backend调用reviveOffers获取资源运行推测任务。

    2万字硬核spark源码精讲手册

  • 2)当成功的Task数超过总Task数的75% (spark.speculation.quantile: 0.75),再统计任务运行时间中位数乘以1.5(spark.speculation.multiplier)的运行时间阈值,如果超出该阈值则启动推测

    2万字硬核spark源码精讲手册

  • 3)在TasksetManager为下个task分配executor时候dequeueTask()中启用调度检测,先过滤掉已经成功执行的task,另外,推测执行task不在和正在执行的task同一Host执行,不在黑名单executor里执行。

    2万字硬核spark源码精讲手册

4.4 SparkContext初始化

4.4.1 初始化applicationId、ui、blockManager、ContextCleaner、MetricSystem

  • 1)创建ContextCleanner并启动: 清理那些超出应用范围的RDD、shuffleDependency和Broadcast

  • 2)创建MetricSystem并启动: 统计信息管理器

  • 3)创建ExecutorAllocationManager(是否开启动态资源配置),根据工作负载来衡量是否应该增加或减少executor

  • 4)BlockManager初始化

2万字硬核spark源码精讲手册

2万字硬核spark源码精讲手册

4.4.2 启动事件消息监听器;发送环境更新和应用启动消息

2万字硬核spark源码精讲手册

4.5 激活SparkContext

将当前SparkContext的状态从contextBeingConstructed(正在构建中)改为activeContext(已激活)

2万字硬核spark源码精讲手册

至此,SparkContext已经初始化完成,TaskSchedulerDagScheduler已经创建,程序进入任务划分等待阶段。

5 源码剖析—Executor启动

在任务进入任务划分等待阶段时,ExecutorBackend线程已经开始准备启动Executor的工作(这两个步骤是同步进行的)。

至于启动多少executor和如何启动,ExecutorBackend会遵循你的Spark-submit脚本。

这里仅剖析Yarn模式下的Executor情况(Standalone模式情况类似,只不过是对Master反向注册)

5.1 CoarseGrainExecutorBackend向Driver注册Executor

  • 1)初始化CoarseGrainedExecutorBackend环境,创建RPC对象

  • 2)启动onStart()方法,向Driver发送ask注册请求(自身的RPC ref对象)

  • 3)Driver的CoarseGrainedScheduleBackend接收请求并注册Executor

  • 4)CoarseGrainedExecutorBackend接受请求并创建Executor

2万字硬核spark源码精讲手册

5.1.1 创建CoarseGrainedExecutorBackend环境(RPC对象)

2万字硬核spark源码精讲手册

5.1.2 CoarseGrainedSchedulerBackend接收请求,注册Executor

  • 1)内存executorDataMap中添加Executor信息(Executor address记录、数量+1)

    2万字硬核spark源码精讲手册

  • 2)向CoarseGrainedExecutorBackend发送registeredExecutor完成信息

    2万字硬核spark源码精讲手册

  • 3)调用makeOffers(), 等待后续分配taskset给Executor

    2万字硬核spark源码精讲手册

5.1.3 CoarseGrainedExecutor接收executor的注册消息,启动executor

2万字硬核spark源码精讲手册

5.2 CoarseGrainExecutorBackend启动Executor

  • 1)创建ThreadPool线程池

  • 2)创建Executor并序列化

  • 3)等待后续分配task

2万字硬核spark源码精讲手册

Executor此时创建完成,开始进入等待任务分配阶段。

6 源码剖析—Task启动

现在开始进行Task启动过程,首先进行的是任务切分和分配工作。

6.1 Task任务切分

6.1.1 DAGScheduler初始化

DAGScheduler的功能:
1)计算并追踪DAG和划分stage: 最后finalStage,倒推遇到宽依赖就划分stage,优先提交父stage
2)根据stage中的taskset最优算法<存在cache或者checkpoint操作的>设置好优先位置,否则等待taskscheduler进行最优位置划分;最后提交taskset到Taskscheduler
3)处理因shuffle过程丢失的RDD,重新计算和提交; 一个stage内部的原因,则是task自己解决

在完成SparkContext初始化和Executor启动后,这里还是回到我们提交的Main方法中。

2万字硬核spark源码精讲手册

我们定位到Spark程序中的Action算子(foreach/collect算子),其内部调用SparkContext的runJob方法。

2万字硬核spark源码精讲手册

  • 1)SparkContext嵌套的runJob方法(所有的action算子均有这个runJob函数)

    2万字硬核spark源码精讲手册

  • 2)调用DAGScheduler的runJob方法

    2万字硬核spark源码精讲手册

  • 2.1)初始化DAGSchedulerEventProcessLoop,DAGSchedulerEventProcessLoop是来对DAGScheduler主要事件进行管理(包括接收JobSubmit提交等消息处理)

  • 2.2)submitJob方法调用eventProcessLoop的post方法,调用eventProcessLoop post将JobSubmitted事件添加到DAGScheduler事件队列,给自己发送一个提交任务的作业

    2万字硬核spark源码精讲手册

  • 3)eventProcessLoop的receive接收,处理jobtask(调用DAGScheduler的handleJobSubmitted方法)

    2万字硬核spark源码精讲手册

6.1.2 DAGScheduler切分Stage

我们定位到DAGScheduler的handleJobSubmited()方法:

2万字硬核spark源码精讲手册

  • 1)触发job的最后一个rdd,创建finalStage并同时创建shuffleMapStage

  • 2)用finalStage创建一个Job,这个job的最后一个stage,就是finalStage

  • 3)将Job相关信息,加入内存缓冲中

  • 4)第四步,使用submitStage方法提交finalStage

    2万字硬核spark源码精讲手册

6.1.2.1 创建finalStage(ResultStage)

  • 1)检查当前final RDD是否处于屏障阶段

  • 2)获取当前final RDD的父stage(ShuffleMapStage

  • 3)创建当前final RDD的stage(ResultStage)并更新保存内存中的stage信息,即当前stage(ResultStage)和父stage(ShuffleMapStage

    2万字硬核spark源码精讲手册

  • 3.1)调用getOrCreateParentStage获取当前final rdd的父parentShuffleMapStage

    2万字硬核spark源码精讲手册

  • 3.2)获取当前finalRDD的shuffleDependencies依赖;遍历final RDD的shuffleDependiences,然后创建ShuffleMapStage

  • 4)创建ResultStage

  • 4.1)封装Result Stage的id、rdd、partitions和所有shuffle的Dependies信息;

  • 4.2)内存中更新并保存当前Result Stage的信息(shuffleId对应的Map对象)

  • 4.3)返回所有的stagelist(Result Stage/ShuffleMap Stage)

6.1.2.2 创建shuffleMapStage

  • 1)获取当前final RDD到其dependencies中最近的shuffle RDD之间的shuffle dependencies。 例如: A(shuffle) –> B(shuffle) -> C(final RDD) 则只返回B->C

    2万字硬核spark源码精讲手册

  • 2)遍历父shuffleDependencies,创建shuffleMapStage

    2万字硬核spark源码精讲手册

  • 2.1)根据shuffleId获取当前父shuffle RDD的shuffleStage,如果当前shuffleStage已经存在,则直接返回

  • 2.2)否则创建父shuffle RDD的ShuffleMapStage。此过程循环调用直到所有的depedencies全部遍历完成,完成RDD的所有shuffleMapStage的创建

注意:
1)如果shuffleIdToMapStage内存中查找不到当前RDD的shuffle stage信息,首先调用getMissingAncestorShuffleDependencies获取没有注册到shuffleToMapStage中当前RDD的父shuffle dependies信息,并判断获取的dep在shuffleToMapStage中是否存在,不存在则调用createShuffleMapStage创建其父shuffleRDD的shuffle Stage
2)最终调用createShuffleMapStage创建自身的shuffle Stage。内部循环调用getOrCreateParentStages和createShuffleMapStage,遍历所有的父shuffle rdd并创建对应的stage。

2万字硬核spark源码精讲手册

  • 3)创建ShuffleMapStage

    2万字硬核spark源码精讲手册

  • 3.1)封装shuffle stage的shuffleId、rdd、parents、shuffleDependendies、MapOutTracker信息;

  • 3.2)内存中更新并保存shuffleStage的信息(shuffleId对应的Map对象)

  • 3.3)判断mapOutputTracker是否包含该shuffle stage信息,没有则将该shuffle信息(shuffleId/partition数量)封装为shuffleStatus注册到mapOutTracker中, 后续Driver上的mapOutTrackerMaster根据这个shuffleId查找该shuffle信息

6.1.2.3 为当前finalStage生成Job

  • 1)根据当前的action算子及生成的stage创建Active Job对象

  • 2)将当前的active job加入到内存Map中,并和Stage的active Job字段关联

  • 3)获取当前job的所有stageIds和stageInfo信息,发送Job启动的消息

  • 4)提交Stage任务

2万字硬核spark源码精讲手册

补充:stage划分算法
1)DAGScheduler中根据action RDD算子创建finalStage 2)finalStage中创建active Job并将job信息加入内存缓存中
3)使用submitStage提交finalStage
4)获取final RDDshuffleDependies,遍历调用查找finalStage的父stage
5)调用getMissingParentStage查找finalStage的父stage(根据rdd的dependies判断, 如果是shuffleDependency宽依赖则生成stage, Narrow窄依赖则继续压入栈中继续向上遍历),最后返回stage列表 6)如果存在父stage, 则递归调用submitStage(如果一直存在则递归直到stage0), 将当前stage加入waitingStage待提交;
7)如果不存在stage, 则直接提交stage中未提交的tasks(submitMissingTasks)
8)后续submitMissingTask, 为stage创建一批tasks,数量等同于partitions(final RDD的); 计算每个task对应的Partition最佳位置 9)对于stage的task,创建taskset对象,调用TaskSchduler的submitTasks方法

6.1.2.4 查找父stages

  • 1)对stage的active job id进行验证,如果存在,进行第2步,如果不存在,则abortStage终止

  • 2)判断当前stage是否为waiting、running、failed状态,如果是则终止

  • 3)调用getMissingParentStages,遍历当前stage父RDD依赖

    2万字硬核spark源码精讲手册

  • 3.1)判断rdd的dependencies类型,如果是宽依赖,则将其生成一个新的stage

  • 3.2)如果是窄依赖则继续将rdd放入栈中

  • 3.3)返回stage list(New Shuffle Stage | Null)

  • 4)获取getMissingParentStages返回结果

    2万字硬核spark源码精讲手册

  • 4.1)如果当前stage不存在未提交的父stage,则调用submitMissingTasks方法,提交当前stage所有未提交的task

  • 4.2)如果当前stage存在未提交的父Stage,递归调用submitStage()直到最开始的stage0。并将当前stage加入waitingStages等待执行队列中,后续执行

  • 4.3)递归调用submitStage提交所有未提交的父stage,直到最开始的stage0, 陆续调用submitMissingTasks

总结:递归调用查找父stage, 最终执行最开始的stage0,其他的stage加入等待队列中,待后续执行

  • 5)执行当前stage的submitMissingTasks提交task

    2万字硬核spark源码精讲手册

6.1.2.5 提交submitMissingTasks

2万字硬核spark源码精讲手册

  • 1)获取当前stage没有计算的partitions和properities

  • 1.1)如果是shuffleMapStage,调用MapOutputTrackerMaster的findMissingpartitions方法查找MapOutputTracker中需要参与计算的该stage的partitionIds

    2万字硬核spark源码精讲手册

  • 1.2)如果是ResultStage, 获取当前job中未计算的partitionId

    2万字硬核spark源码精讲手册

  • 2)将stage添加到runningStage

  • 3)匹配stage类型,获取task对应partition的最优资源位置来运行job(查看缓存cache中内存->查找BlockManager存储优先级别)

    2万字硬核spark源码精讲手册

  • 4)根据stage类型不同封装task, 传递给TaskScheduler调用task

6.1.2.6 Task提交

DAGScheduler生产DAG切分完成taskset,将taskset提交给TaskScheduler,由TaskScheduler完成task任务分配(具体的executor上)。

  • 1)TaskSchdulersubmitTask方法。把tasks封装到TaskSetManager,并且放入到调度器

  • 2)执行backend.reviveOffers(), 调用CoarseGrainedSchedulerBackend的reviveOffers进行任务分配(executor最优位置分配)

  • 3)执行launchTasks(scheduler.resourceOffers(workOffers)), 执行task排序

2万字硬核spark源码精讲手册

6.2 Executor启动Task

CoarseGrainedSchedulerBackend调用makeOffers, 启动task任务

  • 1)调用scheduler.resourceOffers(workOffers)对task排序和task任务分配(executor和task位置)

    2万字硬核spark源码精讲手册

  • 2)执行launchTasks, 发送消息给CoarseGrainedExecutorBackend创建task任务,对应的Executor准备启动Task任务

    2万字硬核spark源码精讲手册

  • 3)CoarseGrainedExecutorBackend接收请求,调用Executor执行launchTask任务调度

    2万字硬核spark源码精讲手册

  • 4)Executor创建TaskRunner线程对象,并在线程池中取出线程执行(后续task执行)

    2万字硬核spark源码精讲手册

至此,task任务的划分和分配已经完成,下面我们来看下task任务在executor上的启动执行过程。

6.3 Task任务执行

6.3.1 反序列化task代码,创建TaskContext

前面说到Executor创建TaskRunner线程并执行。TaskRunner线程首先反序列化程序代码和数据,然后进行后续操作

2万字硬核spark源码精讲手册

  • 1)初始化Task线程环境和TaskMemoryManager等组件

  • 2)对序列化的task数据进行反序列化

    2万字硬核spark源码精讲手册

  • 3)远程网络通信拉取文件(文件、资源、jar等)

    2万字硬核spark源码精讲手册

  • 4)调用Task的runTask()方法,进行数据计算

    2万字硬核spark源码精讲手册

Task可进一步分成ShuffleMapTaskResultTask(根据shuffle宽依赖),执行不同的runTask()逻辑。

6.3.2 调用Task的runTask()方法

  • 1)由ShuffleMapTask创建的ShuffleWriter执行代码定义的算子,并将结果写入到对应分区的bucket文件

    2万字硬核spark源码精讲手册

  • 2)ShuffleMapTask返回MapStatus到DAGScheduler MapOutputTracker中

  • 3)ResultTask则直接反序列化代码,并执行func自定义方法,将结果传到driver或者输出都调用RDD.iteralator()方法

    2万字硬核spark源码精讲手册

最终task在不同的executor上分布式执行,反序列化数据和执行逻辑并进行状态上报,直至任务完成。

6.4 补充说明

由于篇幅有限且个人水平有限,spark源码暂时剖析到这里。未详尽之处后续有时间还会继续推出文章进行补充,不喜勿喷,谢谢大家~文章来源地址https://www.toymoban.com/news/detail-423388.html

到了这里,关于2万字硬核spark源码精讲手册的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包赞助服务器费用

相关文章

  • 2023年Java核心技术面试第四篇(篇篇万字精讲)

    目录 八. 对比Vector,ArrayList, LinkedList有何区别?  8.1  典型回答 8.1.1 Vector: 8.1.2 ArrayList : 8.1.3 LinkedList 8.2 考察点分析: 8.2.1 不同容器类型适合的场景 8.2.1.1 Vector和ArrayList 8.2.1.2 LinkedList 8.3 小结: 8.3.1 底层实现方式 8.3.2 读写机制 8.3.3 读写效率 8.3.4 线程安全性 九.  对比H

    2024年02月12日
    浏览(30)
  • 2023年Java核心技术面试第九篇(篇篇万字精讲)

    目录 十七 . 并发相关基础概念 17.1 线程安全 17.2 保证线程安全的两个方法 17.2.1 封装 17.2.2 不可变 17.2.2.1 final 和 immutable解释  17.3 线程安全的基本特性 17.3.1 原子性(Atomicity) 17.3.2 可见性(Visibility) 17.3.2.1  volatile  17.3.2.2 synchronized  17.3.2.3  Lock 接口 17.3.

    2024年02月11日
    浏览(25)
  • 万字精讲——数据结构栈与队列必会OJ练习

    万字精讲——数据结构栈与队列必会OJ练习

    W...Y的主页 💕 代码库分享 😊 在之前的博客中,我们学习了栈与队列的基本内容,并且实现了栈与队列。今天我们进行刷题训练,走进栈与队列的世界中去感受一番!!! 目录 括号匹配问题  使用队列实现栈 用栈实现队列 设计循环队列 给定一个只包括  \\\'(\\\' , \\\')\\\' , \\\'{

    2024年02月10日
    浏览(10)
  • 2023年Java核心技术面试第三篇(篇篇万字精讲)

    目录 六.  Java反射机制以及动态代理是基于什么原理  6.1 反射机制: 6.2 反射例子: 6.3 动态代理: 6.4 例子: 6.5 总结: 6.5.1 代理模式 6.5.1.1动态代理: *6.5.1.2 JDK动态代理: *6.5.1.3 cglib动态代理: 6.5.1.4 反射与动态代理原理 6.5.2 应用场景: 6.5.2.3 组成要素: 6.5.2.4 实现方式

    2024年02月12日
    浏览(11)
  • 2023年Java核心技术第十二篇(篇篇万字精讲)

    目录 22. AtomicInteger 底层实现原理是什么?如何在自己的项目代码中应用CAS操作? 22.1 典型回答 22.1.1 CAS详细解释: 22.1.1.1 预期值的选取: 22.1.2 CAS的弊端 22.1.2.1 ABA问题: 22.1.2.2 自旋次数限制: 22.1.2.3 只能保证一个共享变量的原子操作: 22.1.3 CAS操作失败的原因以及解决方案

    2024年02月10日
    浏览(9)
  • 2023年Java核心技术面试第八篇(篇篇万字精讲)

    目录 十五 . 面向对象的基本要素:封装,继承,多态  15.1 封装: 15.1.1 例子: 15.2 继承 15.2.1 例子 15.3 多态 15.3.1 例子 15.3.2 小结:谈谈多态的继承的联系 十六 . synchronized 和 ReentrantLock 的区别? 16.1 典型回答 16.2 深入理解底层锁的概念 16.2.1 synchronized 16.2.2 ReentrantLock 16.2.2.1

    2024年02月11日
    浏览(7)
  • Kafka 万字精讲|工作五年这些你都知道吗?

    Kafka 万字精讲|工作五年这些你都知道吗?

    本文以 Kafka 官方文档 的内容为基石,结合参考文献处文章和笔者自身实践凝练而成,涵盖内容全面,详略得当。 这也是《一文搞懂》系列的第一篇技术长文,期待您的关注。 一个十分钟的视频带你了解 Kafka Apache Kafka 是一个开源的分布式事件流平台,被数千家公司用于高性

    2024年02月02日
    浏览(7)
  • 【Spark精讲】Spark Shuffle详解

    【Spark精讲】Spark Shuffle详解

    目录 Shuffle概述 Shuffle执行流程 总体流程 中间文件 ShuffledRDD生成 Stage划分 Task划分 Map端写入(Shuffle Write) Reduce端读取(Shuffle Read) Spark Shuffle演变 SortShuffleManager运行机制 普通运行机制 bypass 运行机制 Tungsten Sort Shuffle 运行机制 基于Sort的Shuffle机制的优缺点 Shuffle调优 广播变量 shu

    2024年02月02日
    浏览(10)
  • 【Spark精讲】Spark任务运行流程

    【Spark精讲】Spark任务运行流程

    目录 Spark任务执行流程 Client模式 Cluster模式 Yarn任务运行流程 YARN-CLIENT模式 YARN-CLUSTER模式 ​编辑 故障排查 YARN-CLIENT 模式导致的网卡流量激增问 题 YARN-CLUSTER 模式的 JVM 栈内存溢出无法执行问题         部署模式是根据Drvier和Executor的运行位置的不同划分的。client模式提交

    2024年04月10日
    浏览(11)
  • 【Spark精讲】一文讲透SparkSQL执行过程

    【Spark精讲】一文讲透SparkSQL执行过程

    逻辑计划阶段会将用户所写的 SQL语句转换成树型数据结构( 逻辑算子树 ), SQL语句中蕴含的逻辑映射到逻辑算子树的不同节点。 顾名思义,逻辑计划阶段生成的逻辑算子树并不会直接提交执行,仅作为中间阶段 。 最终逻辑算子树的生成过程经历 3 个子阶段,分别对应 未解析

    2024年02月03日
    浏览(10)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包