文章

FlinkSQL 简明教程

Docker安装Flink,并使用flinksql消费kafka数据

简单搭建使用

1.拉取镜像,创建网络

docker pull flink:1.17.2
docker network create flink-network

2.创建 jobmanager

# 创建 JobManager 
 docker run \
  -itd \
  --name=jobmanager \
  --publish 8081:8081 \
  --network flink-network \
  --env FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager" \
  flink:1.17.2 jobmanager 

3.创建 taskmanager

# 创建 TaskManager 
 docker run \
  -itd \
  --name=taskmanager \
  --network flink-network \
  --env FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager" \
  flink:1.17.2 taskmanager 

image-20240820102943445

5.修改Task Slots

默认的Slots num是1,我们可以修改为5:
修改的目录是jobmanagertaskmanager/opt/flink/confflink-conf.yaml文件:

修改taskmanager.numberOfTaskSlots即可。
注意:默认的docker容器中没有vi/vim命令,可以使用docker cp命令,复制出来修改,然后在复制回去,如下:

docker cp taskmanager:/opt/flink/conf/flink-conf.yaml .
docker cp flink-conf.yaml taskmanager:/opt/flink/conf/

6.通过FlinkSQL消费Kafka

导入flink-sql-connector-kafka jar包,用于连接flinksql和kafka。

进入flink查看版本

docker exec -it jobmanager /bin/bash

cd /opt/flink/bin

flink --version

image-20240820103302014

根据自己的flink版本,下载对应的 flink-sql-connector-kafka jar
https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-kafka
因为我是1.17.2,所以选择下图的版本包:

image-20240820103350680

点进去复制链接

image-20240820103426189

分别在jobmanagertaskmanager/opt/flink/lib目录下使用wget下载jar包,注意,是两个都要放,

wget https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka/3.1.0-1.17/flink-sql-connector-kafka-3.1.0-1.17.jar

image-20240820103619544

然后重启这两个镜像

docker restart jobmanager taskmanager 

FlinkSQL消费Kafka

先通过Python脚本写入消息

import json
import time
import traceback
from datetime import datetime
from kafka import KafkaProducer
from kafka.errors import KafkaError

producer = KafkaProducer(
    bootstrap_servers=['10.211.55.58:9092'],
    key_serializer=lambda k: json.dumps(k).encode('utf-8'),
    value_serializer=lambda v: json.dumps(v).encode('utf-8'))

while True:
    # 获取当前时间
    ts = datetime.strftime(datetime.now(), '%Y-%m-%d %H:%M:%S')

    # 构建JSON对象
    message = {
        "count_num": "some_count_value",  # 这里填入实际的count_num值
        "ts": ts
    }

    future = producer.send(
        'kafka_demo1',
        key='count_num',  # 使用相同的key值将消息送至同一个分区
        value=message,
        partition=0)  # 向分区0发送消息

    print("send {}".format(json.dumps(message)))
    time.sleep(3)

    try:
        future.get(timeout=10)  # 监控是否发送成功
    except KafkaError:  # 发送失败抛出KafkaError
        traceback.print_exc()

image-20240820104243000

进入jobmanager中,执行SQL

cd /opt/flink/bin

sql-client.sh

image-20240820103939382

Flink SQL执行以下语句:

CREATE TABLE KafkaTable (
  `count_num` STRING,
  `ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
  'connector' = 'kafka',
  'topic' = 'kafka_demo1',
  'properties.bootstrap.servers' = '10.211.55.58:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
);
select * from KafkaTable;

然后就可以看到数据了

image-20240820104208954

Flink SQL是Flink数据处理框架的一部分,允你使用SQL查询实时流数据和静态批数据。可以通过定义表结构、指定数据源和执行SQL查询来处理数据流。

1.字段类型(Field Types)

Flink SQL支持多种数据类型,适用于不同的数据处理场景。常见的字段类型包括:

  • 基本数据类型:

    • STRING: 字符串类型,用于文本数据。
    • BOOLEAN: 布尔类型,值为TRUEFALSE
    • INT: 32位有符号整数。
    • BIGINT: 64位有符号整数。
    • FLOAT: 32位浮点数。
    • DOUBLE: 64位浮点数。
    • DECIMAL(p, s): 精确的十进制数,其中p是总位数,s是小数位数。
  • 日期和时间类型:

    • DATE: 表示日期,格式为YYYY-MM-DD
    • TIME: 表示时间,格式为HH:MM:SS
    • TIMESTAMP(p): 带有可选精度的时间戳,p表示秒的小数位数,如TIMESTAMP(3)表示毫秒精度。
    • INTERVAL: 表示时间间隔,可用于时间窗口操作。
  • 复杂类型:

    • ARRAY<type>: 数组类型,存储相同数据类型的元素。
    • MAP<key_type, value_type>: 映射类型,存储键值对。
    • ROW<field_name1 type1, field_name2 type2, ...>: 行类型,类似于表结构的嵌套。

2. CREATE TABLE 语句中的参数配置

在Flink SQL中,CREATE TABLE语句用于定义表结构,并配置数据源和数据格式。以下是常用配置参数的解释。

  • 基础配置:

    • connector: 指定连接器类型,例如kafkafilesystemjdbc等。
    • format: 数据格式,如jsoncsvavroraw等。
    • schema: 定义表的字段名和类型,支持基本类型和复杂类型。
  • Kafka 连接器配置:

    • topic: Kafka主题名称。
    • properties.bootstrap.servers: Kafka集群的地址。
    • properties.group.id: Kafka消费者组ID。
    • scan.startup.mode: 消费起始位置,可以是group-offsets(从上次消费的偏移量开始)、earliest-offset(从最早的偏移量开始)或latest-offset(从最新的偏移量开始)。
  • Filesystem 连接器配置:

    • path: 文件路径,可以是本地路径或HDFS路径。
    • format: 文件格式,如csvparquet等。
  • JDBC 连接器配置:

    • url: 数据库的JDBC URL。
    • table-name: 数据库表名。
    • driver: JDBC驱动类名。
    • username: 数据库用户名。
    • password: 数据库密码。
  • Redis 连接器配置:

    • host: Redis服务器的主机地址,可以是IP地址或主机名。

    • port: Redis服务器的端口号,默认是6379

    • mode: Redis的操作模式,如string(字符串模式)、hash(哈希模式)、list(列表模式)等。

    • command: Redis命令类型,如SETGETHSETLPUSH等,根据模式选择适合的命令。

    • key-column: 指定Redis中用作键的字段名称。

    • value-column: 指定Redis中用作值的字段名称。

    • password: Redis服务器的密码,如果设置了访问控制,则需要提供。

  • Cassandra 连接器配置:

  • hosts: Cassandra集群的主机列表,多个主机用逗号分隔。

  • keyspace: Cassandra的keyspace名称,相当于数据库名称。

  • table: Cassandra中的表名。

  • username: 访问Cassandra的用户名。

  • password: 访问Cassandra的密码。

  • port: Cassandra的连接端口,默认是9042

  • consistency: Cassandra的一致性级别,如ONEQUORUMLOCAL_QUORUM等,决定了读取或写入操作的可靠性。

  • TiDB 连接器配置:

    • url: TiDB集群的JDBC连接字符串,格式为jdbc:mysql://<host>:<port>/<database>。TiDB默认端口是4000

    • table-name: TiDB中的表名。

    • driver: 使用的JDBC驱动类名,通常是com.mysql.cj.jdbc.Driver

    • username: TiDB的用户名。

    • password: TiDB的密码。

    • sink.buffer-flush.max-rows: 在将数据写入TiDB之前缓冲的最大行数。

    • sink.buffer-flush.interval: 缓冲时间间隔,达到这个时间间隔时会将数据写入TiDB。

    • sink.max-retries: 在写入失败时的最大重试次数。

3. 不同数据源的配置模版

根据数据源的不同,配置参数会有所差异。以下是常见数据源的配置模版。

  • Kafka 表配置模版:

    CREATE TABLE KafkaTable (
      `user_id` STRING,  -- 用户ID,字符串类型
      `order_amount` DOUBLE,  -- 订单金额,双精度浮点数类型
      `order_time` TIMESTAMP(3)  -- 订单时间,精确到毫秒的时间戳类型
    ) WITH (
      'connector' = 'kafka',  -- 指定使用Kafka连接器
      'topic' = 'orders',  -- Kafka主题名称
      'properties.bootstrap.servers' = 'localhost:9092',  -- Kafka的Bootstrap服务器地址
      'properties.group.id' = 'consumerGroup1',  -- Kafka消费者组ID
      'format' = 'json',  -- 消息格式为JSON
      'scan.startup.mode' = 'earliest-offset'  -- 从最早的偏移量开始读取消息
    );
    
  • Filesystem 表配置模版:

    CREATE TABLE FileTable (
      `user_id` STRING,  -- 用户ID,字符串类型
      `order_amount` DOUBLE,  -- 订单金额,双精度浮点数类型
      `order_date` DATE  -- 订单日期,日期类型
    ) WITH (
      'connector' = 'filesystem',  -- 指定使用Filesystem连接器
      'path' = 'file:///path/to/data',  -- 文件路径,可以是本地路径或HDFS路径
      'format' = 'csv'  -- 文件格式为CSV
    );
    
  • JDBC 表配置模版:

    CREATE TABLE JdbcTable (
      `user_id` STRING,  -- 用户ID,字符串类型
      `order_amount` DOUBLE,  -- 订单金额,双精度浮点数类型
      `order_time` TIMESTAMP(3)  -- 订单时间,精确到毫秒的时间戳类型
    ) WITH (
      'connector' = 'jdbc',  -- 指定使用JDBC连接器
      'url' = 'jdbc:mysql://localhost:3306/mydb',  -- 数据库的JDBC URL
      'table-name' = 'orders',  -- 数据库表名
      'driver' = 'com.mysql.cj.jdbc.Driver',  -- JDBC驱动类名
      'username' = 'root',  -- 数据库用户名
      'password' = 'password'  -- 数据库密码
    );
    
  • Redis 表配置模版:

    CREATE TABLE RedisTable (
      `user_id` STRING,
      `order_count` INT
    ) WITH (
      'connector' = 'redis',
      'mode' = 'string', -- 这里可以选择 'string','hash','list' 等模式
      'command' = 'SET', -- 或者 'GET',具体取决于你想执行的操作
      'host' = 'localhost',
      'port' = '6379',
      'format' = 'raw', -- Redis通常使用简单的字符串格式
      'key-column' = 'user_id', -- 将字段映射为Redis键
      'value-column' = 'order_count' -- 将字段映射为Redis值
    );
    
  • Cassandra 表配置模版:

    CREATE TABLE CassandraTable (
      `user_id` STRING,
      `order_id` STRING,
      `order_amount` DOUBLE,
      `order_time` TIMESTAMP(3)
    ) WITH (
      'connector' = 'cassandra',
      'hosts' = 'localhost', -- Cassandra集群的主机列表,多个主机用逗号分隔
      'keyspace' = 'mykeyspace', -- Cassandra的keyspace名称
      'table' = 'orders', -- Cassandra中的表名
      'format' = 'json', -- 可以选择 `json`,`csv`,`raw` 等格式
      'username' = 'cassandra', -- 数据库用户名
      'password' = 'cassandra', -- 数据库密码
      'port' = '9042', -- Cassandra的连接端口
      'consistency' = 'LOCAL_QUORUM' -- Cassandra的一致性级别
    );
    
  • TiDB 表配置模版:

    CREATE TABLE TiDBTable (
      `user_id` STRING,
      `order_id` STRING,
      `order_amount` DOUBLE,
      `order_time` TIMESTAMP(3)
    ) WITH (
      'connector' = 'jdbc',
      'url' = 'jdbc:mysql://<tidb_host>:4000/<database>', -- TiDB 集群的 JDBC URL
      'table-name' = 'orders', -- TiDB 中的表名
      'driver' = 'com.mysql.cj.jdbc.Driver', -- 使用 MySQL JDBC 驱动
      'username' = '<username>', -- TiDB 用户名
      'password' = '<password>', -- TiDB 密码
      'sink.buffer-flush.max-rows' = '5000', -- 缓存行数,达到后写入 TiDB
      'sink.buffer-flush.interval' = '2s', -- 每隔多长时间刷新缓存
      'sink.max-retries' = '3' -- 最大重试次数
    );
    

4. 时间属性(Time Attributes)

Flink SQL中,时间属性用于流处理中的窗口操作、事件时间处理等,主要包括以下类型:

  • Processing Time: 处理时间,表示记录在Flink任务中被处理的时间。
  • Event Time: 事件时间,表示数据源生成事件的时间,可以通过指定字段或从数据源中提取时间戳。
  • Ingestion Time: 吞吐时间,表示记录进入Flink系统的时间。

5. 示例:综合运用

假设我们要从Kafka读取订单数据,按小时计算每个用户的订单总额,结果存储到MySQL数据库中。以下是完整的Flink SQL脚本:

-- 创建Kafka表
CREATE TABLE KafkaOrders (
  `user_id` STRING,
  `order_amount` DOUBLE,
  `order_time` TIMESTAMP(3),
  WATERMARK FOR `order_time` AS `order_time` - INTERVAL '5' SECOND
) WITH (
  'connector' = 'kafka',
  'topic' = 'orders',
  'properties.bootstrap.servers' = 'localhost:9092',
  'format' = 'json',
  'scan.startup.mode' = 'earliest-offset'
);

-- 创建MySQL表
CREATE TABLE MysqlResults (
  `user_id` STRING,
  `window_end` TIMESTAMP(3),
  `total_amount` DOUBLE
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://localhost:3306/mydb',
  'table-name' = 'user_order_totals',
  'driver' = 'com.mysql.cj.jdbc.Driver',
  'username' = 'root',
  'password' = 'password'
);

-- 计算每小时的订单总额
INSERT INTO MysqlResults
SELECT
  user_id,
  TUMBLE_END(order_time, INTERVAL '1' HOUR) AS window_end,
  SUM(order_amount) AS total_amount
FROM KafkaOrders
GROUP BY
  TUMBLE(order_time, INTERVAL '1' HOUR),
  user_id;

CREATE 语句

目前 Flink SQL 支持下列 CREATE 语句:

  • CREATE TABLE: 创建数据表,定义数据结构和外部数据源连接。

  • CREATE CATALOG: 创建目录,用于组织和管理数据库、表等对象。

  • CREATE DATABASE: 创建数据库,用于在目录中管理和组织表、视图等对象。

  • CREATE VIEW: 创建视图,基于已有表的查询结果创建虚拟表。

  • CREATE FUNCTION: 创建自定义函数,用于扩展 SQL 处理能力。

1. CREATE TABLE

格式:

CREATE TABLE table_name (
  column_name data_type [column_constraint] [COMMENT 'comment_text'],
  ...
  [PRIMARY KEY (column_name, ...)]
) WITH (
  'property_name' = 'property_value',
  ...
);

解释:

  • table_name: 表名。
  • column_name: 列名。
  • data_type: 数据类型,如STRINGINTDOUBLETIMESTAMP等。
  • column_constraint: 列约束,如NOT NULLUNIQUE等。
  • PRIMARY KEY: 定义表的主键,可以是一个或多个列的组合。
  • WITH: 用于指定表的连接器属性,如数据源类型、路径、格式等。

模版:

CREATE TABLE MyTable (
  `id` INT NOT NULL,
  `name` STRING,
  `amount` DOUBLE,
  `order_time` TIMESTAMP(3),
  PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
  'connector' = 'kafka',
  'topic' = 'my_topic',
  'properties.bootstrap.servers' = 'localhost:9092',
  'format' = 'json'
);

2. CREATE CATALOG

格式:

CREATE CATALOG catalog_name
WITH (
  'property_name' = 'property_value',
  ...
);

解释:

  • catalog_name: 目录名称,用于标识和组织数据库和表。
  • WITH: 用于指定目录的属性,如连接配置等。

模版:

CREATE CATALOG MyCatalog
WITH (
  'type' = 'hive',
  'default-database' = 'my_database',
  'hive-conf-dir' = '/path/to/hive/conf'
);

3. CREATE DATABASE

格式:

CREATE DATABASE [IF NOT EXISTS] database_name
WITH (
  'property_name' = 'property_value',
  ...
);

解释:

  • database_name: 数据库名称。
  • IF NOT EXISTS: 如果数据库不存在则创建,存在则忽略。
  • WITH: 可选,用于指定数据库的相关属性。

模版:

CREATE DATABASE IF NOT EXISTS MyDatabase
WITH (
  'comment' = 'This is a sample database'
);

4. CREATE VIEW

格式:

CREATE VIEW view_name AS
SELECT ...
FROM ...
WHERE ...;

解释:

  • view_name: 视图名称。
  • SELECT ...: 查询语句,用于定义视图的内容。

模版:

CREATE VIEW MyView AS
SELECT 
  `id`, 
  `name`, 
  `amount` 
FROM 
  MyTable
WHERE 
  `amount` > 1000;

5. CREATE FUNCTIO

格式:

CREATE FUNCTION function_name AS 'class_name'
USING JAR 'path_to_jar_file';

解释:

  • function_name: 函数名称,用于在SQL查询中调用。
  • class_name: Java或Scala类的完全限定名,代表具体的函数实现。
  • USING JAR: 指定包含函数实现的JAR文件路径。

模版:

CREATE FUNCTION MyFunction AS 'com.example.MyUdf'
USING JAR 'file:///path/to/my-udf.jar';

Inster 语句

在 Flink SQL 中,INSERT 语句主要有以下几种类型:

1. 插入静态数据

  • 直接将静态数据插入到表中。
INSERT INTO table_name VALUES (value1, value2, ...);

2. 从查询结果插入

  • 将一个查询的结果插入到目标表中。
INSERT INTO target_table SELECT * FROM source_table WHERE condition;

3. 插入到分区表

  • 对于支持分区的表,可以指定插入的分区。
INSERT INTO partitioned_table PARTITION (partition_column = value) SELECT * FROM source_table;

4. 插入流数据

  • 将流数据插入到表中,支持实时数据处理。
INSERT INTO table_name SELECT * FROM stream_source;

5. 插入到临时表

  • 可以将数据插入到临时表中,临时表在会话结束后会被自动删除。
INSERT INTO TEMPORARY TABLE temp_table SELECT * FROM source_table;

Delete 语句

DELETE FROM table_name
WHERE condition;

Drop 语句

DROP TABLE table_name;

Update 语句

UPDATE table_name
SET column1 = value1, column2 = value2
WHERE condition;

Select 语句

1. WITH Clause

WITH 子句用于定义临时视图,使查询更易读,特别是在复杂查询中。

示例:

WITH high_salary AS (
    SELECT * FROM employees WHERE salary > 70000
)
SELECT * FROM high_salary;
  • 说明:此查询首先定义一个临时视图 high_salary,然后从中检索数据。

2. SELECT & WHERE

  • SELECT:用于指定要检索的列。
  • WHERE:用于过滤结果集。

示例:

SELECT name, salary
FROM employees
WHERE department = 'Sales';
  • 说明:此查询选取销售部门员工的姓名和薪水。

3. SELECT DISTINCT

用于从结果集中去除重复记录。

示例:

SELECT DISTINCT department
FROM employees;
  • 说明:此查询返回所有不同的部门名称。

4. Windowing TVF

窗口表值函数(TVF)允许在时间窗口内处理流数据。

示例:

SELECT *
FROM TABLE(TUMBLE(table_name, INTERVAL '1' HOUR));
  • 说明:此查询将数据按每小时进行分组。

5. Window Aggregation

窗口聚合用于在指定时间窗口内对数据进行聚合计算。

示例:

SELECT department, AVG(salary)
FROM employees
WINDOW TUMBLE AS (INTERVAL '1' HOUR)
GROUP BY department;
  • 说明:此查询计算每小时各部门的平均薪水。

6. Group Aggregation

分组聚合用于根据某一列进行分组,然后对每组进行聚合计算。

示例:

SELECT department, COUNT(*) AS employee_count
FROM employees
GROUP BY department;
  • 说明:此查询统计每个部门的员工数量。

7. Over Aggregation

用于在不分组的情况下对数据进行聚合,允许在结果集的每一行上使用聚合函数。

示例:

SELECT name, salary,
       AVG(salary) OVER () AS avg_salary
FROM employees;
  • 说明:此查询返回每位员工的薪水及整体平均薪水。

8. Joins

连接用于在多个表之间建立关联。

示例:

SELECT e.name, d.department_name
FROM employees e
INNER JOIN departments d ON e.department_id = d.id;
  • 说明:此查询将员工表与部门表连接,返回员工姓名和部门名称。

9. Set Operations

集合操作用于合并、交集或差集多个结果集,包括 UNIONINTERSECTEXCEPT

示例:

SELECT name FROM employees
UNION
SELECT name FROM contractors;
  • 说明:此查询返回员工和承包商的所有姓名,去除重复项。

10. ORDER BY Clause

用于对查询结果进行排序。

示例:

SELECT *
FROM employees
ORDER BY salary DESC;
  • 说明:此查询按薪水降序排列员工记录。

11. LIMIT Clause

限制查询结果的返回行数。

示例:

SELECT *
FROM employees
LIMIT 10;
  • 说明:此查询返回前 10 条员工记录。

12. Top-N

返回根据某一列排序的前 N 条记录。

示例:

SELECT *
FROM employees
ORDER BY salary DESC
LIMIT 5;
  • 说明:此查询返回薪水最高的前 5 名员工。

13. Window Top-N

在窗口内返回前 N 条记录。

示例:

SELECT department, name, salary
FROM employees
WINDOW TUMBLE AS (INTERVAL '1' HOUR)
ORDER BY salary DESC
LIMIT 3;
  • 说明:此查询在每小时的窗口内返回薪水最高的前 3 名员工。

14. Deduplication

去重操作用于移除重复记录,通常结合窗口或条件使用。

示例:

SELECT DISTINCT name, department
FROM employees;
  • 说明:此查询返回不重复的员工姓名和部门组合。

15. Pattern Recognition

模式识别用于检测流数据中的特定模式,适合复杂事件处理。

示例:

SELECT *
FROM pattern_matched
WHERE MATCH_RECOGNIZE (
    PARTITION BY id
    ORDER BY time
    MEASURES
        A.name AS start_name,
        B.name AS end_name
    PATTERN (A B)
    DEFINE
        A AS A.salary < B.salary
);
  • 说明:此查询识别在同一 ID 内薪水逐渐上升的模式。

示例

Kafka --> MySQL

使用PythonKafka写入数据

import json
import time
import traceback
from datetime import datetime
from kafka import KafkaProducer
from kafka.errors import KafkaError

producer = KafkaProducer(
    bootstrap_servers=['10.211.55.58:9092'],
    key_serializer=lambda k: json.dumps(k).encode('utf-8'),
    value_serializer=lambda v: json.dumps(v).encode('utf-8'))

while True:
    # 获取当前时间
    ts = datetime.strftime(datetime.now(), '%Y-%m-%d %H:%M:%S')

    # 构建不同类型的JSON对象
    message = {
        "count_num": 42,  # 示例整数
        "is_active": True,  # 布尔值
        "details": {
            "user": "example_user",
            "status": "active"
        },
        "ts": ts,
    }

    # 发送消息
    future = producer.send(
        'kafka_demo2',
        key='count_num',  # 使用相同的key值将消息送至同一个分区
        value=message,
        partition=0  # 向分区0发送消息
    )

    print("send {}".format(json.dumps(message)))
    time.sleep(3)

    try:
        future.get(timeout=10)  # 监控是否发送成功
    except KafkaError:  # 发送失败抛出KafkaError
        traceback.print_exc()

通过Flink SQL消费Kafka数据,然后写入MySQL

-- 1. 创建 Kafka 源表
CREATE TABLE kafka_source (
    count_num INT,
    is_active BOOLEAN,
    details ROW<`user` STRING, status STRING>,
    ts TIMESTAMP(3)
) WITH (
    'connector' = 'kafka',
    'topic' = 'kafka_demo2',
    'properties.bootstrap.servers' = '10.211.55.58:9092',
    'properties.group.id' = 'testGroup',
    'properties.auto.offset.reset' = 'earliest',  -- 设置偏移量重置策略
    'format' = 'json'
);

-- 2. 创建 MySQL 目标表
CREATE TABLE mysql_sink (
    count_num INT,
    is_active BOOLEAN,
    `user` STRING,
    status STRING,
    ts TIMESTAMP(3)
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://10.211.55.58:3306/FlinkSQL',  -- 替换为你的数据库名
    'table-name' = 'mysql_sink',
    'driver' = 'com.mysql.cj.jdbc.Driver',
    'username' = 'root',  -- 替换为你的用户名
    'password' = 'root',    -- 替换为你的密码
    'sink.buffer-flush.interval' = '1s'  -- 可选:设置刷新间隔
);

-- 3. 从 Kafka 读取数据并写入 MySQL
INSERT INTO mysql_sink
SELECT 
    count_num, 
    is_active,
    details.`user` AS `user`,
    details.status AS status,
    ts
FROM kafka_source;

配置详解

scan.startup.mode

1. earliest-offset

  • 描述:从主题的最早可用偏移量开始读取数据,即从 Kafka 中的所有分区的最早数据开始消费。
  • 使用场景:适用于需要从 Kafka 中读取所有历史数据的情况。

2. latest-offset

  • 描述:从提交作业的当前最新偏移量开始读取数据。新加入的记录会被实时消费。
  • 使用场景:用于只想消费 Kafka 中新增数据的情况。

3. group-offsets

  • 描述:从上次已提交的偏移量开始消费。如果某个分区没有已提交的偏移量,则默认行为是从最早偏移量开始读取。
  • 使用场景:适用于重启作业时从中断位置继续消费的情况。

4. timestamp

  • 描述:从指定的时间戳开始读取数据。Kafka 中各个分区会查找最接近给定时间戳的偏移量,从该位置开始消费。
  • 使用场景:当你希望从特定时间点的数据开始消费时使用。

5. specific-offsets

  • 描述:从指定的偏移量开始消费数据。你可以为每个 Kafka 分区明确设置起始的偏移量。
  • 使用场景:用于需要精确控制从哪个位置开始消费数据的情况。
License:  CC BY 4.0