• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

Flink三种提交方式

武飞扬头像
爱学习的菜鸟罢了
帮助7

第一种方式:YARN session

操作步骤

说明

1

yarn-session.sh(开辟资源) flink run(提交任务)

 

这种模式下会启动yarn session,并且会启动Flink的两个必要服务:JobManager和Task-managers,然后你可以向集群提交作业。同一个Session中可以提交多个Flink作业。需要注意的是,这种模式下Hadoop的版本至少是2.2,而且必须安装了HDFS(因为启动YARN session的时候会向HDFS上提交相关的jar文件和配置文件)

通过./bin/yarn-session.sh脚本启动YARN Session

脚本可以携带的参数:

 

-n(--container):TaskManager的数量。(1.10 已经废弃)

-s(--slots): 每个TaskManager的slot数量,默认一个slot一个core,默认每个taskmanager的slot的个数为1,有时可以多一些taskmanager,做冗余。

-jm:JobManager的内存(单位MB)。

-q显示可用的YARN资源(内存,内核);

-tm:每个TaskManager容器的内存(默认值:MB)

-nm:yarn 的appName(现在yarn的ui上的名字)。  

-d:后台执行。

 

注意:

如果不想让Flink YARN客户端始终运行,那么也可以启动分离的 YARN会话。该参数被称为-d--detached

 

确定TaskManager数

Flink on YARN时,TaskManager的数量就是:max(parallelism) / yarnslots(向上取整)。例如,一个最大并行度为10,每个TaskManager有两个任务槽的作业,就会启动5个TaskManager。

2

去yarn页面:ip:8088可以查看当前提交的flink session

3

停止当前任务:

 

yarn application -kill  application_1527077715040_0007

第二种方式:YARN PerJob

  1. 使用flink直接提交任务

bin/flink run -m yarn-cluster ./AAA.jar

常用参数:

  • -p 程序默认并行度

下面的参数仅可用于 -m yarn-cluster 模式

  • -yjm JobManager可用内存,单位兆
  • -ynm YARN程序的名称
  • -yq 查询YARN可用的资源
  • -yqu 指定YARN队列是哪一个
  • -ys 每个TM会有多少个Slot
  • -ytm 每个TM所在的Container可申请多少内存,单位兆
  • -yD 动态指定Flink参数
  • -yd 分离模式(后台运行,不指定-yd, 终端会卡在提交的页面

8088页面看作业

 

停止yarn-cluster

yarn application -kill application的ID

注意:

在创建集群的时候,集群的配置参数就写好了,但是往往因为业务需要,要更改一些配置参数,这个时候可以不必因为一个实例的提交而修改conf/flink-conf.yaml;

可以通过:-yD <arg>                        Dynamic properties

来覆盖原有的配置信息:比如:

bin/flink run -m yarn-cluster -yD fs.overwrite-files=true ./AAA.jar

-yD fs.overwrite-files=true -yD taskmanager.network.numberOfBuffers=16368

第三种方式:YARN Application 

application 模式使用 bin/flink run-application 提交作业;通过 -t 指定部署环境,目前 application 模式支持部署在 yarn 上(-t yarn-application) 和 k8s 上(-t kubernetes-application);并支持通过 -D 参数指定通用的 运行配置,比如 jobmanager/taskmanager 内存、checkpoint 时间间隔等。

通过 bin/flink run-application -h 可以看到 -D/-t 的详细说明:(-e 已经被废弃,可以忽略)

bin/flink run-application -h 

参数:

Options for Generic CLI mode:

     -D <property=value>  Generic configuration options for execution/deployment and for the configured executor.The available options can be found at

                           https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html

     -e,--executor <arg>   DEPRECATED: Please use the -t option instead which is also available with the "Application Mode". The name of the executor to be used for executing the

                           given job, which is equivalent to the "execution.target" config option. The currently available executors are: "collection", "remote",

                           "local", "kubernetes-session", "yarn-per-job", "yarn-session".

     -t,--target <arg>     The deployment target for the given application, which is equivalent to the "execution.target" config option. The currently available targets are:

                           "collection", "remote", "local", "kubernetes-session", "yarn-per-job", "yarn-session", "yarn-application" and "kubernetes-application".

  1. 下面列举几个使用 Application 模式提交作业到 yarn 上运行的命令:

第一种方式

带有 JM 和 TM 内存设置的命令提交:

./bin/flink run-application -t yarn-application \

-Djobmanager.memory.process.size=1024m \

-Dtaskmanager.memory.process.size=1024m \

-Dyarn.application.name="MyFlink" \

./examples/batch/AAA.jar --output hdfs://node01:8020/output_51

第二种方式

上面例子 的基础上自己设置 TaskManager slots 个数为3,以及指定并发数为3:

 

./bin/flink run-application -t yarn-application -p 3 \

-Djobmanager.memory.process.size=1024m \

-Dtaskmanager.memory.process.size=1024m \

-Dyarn.application.name="MyFlink" \

-Dtaskmanager.numberOfTaskSlots=3 \

./examples/batch/AAA.jar --output hdfs://node01:8020/output_52

当然,指定并发还可以使用 -Dparallelism.default=3,而且社区目前倾向使用 -D 通用配置代替客户端命令参数(比如 -p)。所以这样写更符合规范:

./bin/flink run-application -t yarn-application \

-Dparallelism.default=3 \

-Djobmanager.memory.process.size=1024m \

-Dtaskmanager.memory.process.size=1024m \

-Dyarn.application.name="MyFlink" \

-Dtaskmanager.numberOfTaskSlots=3 \

./examples/batch/AAA.jar --output hdfs://node01:8020output_53

第三种方式

 yarn.provided.lib.dirs 参数一起使用,可以充分发挥 application 部署模式的优势:我们看 官方配置文档 对这个配置的解释:

 

yarn.provided.lib.dirs: A semicolon-separated list of provided lib directories. They should be pre-uploaded and world-readable. Flink will use them to exclude the local Flink jars(e.g. flink-dist, lib/, plugins/)uploading to accelerate the job submission process. Also YARN will cache them on the nodes so that they doesn't need to be downloaded every time for each application. An example could be hdfs://$namenode_address/path/of/flink/lib

意思是我们可以预先上传 flink 客户端依赖包 (flink-dist/lib/plugin) 到远端存储(一般是 hdfs,或者共享存储),然后通过 yarn.provided.lib.dirs 参数指定这个路径,flink 检测到这个配置时,就会从该地址拉取 flink 运行需要的依赖包,省去了依赖包上传的过程,yarn-cluster/per-job 模式也支持该配置。在之前的版本中,使用 yarn-cluster/per-job 模式,每个作业都会单独上传 flink 依赖包(一般会有 180MB左右)导致 hdfs 资源浪费,而且程序异常退出时,上传的 flink 依赖包往往得不到自动清理。通过指定 yarn.provided.lib.dirs,所有作业都会使用一份远端 flink 依赖包,并且每个 yarn nodemanager 都会缓存一份,提交速度也会大大提升,对于跨机房提交作业会有很大的优化。

使用示例如下:

my-application.jar 是用户 jar 包

上传 Flink 相关 plugins 到hdfs

cd /export/servers/flink-1.13.1/plugins

hdfs dfs -mkdir /flink/plugins

hdfs dfs -put \

external-resource-gpu/flink-external-resource-gpu-1.13.1.jar \

metrics-datadog/flink-metrics-datadog-1.13.1.jar \

metrics-graphite/flink-metrics-graphite-1.13.1.jar \

metrics-influx/flink-metrics-influxdb-1.13.1.jar \

metrics-jmx/flink-metrics-jmx-1.13.1.jar \

metrics-prometheus/flink-metrics-prometheus-1.13.1.jar \

metrics-slf4j/flink-metrics-slf4j-1.13.1.jar \

metrics-statsd/flink-metrics-statsd-1.13.1.jar \

/flink/plugins

 

根据自己业务需求上传相关的 jar

cd /export/servers/flink-1.13.1/libs

hdfs dfs -mkdir /flink/libs

hdfs dfs -put flink-csv-1.13.1.jar \

flink-dist_2.11-1.13.1.jar \

flink-json-1.13.1.jar \

flink-shaded-hadoop-2-uber-2.7.5-10.0.jar \

flink-shaded-zookeeper-3.4.14.jar \

flink-table_2.11-1.13.1.jar \

flink-table-blink_2.11-1.13.1.jar \

log4j-1.2-api-2.12.1.jar log4j-api-2.12.1.jar \

log4j-core-2.12.1.jar \

log4j-slf4j-impl-2.12.1.jar \

/flink/libs

 

上传用户 jar 到 hdfs

cd /export/servers/flink-1.13.1

hdfs dfs -mkdir /flink/user-libs

hdfs dfs -put ./AAA.jar /flink/user-libs

提交任务

bin/flink run-application -t yarn-application \

-Djobmanager.memory.process.size=1024m \

-Dtaskmanager.memory.process.size=1024m \

-Dtaskmanager.numberOfTaskSlots=2 \

-Dparallelism.default=2 \

-Dyarn.provided.lib.dirs="hdfs://node01:8020/flink/libs;hdfs://node01:8020/flink/plugins" \

-Dyarn.application.name="batch" \

hdfs://node01:8020/flink/user-libs/AAAt.jar --output hdfs://node01:8020/output_54

 

也可以将 yarn.provided.lib.dirs 配置到 conf/flink-conf.yaml,这时提交作业就和普通作业没有区别了:

./bin/flink run-application -t yarn-application \

-Djobmanager.memory.process.size=1024m \

-Dtaskmanager.memory.process.size=1024m \

-Dyarn.application.name="MyFlink" \

-Dtaskmanager.numberOfTaskSlots=3 \

/local/path/to/my-application.jar

注意:如果自己指定 yarn.provided.lib.dirs,有以下注意事项:

  1. 需要将 lib 包和 plugins 包地址用;分开,从上面的例子中也可以看到,将 plugins 包放在 lib 目录下可能会有包冲突错误
  2. plugins 包路径地址必须以 plugins 结尾,例如上面例子中的 hdfs://node01:8020/flink/plugins
  3. hdfs 路径必须指定 nameservice(或 active namenode 地址),而不能使用简化方式(例如 hdfs://node01:8020/flink/libs)

该种模式的操作使得 flink 作业提交变得很轻量,因为所需的 Flink jar 包和应用程序 jar 将到指定的远程位置获取,而不是由客户端下载再发送到集群。这也是社区在 flink-1.11 版本引入新的部署模式的意义所在。

Application 模式在停止、取消或查询正在运行的应用程序的状态等方面和 flink-1.11 之前的版本一样,可以采用现有的方法。


yarn application -list | awk '$2 == "APPLICATION_NAME" { print $1 }' 


yarn application -list | awk '$2 == "APPLICATION_NAME" { print $1 }' | xargs yarn application -kill
 

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhfhbjei
系列文章
更多 icon
同类精品
更多 icon
继续加载