欢迎来到Introzo百科
Introzo百科
当前位置:Introzo百科 > 技术

从源码看Azkaban作业流程分配流程

日期:2023-10-11 08:23

-->

上一篇文章零散地列出了看源码时记录的几类信息。本文完整介绍了Azkaban中一个作业流的执行过程。希望能够对刚刚接手的Azkaban相关工作的开发和测试有所帮助。

1。Azkaban简介  

Azkaban作为一个开源的调度系统,在大数据方面有着广泛的应用。主要由三部分组成:Webserver、Executor、DB。

             图1 Azkaban的架构

图1为Azkaban的基本架构:Webserver主要负责权限验证、项目管理、作业流程分发等; Executor主要负责作业流/作业的具体执行以及执行日志的收集; MySQL用于存储作业/作业流的执行状态信息。图中所示的是单执行器场景,但实际应用中大多数项目都采用多执行器场景。下面主要介绍多执行器场景下的azkaban调度流程。

2。作业流程执行流程

图2 作业流程执行流程

图2展示了Azkaban工作流程的执行过程:

1。首先,Webserver判断缓存在内存中的每个Executor的资源状态(Webserver有一个线程遍历每个活跃的executor发送http请求获取其资源状态信息并缓存在内存中),根据选择策略(包括执行器资源)状态、最近执行流程的数量等)选择执行器下发作业流程;

2。然后执行器决定是否设置作业粒度分配。如果不设置作业粒度分配,则所有作业都会在当前执行器中执行;

3。如果设置了作业粒度分配,则当前节点将成为作业分配的决策者,即分配节点;

4。分配节点从zookeeper获取各个executor的资源状态信息,然后根据策略选择一个executor来分配作业;

5。分配给作业的执行器成为执行节点,执行作业,然后更新数据库。

3。从源码看作业流程执行流程

首先是网络服务器结束:

1。 ExecutorServlet类根据请求的ajax参数进行判断。如果ajax=executeFlow,则调用ajaxAttemptExecuteFlow(req, resp, ret, session.getUser())方法

2。在ajaxAttemptExecuteFlow方法中,首先调用getProjectAjaxByPermission方法判断用户是否有执行权限。如果验证权限通过,并且Project和Flow都存在,则调用ajaxExecuteFlow方法

3。 ajaxExecuteFlow方法的主要功能是构造ExecutableFlow对象,设置执行参数(通知机制、并发度、失败策略),然后调用executorManager.submitExecutableFlow方法

4。 executorManager.submitExecutableFlow方法:确定执行策略(管道、忽略、并发);如果是多执行节点模式,则将作业流提交到执行队列队列中;如果是单执行节点模式,则选择唯一的执行节点发送作业流程。

5。 ExecutorManager.submitExecutableFlow()方法是Webserver下发作业流的主要实现逻辑。下面重点介绍其内容:

5.1 从exflow实例中获取作业流程的flowId(即作业流程名称)和日志(“某某开始提交流程XXX”)。

5.2 判断queuedFlows是否已满。如果已满,则记录(“提交失败,Azkaban过饱和”)并返回;如果未满则继续执行代码

?参数生效)
5.6 判断运行是否为空。如果为空,则没有并发实例运行
5.7 如果 running 不为空,获取并发设置 getConcurrentOption()
5.7.1 Pipeline : 将 pipelineExcutionId 设置为运行时最后提交的实例 id
5.7.2 忽略(跳过):抛出异常,“流程已经在执行,忽略本次执行”
5.7.3 并发(忽略):仅修改 Log
5.8 根据白名单设置内存检查
5.9 executorLoader.uploadExecutableFlow(exflow)写入数据库表execution_flows,状态为prepare

5.10 构造具体的执行实例ExecutionReference
5.11 判断是否为多执行节点模式。如果没有,则将执行流的状态标记为active,即写入数据库表active_executing_flows,并将该流派发到唯一的执行节点执行。
5.12 如果是多执行节点模式,则将执行流的状态标记为active,然后将该流放入执行队列queuedFlows中。

6。如果是多执行节点模式,ExecutorManager类会在构造函数中调用setupMultiExecutorMode()方法,该方法会创建一个线程,通过processQueuedFlows方法不断消费队列中的第一个作业流。 processQueuedFlows方法的主要内容是按照一定的规则refreshExecutors来刷新执行节点的资源信息,selectExecutorAndDispatchFlow根据策略从activeExecutors中选择一个executor来下发作业流。 freshExecutors()方法实际上会遍历每个活跃的执行器并发送请求来获取状态信息,而不是通过zookeeper。

至此,Webserver端的工作已经完成。

然后Executor端:

1。执行流程到达Executor端。此时数据库中的状态为准备中

2。 ExecutorServlet 类根据请求的操作参数进行确定。如果action=execute,则调用handleAjaxExecute(req, respMap, execid)方法

3。在handleAjaxExecute方法中执行flowRunnerManager.submitFlow(execId),调用FlowRunnerManager的submitFlow(execId)方法提​​交执行流。

4。 FlowRunnerManager的两个重要数据结构:

4.1 Map,Integer> SubmittedFlows = new ConcurrentHashMap,integer>();

4.2 Map runningFlows = new ConcurrentHashMap();

SubscribedFlows 用于跟踪当前执行器的准备状态下所有流程的执行情况; runningFlows 用于存储当前执行器正在执行的所有流的信息。当需要执行canceling()或killing()时可以找到这些流程。 。

5。 FlowRunnerManager.submitFlow(execId)方法是Executor端执行作业流的主要实现逻辑。其内容详细如下:

5.1 首先判断runningFlows中是否包含execId对应的实例。如果已经包含,则抛出异常

5.2 从executorLoader flow中获取execId对应的执行实例(ExecutableFlow)

5.3 执行setupFlow(流程),配置流程:创建项目和执行目录等。
5.4 获取执行设置ExecutionOptions
5.5 判断pipelineExecId是否为空。如果不为null,则判断pipelineExecId对应的flowRunner是否在runningFlows中。如果在runningFlows中,启动一个LocalFlowWatcher来监控flow中每个job的执行状态;

5.6 如果不在runningFlows中,启动一个RemoteFlowWatcher进行监控,即通过一定时间(默认是60秒)读取数据库的记录来监控flow中每个作业的状态

5.7 判断执行参数中是否包含flow.num.job.threads。如果存在且小于默认值10,请修改该值。该值表示该流可以同时执行的作业线程数。 ​5.11 将 runner 添加到 runningFlows 的映射
5.12 提交到 TrackingThreadPool(工作线程池)
5.13 加入 SubmittedFlows 的映射

6。从那时起,我们就有了一个 FlowRunner 实例。让我们看看 FlowRunner 做了什么。

FlowRunner实际上是一个线程,其run()方法内容如下:

6.1 Executors.newFixedThreadPool(numJobThreads) 创建流程内部作业线程池流程
6.2 setupFlowExecution()
6.3 updateFlowReference()
6.4 updateFlow() update 将流程的状态信息写入数据库表execution_flows
6.5 loadAllProperties()加载作业参数和共享参数
6.6 判断输入参数是否包含job.dispatch(作业粒度分布)。如果是并且为 true,则启动一个新线程 jobEventUpdaterThread 来跟踪作业流程。每个作业的执行状态。
6.7 执行runFlow()
6.8 runFlow()方法:按照DAG图的算法顺序执行作业。从流程的起始节点开始,递归调用runReadyjob()执行作业,然后updateFlow();如果流程尚未结束,则根据重试设置决定是否重新运行失败的作业。
6.9 在runReadyjob()中,会调用runExecutableNode(node)方法。然后runExecutableNode方法会判断job.dispatch参数。如果为false,则通过LocalJobRunner在本地执行;如果为 true,则作业将通过 JobRunnerManager 提交。
6.10 JobRunnerManager通过submitExecutableNode方法构造RemoteJobRunner。 RemoteJobRunner会根据各个执行节点(包括本节点)的资源状况来选择执行作业的节点。

最后,整个过程可以总结成一张图,如下图:

图3 从源码看作业流执行流程

-->

关灯