架构¶
Flink集群环境的主要服务包括:JobManger 和 TaskManager 是非常典型的主从结构。类比到Spark中,JobManger和TaskManager相当于Application的driver和Executor。
Flink的调度模型和Spark的最大不同在于:Flink使用多线程模型进行任务调度,并且不同App的Job可以混合在同一个TaskManager进程中。换句话说,即Flink任务能够多个App共用一组JobManager和TaskManager!!

在Flink中,App会被JobManger分割成多个task,这些task运行在TaskManager中。由于TaskManager是一个独立的JVM进程,为了进行内存隔离Flink引入了slot的概念,将TaskManager的内存分割成多份,每份是一个slot。每个Task运行时需要获得TaskManager中的一个slot。
App被分成几个task和Flink程序的并行度,以及使用的算子数目息息相关。
安装部署¶
Flink支持一下方式部署:
-
Mesos
-
其他云原生部署
Standalone 方式部署¶
Standalone方式部署没啥好说的,非常典型的M/S架构,部署过程和Hadoop非常相似。
Standalone 方式实际上是在部署一个JobManager/TaskManager集群,后续所有业务共用这个集群。
通过环境变量FLINK_CONF_DIR可以指定Flink的配置文件位置,其中需要关心的只有flink-conf.yaml这一个配置文件。
Yarn 方式部署¶
在Yarn上运行Flink任务时有两种方式:
- 所有任务共用一个Yarn Session(类似于Standalone集群);
- 每个任务专用一个Yarn Session
Yarn Session¶
Flink在Yarn上运行时,需要将JobManager和TaskManager托管到Yarn上运行,使用下面的命令在Yarn上创建Flink集群。
yarn-session.sh --detached -jm 1024m -tm 4096m
[[PS]]: 上面的命令会读取conf/flink-conf.yaml中的配置文件。用户也可以使用-D的方式传递额外配置,这样可以覆盖配置文件中的参数。
flink run -yid { appid } {jar-path} {args}
[[PS]]: 使用上面的命令可以将任务提交到一个已经存在的Flink集群
用户也可以使用flink命令为每个任务创建一个独立Flink的集群:
flink run -m yarn-cluster {jar-path} {args}
编译安装包¶
使用Yarn方式部署时,需要下载指定版本的Hadoop依赖。
目前,Flink 1.8 提供 Hadoop 2.8.3 的预编译jar,如果CDH那么需要从源码编译。
官方提供了Build文档。
参考以下命令可以编译 Hadoop 3.0.0 - CDH 6.1.0 的依赖:
mvn clean package -DskipTests -Pinclude-hadoop -Pvendor-repos -Dhadoop.version=3.0.0-cdh6.1.0
# PS: 编译过程中会出现一个Test目录的“.java”文件编译失败,需要手工修改该文件才能编译成功。
# PS:编译完成后,安装包在flink-dist/target/flink-1.8.0-bin中
Kerberos配置¶
Flink支持Kafka、HDFS、HBase、ZK服务(连接器)的Kerberos配置,并且连接这些服务时共享Kerberos配置,所有任务共用一个Kerberos票据。
Flink在服务的生命周期中,会自动刷新TGT票据。
Flink的安全认证基于 org.apache.flink.runtime.security.modules.SecurityModule 模块,该模块下包含以下子模块:
- Hadoop Security Module:使用UGI进行认证,连接HDFS、HBase、YARN服务时使用该模块
- JAAS Security Module:生成动态jaas配置,用于连接zk和Kafka。用户可以使用静态jaas配置文件服务配置。
- ZooKeeper Security Module:管理Flink连接ZK的安全配置。
相关Kerberos配置:
| Key | Default | Description |
|---|---|---|
| security.kerberos.login.contexts | (none) | 指定jaas使用的上下文,如:Client,KafkaClient |
| security.kerberos.login.keytab | (none) | ---- |
| security.kerberos.login.principal | (none) | ---- |
| security.kerberos.login.use-ticket-cache | true | ---- |
JobManager HA¶
Standalone模式中,Flink集群中可以配置多个JobManager,通过Zookeeper实现故障转移。
Yarn模式中,Flink通过重启JobManager容器的方式恢复。
通过yarn.resourcemanager.am.max-attempts可以指定JobManager的恢复次数,这个参数可以在yarn-site.xml或者flink-conf.yaml中指定。
JobManager的metadata数据保存在zk和文件系统中(HDFS、filesystem、rockdb中)。这些数据用于恢复JobManager。
Histroy Server¶
Flink 的 Histroy Server 机制和Spark类似,主要两个配置为historyserver.archive.fs.dir和jobmanager.archive.fs.dir,前者为 History Server 扫描日志的目录,后者为JobManager任务写入日志的目录,两者要保持一致。
Histroy Server支持Rest API获取日志信息,可以参考官方链接.
Flink with Cloudera¶
Cloudera官方没有提供 Flink Parcels。但是,目前Flink-3090已经提出了相应ISSUES,只是过了好几年这个ISSUE也没有被关闭~~~
以下是一些Cloudera集成Flink的非官方方案:
| 地址 | 说明 |
|---|---|
| mbalassi/flink-parcel | 作者只提供了Flink 1.0.3版本的安装包。不过这个作者是Flink的commiter之一,而且还是PMC |
| jkirsch/cmflink | Not complete yet. But 作者突然不搞了!! |