事件时间
由于在大多数现实世界的用例中,消息到达是无序的,我们处理事件时必须考虑那些迟到的消息。 Flink 有强大的事件时间处理接口,协助开发者更好地完成业务需求。
如下图,我们可以清晰地看到关于事件的三种时间:1、Event Time ;2、Ingestion Time;3、Processing Time。
Event Time 是事件在现实世界中发生的时间,Ingestion Time 是事件摄入到系统的时间,Processing Time 是Flink系统处理该事件的时间。
watermark
一般情况下,我们选择用 Event Time 作为事件处理时间,来保证程序处理是符合预期的。
但我们知道,流处理从事件产生,到流经source,再到operator,中间是有一个过程和时间的。虽然大部分情况下,流到 operator 的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络、背压等原因,导致乱序的产生(out-of-order或者说late element)。但是对于 late element,我们又不能无限期的等下去,必须要有个机制来保证一个特定的时间后,必须触发window去进行计算了。这个特别的机制,就是 watermark。
更多关于 watermark 的介绍可以查看:http://vishnuviswanath.com/flink_eventtime.html
kafka 例子
kafka 作为数据源,我们要选择消息里的某个字段作为 EventTime,配置 watermark 来保证数据处理的顺序性:
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val kafkaSource = new FlinkKafkaConsumer[MyType]("myTopic", schema, props)
kafkaSource.assignTimestampsAndWatermarks(new AscendingTimestampExtractor[MyType] {
def extractAscendingTimestamp(element: MyType): Long = element.eventTimestamp
})
val stream: DataStream[MyType] = env.addSource(kafkaSource)
图例:
从图中,我们可以看到每个分区都会生成watermark,以及在这种情况下 watermark 是如何在数据处理流中传递。
注意事项:
如果 watermark 依赖从 kafka 读取的消息里的字段,则需要所有 topic 和 partition 都要有连续的消息流,否则,整个应用的 watermark 不能前进,而且基于时间操作的算子都不会进行工作,如 windows 算子。
目前这个问题将在 1.8.0 版本中解决:https://issues.apache.org/jira/browse/FLINK-5479
我们集群是1.7.2,如果使用kafka作为数据源,需要保证每个分区都有消息流。
参考链接
- https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/event_time.html
- https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission
- https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/event_timestamps_watermarks.html#timestamps-per-kafka-partition
本文由 Chakhsu Lau 创作,采用 知识共享署名4.0 国际许可协议进行许可。
本站文章除注明转载/出处外,均为本站原创或翻译,转载前请务必署名。