1. 介绍

Spark运行JOB主要有两种模式:

  1. cluster mode: Spark driver在 application的master process中运行。如果和YARN集成,则application master process由YARN管理,在YARN中运行。
  2. client mode:Spark driver在clinet process中运行。如果集成YARN,application master只负责从YARN请求资源。

由此可见,两种模式分别代表了瘦客户端(cluster mode)或者是瘦服务器(client mode)两种模式。

本文主要介绍下cluster mode和client mode在使用YARN和不使用YARN的情况下的运行过程。

2. 名词回顾

为了方便理解我们这里先回顾一些名词。

2.1 Standalone模式下存在的角色。

  1. Client:客户端进程,负责提交作业到Master。
  2. Master:Standalone模式中主控节点,负责接收Client提交的作业,管理Worker,并命令Worker启动Driver和Executor。
  3. Worker:Standalone模式中slave节点上的守护进程,负责管理本节点的资源,定期向Master汇报心跳,接收Master的命令,启动Driver和Executor。
  4. Driver: 一个Spark作业运行时包括一个Driver进程,也是作业的主进程,负责作业的解析、生成Stage并调度Task到Executor上。包括DAGScheduler,TaskScheduler。
  5. Executor:即真正执行作业的地方,一个集群一般包含多个Executor,每个Executor接收Driver的命令Launch Task,一个Executor可以执行一到多个Task。

2.2 作业相关的名词解释

  1. Stage:一个Spark作业一般包含一到多个Stage。
  2. Task:一个Stage包含一到多个Task,通过多个Task实现并行运行的功能。
  3. DAGScheduler: 实现将Spark作业分解成一到多个Stage,每个Stage根据RDD的Partition个数决定Task的个数,然后生成相应的Task set放到TaskScheduler中。
  4. TaskScheduler:实现Task分配到Executor上执行。

4. cluster mode

4.1 cluster mode without yarn


1.客户端提交作业给Master

  1. Master让一个Worker启动Driver(上图中间的worker),即SchedulerBackend。 Worker创建一个DriverRunner线程,DriverRunner启动SchedulerBackend进程。
  2. Master让其余Worker启动Exeuctor,即ExecutorBackend。Worker创建一个ExecutorRunner线程,ExecutorRunner会启动ExecutorBackend进程。 ExecutorBackend启动后会向Driver的SchedulerBackend注册。
  3. 在Driver上的SchedulerBackend进程中包含DAGScheduler,它会根据用户程序,生成执行计划,并调度执行。对于每个stage的task,都会被存放到TaskScheduler中,ExecutorBackend向SchedulerBackend汇报的时候把TaskScheduler中的task调度到ExecutorBackend执行。 所有stage都完成后作业结束。

4.2 cluster mode on yarn

该模式下的特点:

  1. spark driver运行在由YARN管理的application master process(管理所有应用生命周期)当中
  2. Driver程序在YARN中运行,所以事先不用启动Spark Master/Client
  3. 应用的运行结果不能在客户端显示(可以在history server中查看),客户端的终端显示的是作为YARN的job的简单运行状况。

其基本过程如下图所示:


PS: 这里可以发现,使用YARN,在启动APP MASTER的时候,也是用“在一个NODE上启动,其他节点来注册”的模式。

一个启动的例子:

14/09/28 11:24:52 INFO RMProxy: Connecting to ResourceManager at hdp01/172.19.1.231:8032
14/09/28 11:24:52 INFO Client: Got Cluster metric info from ApplicationsManager (ASM), number of NodeManagers: 4
14/09/28 11:24:52 INFO Client: Queue info ... queueName: root.default, queueCurrentCapacity: 0.0, queueMaxCapacity: -1.0,
queueApplicationCount = 0, queueChildQueueCount = 0
14/09/28 11:24:52 INFO Client: Max mem capabililty of a single resource in this cluster 8192
14/09/28 11:24:53 INFO Client: Uploading file:/usr/lib/spark/examples/lib/spark-examples_2.10-1.0.0-cdh5.1.0.jar to hdfs://hdp01:8020/user/spark/.sparkStaging/application_1411874193696_0003/spark-examples_2.10-1.0.0-cdh5.1.0.jar
14/09/28 11:24:54 INFO Client: Uploading file:/usr/lib/spark/assembly/lib/spark-assembly-1.0.0-cdh5.1.0-hadoop2.3.0-cdh5.1.0.jar to hdfs://hdp01:8020/user/spark/.sparkStaging/application_1411874193696_0003/spark-assembly-1.0.0-cdh5.1.0-hadoop2.3.0-cdh5.1.0.jar
14/09/28 11:24:55 INFO Client: Setting up the launch environment
14/09/28 11:24:55 INFO Client: Setting up container launch context
14/09/28 11:24:55 INFO Client: Command for starting the Spark ApplicationMaster: List($JAVA_HOME/bin/java, -server, -Xmx512m, -Djava.io.tmpdir=$PWD/tmp, -Dspark.master="spark://hdp01:7077", -Dspark.app.name="org.apache.spark.examples.SparkPi", -Dspark.eventLog.enabled="true", -Dspark.eventLog.dir="/user/spark/applicationHistory", -Dlog4j.configuration=log4j-spark-container.properties, org.apache.spark.deploy.yarn.ApplicationMaster, --class, org.apache.spark.examples.SparkPi, --jar , file:/usr/lib/spark/examples/lib/spark-examples_2.10-1.0.0-cdh5.1.0.jar, , --executor-memory, 1024, --executor-cores, 1, --num-executors , 2, 1>, <LOG_DIR>/stdout, 2>, <LOG_DIR>/stderr)
14/09/28 11:24:55 INFO Client: Submitting application to ASM
14/09/28 11:24:55 INFO YarnClientImpl: Submitted application application_1411874193696_0003
14/09/28 11:24:56 INFO Client: Application report from ASM:
application identifier: application_1411874193696_0003
appId: 3
clientToAMToken: null
appDiagnostics:
appMasterHost: N/A
appQueue: root.spark
appMasterRpcPort: -1
appStartTime: 1411874695327
yarnAppState: ACCEPTED
distributedFinalState: UNDEFINED
appTrackingUrl: http://hdp01:8088/proxy/application_1411874193696_0003/
appUser: spark

总结下过程为:

  1. 由client向ResourceManager提交请求,并上传jar到HDFS上

这期间包括四个步骤:
a). 连接到RM
b). 从RM ASM(ApplicationsManager )中获得metric、queue和resource等信息。
c). upload app jar and spark-assembly jar
d). 设置运行环境和container上下文(launch-container.sh等脚本)

  1. ResouceManager向NodeManager申请资源,创建Spark ApplicationMaster(每个SparkContext都有一个ApplicationMaster)
  2. NodeManager启动Spark App Master,并向ResourceManager AsM注册
  3. Spark ApplicationMaster从HDFS中找到jar文件,启动DAGscheduler和YARN Cluster Scheduler
  4. ResourceManager向ResourceManager AsM注册申请container资源(INFO YarnClientImpl: Submitted application)
  5. ResourceManager通知NodeManager分配Container,这时可以收到来自ASM关于container的报告。(每个container的对应一个executor)
  6. Spark ApplicationMaster直接和container(executor)进行交互,完成这个分布式任务。

需要注意的是:
a). Spark中的localdir会被yarn.nodemanager.local-dirs替换
b). 允许失败的节点数(spark.yarn.max.worker.failures)为executor数量的两倍数量,最小为3.
c). SPARK_YARN_USER_ENV传递给spark进程的环境变量
d). 传递给app的参数应该通过–args指定。

4.3 使用yarn和不使用yarn的区别

用YARN之后的主要区别就是:

  1. 在结构当中添加了node manager和resource manger分别替代了worker和master的工作
  2. spark driver运行在由YARN管理的application master process当中
  3. client提交任务给RM,而不是master
  4. node manager来替代worker的工作,例如启动YARN容器,然后启动executor

5. client mode

5.1 client mode without yarn

JOB运行的过程入下图:

作业执行流程描述:

  1. 客户端启动后直接运行用户程序,启动Driver相关的工作:DAGScheduler和BlockManagerMaster等。
  2. 客户端的Driver向Master注册。 Master还会让Worker启动Exeuctor。
  3. Worker创建一个ExecutorRunner线程,ExecutorRunner会启动ExecutorBackend进程。
  4. ExecutorBackend启动后会向Driver的SchedulerBackend注册。Driver的DAGScheduler解析作业并生成相应的Stage,每个Stage包含的Task通过TaskScheduler分配给Executor执行。 所有stage都完成后作业结束。

5.2 client mode with yarn

client mode on yarn的主要特点可以概括如下:

  1. spark driver运行在client上
  2. 本地Driver负责与所有的executor container进行交互,并将最后的结果汇总。
  3. 结束掉终端,相当于kill掉这个spark应用。

参考资料:

  1. Spark on YARN两种运行模式介绍
  2. Spark架构与作业执行流程简介