文章

Flink DataStream API 高级特性

Flink DataStream API 还有一些高级特性和算子。

  • 转换算子(map()flatMap()filter()
  • 窗口算子(timeWindow()countWindow()
  • 状态管理(ValueStateListState
  • 事件时间处理(Watermark
  • 复杂事件处理(CEP)
  • 旁路输出(Side Output)
  • Sink(Kafka、Elasticsearch)
  • Checkpoint 及故障恢复

1. 状态(State)管理深入讲解

Flink 提供了 Keyed State 和 Operator State,允许对流数据进行更复杂的状态管理。

1.1 Keyed State

Keyed State 只能在 keyBy() 之后使用,适用于需要针对某个 Key 维护状态的情况。

示例:使用 ValueState 进行累加

public class CountWithState extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {
    private transient ValueState<Integer> countState;

    @Override
    public void open(Configuration parameters) {
        ValueStateDescriptor<Integer> descriptor =
            new ValueStateDescriptor<>("countState", Types.INT);
        countState = getRuntimeContext().getState(descriptor);
    }

    @Override
    public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception {
        Integer currentCount = countState.value() == null ? 0 : countState.value();
        currentCount += value.f1;
        countState.update(currentCount);
        out.collect(Tuple2.of(value.f0, currentCount));
    }
}

DataStream<Tuple2<String, Integer>> stream = env.fromElements(
    Tuple2.of("apple", 1),
    Tuple2.of("apple", 2),
    Tuple2.of("banana", 3)
);

stream.keyBy(value -> value.f0).flatMap(new CountWithState()).print();
env.execute();

该示例使用 ValueState 记录每个 Key 的累计值。


1.2 Operator State

Operator State 适用于非 KeyedStream,适合处理周期性检查点(Checkpoint)和容错。

public class ListStateExample extends RichMapFunction<Integer, List<Integer>> {
    private transient ListState<Integer> listState;

    @Override
    public void open(Configuration parameters) {
        ListStateDescriptor<Integer> descriptor =
            new ListStateDescriptor<>("listState", Types.INT);
        listState = getRuntimeContext().getListState(descriptor);
    }

    @Override
    public List<Integer> map(Integer value) throws Exception {
        listState.add(value);
        List<Integer> result = new ArrayList<>();
        for (Integer val : listState.get()) {
            result.add(val);
        }
        return result;
    }
}

ListState 适用于存储一个列表状态,可以用于保存最近的 N 个元素。


2. ProcessFunction 详细讲解

ProcessFunction 是 Flink 最底层的算子,允许开发者访问 时间戳、状态、定时器(Timer) 等高级特性。

2.1 KeyedProcessFunction 处理时间

public class MyKeyedProcessFunction extends KeyedProcessFunction<String, Tuple2<String, Integer>, String> {
    private transient ValueState<Integer> state;

    @Override
    public void open(Configuration parameters) {
        state = getRuntimeContext().getState(new ValueStateDescriptor<>("count", Types.INT));
    }

    @Override
    public void processElement(Tuple2<String, Integer> value, Context ctx, Collector<String> out) throws Exception {
        Integer current = state.value() == null ? 0 : state.value();
        current += value.f1;
        state.update(current);

        // 注册定时器,10 秒后触发
        ctx.timerService().registerEventTimeTimer(ctx.timestamp() + 10000);
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
        out.collect("Timer Triggered at: " + timestamp);
    }
}

该示例注册了一个 10 秒后触发的定时器,可以用于延迟处理。


3. Watermark 及 Event Time 处理

Flink 通过 Watermark 处理乱序数据,确保窗口计算准确。

3.1 生成 Watermark

DataStream<Tuple2<String, Long>> stream = env.fromElements(
    Tuple2.of("event1", 1000L),
    Tuple2.of("event2", 2000L),
    Tuple2.of("event3", 500L)
);

DataStream<Tuple2<String, Long>> withTimestamps = stream.assignTimestampsAndWatermarks(
    WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5))
        .withTimestampAssigner((event, timestamp) -> event.f1)
);

这里 assignTimestampsAndWatermarks 允许我们处理乱序数据,设定 5 秒的延迟窗口。


4. CEP(复杂事件处理)

Flink 还支持 CEP(Complex Event Processing),可以检测模式匹配事件,如交易欺诈检测。

4.1 CEP 示例

Pattern<Tuple2<String, Integer>, ?> pattern = Pattern.<Tuple2<String, Integer>>begin("start")
    .where(event -> event.f1 > 10)
    .next("next")
    .where(event -> event.f1 < 5);

PatternStream<Tuple2<String, Integer>> patternStream = CEP.pattern(stream, pattern);

patternStream.select((PatternSelectFunction<Tuple2<String, Integer>, String>) patternMatches ->
    "Pattern Matched: " + patternMatches.toString()
).print();

该示例会检测 先出现大于 10 的值,然后出现小于 5 的值。


5. Side Output(旁路输出)

Flink 允许从 ProcessFunction 输出不同类型的数据,适用于异常检测、告警系统等场景。

5.1 Side Output 示例

OutputTag<String> sideOutputTag = new OutputTag<String>("side-output") {};

SingleOutputStreamOperator<String> processedStream = stream.process(new ProcessFunction<String, String>() {
    @Override
    public void processElement(String value, Context ctx, Collector<String> out) {
        if (value.contains("error")) {
            ctx.output(sideOutputTag, "Detected error: " + value);
        } else {
            out.collect("Normal event: " + value);
        }
    }
});

// 获取旁路输出数据
DataStream<String> sideOutputStream = processedStream.getSideOutput(sideOutputTag);
sideOutputStream.print();

该示例 当检测到 “error” 关键字时,将数据发送到旁路输出。


Flink 允许将数据写入不同的存储,如 Kafka、Elasticsearch、HDFS、JDBC 等。

6.1 Kafka Sink

stream.addSink(new FlinkKafkaProducer<>(
    "kafka-topic",
    new SimpleStringSchema(),
    kafkaProperties
));

6.2 Elasticsearch Sink

stream.addSink(new ElasticsearchSink.Builder<>(
    elasticsearchHttpHosts,
    new ElasticsearchSinkFunction<String>() {
        public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
            indexer.add(Requests.indexRequest().index("flink-index").source(element));
        }
    }
).build());

Flink 提供端到端的状态一致性保证,允许启用 Checkpoint 进行数据恢复。

7.1 启用 Checkpoint

env.enableCheckpointing(5000);  // 每 5 秒触发一次 Checkpoint
License:  CC BY 4.0