Flink DataStream API
Flink DataStream API 是 Apache Flink 用于处理 流数据(Streaming Data) 的核心 API。它支持有界(Bounded)和无界(Unbounded)数据流,提供了丰富的转换(Transformations)、窗口(Windows)、状态管理(State Management)、时间语义(Time Semantics)等功能。
1. DataStream API 核心概念
1.1 DataStream
DataStream<T>
是 Flink 处理流数据的基本抽象,代表一个元素类型为 T
的数据流。
创建 DataStream
的方式:
- 从集合(本地测试时使用)
- 从文件
- 从 Kafka、RabbitMQ 等流式数据源
- 自定义 Source
1.2 主要组成部分
- 数据源(Source):从外部系统读取数据,如 Kafka、Socket、文件等。
- 转换(Transformations):对数据进行各种转换,如
map()
、filter()
、keyBy()
等。 - 窗口(Windowing):对流数据进行分组和聚合,如
window(TumblingEventTimeWindows.of(Time.seconds(10)))
。 - 状态(State):Flink 提供了 Keyed State 和 Operator State 管理流数据的中间状态。
- 时间(Time):支持 Event Time(事件发生时间)、Processing Time(处理时间)、Ingestion Time(进入 Flink 的时间)。
- 数据汇(Sink):数据写出到 Kafka、MySQL、Elasticsearch、HDFS 等。
2. DataStream API 详解
2.1 创建 DataStream(Source)
Flink 提供了多种方式创建 DataStream:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 从集合创建 DataStream(适合测试)
DataStream<String> stream1 = env.fromElements("a", "b", "c");
// 从文件读取
DataStream<String> stream2 = env.readTextFile("path/to/file");
// 从 Kafka 读取(需要依赖 Kafka Connector)
FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), properties);
DataStream<String> stream3 = env.addSource(kafkaSource);
2.2 数据转换(Transformations)
Flink 提供了丰富的流数据转换 API,核心有:
- map():一对一转换
- flatMap():一对多转换
- filter():过滤数据
- keyBy():按照 key 进行分区
- reduce():聚合数据
- window():窗口计算
- process():更底层的流处理
示例:
// map():将每个元素转换为大写
DataStream<String> upperCaseStream = stream1.map(String::toUpperCase);
// filter():过滤掉不满足条件的元素
DataStream<String> filteredStream = upperCaseStream.filter(value -> value.startsWith("A"));
// keyBy():按 key 分区(必须是 Tuple 类型或 POJO)
DataStream<Tuple2<String, Integer>> keyedStream = stream1.map(value -> new Tuple2<>(value, 1)).keyBy(0);
2.3 窗口(Windowing)
窗口(Window)是 Flink 流计算的核心,用于按时间或数量划分数据流。常见窗口类型:
- 滚动窗口(Tumbling Window):固定时间间隔,不重叠
- 滑动窗口(Sliding Window):有重叠,每次滑动一定步长
- 会话窗口(Session Window):按事件间隔分割
- 全局窗口(Global Window):仅在自定义触发器时使用
示例:滚动窗口
DataStream<Tuple2<String, Integer>> windowedStream = keyedStream
.window(TumblingEventTimeWindows.of(Time.seconds(10))) // 10秒的滚动窗口
.sum(1);
2.4 状态管理(State)
Flink 提供了 Keyed State 和 Operator State 来存储计算中的中间状态。
- Keyed State:基于 keyBy 进行管理,典型应用如滚动计数器
- Operator State:作用于算子级别,如 Kafka 读取的偏移量
示例:Keyed State
public class CountWithKeyedState extends KeyedProcessFunction<String, String, Tuple2<String, Integer>> {
private transient ValueState<Integer> countState;
@Override
public void open(Configuration parameters) {
ValueStateDescriptor<Integer> descriptor = new ValueStateDescriptor<>(
"count",
Integer.class,
0);
countState = getRuntimeContext().getState(descriptor);
}
@Override
public void processElement(String value, Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception {
int count = countState.value() + 1;
countState.update(count);
out.collect(new Tuple2<>(value, count));
}
}
2.5 时间管理(Time Semantics)
Flink 支持 3 种时间语义:
- Processing Time:基于系统当前时间
- Event Time:基于事件发生时间,需要
Watermark
处理乱序 - Ingestion Time:事件进入 Flink 时的时间
示例:使用 Event Time 和 Watermark
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<String> stream = env.addSource(new MyEventSource())
.assignTimestampsAndWatermarks(WatermarkStrategy
.<String>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, timestamp) -> extractTimestamp(event)));
2.6 数据写出(Sink)
Flink 提供多种 Sink,支持写入 Kafka、MySQL、Elasticsearch 等。
示例:写入 Kafka
FlinkKafkaProducer<String> kafkaSink = new FlinkKafkaProducer<>(
"output_topic",
new SimpleStringSchema(),
properties
);
stream.addSink(kafkaSink);
示例:写入 MySQL
stream.addSink(JdbcSink.sink(
"INSERT INTO table_name (col1, col2) VALUES (?, ?)",
(statement, value) -> {
statement.setString(1, value.f0);
statement.setInt(2, value.f1);
},
JdbcExecutionOptions.builder().withBatchSize(1000).build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:mysql://localhost:3306/db")
.withDriverName("com.mysql.jdbc.Driver")
.withUsername("user")
.withPassword("password")
.build()
));
3. 总结
Flink DataStream API 是 Flink 流计算的核心,包含:
- Source:支持 Kafka、文件等数据源
- Transformations:map、filter、keyBy、reduce 等算子
- Window:支持滚动窗口、滑动窗口等
- State:支持 Keyed State 和 Operator State
- Time:支持 Processing Time、Event Time、Ingestion Time
- Sink:支持写入 Kafka、MySQL、Elasticsearch 等
License:
CC BY 4.0