文章

Flink DataStream API 中的算子

Flink DataStream API 中的算子(Operators)详解

Flink DataStream API 提供了一系列 算子(Operators),用于对流数据进行转换和处理。算子是 Flink 计算逻辑的核心,它们允许开发者对数据流进行 过滤、转换、聚合、分区、窗口操作等。

Flink DataStream API 中的算子主要分为以下几类:

  1. 转换(Transformation)算子:map()flatMap()filter()keyBy()reduce()
  2. 分区(Partitioning)算子:shuffle()rebalance()broadcast()
  3. 窗口(Windowing)算子:window()timeWindow()countWindow()
  4. 联结(Join)算子:connect()union()coMap()coFlatMap()intervalJoin()
  5. 状态(State)管理算子:process()mapWithState()
  6. Sink(输出)算子:print()addSink()

1. 转换(Transformation)算子

这些算子用于对数据流进行基本变换,如映射、过滤、聚合等。

1.1 map()

作用:一对一转换,每个输入元素映射为一个新的元素。

示例:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<Integer> numbers = env.fromElements(1, 2, 3, 4);

DataStream<Integer> squared = numbers.map(value -> value * value);

squared.print();  // 输出: 1, 4, 9, 16

env.execute();

1.2 flatMap()

作用:一对多转换,每个输入元素可以映射成多个输出元素。

示例:

DataStream<String> lines = env.fromElements("hello world", "flink streaming");

DataStream<String> words = lines.flatMap((String line, Collector<String> out) -> {
    for (String word : line.split(" ")) {
        out.collect(word);
    }
}).returns(String.class);

words.print();  // 输出: hello, world, flink, streaming

env.execute();

1.3 filter()

作用:过滤数据,仅保留满足条件的元素。

示例:

DataStream<Integer> numbers = env.fromElements(1, 2, 3, 4, 5, 6);

DataStream<Integer> evens = numbers.filter(value -> value % 2 == 0);

evens.print();  // 输出: 2, 4, 6

env.execute();

1.4 keyBy()

作用:对数据进行分组(类似 SQL GROUP BY)。

示例:

DataStream<Tuple2<String, Integer>> wordCounts = env.fromElements(
    Tuple2.of("apple", 1),
    Tuple2.of("banana", 2),
    Tuple2.of("apple", 3)
);

KeyedStream<Tuple2<String, Integer>, String> keyedStream = wordCounts.keyBy(value -> value.f0);

1.5 reduce()

作用:对 keyBy 之后的数据进行滚动聚合,每次输入一个新元素,都会更新累加值。

示例:

DataStream<Tuple2<String, Integer>> wordCounts = env.fromElements(
    Tuple2.of("apple", 1),
    Tuple2.of("banana", 2),
    Tuple2.of("apple", 3)
);

KeyedStream<Tuple2<String, Integer>, String> keyedStream = wordCounts.keyBy(value -> value.f0);

DataStream<Tuple2<String, Integer>> wordSum = keyedStream.reduce((t1, t2) -> Tuple2.of(t1.f0, t1.f1 + t2.f1));

wordSum.print();  // apple: 4, banana: 2

env.execute();

2. 分区(Partitioning)算子

这些算子用于控制数据在不同 TaskManager 之间的分配方式。

2.1 shuffle()

作用:随机打乱数据的分区。

示例:

DataStream<Integer> shuffled = numbers.shuffle();
shuffled.print();
env.execute();

2.2 rebalance()

作用:均匀分配数据,防止数据倾斜。

DataStream<Integer> balanced = numbers.rebalance();
balanced.print();
env.execute();

2.3 broadcast()

作用:将数据复制到所有 TaskManager 进行计算。

DataStream<Integer> broadcasted = numbers.broadcast();
broadcasted.print();
env.execute();

3. 窗口(Windowing)算子

Flink 提供基于时间和计数的窗口操作。

3.1 timeWindow()

作用:基于时间的滚动窗口。

KeyedStream<Tuple2<String, Integer>, String> keyed = wordCounts.keyBy(value -> value.f0);

DataStream<Tuple2<String, Integer>> windowedCounts = keyed
    .window(TumblingEventTimeWindows.of(Time.seconds(10)))
    .sum(1);

windowedCounts.print();
env.execute();

3.2 countWindow()

作用:基于元素数量的窗口。

DataStream<Tuple2<String, Integer>> countWindowed = keyed
    .countWindow(5)  // 每 5 个元素触发窗口计算
    .sum(1);

countWindowed.print();
env.execute();

4. 联结(Join)算子

4.1 connect()

作用:合并两个不同类型的流。

DataStream<String> stream1 = env.fromElements("hello", "flink");
DataStream<Integer> stream2 = env.fromElements(1, 2, 3);

ConnectedStreams<String, Integer> connected = stream1.connect(stream2);

4.2 union()

作用:合并多个相同类型的流。

DataStream<String> stream1 = env.fromElements("a", "b");
DataStream<String> stream2 = env.fromElements("c", "d");

DataStream<String> merged = stream1.union(stream2);
merged.print();
env.execute();

5. 状态(State)管理算子

5.1 process()

作用:自定义处理逻辑,可访问状态、时间、上下文。

public class MyProcessFunction extends ProcessFunction<String, String> {
    @Override
    public void processElement(String value, Context ctx, Collector<String> out) {
        out.collect("Processed: " + value);
    }
}
stream.process(new MyProcessFunction()).print();
env.execute();

6. Sink(输出)算子

6.1 print()

stream.print();
env.execute();

6.2 addSink()

stream.addSink(new FlinkKafkaProducer<>("topic", new SimpleStringSchema(), properties));
env.execute();

7. 总结

类别 常见算子 用途
转换 map()flatMap()filter() 数据转换、过滤
分区 keyBy()shuffle()broadcast() 数据分组、负载均衡
窗口 timeWindow()countWindow() 流式窗口计算
联结 connect()union()intervalJoin() 连接多个流
状态管理 process() 访问状态和时间信息
Sink print()addSink() 输出数据
License:  CC BY 4.0