Flink DataStream API 中的算子
Flink DataStream API 中的算子(Operators)详解
Flink DataStream API 提供了一系列 算子(Operators),用于对流数据进行转换和处理。算子是 Flink 计算逻辑的核心,它们允许开发者对数据流进行 过滤、转换、聚合、分区、窗口操作等。
Flink DataStream API 中的算子主要分为以下几类:
- 转换(Transformation)算子:
map()
、flatMap()
、filter()
、keyBy()
、reduce()
等 - 分区(Partitioning)算子:
shuffle()
、rebalance()
、broadcast()
等 - 窗口(Windowing)算子:
window()
、timeWindow()
、countWindow()
等 - 联结(Join)算子:
connect()
、union()
、coMap()
、coFlatMap()
、intervalJoin()
等 - 状态(State)管理算子:
process()
、mapWithState()
等 - Sink(输出)算子:
print()
、addSink()
等
1. 转换(Transformation)算子
这些算子用于对数据流进行基本变换,如映射、过滤、聚合等。
1.1 map()
作用:一对一转换,每个输入元素映射为一个新的元素。
示例:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Integer> numbers = env.fromElements(1, 2, 3, 4);
DataStream<Integer> squared = numbers.map(value -> value * value);
squared.print(); // 输出: 1, 4, 9, 16
env.execute();
1.2 flatMap()
作用:一对多转换,每个输入元素可以映射成多个输出元素。
示例:
DataStream<String> lines = env.fromElements("hello world", "flink streaming");
DataStream<String> words = lines.flatMap((String line, Collector<String> out) -> {
for (String word : line.split(" ")) {
out.collect(word);
}
}).returns(String.class);
words.print(); // 输出: hello, world, flink, streaming
env.execute();
1.3 filter()
作用:过滤数据,仅保留满足条件的元素。
示例:
DataStream<Integer> numbers = env.fromElements(1, 2, 3, 4, 5, 6);
DataStream<Integer> evens = numbers.filter(value -> value % 2 == 0);
evens.print(); // 输出: 2, 4, 6
env.execute();
1.4 keyBy()
作用:对数据进行分组(类似 SQL GROUP BY
)。
示例:
DataStream<Tuple2<String, Integer>> wordCounts = env.fromElements(
Tuple2.of("apple", 1),
Tuple2.of("banana", 2),
Tuple2.of("apple", 3)
);
KeyedStream<Tuple2<String, Integer>, String> keyedStream = wordCounts.keyBy(value -> value.f0);
1.5 reduce()
作用:对 keyBy
之后的数据进行滚动聚合,每次输入一个新元素,都会更新累加值。
示例:
DataStream<Tuple2<String, Integer>> wordCounts = env.fromElements(
Tuple2.of("apple", 1),
Tuple2.of("banana", 2),
Tuple2.of("apple", 3)
);
KeyedStream<Tuple2<String, Integer>, String> keyedStream = wordCounts.keyBy(value -> value.f0);
DataStream<Tuple2<String, Integer>> wordSum = keyedStream.reduce((t1, t2) -> Tuple2.of(t1.f0, t1.f1 + t2.f1));
wordSum.print(); // apple: 4, banana: 2
env.execute();
2. 分区(Partitioning)算子
这些算子用于控制数据在不同 TaskManager 之间的分配方式。
2.1 shuffle()
作用:随机打乱数据的分区。
示例:
DataStream<Integer> shuffled = numbers.shuffle();
shuffled.print();
env.execute();
2.2 rebalance()
作用:均匀分配数据,防止数据倾斜。
DataStream<Integer> balanced = numbers.rebalance();
balanced.print();
env.execute();
2.3 broadcast()
作用:将数据复制到所有 TaskManager 进行计算。
DataStream<Integer> broadcasted = numbers.broadcast();
broadcasted.print();
env.execute();
3. 窗口(Windowing)算子
Flink 提供基于时间和计数的窗口操作。
3.1 timeWindow()
作用:基于时间的滚动窗口。
KeyedStream<Tuple2<String, Integer>, String> keyed = wordCounts.keyBy(value -> value.f0);
DataStream<Tuple2<String, Integer>> windowedCounts = keyed
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.sum(1);
windowedCounts.print();
env.execute();
3.2 countWindow()
作用:基于元素数量的窗口。
DataStream<Tuple2<String, Integer>> countWindowed = keyed
.countWindow(5) // 每 5 个元素触发窗口计算
.sum(1);
countWindowed.print();
env.execute();
4. 联结(Join)算子
4.1 connect()
作用:合并两个不同类型的流。
DataStream<String> stream1 = env.fromElements("hello", "flink");
DataStream<Integer> stream2 = env.fromElements(1, 2, 3);
ConnectedStreams<String, Integer> connected = stream1.connect(stream2);
4.2 union()
作用:合并多个相同类型的流。
DataStream<String> stream1 = env.fromElements("a", "b");
DataStream<String> stream2 = env.fromElements("c", "d");
DataStream<String> merged = stream1.union(stream2);
merged.print();
env.execute();
5. 状态(State)管理算子
5.1 process()
作用:自定义处理逻辑,可访问状态、时间、上下文。
public class MyProcessFunction extends ProcessFunction<String, String> {
@Override
public void processElement(String value, Context ctx, Collector<String> out) {
out.collect("Processed: " + value);
}
}
stream.process(new MyProcessFunction()).print();
env.execute();
6. Sink(输出)算子
6.1 print()
stream.print();
env.execute();
6.2 addSink()
stream.addSink(new FlinkKafkaProducer<>("topic", new SimpleStringSchema(), properties));
env.execute();
7. 总结
类别 | 常见算子 | 用途 |
---|---|---|
转换 | map() 、flatMap() 、filter() |
数据转换、过滤 |
分区 | keyBy() 、shuffle() 、broadcast() |
数据分组、负载均衡 |
窗口 | timeWindow() 、countWindow() |
流式窗口计算 |
联结 | connect() 、union() 、intervalJoin() |
连接多个流 |
状态管理 | process() |
访问状态和时间信息 |
Sink | print() 、addSink() |
输出数据 |
License:
CC BY 4.0