FlinkSQL 简明教程
Docker安装Flink,并使用flinksql消费kafka数据
简单搭建使用
1.拉取镜像,创建网络
docker pull flink:1.17.2
docker network create flink-network
2.创建 jobmanager
# 创建 JobManager
docker run \
-itd \
--name=jobmanager \
--publish 8081:8081 \
--network flink-network \
--env FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager" \
flink:1.17.2 jobmanager
3.创建 taskmanager
# 创建 TaskManager
docker run \
-itd \
--name=taskmanager \
--network flink-network \
--env FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager" \
flink:1.17.2 taskmanager
4.访问 Flink Dashboard
5.修改Task Slots
默认的Slots num
是1,我们可以修改为5:
修改的目录是jobmanager
和taskmanager
的/opt/flink/conf
的flink-conf.yaml
文件:
修改taskmanager.numberOfTaskSlots
即可。
注意:默认的docker
容器中没有vi/vim
命令,可以使用docker cp
命令,复制出来修改,然后在复制回去,如下:
docker cp taskmanager:/opt/flink/conf/flink-conf.yaml .
docker cp flink-conf.yaml taskmanager:/opt/flink/conf/
6.通过FlinkSQL消费Kafka
导入flink-sql-connector-kafka jar包,用于连接flinksql和kafka。
进入flink
查看版本
docker exec -it jobmanager /bin/bash
cd /opt/flink/bin
flink --version
根据自己的flink
版本,下载对应的 flink-sql-connector-kafka jar
包
https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-kafka
因为我是1.17.2,所以选择下图的版本包:
点进去复制链接
分别在jobmanager
、taskmanager
的/opt/flink/lib
目录下使用wget
下载jar包,注意,是两个都要放,
wget https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka/3.1.0-1.17/flink-sql-connector-kafka-3.1.0-1.17.jar
然后重启这两个镜像
docker restart jobmanager taskmanager
FlinkSQL消费Kafka
先通过Python
脚本写入消息
import json
import time
import traceback
from datetime import datetime
from kafka import KafkaProducer
from kafka.errors import KafkaError
producer = KafkaProducer(
bootstrap_servers=['10.211.55.58:9092'],
key_serializer=lambda k: json.dumps(k).encode('utf-8'),
value_serializer=lambda v: json.dumps(v).encode('utf-8'))
while True:
# 获取当前时间
ts = datetime.strftime(datetime.now(), '%Y-%m-%d %H:%M:%S')
# 构建JSON对象
message = {
"count_num": "some_count_value", # 这里填入实际的count_num值
"ts": ts
}
future = producer.send(
'kafka_demo1',
key='count_num', # 使用相同的key值将消息送至同一个分区
value=message,
partition=0) # 向分区0发送消息
print("send {}".format(json.dumps(message)))
time.sleep(3)
try:
future.get(timeout=10) # 监控是否发送成功
except KafkaError: # 发送失败抛出KafkaError
traceback.print_exc()
进入jobmanager
中,执行SQL
cd /opt/flink/bin
sql-client.sh
Flink SQL
执行以下语句:
CREATE TABLE KafkaTable (
`count_num` STRING,
`ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
'topic' = 'kafka_demo1',
'properties.bootstrap.servers' = '10.211.55.58:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
);
select * from KafkaTable;
然后就可以看到数据了
Flink SQL
Flink SQL 基本概念
Flink SQL是Flink数据处理框架的一部分,允你使用SQL查询实时流数据和静态批数据。可以通过定义表结构、指定数据源和执行SQL查询来处理数据流。
1.字段类型(Field Types)
Flink SQL支持多种数据类型,适用于不同的数据处理场景。常见的字段类型包括:
-
基本数据类型:
STRING
: 字符串类型,用于文本数据。BOOLEAN
: 布尔类型,值为TRUE
或FALSE
。INT
: 32位有符号整数。BIGINT
: 64位有符号整数。FLOAT
: 32位浮点数。DOUBLE
: 64位浮点数。DECIMAL(p, s)
: 精确的十进制数,其中p
是总位数,s
是小数位数。
-
日期和时间类型:
DATE
: 表示日期,格式为YYYY-MM-DD
。TIME
: 表示时间,格式为HH:MM:SS
。TIMESTAMP(p)
: 带有可选精度的时间戳,p
表示秒的小数位数,如TIMESTAMP(3)
表示毫秒精度。INTERVAL
: 表示时间间隔,可用于时间窗口操作。
-
复杂类型:
ARRAY<type>
: 数组类型,存储相同数据类型的元素。MAP<key_type, value_type>
: 映射类型,存储键值对。ROW<field_name1 type1, field_name2 type2, ...>
: 行类型,类似于表结构的嵌套。
2. CREATE TABLE 语句中的参数配置
在Flink SQL中,CREATE TABLE
语句用于定义表结构,并配置数据源和数据格式。以下是常用配置参数的解释。
-
基础配置:
connector
: 指定连接器类型,例如kafka
、filesystem
、jdbc
等。format
: 数据格式,如json
、csv
、avro
、raw
等。schema
: 定义表的字段名和类型,支持基本类型和复杂类型。
-
Kafka 连接器配置:
topic
: Kafka主题名称。properties.bootstrap.servers
: Kafka集群的地址。properties.group.id
: Kafka消费者组ID。scan.startup.mode
: 消费起始位置,可以是group-offsets
(从上次消费的偏移量开始)、earliest-offset
(从最早的偏移量开始)或latest-offset
(从最新的偏移量开始)。
-
Filesystem 连接器配置:
path
: 文件路径,可以是本地路径或HDFS路径。format
: 文件格式,如csv
、parquet
等。
-
JDBC 连接器配置:
url
: 数据库的JDBC URL。table-name
: 数据库表名。driver
: JDBC驱动类名。username
: 数据库用户名。password
: 数据库密码。
-
Redis 连接器配置:
-
host
: Redis服务器的主机地址,可以是IP地址或主机名。 -
port
: Redis服务器的端口号,默认是6379
。 -
mode
: Redis的操作模式,如string
(字符串模式)、hash
(哈希模式)、list
(列表模式)等。 -
command
: Redis命令类型,如SET
、GET
、HSET
、LPUSH
等,根据模式选择适合的命令。 -
key-column
: 指定Redis中用作键的字段名称。 -
value-column
: 指定Redis中用作值的字段名称。 -
password
: Redis服务器的密码,如果设置了访问控制,则需要提供。
-
-
Cassandra 连接器配置:
-
hosts
: Cassandra集群的主机列表,多个主机用逗号分隔。 -
keyspace
: Cassandra的keyspace名称,相当于数据库名称。 -
table
: Cassandra中的表名。 -
username
: 访问Cassandra的用户名。 -
password
: 访问Cassandra的密码。 -
port
: Cassandra的连接端口,默认是9042
。 -
consistency
: Cassandra的一致性级别,如ONE
、QUORUM
、LOCAL_QUORUM
等,决定了读取或写入操作的可靠性。 -
TiDB 连接器配置:
-
url
: TiDB集群的JDBC连接字符串,格式为jdbc:mysql://<host>:<port>/<database>
。TiDB默认端口是4000
。 -
table-name
: TiDB中的表名。 -
driver
: 使用的JDBC驱动类名,通常是com.mysql.cj.jdbc.Driver
。 -
username
: TiDB的用户名。 -
password
: TiDB的密码。 -
sink.buffer-flush.max-rows
: 在将数据写入TiDB之前缓冲的最大行数。 -
sink.buffer-flush.interval
: 缓冲时间间隔,达到这个时间间隔时会将数据写入TiDB。 -
sink.max-retries
: 在写入失败时的最大重试次数。
-
3. 不同数据源的配置模版
根据数据源的不同,配置参数会有所差异。以下是常见数据源的配置模版。
-
Kafka 表配置模版:
CREATE TABLE KafkaTable ( `user_id` STRING, -- 用户ID,字符串类型 `order_amount` DOUBLE, -- 订单金额,双精度浮点数类型 `order_time` TIMESTAMP(3) -- 订单时间,精确到毫秒的时间戳类型 ) WITH ( 'connector' = 'kafka', -- 指定使用Kafka连接器 'topic' = 'orders', -- Kafka主题名称 'properties.bootstrap.servers' = 'localhost:9092', -- Kafka的Bootstrap服务器地址 'properties.group.id' = 'consumerGroup1', -- Kafka消费者组ID 'format' = 'json', -- 消息格式为JSON 'scan.startup.mode' = 'earliest-offset' -- 从最早的偏移量开始读取消息 );
-
Filesystem 表配置模版:
CREATE TABLE FileTable ( `user_id` STRING, -- 用户ID,字符串类型 `order_amount` DOUBLE, -- 订单金额,双精度浮点数类型 `order_date` DATE -- 订单日期,日期类型 ) WITH ( 'connector' = 'filesystem', -- 指定使用Filesystem连接器 'path' = 'file:///path/to/data', -- 文件路径,可以是本地路径或HDFS路径 'format' = 'csv' -- 文件格式为CSV );
-
JDBC 表配置模版:
CREATE TABLE JdbcTable ( `user_id` STRING, -- 用户ID,字符串类型 `order_amount` DOUBLE, -- 订单金额,双精度浮点数类型 `order_time` TIMESTAMP(3) -- 订单时间,精确到毫秒的时间戳类型 ) WITH ( 'connector' = 'jdbc', -- 指定使用JDBC连接器 'url' = 'jdbc:mysql://localhost:3306/mydb', -- 数据库的JDBC URL 'table-name' = 'orders', -- 数据库表名 'driver' = 'com.mysql.cj.jdbc.Driver', -- JDBC驱动类名 'username' = 'root', -- 数据库用户名 'password' = 'password' -- 数据库密码 );
-
Redis 表配置模版:
CREATE TABLE RedisTable ( `user_id` STRING, `order_count` INT ) WITH ( 'connector' = 'redis', 'mode' = 'string', -- 这里可以选择 'string','hash','list' 等模式 'command' = 'SET', -- 或者 'GET',具体取决于你想执行的操作 'host' = 'localhost', 'port' = '6379', 'format' = 'raw', -- Redis通常使用简单的字符串格式 'key-column' = 'user_id', -- 将字段映射为Redis键 'value-column' = 'order_count' -- 将字段映射为Redis值 );
-
Cassandra 表配置模版:
CREATE TABLE CassandraTable ( `user_id` STRING, `order_id` STRING, `order_amount` DOUBLE, `order_time` TIMESTAMP(3) ) WITH ( 'connector' = 'cassandra', 'hosts' = 'localhost', -- Cassandra集群的主机列表,多个主机用逗号分隔 'keyspace' = 'mykeyspace', -- Cassandra的keyspace名称 'table' = 'orders', -- Cassandra中的表名 'format' = 'json', -- 可以选择 `json`,`csv`,`raw` 等格式 'username' = 'cassandra', -- 数据库用户名 'password' = 'cassandra', -- 数据库密码 'port' = '9042', -- Cassandra的连接端口 'consistency' = 'LOCAL_QUORUM' -- Cassandra的一致性级别 );
-
TiDB 表配置模版:
CREATE TABLE TiDBTable ( `user_id` STRING, `order_id` STRING, `order_amount` DOUBLE, `order_time` TIMESTAMP(3) ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://<tidb_host>:4000/<database>', -- TiDB 集群的 JDBC URL 'table-name' = 'orders', -- TiDB 中的表名 'driver' = 'com.mysql.cj.jdbc.Driver', -- 使用 MySQL JDBC 驱动 'username' = '<username>', -- TiDB 用户名 'password' = '<password>', -- TiDB 密码 'sink.buffer-flush.max-rows' = '5000', -- 缓存行数,达到后写入 TiDB 'sink.buffer-flush.interval' = '2s', -- 每隔多长时间刷新缓存 'sink.max-retries' = '3' -- 最大重试次数 );
4. 时间属性(Time Attributes)
Flink SQL中,时间属性用于流处理中的窗口操作、事件时间处理等,主要包括以下类型:
- Processing Time: 处理时间,表示记录在Flink任务中被处理的时间。
- Event Time: 事件时间,表示数据源生成事件的时间,可以通过指定字段或从数据源中提取时间戳。
- Ingestion Time: 吞吐时间,表示记录进入Flink系统的时间。
5. 示例:综合运用
假设我们要从Kafka读取订单数据,按小时计算每个用户的订单总额,结果存储到MySQL数据库中。以下是完整的Flink SQL脚本:
-- 创建Kafka表
CREATE TABLE KafkaOrders (
`user_id` STRING,
`order_amount` DOUBLE,
`order_time` TIMESTAMP(3),
WATERMARK FOR `order_time` AS `order_time` - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'orders',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'json',
'scan.startup.mode' = 'earliest-offset'
);
-- 创建MySQL表
CREATE TABLE MysqlResults (
`user_id` STRING,
`window_end` TIMESTAMP(3),
`total_amount` DOUBLE
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/mydb',
'table-name' = 'user_order_totals',
'driver' = 'com.mysql.cj.jdbc.Driver',
'username' = 'root',
'password' = 'password'
);
-- 计算每小时的订单总额
INSERT INTO MysqlResults
SELECT
user_id,
TUMBLE_END(order_time, INTERVAL '1' HOUR) AS window_end,
SUM(order_amount) AS total_amount
FROM KafkaOrders
GROUP BY
TUMBLE(order_time, INTERVAL '1' HOUR),
user_id;
CREATE 语句
目前 Flink SQL 支持下列 CREATE 语句:
-
CREATE TABLE
: 创建数据表,定义数据结构和外部数据源连接。 -
CREATE CATALOG
: 创建目录,用于组织和管理数据库、表等对象。 -
CREATE DATABASE
: 创建数据库,用于在目录中管理和组织表、视图等对象。 -
CREATE VIEW
: 创建视图,基于已有表的查询结果创建虚拟表。 -
CREATE FUNCTION
: 创建自定义函数,用于扩展 SQL 处理能力。
1. CREATE TABLE
格式:
CREATE TABLE table_name (
column_name data_type [column_constraint] [COMMENT 'comment_text'],
...
[PRIMARY KEY (column_name, ...)]
) WITH (
'property_name' = 'property_value',
...
);
解释:
table_name
: 表名。column_name
: 列名。data_type
: 数据类型,如STRING
、INT
、DOUBLE
、TIMESTAMP
等。column_constraint
: 列约束,如NOT NULL
、UNIQUE
等。PRIMARY KEY
: 定义表的主键,可以是一个或多个列的组合。WITH
: 用于指定表的连接器属性,如数据源类型、路径、格式等。
模版:
CREATE TABLE MyTable (
`id` INT NOT NULL,
`name` STRING,
`amount` DOUBLE,
`order_time` TIMESTAMP(3),
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'my_topic',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'json'
);
2. CREATE CATALOG
格式:
CREATE CATALOG catalog_name
WITH (
'property_name' = 'property_value',
...
);
解释:
catalog_name
: 目录名称,用于标识和组织数据库和表。WITH
: 用于指定目录的属性,如连接配置等。
模版:
CREATE CATALOG MyCatalog
WITH (
'type' = 'hive',
'default-database' = 'my_database',
'hive-conf-dir' = '/path/to/hive/conf'
);
3. CREATE DATABASE
格式:
CREATE DATABASE [IF NOT EXISTS] database_name
WITH (
'property_name' = 'property_value',
...
);
解释:
database_name
: 数据库名称。IF NOT EXISTS
: 如果数据库不存在则创建,存在则忽略。WITH
: 可选,用于指定数据库的相关属性。
模版:
CREATE DATABASE IF NOT EXISTS MyDatabase
WITH (
'comment' = 'This is a sample database'
);
4. CREATE VIEW
格式:
CREATE VIEW view_name AS
SELECT ...
FROM ...
WHERE ...;
解释:
view_name
: 视图名称。SELECT ...
: 查询语句,用于定义视图的内容。
模版:
CREATE VIEW MyView AS
SELECT
`id`,
`name`,
`amount`
FROM
MyTable
WHERE
`amount` > 1000;
5. CREATE FUNCTIO
格式:
CREATE FUNCTION function_name AS 'class_name'
USING JAR 'path_to_jar_file';
解释:
function_name
: 函数名称,用于在SQL查询中调用。class_name
: Java或Scala类的完全限定名,代表具体的函数实现。USING JAR
: 指定包含函数实现的JAR文件路径。
模版:
CREATE FUNCTION MyFunction AS 'com.example.MyUdf'
USING JAR 'file:///path/to/my-udf.jar';
Inster 语句
在 Flink SQL 中,INSERT
语句主要有以下几种类型:
1. 插入静态数据
- 直接将静态数据插入到表中。
INSERT INTO table_name VALUES (value1, value2, ...);
2. 从查询结果插入
- 将一个查询的结果插入到目标表中。
INSERT INTO target_table SELECT * FROM source_table WHERE condition;
3. 插入到分区表
- 对于支持分区的表,可以指定插入的分区。
INSERT INTO partitioned_table PARTITION (partition_column = value) SELECT * FROM source_table;
4. 插入流数据
- 将流数据插入到表中,支持实时数据处理。
INSERT INTO table_name SELECT * FROM stream_source;
5. 插入到临时表
- 可以将数据插入到临时表中,临时表在会话结束后会被自动删除。
INSERT INTO TEMPORARY TABLE temp_table SELECT * FROM source_table;
Delete 语句
DELETE FROM table_name
WHERE condition;
Drop 语句
DROP TABLE table_name;
Update 语句
UPDATE table_name
SET column1 = value1, column2 = value2
WHERE condition;
Select 语句
1. WITH Clause
WITH
子句用于定义临时视图,使查询更易读,特别是在复杂查询中。
示例:
WITH high_salary AS (
SELECT * FROM employees WHERE salary > 70000
)
SELECT * FROM high_salary;
- 说明:此查询首先定义一个临时视图
high_salary
,然后从中检索数据。
2. SELECT & WHERE
- SELECT:用于指定要检索的列。
- WHERE:用于过滤结果集。
示例:
SELECT name, salary
FROM employees
WHERE department = 'Sales';
- 说明:此查询选取销售部门员工的姓名和薪水。
3. SELECT DISTINCT
用于从结果集中去除重复记录。
示例:
SELECT DISTINCT department
FROM employees;
- 说明:此查询返回所有不同的部门名称。
4. Windowing TVF
窗口表值函数(TVF)允许在时间窗口内处理流数据。
示例:
SELECT *
FROM TABLE(TUMBLE(table_name, INTERVAL '1' HOUR));
- 说明:此查询将数据按每小时进行分组。
5. Window Aggregation
窗口聚合用于在指定时间窗口内对数据进行聚合计算。
示例:
SELECT department, AVG(salary)
FROM employees
WINDOW TUMBLE AS (INTERVAL '1' HOUR)
GROUP BY department;
- 说明:此查询计算每小时各部门的平均薪水。
6. Group Aggregation
分组聚合用于根据某一列进行分组,然后对每组进行聚合计算。
示例:
SELECT department, COUNT(*) AS employee_count
FROM employees
GROUP BY department;
- 说明:此查询统计每个部门的员工数量。
7. Over Aggregation
用于在不分组的情况下对数据进行聚合,允许在结果集的每一行上使用聚合函数。
示例:
SELECT name, salary,
AVG(salary) OVER () AS avg_salary
FROM employees;
- 说明:此查询返回每位员工的薪水及整体平均薪水。
8. Joins
连接用于在多个表之间建立关联。
示例:
SELECT e.name, d.department_name
FROM employees e
INNER JOIN departments d ON e.department_id = d.id;
- 说明:此查询将员工表与部门表连接,返回员工姓名和部门名称。
9. Set Operations
集合操作用于合并、交集或差集多个结果集,包括 UNION
、INTERSECT
和 EXCEPT
。
示例:
SELECT name FROM employees
UNION
SELECT name FROM contractors;
- 说明:此查询返回员工和承包商的所有姓名,去除重复项。
10. ORDER BY Clause
用于对查询结果进行排序。
示例:
SELECT *
FROM employees
ORDER BY salary DESC;
- 说明:此查询按薪水降序排列员工记录。
11. LIMIT Clause
限制查询结果的返回行数。
示例:
SELECT *
FROM employees
LIMIT 10;
- 说明:此查询返回前 10 条员工记录。
12. Top-N
返回根据某一列排序的前 N 条记录。
示例:
SELECT *
FROM employees
ORDER BY salary DESC
LIMIT 5;
- 说明:此查询返回薪水最高的前 5 名员工。
13. Window Top-N
在窗口内返回前 N 条记录。
示例:
SELECT department, name, salary
FROM employees
WINDOW TUMBLE AS (INTERVAL '1' HOUR)
ORDER BY salary DESC
LIMIT 3;
- 说明:此查询在每小时的窗口内返回薪水最高的前 3 名员工。
14. Deduplication
去重操作用于移除重复记录,通常结合窗口或条件使用。
示例:
SELECT DISTINCT name, department
FROM employees;
- 说明:此查询返回不重复的员工姓名和部门组合。
15. Pattern Recognition
模式识别用于检测流数据中的特定模式,适合复杂事件处理。
示例:
SELECT *
FROM pattern_matched
WHERE MATCH_RECOGNIZE (
PARTITION BY id
ORDER BY time
MEASURES
A.name AS start_name,
B.name AS end_name
PATTERN (A B)
DEFINE
A AS A.salary < B.salary
);
- 说明:此查询识别在同一 ID 内薪水逐渐上升的模式。
示例
Kafka --> MySQL
使用Python
向Kafka
写入数据
import json
import time
import traceback
from datetime import datetime
from kafka import KafkaProducer
from kafka.errors import KafkaError
producer = KafkaProducer(
bootstrap_servers=['10.211.55.58:9092'],
key_serializer=lambda k: json.dumps(k).encode('utf-8'),
value_serializer=lambda v: json.dumps(v).encode('utf-8'))
while True:
# 获取当前时间
ts = datetime.strftime(datetime.now(), '%Y-%m-%d %H:%M:%S')
# 构建不同类型的JSON对象
message = {
"count_num": 42, # 示例整数
"is_active": True, # 布尔值
"details": {
"user": "example_user",
"status": "active"
},
"ts": ts,
}
# 发送消息
future = producer.send(
'kafka_demo2',
key='count_num', # 使用相同的key值将消息送至同一个分区
value=message,
partition=0 # 向分区0发送消息
)
print("send {}".format(json.dumps(message)))
time.sleep(3)
try:
future.get(timeout=10) # 监控是否发送成功
except KafkaError: # 发送失败抛出KafkaError
traceback.print_exc()
通过Flink SQL
消费Kafka
数据,然后写入MySQL
-- 1. 创建 Kafka 源表
CREATE TABLE kafka_source (
count_num INT,
is_active BOOLEAN,
details ROW<`user` STRING, status STRING>,
ts TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'kafka_demo2',
'properties.bootstrap.servers' = '10.211.55.58:9092',
'properties.group.id' = 'testGroup',
'properties.auto.offset.reset' = 'earliest', -- 设置偏移量重置策略
'format' = 'json'
);
-- 2. 创建 MySQL 目标表
CREATE TABLE mysql_sink (
count_num INT,
is_active BOOLEAN,
`user` STRING,
status STRING,
ts TIMESTAMP(3)
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://10.211.55.58:3306/FlinkSQL', -- 替换为你的数据库名
'table-name' = 'mysql_sink',
'driver' = 'com.mysql.cj.jdbc.Driver',
'username' = 'root', -- 替换为你的用户名
'password' = 'root', -- 替换为你的密码
'sink.buffer-flush.interval' = '1s' -- 可选:设置刷新间隔
);
-- 3. 从 Kafka 读取数据并写入 MySQL
INSERT INTO mysql_sink
SELECT
count_num,
is_active,
details.`user` AS `user`,
details.status AS status,
ts
FROM kafka_source;
配置详解
scan.startup.mode
1. earliest-offset
- 描述:从主题的最早可用偏移量开始读取数据,即从 Kafka 中的所有分区的最早数据开始消费。
- 使用场景:适用于需要从 Kafka 中读取所有历史数据的情况。
2. latest-offset
- 描述:从提交作业的当前最新偏移量开始读取数据。新加入的记录会被实时消费。
- 使用场景:用于只想消费 Kafka 中新增数据的情况。
3. group-offsets
- 描述:从上次已提交的偏移量开始消费。如果某个分区没有已提交的偏移量,则默认行为是从最早偏移量开始读取。
- 使用场景:适用于重启作业时从中断位置继续消费的情况。
4. timestamp
- 描述:从指定的时间戳开始读取数据。Kafka 中各个分区会查找最接近给定时间戳的偏移量,从该位置开始消费。
- 使用场景:当你希望从特定时间点的数据开始消费时使用。
5. specific-offsets
- 描述:从指定的偏移量开始消费数据。你可以为每个 Kafka 分区明确设置起始的偏移量。
- 使用场景:用于需要精确控制从哪个位置开始消费数据的情况。