博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
66、Spark Streaming:数据处理原理剖析与源码分析(block与batch关系透彻解析)
阅读量:5219 次
发布时间:2019-06-14

本文共 1866 字,大约阅读时间需要 6 分钟。

一、数据处理原理剖析

每隔我们设置的batch interval 的time,就去找ReceiverTracker,将其中的,从上次划分batch的时间,到目前为止的这个batch interval time间隔内的block封装为一个batch;其次,会将这个batch中的数据,去创建为一个初始的RDD,一个batch内,在这段时间封装了几个block,就代表这个batch对应的RDD内会有几个partition;这个batch对应的RDD的partition决定了数据处理阶段的并行度,这个跟调优关系很大,如果想增加数据处理阶段的性能,就考虑增加并行度,那么就考虑缩短block interval;只有output操作中,使用了ForEachStream,其中定义了generatorJob()方法,在数据处理阶段,才触发针对接收到的一个一个batch的数据,触发小的job,去处理该batch的数据;最后一步,去找JobScheduler去调度job,job的输入RDD,就是batch对应的RDD;

二、源码分析

入口,JobGenerator的generateJobs()方法

###org.apache.spark.streaming.scheduler/JobGenerator.scala /**    * 定时,调度generateJobs()方法,传入一个time,其实就是一个batch interval内的时间段    */  private def generateJobs(time: Time) {    // Set the SparkEnv in this thread, so that job generation code can access the environment    // Example: BlockRDDs are created in this thread, and it needs to access BlockManager    // Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.    SparkEnv.set(ssc.env)    Try {      // 找到ReceiverTracker,调用其allocateBlocksToBatch方法,将当前时间段内的block分配给一个batch,并为其      // 创建一个RDD      jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch      // 调用DSteamGraph的generateJobs()来根据程序定义的DSteam之间的依赖关系和算子,生成job      graph.generateJobs(time) // generate jobs using allocated block    } match {        // 如果成功创建了job      case Success(jobs) =>        // 从ReceiverTracker中,获取当前batch interval对应的block数据        val receivedBlockInfos =          jobScheduler.receiverTracker.getBlocksOfBatch(time).mapValues { _.toArray }        // 用jobScheduler提交job,其对应的原始数据,是那批block        jobScheduler.submitJobSet(JobSet(time, jobs, receivedBlockInfos))      case Failure(e) =>        jobScheduler.reportError("Error generating jobs for time " + time, e)    }    eventActor ! DoCheckpoint(time)  }

转载于:https://www.cnblogs.com/weiyiming007/p/11387814.html

你可能感兴趣的文章
UNIX基础知识之输入和输出
查看>>
【洛谷 P1666】 前缀单词 (Trie)
查看>>
对称加密和非对称加密
查看>>
数据库锁机制及乐观锁,悲观锁的并发控制
查看>>
图像处理中双线性插值
查看>>
RobHess的SIFT代码解析之RANSAC
查看>>
03 线程池
查看>>
201771010125王瑜《面向对象程序设计(Java)》第十三周学习总结
查看>>
手机验证码执行流程
查看>>
python 基础 ----- 变量
查看>>
设计模式课程 设计模式精讲 2-2 UML类图讲解
查看>>
Silverlight 的菜单控件。(不是 Toolkit的)
查看>>
:hover 鼠标同时触发两个元素变化
查看>>
go语言学习十三 - 相等性
查看>>
Idea 提交代码到码云(提交到github也大同小异)
查看>>
c#连接excel2007未安装ISAM解决
查看>>
Mono 异步加载数据更新主线程
查看>>
初识lua
查看>>
我是插件狂人,jDuang,jValidator,jModal,jGallery
查看>>
张季跃 201771010139《面向对象程序设计(java)》第四周学习总结
查看>>