Kafka简明教程
1.安装以及使用
通过Docker进行安装
1.安装zookeeper
docker run \
-d \
--restart=always \
-e ALLOW_ANONYMOUS_LOGIN=yes \
--log-driver json-file \
--log-opt max-size=100m \
--log-opt max-file=2 \
--name zookeeper \
-p 2181:2181 \
-v /etc/localtime:/etc/localtime \
bitnami/zookeeper:latest
2.安装Kafka
docker run \
-d \
--log-driver json-file \
--log-opt max-size=100m \
--log-opt max-file=2 \
--name kafka \
-p 9092:9092 \
-e KAFKA_BROKER_ID=0 \
-e KAFKA_ZOOKEEPER_CONNECT=<IP>:2181/kafka \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://<IP>:9092 \
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
-v /etc/localtime:/etc/localtime \
bitnami/kafka:latest
进入kafka
docker exec -it kafka /bin/bash
cd /opt/bitnami/kafka/bin
查看kafka版本
kafka-topics.sh --version
3.验证kafka
创建一个新的主题:
./kafka-topics.sh --create --topic test-kafka --bootstrap-server localhost:9092
在开启一个新的终端,一个作为生产者,一个作为消费者
消费者
./kafka-console-consumer.sh --topic test-kafka --from-beginning --bootstrap-server localhost:9092
生产者
./kafka-console-producer.sh --topic test-kafka --bootstrap-server localhost:9092
在生产者页面输入测试内容:
{"id":1,"name":"arvin"}
4.安装Kafka Map
docker run -d --name kafka-map \
-p 9001:8080 \
-v /opt/kafka-map/data:/usr/local/kafka-map/data \
-e DEFAULT_USERNAME=admin \
-e DEFAULT_PASSWORD=admin \
--restart always dushixiang/kafka-map:latest
访问 http://
5.通过Python生产、消费Kafka
生产者
import json
import time
import traceback
from datetime import datetime
from kafka import KafkaProducer
from kafka.errors import kafka_errors
producer = KafkaProducer(
bootstrap_servers=['10.211.55.58:9092'],
key_serializer=lambda k: json.dumps(k).encode(),
value_serializer=lambda v: json.dumps(v).encode())
while True:
i = datetime.strftime(datetime.now(), '%Y%m%d %H:%M:%S')
future = producer.send(
'kafka_demo',
key='count_num', # 同一个key值,会被送至同一个分区
value=str(i),
partition=0) # 向分区1发送消息
print("send {}".format(str(i)))
time.sleep(3)
try:
future.get(timeout=10) # 监控是否发送成功
except kafka_errors: # 发送失败抛出kafka_errors
traceback.format_exc()
消费者
import json
from kafka import KafkaConsumer
consumer = KafkaConsumer(
'kafka_demo',
bootstrap_servers='10.211.55.58:9092',
group_id='test'
)
for message in consumer:
print("receive, key: {}, value: {}".format(
json.loads(message.key.decode()),
json.loads(message.value.decode())
)
)
2.详细讲解
消费策略
1. 自动提交偏移量(Auto Offset Commit)
-
描述:这是 Kafka 默认的消费方式,消费者在消费消息时,会定期自动提交当前消费的偏移量。Kafka 会根据已提交的偏移量记录,每次启动时从上次提交的位置继续消费。
-
优点:
- 简单易用,不需要手动管理偏移量。
-
缺点:
- 如果消费者在消息处理过程中崩溃,可能会导致消息重复消费或丢失,因为偏移量已经提交,但数据可能还没有完全处理。
-
配置:
enable.auto.commit = true
2. 手动提交偏移量(Manual Offset Commit)
-
描述:消费者在消费消息后,手动提交已处理的偏移量。开发者可以在确保数据正确处理后再提交偏移量,避免消息丢失或重复消费。
-
优点:
- 可以精确控制偏移量的提交时机,确保消息被完全处理后再提交。
-
缺点:
- 增加了实现的复杂性,需要自己管理偏移量的提交。
-
配置:
enable.auto.commit = false
,并通过代码显式调用commitSync()
或commitAsync()
方法提交偏移量。
3. 批量消费(Batch Consumption)
-
描述:消费者每次读取一批消息,然后批量处理。这种方式适合需要高吞吐量的场景,可以减少每次消费的网络延迟。
-
优点:
- 批量消费可以提升性能和吞吐量,尤其是在高负载的情况下。
-
缺点:
- 如果处理批量中的某条消息失败,可能需要重新处理整个批次。
-
配置:可以通过
max.poll.records
参数控制每次批量消费的消息数。
4. 消费指定偏移量(Consume from Specific Offset)
-
描述:消费者可以手动指定从某个具体的偏移量开始消费,而不是从 Kafka 默认的最新或最早偏移量开始。
-
优点:
- 提供了精确控制消费位置的能力,适用于需要回溯或重置消费位置的场景。
-
缺点:
- 需要手动管理消费偏移量,不适合一般场景的自动化消费。
-
配置:可以通过 API 手动设置消费的偏移量,例如
seek(offset)
方法。
5. 消费最新消息(Consume from Latest Offset)
-
描述:消费者从当前主题的最新偏移量开始消费,即只处理新产生的消息,忽略主题中已有的历史数据。
-
优点:
- 适合只关心新产生数据的场景,例如实时监控或实时流处理。
-
缺点:
- 可能会忽略历史数据,无法用于回溯处理。
-
配置:将消费者的启动模式设置为
auto.offset.reset = latest
。
6. 消费最早消息(Consume from Earliest Offset)
-
描述:消费者从当前主题的最早偏移量开始消费,确保消费到 Kafka 中的所有历史数据。
-
优点:
- 适用于需要处理所有历史数据的场景,如日志分析或数据回放。
-
缺点:
- 如果主题数据量很大,消费延迟可能较高,尤其是在消费积压的情况下。
-
配置:将消费者的启动模式设置为
auto.offset.reset = earliest
。
7. 消费指定时间点的消息(Consume from a Specific Timestamp)
-
描述:消费者从接近指定时间戳的偏移量开始消费。Kafka 会查找每个分区中最接近给定时间戳的偏移量,然后从该位置开始消费。
-
优点:
- 能够精准控制从某一时间点的数据开始消费,适用于数据恢复和回溯场景。
-
缺点:
- 需要有时间戳信息,并且查找偏移量可能稍有开销。
-
配置:使用
KafkaConsumer.offsetsForTimes()
来查找时间戳对应的偏移量。
不同的消费方式适用于不同的应用场景:
- 自动提交 适用于简单的实时处理。
- 手动提交 和 批量消费 适用于需要可靠性和更高吞吐量的场景。
- 指定偏移量 和 指定时间点 适用于精确控制数据消费起点的需求。