背景:我们用 Node.js 启多实例的时候,通过 pm2 cluster 模式启动多实例的方式提高服务的数据处理效率,Flink 也有类似的多实例的配置,但又有点不一样,这里单独列出来说一下,同时又结合 kafka 分区也简单说一下。
Flink 程序可以由不同的 task (如:transformations/opterators,data sources 及 data sinks等) 组成,一个 task 会分发到多个并发实例中运行,并且每个并发实例处理 task 的部分输入数据集。一个 task 的并发实例数叫做 parallelism。
如果你想使用savepoints的话,你需要设置一个最大并发数,当你从 savepoints 中重新获取数据时,你可以改变并发数,但是新的并发数必须大于先前的并发数。
配置 parallelism
task 的 parallelism 可以在 Flink 的不同级别上指定。
算子(operator)级别
每个 operator、data source 或者 data sink 都可以通过调用 setParallelism() 方法来指定parallelism,Scala 代码例如:
val env = StreamExecutionEnvironment.getExecutionEnvironment
val text = [...]
val wordCounts = text
.flatMap{ _.split(" ") map { (_, 1) } }
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1).setParallelism(5)
wordCounts.print()
env.execute("Word Count Example")
运行环境级别
Flink 程序是在一个运行环境的上下文中运行的。一个运行环境为每个 operator、data source 和data sink 的运行定义了一个默认的并发数。运行环境的并发数可以被每个算子确切的并发数配置所覆盖。
运行环境的默认并发数可以通过调用 setParallelism() 方法来指定。为了让所有的operator、data source和data sink以3个并发数来运行,你可按如下方法来设置运行环境的默认并发数,Scala 代码例如:
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(3)
val text = [...]
val wordCounts = text
.flatMap{ _.split(" ") map { (_, 1) } }
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1)
wordCounts.print()
env.execute("Word Count Example")
客户端级别
并发数可以在提交 Job 到 Flink 的客户端设置,客户端可以是 Java 或者 Scala 程序,典型的例子如: Flink 命令行接口(CLI)。
对于 CLI 客户端,并发参数可以通过 -p
来指定,例如:
./bin/flink run -p 10 ../examples/*WordCount-java*.jar
在 Scala 程序中,可以按如下方式指定:
try {
val program = new PackagedProgram(file, args)
val jobManagerAddress = RemoteExecutor.getInetFromHostport("localhost:6123")
val config = new Configuration()
val client = new Client(jobManagerAddress, config, program.getUserCodeClassLoader())
// 将并发度设为10
client.run(program, 10, true)
} catch {
case e: Exception => e.printStackTrace
}
系统级别
影响所有运行环境的系统级别的默认并发度可以在./conf/flink-conf.yaml的parallelism.defaul项中指定。
kafka 分区数与 Flink parallelism
在 kafka 中,同一个消费组的不同消费者至少消费一个分区,不可能存在两个不同的消费者消费同一个分区的场景。Flink 的消费者数量依赖 flink parallelism(默认为1)
有下面三种场景:
1、kafka partitions == flink parallelism
这个场景是最理想的和最好的,每个消费者只关心一个分区。如果每个分区的消息是均衡的,则每个 flink 算子工作量也均衡。
2、kafka partitions < flink parallelism
这个场景下,有些 flink 实例将不会收到任何消息,实例空跑,占用资源。我们需要避免这种情况,我们可以在调用输入流之前执行重新平衡rebalance
,例如:
inputStream = env.addSource(new FlinkKafkaConsumer10("topic", new SimpleStringSchema(), properties));
inputStream
.rebalance()
.map(s -> "message" + s)
.print();
3、kafka partitions > flink parallelism
这个场景下,一些实例会处理多个分区的消息。同样地,我们也可以使用rebalance
来避免这种情况。
收官。
本文由 Chakhsu Lau 创作,采用 知识共享署名4.0 国际许可协议进行许可。
本站文章除注明转载/出处外,均为本站原创或翻译,转载前请务必署名。