文章

Flink DataStream API

Flink DataStream API 是 Apache Flink 用于处理 流数据(Streaming Data) 的核心 API。它支持有界(Bounded)和无界(Unbounded)数据流,提供了丰富的转换(Transformations)、窗口(Windows)、状态管理(State Management)、时间语义(Time Semantics)等功能。


1. DataStream API 核心概念

1.1 DataStream

DataStream<T> 是 Flink 处理流数据的基本抽象,代表一个元素类型为 T 的数据流。

创建 DataStream 的方式:

  • 从集合(本地测试时使用)
  • 从文件
  • 从 Kafka、RabbitMQ 等流式数据源
  • 自定义 Source

1.2 主要组成部分

  1. 数据源(Source):从外部系统读取数据,如 Kafka、Socket、文件等。
  2. 转换(Transformations):对数据进行各种转换,如 map()filter()keyBy() 等。
  3. 窗口(Windowing):对流数据进行分组和聚合,如 window(TumblingEventTimeWindows.of(Time.seconds(10)))
  4. 状态(State):Flink 提供了 Keyed State 和 Operator State 管理流数据的中间状态。
  5. 时间(Time):支持 Event Time(事件发生时间)、Processing Time(处理时间)、Ingestion Time(进入 Flink 的时间)。
  6. 数据汇(Sink):数据写出到 Kafka、MySQL、Elasticsearch、HDFS 等。

2. DataStream API 详解

2.1 创建 DataStream(Source)

Flink 提供了多种方式创建 DataStream:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 从集合创建 DataStream(适合测试)
DataStream<String> stream1 = env.fromElements("a", "b", "c");

// 从文件读取
DataStream<String> stream2 = env.readTextFile("path/to/file");

// 从 Kafka 读取(需要依赖 Kafka Connector)
FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), properties);
DataStream<String> stream3 = env.addSource(kafkaSource);

2.2 数据转换(Transformations)

Flink 提供了丰富的流数据转换 API,核心有:

  1. map():一对一转换
  2. flatMap():一对多转换
  3. filter():过滤数据
  4. keyBy():按照 key 进行分区
  5. reduce():聚合数据
  6. window():窗口计算
  7. process():更底层的流处理

示例:

// map():将每个元素转换为大写
DataStream<String> upperCaseStream = stream1.map(String::toUpperCase);

// filter():过滤掉不满足条件的元素
DataStream<String> filteredStream = upperCaseStream.filter(value -> value.startsWith("A"));

// keyBy():按 key 分区(必须是 Tuple 类型或 POJO)
DataStream<Tuple2<String, Integer>> keyedStream = stream1.map(value -> new Tuple2<>(value, 1)).keyBy(0);

2.3 窗口(Windowing)

窗口(Window)是 Flink 流计算的核心,用于按时间或数量划分数据流。常见窗口类型:

  1. 滚动窗口(Tumbling Window):固定时间间隔,不重叠
  2. 滑动窗口(Sliding Window):有重叠,每次滑动一定步长
  3. 会话窗口(Session Window):按事件间隔分割
  4. 全局窗口(Global Window):仅在自定义触发器时使用

示例:滚动窗口

DataStream<Tuple2<String, Integer>> windowedStream = keyedStream
    .window(TumblingEventTimeWindows.of(Time.seconds(10)))  // 10秒的滚动窗口
    .sum(1);

2.4 状态管理(State)

Flink 提供了 Keyed State 和 Operator State 来存储计算中的中间状态。

  • Keyed State:基于 keyBy 进行管理,典型应用如滚动计数器
  • Operator State:作用于算子级别,如 Kafka 读取的偏移量

示例:Keyed State

public class CountWithKeyedState extends KeyedProcessFunction<String, String, Tuple2<String, Integer>> {
    private transient ValueState<Integer> countState;

    @Override
    public void open(Configuration parameters) {
        ValueStateDescriptor<Integer> descriptor = new ValueStateDescriptor<>(
            "count", 
            Integer.class, 
            0);
        countState = getRuntimeContext().getState(descriptor);
    }

    @Override
    public void processElement(String value, Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception {
        int count = countState.value() + 1;
        countState.update(count);
        out.collect(new Tuple2<>(value, count));
    }
}

2.5 时间管理(Time Semantics)

Flink 支持 3 种时间语义:

  1. Processing Time:基于系统当前时间
  2. Event Time:基于事件发生时间,需要 Watermark 处理乱序
  3. Ingestion Time:事件进入 Flink 时的时间

示例:使用 Event Time 和 Watermark

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

DataStream<String> stream = env.addSource(new MyEventSource())
    .assignTimestampsAndWatermarks(WatermarkStrategy
        .<String>forBoundedOutOfOrderness(Duration.ofSeconds(5))
        .withTimestampAssigner((event, timestamp) -> extractTimestamp(event)));

2.6 数据写出(Sink)

Flink 提供多种 Sink,支持写入 Kafka、MySQL、Elasticsearch 等。

示例:写入 Kafka

FlinkKafkaProducer<String> kafkaSink = new FlinkKafkaProducer<>(
    "output_topic",
    new SimpleStringSchema(),
    properties
);
stream.addSink(kafkaSink);

示例:写入 MySQL

stream.addSink(JdbcSink.sink(
    "INSERT INTO table_name (col1, col2) VALUES (?, ?)",
    (statement, value) -> {
        statement.setString(1, value.f0);
        statement.setInt(2, value.f1);
    },
    JdbcExecutionOptions.builder().withBatchSize(1000).build(),
    new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
        .withUrl("jdbc:mysql://localhost:3306/db")
        .withDriverName("com.mysql.jdbc.Driver")
        .withUsername("user")
        .withPassword("password")
        .build()
));

3. 总结

Flink DataStream API 是 Flink 流计算的核心,包含:

  • Source:支持 Kafka、文件等数据源
  • Transformations:map、filter、keyBy、reduce 等算子
  • Window:支持滚动窗口、滑动窗口等
  • State:支持 Keyed State 和 Operator State
  • Time:支持 Processing Time、Event Time、Ingestion Time
  • Sink:支持写入 Kafka、MySQL、Elasticsearch 等
License:  CC BY 4.0