首先,一个基本概念就是Spark应用程序从开始提交到task执行分了很多层。
- 应用调度器。主要是资源管理器,比如standalone,yarn等负责Spark整个应用的调度和集群资源的管理。
- job调度器。spark 的算子分为主要两大类,transform和action,其中每一个action都会产生一个job。这个job需要在executor提供的资源池里调度执行,当然并不少直接调度执行job。
- stage划分及调度。job具体会划分为若干stage,这个就有一个基本的概念就是宽依赖和窄依赖,宽依赖就会划分stage。stage也需要调度执行,从后往前划分,从前往后调度执行。
- task切割及调度。stage往下继续细化就是会根据不太的并行度划分出task集合,这个就是在executor上调度执行的基本单元,目前的调度默认是一个task一个cpu。
- Spark Streaming 的job生成是周期性的。当前job的执行时间超过生成周期就会产生job 累加。累加一定数目的job后有可能会导致应用程序失败。这个主要原因是由于FIFO的调度模式和Spark Streaming的默认单线程的job执行机制
3.Spark Streaming job生成
这个源码主要入口是StreamingContext#JobScheduler#JobGenerator对象,内部有个RecurringTimer,主要负责按照批处理时间周期产生GenrateJobs事件,当然在存在windows的情况下,该周期有可能不会生成job,要取决于滑动间隔,有兴趣自己去揭秘,浪尖星球里分享的视频教程里讲到了。具体代码块如下
- private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
- longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")
我们直接看其实现代码块:
- eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") {
- override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event)
-
- override protected def onError(e: Throwable): Unit = {
- jobScheduler.reportError("Error in job generator", e)
- }
- }
- eventLoop.start()
event处理函数是processEvent方法
- /** Processes all events */
- private def processEvent(event: JobGeneratorEvent) {
- logDebug("Got event " + event)
- event match {
- case GenerateJobs(time) => generateJobs(time)
- case ClearMetadata(time) => clearMetadata(time)
- case DoCheckpoint(time, clearCheckpointDataLater) =>
- doCheckpoint(time, clearCheckpointDataLater)
- case ClearCheckpointData(time) => clearCheckpointData(time)
- }
- }
在接受到GenerateJob事件的时候,会执行generateJobs代码,就是在该代码内部产生和调度job的。
- /** Generate jobs and perform checkpointing for the given `time`. */
- private def generateJobs(time: Time) {
- // Checkpoint all RDDs marked for checkpointing to ensure their lineages are
- // truncated periodically. Otherwise, we may run into stack overflows (SPARK-6847).
- ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, "true")
- Try {
- jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
- graph.generateJobs(time) // generate jobs using allocated block
- } match {
- case Success(jobs) =>
- val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
- jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
- case Failure(e) =>
- jobScheduler.reportError("Error generating jobs for time " + time, e)
- PythonDStream.stopStreamingContextIfPythonProcessIsDead(e)
- }
- eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
- }
(编辑:常州站长网)
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!
|