Flink DataStream API 高级特性
Flink DataStream API 还有一些高级特性和算子。
- 转换算子(
map()
、flatMap()
、filter()
) - 窗口算子(
timeWindow()
、countWindow()
) - 状态管理(
ValueState
、ListState
) - 事件时间处理(
Watermark
) - 复杂事件处理(CEP)
- 旁路输出(Side Output)
- Sink(Kafka、Elasticsearch)
- Checkpoint 及故障恢复
1. 状态(State)管理深入讲解
Flink 提供了 Keyed State 和 Operator State,允许对流数据进行更复杂的状态管理。
1.1 Keyed State
Keyed State 只能在 keyBy()
之后使用,适用于需要针对某个 Key 维护状态的情况。
示例:使用 ValueState
进行累加
public class CountWithState extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {
private transient ValueState<Integer> countState;
@Override
public void open(Configuration parameters) {
ValueStateDescriptor<Integer> descriptor =
new ValueStateDescriptor<>("countState", Types.INT);
countState = getRuntimeContext().getState(descriptor);
}
@Override
public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception {
Integer currentCount = countState.value() == null ? 0 : countState.value();
currentCount += value.f1;
countState.update(currentCount);
out.collect(Tuple2.of(value.f0, currentCount));
}
}
DataStream<Tuple2<String, Integer>> stream = env.fromElements(
Tuple2.of("apple", 1),
Tuple2.of("apple", 2),
Tuple2.of("banana", 3)
);
stream.keyBy(value -> value.f0).flatMap(new CountWithState()).print();
env.execute();
该示例使用 ValueState
记录每个 Key 的累计值。
1.2 Operator State
Operator State 适用于非 KeyedStream,适合处理周期性检查点(Checkpoint)和容错。
public class ListStateExample extends RichMapFunction<Integer, List<Integer>> {
private transient ListState<Integer> listState;
@Override
public void open(Configuration parameters) {
ListStateDescriptor<Integer> descriptor =
new ListStateDescriptor<>("listState", Types.INT);
listState = getRuntimeContext().getListState(descriptor);
}
@Override
public List<Integer> map(Integer value) throws Exception {
listState.add(value);
List<Integer> result = new ArrayList<>();
for (Integer val : listState.get()) {
result.add(val);
}
return result;
}
}
ListState
适用于存储一个列表状态,可以用于保存最近的 N 个元素。
2. ProcessFunction 详细讲解
ProcessFunction
是 Flink 最底层的算子,允许开发者访问 时间戳、状态、定时器(Timer) 等高级特性。
2.1 KeyedProcessFunction
处理时间
public class MyKeyedProcessFunction extends KeyedProcessFunction<String, Tuple2<String, Integer>, String> {
private transient ValueState<Integer> state;
@Override
public void open(Configuration parameters) {
state = getRuntimeContext().getState(new ValueStateDescriptor<>("count", Types.INT));
}
@Override
public void processElement(Tuple2<String, Integer> value, Context ctx, Collector<String> out) throws Exception {
Integer current = state.value() == null ? 0 : state.value();
current += value.f1;
state.update(current);
// 注册定时器,10 秒后触发
ctx.timerService().registerEventTimeTimer(ctx.timestamp() + 10000);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
out.collect("Timer Triggered at: " + timestamp);
}
}
该示例注册了一个 10 秒后触发的定时器,可以用于延迟处理。
3. Watermark 及 Event Time 处理
Flink 通过 Watermark 处理乱序数据,确保窗口计算准确。
3.1 生成 Watermark
DataStream<Tuple2<String, Long>> stream = env.fromElements(
Tuple2.of("event1", 1000L),
Tuple2.of("event2", 2000L),
Tuple2.of("event3", 500L)
);
DataStream<Tuple2<String, Long>> withTimestamps = stream.assignTimestampsAndWatermarks(
WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, timestamp) -> event.f1)
);
这里 assignTimestampsAndWatermarks
允许我们处理乱序数据,设定 5 秒的延迟窗口。
4. CEP(复杂事件处理)
Flink 还支持 CEP(Complex Event Processing),可以检测模式匹配事件,如交易欺诈检测。
4.1 CEP 示例
Pattern<Tuple2<String, Integer>, ?> pattern = Pattern.<Tuple2<String, Integer>>begin("start")
.where(event -> event.f1 > 10)
.next("next")
.where(event -> event.f1 < 5);
PatternStream<Tuple2<String, Integer>> patternStream = CEP.pattern(stream, pattern);
patternStream.select((PatternSelectFunction<Tuple2<String, Integer>, String>) patternMatches ->
"Pattern Matched: " + patternMatches.toString()
).print();
该示例会检测 先出现大于 10 的值,然后出现小于 5 的值。
5. Side Output(旁路输出)
Flink 允许从 ProcessFunction
输出不同类型的数据,适用于异常检测、告警系统等场景。
5.1 Side Output 示例
OutputTag<String> sideOutputTag = new OutputTag<String>("side-output") {};
SingleOutputStreamOperator<String> processedStream = stream.process(new ProcessFunction<String, String>() {
@Override
public void processElement(String value, Context ctx, Collector<String> out) {
if (value.contains("error")) {
ctx.output(sideOutputTag, "Detected error: " + value);
} else {
out.collect("Normal event: " + value);
}
}
});
// 获取旁路输出数据
DataStream<String> sideOutputStream = processedStream.getSideOutput(sideOutputTag);
sideOutputStream.print();
该示例 当检测到 “error” 关键字时,将数据发送到旁路输出。
6. Flink Sink 详细解析
Flink 允许将数据写入不同的存储,如 Kafka、Elasticsearch、HDFS、JDBC 等。
6.1 Kafka Sink
stream.addSink(new FlinkKafkaProducer<>(
"kafka-topic",
new SimpleStringSchema(),
kafkaProperties
));
6.2 Elasticsearch Sink
stream.addSink(new ElasticsearchSink.Builder<>(
elasticsearchHttpHosts,
new ElasticsearchSinkFunction<String>() {
public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
indexer.add(Requests.indexRequest().index("flink-index").source(element));
}
}
).build());
7. Flink Checkpoint 及故障恢复
Flink 提供端到端的状态一致性保证,允许启用 Checkpoint 进行数据恢复。
7.1 启用 Checkpoint
env.enableCheckpointing(5000); // 每 5 秒触发一次 Checkpoint
License:
CC BY 4.0