Flink on YARN 部署详解(III)

Flink Job 的提交和运行

Posted by Ink Bai on 2022-02-10, & views

上文讲了 Flink JobManager 的启动过程,也就是完成了一个 Flink Cluster 的部署,那么接下来我们看一下一个 Flink Job 的提交运行过程。

Flink Job 提交的大体流程如下:

这个图画的是早期 Flink 版本的架构,图中的 JobManager 在老的 runtime 框架中是存在的,现在把它看成是 JobMaster 就可以。

我们分成以下两步讲解:

  • Client 端提交 Flink Job 到 JobManager
  • JobManager 运行提交的 Job

提交 Flink Job 的方式有多种,可以通过命令行方式也可以通过 Restful 请求,在 Flink 目录下用命令行方式提交任务如下:

1
bin/flink run -d ./examples/streaming/TopSpeedWindowing.jar

进入这个 shell 脚本可以看到启动类是 org.apache.flink.client.cli.CliFrontend,注意这个类是运行在 Client 端的,此时还没有提交到 Flink 集群上。跟进代码可以看到主要做了两件事:

  • 加载配置,如果在 JobManager 机器上提交 Flink Job,会从本地目录找到 YARN 配置文件:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    public static File getYarnPropertiesLocation(@Nullable String yarnPropertiesFileLocation) {

    final String propertiesFileLocation;

    if (yarnPropertiesFileLocation != null) {
    propertiesFileLocation = yarnPropertiesFileLocation;
    } else {
    propertiesFileLocation = System.getProperty("java.io.tmpdir");
    }

    String currentUser = System.getProperty("user.name");

    return new File(propertiesFileLocation, YARN_PROPERTIES_FILE + currentUser);
    }

    找到配置文件就可以确定 YARN 的 ApplicationId 了,当然也可以在命令行通过 --applicationId 指定。

  • 初始化任务运行的上下文环境

    1
    2
    3
    4
    5
    6
    StreamContextEnvironment.setAsContext(
    executorServiceLoader,
    configuration,
    userCodeClassLoader,
    enforceSingleJobExecution,
    suppressSysout);
  • 运行 Job Jar 的入口类,这里也就是 ./examples/streaming/TopSpeedWindowing.jar

    1
    2
    3
    4
    5
    6
    7
    /**
    * This method assumes that the context environment is prepared, or the execution
    * will be a local execution by default.
    */
    public void invokeInteractiveModeForExecution() throws ProgramInvocationException {
    callMainMethod(mainClass, args);
    }

Tips:可以在 flink-conf.yaml 内配置如下参数远程 debug Client 模块、JobManager 模块和 TaskManager 模块:
env.java.opts.client: “-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005”
env.java.opts.jobmanager: “-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5006”
env.java.opts.taskmanager: “-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5007”

接下来就是运行 Flink 应用代码,Flink 应用代码结构比较固定,伪代码如下:

1
2
3
4
5
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
addSource();
addTransformers();
addSink();
env.execute("Flink Example");

总结起来就是这么几步:

  • 算子关系转换为 StreamGraph,然后再转换为 JobGraph
  • 由 StreamExecutionEnvironment 创建一个 PipelineExecutor,这个 executor 提交任务

    AbstractSessionClusterExecutor#execute 方法里面核心逻辑如下:

    1
    2
    3
    4
    5
    6
    7
    ClusterClient<ClusterID> clusterClient = clusterClientProvider.getClusterClient();
    return clusterClient
    .submitJob(jobGraph)
    .thenApplyAsync(jobID -> (JobClient) new ClusterClientJobClientAdapter<>(
    clusterClientProvider,
    jobID))
    .whenComplete((ignored1, ignored2) -> clusterClient.close());

    其中 clusterClient 就是 RestClusterClient,可以提交 Restful 请求。

  • 最后通过 HTTP 协议向 JobManager 提交任务,代码在 RestClusterClient#sendRetriableRequest

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    private <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture<P>
    sendRetriableRequest(M messageHeaders, U messageParameters, R request, Collection<FileUpload> filesToUpload, Predicate<Throwable> retryPredicate) {
    return retry(() -> getWebMonitorBaseUrl().thenCompose(webMonitorBaseUrl -> {
    try {
    return restClient.sendRequest(webMonitorBaseUrl.getHost(), webMonitorBaseUrl.getPort(), messageHeaders, messageParameters, request, filesToUpload);
    } catch (IOException e) {
    throw new CompletionException(e);
    }
    }), retryPredicate);
    }

    host 和 port 正是 JobManager,messageHeaders 的具体实例是 JobSubmitHeaders.getInstance(),这里指定了其 url

JobManager 运行提交的 Job

JobManager 端接收 HTTP 请求的类是 DispatcherRestEndpoint,最底层处理 HTTP 协议是基于 Netty 实现的,在其 pipeline 上添加各种类型的 handler:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
/**
* Starts this REST server endpoint.
*
* @throws Exception if we cannot start the RestServerEndpoint
*/
public final void start() throws Exception {
log.info("Starting rest endpoint.");

final Router router = new Router();
final CompletableFuture<String> restAddressFuture = new CompletableFuture<>();

handlers = initializeHandlers(restAddressFuture);

/* sort the handlers such that they are ordered the following:
* /jobs
* /jobs/overview
* /jobs/:jobid
* /jobs/:jobid/config
* /:*
*/
Collections.sort(
handlers,
RestHandlerUrlComparator.INSTANCE);

checkAllEndpointsAndHandlersAreUnique(handlers);
handlers.forEach(handler -> registerHandler(router, handler, log));

ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() {

@Override
protected void initChannel(SocketChannel ch) {
RouterHandler handler = new RouterHandler(router, responseHeaders);

// SSL should be the first handler in the pipeline
if (isHttpsEnabled()) {
ch.pipeline().addLast("ssl",
new RedirectingSslHandler(restAddress, restAddressFuture, sslHandlerFactory));
}

ch.pipeline()
.addLast(new HttpServerCodec())
.addLast(new FileUploadHandler(uploadDir))
.addLast(new FlinkHttpObjectAggregator(maxContentLength, responseHeaders))
.addLast(new ChunkedWriteHandler())
.addLast(handler.getName(), handler)
.addLast(new PipelineErrorHandler(log, responseHeaders));
}
};

NioEventLoopGroup bossGroup = new NioEventLoopGroup(1, new ExecutorThreadFactory("flink-rest-server-netty-boss"));
NioEventLoopGroup workerGroup = new NioEventLoopGroup(0, new ExecutorThreadFactory("flink-rest-server-netty-worker"));

bootstrap = new ServerBootstrap();
bootstrap
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(initializer);

// other logic
}

拿到 HTTP Request 并解析之后,最终会在 Dispatcher#runJob 处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private CompletableFuture<Void> runJob(JobGraph jobGraph) {
Preconditions.checkState(!jobManagerRunnerFutures.containsKey(jobGraph.getJobID()));

final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = createJobManagerRunner(jobGraph);

jobManagerRunnerFutures.put(jobGraph.getJobID(), jobManagerRunnerFuture);

return jobManagerRunnerFuture
.thenApply(FunctionUtils.uncheckedFunction(this::startJobManagerRunner))
.thenApply(FunctionUtils.nullFn())
.whenCompleteAsync(
(ignored, throwable) -> {
if (throwable != null) {
jobManagerRunnerFutures.remove(jobGraph.getJobID());
}
},
getMainThreadExecutor());
}

可以看到先创建然后启动 JobManagerRunner,启动之后会将 JobGraph 转化为 ExecutionGraph,基于 ExecutionGraph 开始进行任务调度,任务调度结束就开始正式执行。

总结

对照开头的流程图,可以总结 Flink Job 提交并运行的步骤为:

  • Client 通过 shell 命令提交任务
  • Client 端执行 Flink Job 代码,算子转化为 StreamGraph、StreamGraph 转化为 JobGraph
  • Client 通过 HTTP 协议向 JobManager 提交 JobGraph
  • JobManager 上的 Dispatcher 组件接收并处理任务提交请求,创建并运行 JobMaster
  • JobMaster 向 ResourceManager 申请 slot,若没有之前分配的空闲 slot,ResourceManager 就向 YARN 申请资源
  • YARN 分配 container 资源,然后在这个 container 上启动 TaskManager
  • TaskManager 启动后,ResourceManager 向其转发 JobMaster 申请 slot 的请求
  • TaskManager 向 JobMaster 提供 slot
  • JobMaster 拿到 slot 资源后,正式在上面启动 task