文章

Kafka简明教程

1.安装以及使用

通过Docker进行安装

1.安装zookeeper

docker run \
  -d \
  --restart=always \
  -e ALLOW_ANONYMOUS_LOGIN=yes \
  --log-driver json-file \
  --log-opt max-size=100m \
  --log-opt max-file=2 \
  --name zookeeper \
  -p 2181:2181 \
  -v /etc/localtime:/etc/localtime \
  bitnami/zookeeper:latest

2.安装Kafka

docker run \
  -d \
  --log-driver json-file \
  --log-opt max-size=100m \
  --log-opt max-file=2 \
  --name kafka \
  -p 9092:9092 \
  -e KAFKA_BROKER_ID=0 \
  -e KAFKA_ZOOKEEPER_CONNECT=<IP>:2181/kafka \
  -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://<IP>:9092 \
  -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
  -v /etc/localtime:/etc/localtime \
  bitnami/kafka:latest

进入kafka

docker exec -it kafka /bin/bash

cd /opt/bitnami/kafka/bin

查看kafka版本

kafka-topics.sh --version

3.验证kafka

创建一个新的主题:

./kafka-topics.sh --create --topic test-kafka --bootstrap-server localhost:9092

在开启一个新的终端,一个作为生产者,一个作为消费者

消费者

./kafka-console-consumer.sh --topic test-kafka --from-beginning --bootstrap-server localhost:9092

生产者

./kafka-console-producer.sh --topic test-kafka --bootstrap-server localhost:9092

在生产者页面输入测试内容:

{"id":1,"name":"arvin"}

4.安装Kafka Map

docker run -d --name kafka-map \
    -p 9001:8080 \
    -v /opt/kafka-map/data:/usr/local/kafka-map/data \
    -e DEFAULT_USERNAME=admin \
    -e DEFAULT_PASSWORD=admin \
    --restart always dushixiang/kafka-map:latest

访问 http://:9001 并添加对应的kafka集群信息

5.通过Python生产、消费Kafka

生产者

import json
import time
import traceback

from datetime import datetime
from kafka import KafkaProducer
from kafka.errors import kafka_errors

producer = KafkaProducer(
    bootstrap_servers=['10.211.55.58:9092'],
    key_serializer=lambda k: json.dumps(k).encode(),
    value_serializer=lambda v: json.dumps(v).encode())

while True:
    i = datetime.strftime(datetime.now(), '%Y%m%d %H:%M:%S')
    future = producer.send(
        'kafka_demo',
        key='count_num',  # 同一个key值,会被送至同一个分区
        value=str(i),
        partition=0)  # 向分区1发送消息
    print("send {}".format(str(i)))
    time.sleep(3)
    try:
        future.get(timeout=10)  # 监控是否发送成功
    except kafka_errors:  # 发送失败抛出kafka_errors
        traceback.format_exc()

消费者

import json

from kafka import KafkaConsumer

consumer = KafkaConsumer(
    'kafka_demo',
    bootstrap_servers='10.211.55.58:9092',
    group_id='test'
)
for message in consumer:
    print("receive, key: {}, value: {}".format(
        json.loads(message.key.decode()),
        json.loads(message.value.decode())
    )
    )

image-20240820095709546

image-20240820095830506

2.详细讲解

消费策略

1. 自动提交偏移量(Auto Offset Commit)

  • 描述:这是 Kafka 默认的消费方式,消费者在消费消息时,会定期自动提交当前消费的偏移量。Kafka 会根据已提交的偏移量记录,每次启动时从上次提交的位置继续消费。

  • 优点:

    • 简单易用,不需要手动管理偏移量。
  • 缺点:

    • 如果消费者在消息处理过程中崩溃,可能会导致消息重复消费或丢失,因为偏移量已经提交,但数据可能还没有完全处理。
  • 配置:enable.auto.commit = true

2. 手动提交偏移量(Manual Offset Commit)

  • 描述:消费者在消费消息后,手动提交已处理的偏移量。开发者可以在确保数据正确处理后再提交偏移量,避免消息丢失或重复消费。

  • 优点:

    • 可以精确控制偏移量的提交时机,确保消息被完全处理后再提交。
  • 缺点:

    • 增加了实现的复杂性,需要自己管理偏移量的提交。
  • 配置:enable.auto.commit = false,并通过代码显式调用 commitSync()commitAsync() 方法提交偏移量。

3. 批量消费(Batch Consumption)

  • 描述:消费者每次读取一批消息,然后批量处理。这种方式适合需要高吞吐量的场景,可以减少每次消费的网络延迟。

  • 优点:

    • 批量消费可以提升性能和吞吐量,尤其是在高负载的情况下。
  • 缺点:

    • 如果处理批量中的某条消息失败,可能需要重新处理整个批次。
  • 配置:可以通过 max.poll.records 参数控制每次批量消费的消息数。

4. 消费指定偏移量(Consume from Specific Offset)

  • 描述:消费者可以手动指定从某个具体的偏移量开始消费,而不是从 Kafka 默认的最新或最早偏移量开始。

  • 优点:

    • 提供了精确控制消费位置的能力,适用于需要回溯或重置消费位置的场景。
  • 缺点:

    • 需要手动管理消费偏移量,不适合一般场景的自动化消费。
  • 配置:可以通过 API 手动设置消费的偏移量,例如 seek(offset) 方法。

5. 消费最新消息(Consume from Latest Offset)

  • 描述:消费者从当前主题的最新偏移量开始消费,即只处理新产生的消息,忽略主题中已有的历史数据。

  • 优点:

    • 适合只关心新产生数据的场景,例如实时监控或实时流处理。
  • 缺点:

    • 可能会忽略历史数据,无法用于回溯处理。
  • 配置:将消费者的启动模式设置为 auto.offset.reset = latest

6. 消费最早消息(Consume from Earliest Offset)

  • 描述:消费者从当前主题的最早偏移量开始消费,确保消费到 Kafka 中的所有历史数据。

  • 优点:

    • 适用于需要处理所有历史数据的场景,如日志分析或数据回放。
  • 缺点:

    • 如果主题数据量很大,消费延迟可能较高,尤其是在消费积压的情况下。
  • 配置:将消费者的启动模式设置为 auto.offset.reset = earliest

7. 消费指定时间点的消息(Consume from a Specific Timestamp)

  • 描述:消费者从接近指定时间戳的偏移量开始消费。Kafka 会查找每个分区中最接近给定时间戳的偏移量,然后从该位置开始消费。

  • 优点:

    • 能够精准控制从某一时间点的数据开始消费,适用于数据恢复和回溯场景。
  • 缺点:

    • 需要有时间戳信息,并且查找偏移量可能稍有开销。
  • 配置:使用 KafkaConsumer.offsetsForTimes() 来查找时间戳对应的偏移量。

不同的消费方式适用于不同的应用场景:

  • 自动提交 适用于简单的实时处理。
  • 手动提交 和 批量消费 适用于需要可靠性和更高吞吐量的场景。
  • 指定偏移量 和 指定时间点 适用于精确控制数据消费起点的需求。
License:  CC BY 4.0