加入收藏 | 设为首页 | 会员中心 | 我要投稿 常州站长网 (https://www.0519zz.com/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 运营中心 > 网站设计 > 教程 > 正文

为啥Spark 的Broadcast要用单例模式

发布时间:2019-06-12 13:06:08 所属栏目:教程 来源:浪院长
导读:副标题#e# 很多用Spark Streaming 的朋友应该使用过broadcast,大多数情况下广播变量都是以单例模式声明的有没有粉丝想过为什么?浪尖在这里帮大家分析一下,有以下几个原因: 广播变量大多数情况下是不会变更的,使用单例模式可以减少spark streaming每次jo

首先,一个基本概念就是Spark应用程序从开始提交到task执行分了很多层。

  1. 应用调度器。主要是资源管理器,比如standalone,yarn等负责Spark整个应用的调度和集群资源的管理。
  2. job调度器。spark 的算子分为主要两大类,transform和action,其中每一个action都会产生一个job。这个job需要在executor提供的资源池里调度执行,当然并不少直接调度执行job。
  3. stage划分及调度。job具体会划分为若干stage,这个就有一个基本的概念就是宽依赖和窄依赖,宽依赖就会划分stage。stage也需要调度执行,从后往前划分,从前往后调度执行。
  4. task切割及调度。stage往下继续细化就是会根据不太的并行度划分出task集合,这个就是在executor上调度执行的基本单元,目前的调度默认是一个task一个cpu。
  5. Spark Streaming 的job生成是周期性的。当前job的执行时间超过生成周期就会产生job 累加。累加一定数目的job后有可能会导致应用程序失败。这个主要原因是由于FIFO的调度模式和Spark Streaming的默认单线程的job执行机制

3.Spark Streaming job生成

这个源码主要入口是StreamingContext#JobScheduler#JobGenerator对象,内部有个RecurringTimer,主要负责按照批处理时间周期产生GenrateJobs事件,当然在存在windows的情况下,该周期有可能不会生成job,要取决于滑动间隔,有兴趣自己去揭秘,浪尖星球里分享的视频教程里讲到了。具体代码块如下

  1. private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds, 
  2.    longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator") 

我们直接看其实现代码块:

  1. eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") { 
  2.       override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event) 
  3.  
  4.       override protected def onError(e: Throwable): Unit = { 
  5.         jobScheduler.reportError("Error in job generator", e) 
  6.       } 
  7.     } 
  8.     eventLoop.start() 

event处理函数是processEvent方法

  1. /** Processes all events */ 
  2.   private def processEvent(event: JobGeneratorEvent) { 
  3.     logDebug("Got event " + event) 
  4.     event match { 
  5.       case GenerateJobs(time) => generateJobs(time) 
  6.       case ClearMetadata(time) => clearMetadata(time) 
  7.       case DoCheckpoint(time, clearCheckpointDataLater) => 
  8.         doCheckpoint(time, clearCheckpointDataLater) 
  9.       case ClearCheckpointData(time) => clearCheckpointData(time) 
  10.     } 
  11.   } 

在接受到GenerateJob事件的时候,会执行generateJobs代码,就是在该代码内部产生和调度job的。

  1. /** Generate jobs and perform checkpointing for the given `time`.  */ 
  2.   private def generateJobs(time: Time) { 
  3.     // Checkpoint all RDDs marked for checkpointing to ensure their lineages are 
  4.     // truncated periodically. Otherwise, we may run into stack overflows (SPARK-6847). 
  5.     ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, "true") 
  6.     Try { 
  7.       jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch 
  8.       graph.generateJobs(time) // generate jobs using allocated block 
  9.     } match { 
  10.       case Success(jobs) => 
  11.         val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time) 
  12.         jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos)) 
  13.       case Failure(e) => 
  14.         jobScheduler.reportError("Error generating jobs for time " + time, e) 
  15.         PythonDStream.stopStreamingContextIfPythonProcessIsDead(e) 
  16.     } 
  17.     eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false)) 
  18.   } 

(编辑:常州站长网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

热点阅读