Skip to content

架构

Flink集群环境的主要服务包括:JobManger 和 TaskManager 是非常典型的主从结构。类比到Spark中,JobManger和TaskManager相当于Application的driver和Executor。

Flink的调度模型和Spark的最大不同在于:Flink使用多线程模型进行任务调度,并且不同App的Job可以混合在同一个TaskManager进程中。换句话说,即Flink任务能够多个App共用一组JobManager和TaskManager!!

在Flink中,App会被JobManger分割成多个task,这些task运行在TaskManager中。由于TaskManager是一个独立的JVM进程,为了进行内存隔离Flink引入了slot的概念,将TaskManager的内存分割成多份,每份是一个slot。每个Task运行时需要获得TaskManager中的一个slot。

App被分成几个task和Flink程序的并行度,以及使用的算子数目息息相关。

安装部署

Flink支持一下方式部署:

Standalone 方式部署

Standalone方式部署没啥好说的,非常典型的M/S架构,部署过程和Hadoop非常相似。

Standalone 方式实际上是在部署一个JobManager/TaskManager集群,后续所有业务共用这个集群。

通过环境变量FLINK_CONF_DIR可以指定Flink的配置文件位置,其中需要关心的只有flink-conf.yaml这一个配置文件。

Yarn 方式部署

在Yarn上运行Flink任务时有两种方式:

  • 所有任务共用一个Yarn Session(类似于Standalone集群);
  • 每个任务专用一个Yarn Session

Yarn Session

Flink在Yarn上运行时,需要将JobManager和TaskManager托管到Yarn上运行,使用下面的命令在Yarn上创建Flink集群。

yarn-session.sh --detached -jm 1024m -tm 4096m

[[PS]]: 上面的命令会读取conf/flink-conf.yaml中的配置文件。用户也可以使用-D的方式传递额外配置,这样可以覆盖配置文件中的参数。

flink run -yid { appid } {jar-path} {args}

[[PS]]: 使用上面的命令可以将任务提交到一个已经存在的Flink集群

用户也可以使用flink命令为每个任务创建一个独立Flink的集群:

flink run -m yarn-cluster {jar-path} {args}

编译安装包

使用Yarn方式部署时,需要下载指定版本的Hadoop依赖。

目前,Flink 1.8 提供 Hadoop 2.8.3 的预编译jar,如果CDH那么需要从源码编译。

官方提供了Build文档

参考以下命令可以编译 Hadoop 3.0.0 - CDH 6.1.0 的依赖:


 mvn clean package -DskipTests -Pinclude-hadoop -Pvendor-repos -Dhadoop.version=3.0.0-cdh6.1.0

# PS: 编译过程中会出现一个Test目录的“.java”文件编译失败,需要手工修改该文件才能编译成功。

# PS:编译完成后,安装包在flink-dist/target/flink-1.8.0-bin中

Kerberos配置

Flink支持Kafka、HDFS、HBase、ZK服务(连接器)的Kerberos配置,并且连接这些服务时共享Kerberos配置,所有任务共用一个Kerberos票据。

Flink在服务的生命周期中,会自动刷新TGT票据。

Flink的安全认证基于 org.apache.flink.runtime.security.modules.SecurityModule 模块,该模块下包含以下子模块:

  • Hadoop Security Module:使用UGI进行认证,连接HDFS、HBase、YARN服务时使用该模块
  • JAAS Security Module:生成动态jaas配置,用于连接zk和Kafka。用户可以使用静态jaas配置文件服务配置。
  • ZooKeeper Security Module:管理Flink连接ZK的安全配置。

相关Kerberos配置:

Key Default Description
security.kerberos.login.contexts (none) 指定jaas使用的上下文,如:Client,KafkaClient
security.kerberos.login.keytab (none) ----
security.kerberos.login.principal (none) ----
security.kerberos.login.use-ticket-cache true ----

JobManager HA

Standalone模式中,Flink集群中可以配置多个JobManager,通过Zookeeper实现故障转移。

Yarn模式中,Flink通过重启JobManager容器的方式恢复。

通过yarn.resourcemanager.am.max-attempts可以指定JobManager的恢复次数,这个参数可以在yarn-site.xml或者flink-conf.yaml中指定。

JobManager的metadata数据保存在zk和文件系统中(HDFS、filesystem、rockdb中)。这些数据用于恢复JobManager。

Histroy Server

Flink 的 Histroy Server 机制和Spark类似,主要两个配置为historyserver.archive.fs.dir和jobmanager.archive.fs.dir,前者为 History Server 扫描日志的目录,后者为JobManager任务写入日志的目录,两者要保持一致。

Histroy Server支持Rest API获取日志信息,可以参考官方链接.

Flink with Cloudera

Cloudera官方没有提供 Flink Parcels。但是,目前Flink-3090已经提出了相应ISSUES,只是过了好几年这个ISSUE也没有被关闭~~~

以下是一些Cloudera集成Flink的非官方方案:

地址 说明
mbalassi/flink-parcel 作者只提供了Flink 1.0.3版本的安装包。不过这个作者是Flink的commiter之一,而且还是PMC
jkirsch/cmflink Not complete yet. But 作者突然不搞了!!

参考

Configuration List

Flink 架构说明