Flink1.7.2 并行计算源码分析

  • 时间:
  • 浏览:3

将内部管理化数据从BLOB存储加载回对象

创建原来后端情況,stateBackend,此时为MemoryStateBackend

当前流任务对应的操作链条,此处不同的流任务对应的操作链条不一样,像source流中,用户自定义的函数链不一样,有本身operatorChain可是我我一样,这里以WordCount为例说明

end

= {Execution@54009} "Attempt #0 (Sink: Print to Std. Out (1/1)) @ org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@67b9a9d7 - [SCHEDULED]"

构建任务情況管理器TaskStateManager

加载用户系统进程jar文件,给当前Task使用

triggerTarget.onProcessingTime(timer);// 调用 WindowOperator.onProcessingTime(timer)出理

在所有的operators是opened以前所有的触发器调度只有被执行,可是我我前要先把operator.open

这里是Task的执行情況,前面是Executition的执行情況,前要区分开来,更新任务情況,由CREATED(已创建)到DEPLOYING(部署中)

调用Execution.deploy()进行部署

将会那么 调置时间服务,就创建SystemProcessingTimeService,它将当前出理 时间指定为调用的结果(时间)

INFO日志输出:部署哪原来Execution到哪一台机器上

更新Execution情況,将当前Execution的情況由SCHEDULED更新为DEPLOYING,即由已调度情況更新为部署中

将任务增加到任务槽位中

调用具体任务的run()函数去出理 ,这里分不同的类型

指定Source中的拆分器,可是我我将不断产生数据的Source拆分给不同的Window做并行任务(RpcInputSplitProvider是其中的有本身分配辦法 )

executionsToDeploy包括所有的(Source,Window,Sink),在这里设置的setParallelism()并行度为几次,都在几次个Window,本案例设置的为3,什么都有 executionsToDeploy对象的数据如下

headOperator = operatorChain.getHeadOperator()为StreamSource

构建任务Task

从序列化的对象中反序列化(通过类加载),JobInformation,TaskInformation,用来构建TaskInformation,Task

StreamTask.invoke()

slot得到TaskManager

调用任务的启动系统进程,该辦法 会触发调用Task.run()函数,

创建文件系统流为有本身任务

任务初使化

给当前任务构建运行环境

加载并实例化任务的可调用代码(用户代码)

TaskManager.submitTask 提交任务,参数为TaskDeploymentDescriptor

任务我不知道它们与其他任务的关系,将会它们是第一次执行任务还是重复尝试。 所有几次只有JobManager知道。 所有任务都知道它买车人的可运行代码,任务的配置以及要使用和联 成的里边结果的ID(将会有语录)。

每个任务由原来专用系统进程运行。

分区实现KeyGroupRangeAssignment.assignKeyToParallelOperator(key, maxParallelism, numberOfOutputChannels);

注册网络追踪给这当前任务

里边发送给Sink的数据,可是我我遍历有本身processingTimeTimersQueue中的数据,当然,每次发送第原来元素,发送后,会把最后原来元素插进第原来元素

把当前元素增加到state中保存,add函数中会对相同key进行聚合操作(reduce),对同原来window中相同key进行求和可是我我在有本身辦法 中进行的

WindowOperator.processElement,给每原来WordWithCount(1,1) 原来的元素分配window,也可是我我确认每原来元素属于哪原来窗口,将会前要对同原来窗口的相同key进行聚合操作

Window 调的是OneInputStreamTask.run()函数

构建TaskDeploymentDescriptor对象,该对象引用Task实例Execution的id,slot(槽位),就都可不都可以能确定Execution在哪个slot上运行

更新当前任务情況,从DEPLOYING(部署中)更新为RUNNING(运行中)

源码