Kafka 详细快速入门指南和故障排查

Kafka 详细快速入门指南和故障排查

发布时间:2024-10-12 01:38:40

1. kafka 简介

kafka 是一个分布式流处理平台,设计用于高吞吐量、可靠性和可扩展性。主要概念包括:

  • topic: 消息的逻辑分类
  • partition: topic 的子集,允许并行处理
  • producer: 发送消息到 topic
  • consumer: 从 topic 读取消息
  • broker: kafka 服务器
  • zookeeper: 用于管理和协调 kafka broker

2. 安装 kafka

2.1 前提条件

  • java 8+
  • 足够的磁盘空间 (建议至少 5gb)
  • 内存 (建议至少 4gb ram)

2.2 下载和安装步骤

  1. 下载 kafka
     
    wget https://downloads.apache.org/kafka/3.5.0/kafka_2.13-3.5.0.tgz
  2. 解压文件
     

    tar -xzf kafka_2.13-3.5.0.tgz
    cd kafka_2.13-3.5.0

  3. 配置环境变量 (可选,但推荐)
     

    export kafka_home=/path/to/kafka
    export path=$path:$kafka_home/bin

2.3 配置 kafka

编辑 config/server.properties:

  • broker.id=0 (每个 broker 应该有唯一的 id)
  • listeners=plaintext://:9092 (监听地址)
  • log.dirs=/tmp/kafka-logs (日志存储位置)
  • num.partitions=3 (每个 topic 的默认分区数)

2.4 常见安装问题

  • 权限问题: 确保有足够的权限解压和执行文件
  • 版本兼容性: 确保 scala 版本与你的环境兼容
  • 磁盘空间: 检查 log.dirs 指定的目录是否有足够空间

3. 启动 kafka 环境

3.1 启动 zookeeper

  1. 使用 kafka 自带的脚本:
     
    bin/zookeeper-server-start.sh config/zookeeper.properties
  2. 或者使用守护进程模式:
     
    bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

3.2 启动 kafka 服务器

  1. 标准模式:
     
    bin/kafka-server-start.sh config/server.properties
  2. 守护进程模式:
     
    bin/kafka-server-start.sh -daemon config/server.properties

3.3 验证 kafka 是否正在运行

 
netstat -tunlp | grep 9092

3.4 常见启动问题

  • 端口冲突: 确保 9092 (kafka) 和 2181 (zookeeper) 端口未被占用
  • 内存不足: 调整 kafka 的堆内存大小 (kafka_heap_opts)
  • 日志目录权限: 确保 kafka 用户有写入 log.dirs 的权限

4. 管理 kafka topics

4.1 创建 topic

 
bin/kafka-topics.sh --create --topic my-topic --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1

4.2 列出所有 topics

 
bin/kafka-topics.sh --list --bootstrap-server localhost:9092

4.3 描述 topic

 
bin/kafka-topics.sh --describe --topic my-topic --bootstrap-server localhost:9092

4.4 删除 topic

 
bin/kafka-topics.sh --delete --topic my-topic --bootstrap-server localhost:9092

4.5 增加 topic 分区

 
bin/kafka-topics.sh --alter --topic my-topic --partitions 4 --bootstrap-server localhost:9092

5. 生产和消费消息

5.1 使用控制台生产者

 
bin/kafka-console-producer.sh --topic my-topic --bootstrap-server localhost:9092

5.2 使用控制台消费者

 
bin/kafka-console-consumer.sh --topic my-topic --from-beginning --bootstrap-server localhost:9092

5.3 使用 java 客户端

producer 示例:

java

properties props = new properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.stringserializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.stringserializer");

producer producer = new kafkaproducer<>(props);
producer.send(new producerrecord<>("my-topic", "key", "value"));
producer.close();

consumer 示例:

java

properties props = new properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.stringdeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.stringdeserializer");

kafkaconsumer consumer = new kafkaconsumer<>(props);
consumer.subscribe(arrays.aslist("my-topic"));

while (true) {
consumerrecords records = consumer.poll(duration.ofmillis(100));
for (consumerrecord record : records) {
system.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}

6. 配置最佳实践

6.1 broker 配置

  • num.network.threads: 处理网络请求的线程数,默认3
  • num.io.threads: 处理磁盘i/o的线程数,默认8
  • socket.send.buffer.bytes: so_sndbuf缓冲区大小,默认100kb
  • socket.receive.buffer.bytes: so_rcvbuf缓冲区大小,默认100kb
  • socket.request.max.bytes: 单个请求的最大大小,默认100mb

6.2 topic 配置

  • retention.ms: 消息保留时间,默认7天
  • retention.bytes: 分区最大大小,默认-1(无限制)
  • max.message.bytes: 单条消息最大大小,默认1mb

6.3 producer 配置

  • acks: 确认机制 (0, 1, all)
  • retries: 重试次数
  • batch.size: 批量发送大小
  • linger.ms: 等待时间以形成批次

6.4 consumer 配置

  • fetch.min.bytes: 每次拉取的最小数据量
  • max.poll.records: 单次调用poll()返回的最大记录数
  • auto.offset.reset: 新消费者组的起始offset (earliest, latest)

7. 监控和性能调优

7.1 jmx 监控

启用 jmx:

 
export kafka_jmx_opts="-dcom.sun.management.jmxremote -dcom.sun.management.jmxremote.port=9999 -dcom.sun.management.jmxremote.authenticate=false -dcom.sun.management.jmxremote.ssl=false"

7.2 使用监控工具

  • kafka manager
  • grafana + prometheus
  • linkedin's burrow

7.3 性能测试

使用 kafka-producer-perf-test.sh 和 kafka-consumer-perf-test.sh 脚本进行性能测试。

7.4 常见性能问题及解决方案

  • 高延迟: 增加 producer 的 batch.size, 调整 linger.ms
  • 低吞吐量: 增加 partition 数量, 调整 num.network.threads 和 num.io.threads
  • 消费者延迟: 调整 fetch.min.bytes 和 max.poll.records

8. 安全配置

8.1 启用 ssl/tls

  1. 生成密钥和证书
  2. 配置 server.properties:
     

    listeners=ssl://localhost:9093
    ssl.keystore.location=/path/to/kafka.server.keystore.jks
    ssl.keystore.password=test1234
    ssl.key.password=test1234

  3. 配置客户端使用 ssl

8.2 启用 sasl 认证

  1. 配置 jaas
  2. 修改 server.properties:
     

    listeners=sasl_ssl://localhost:9094
    security.inter.broker.protocol=sasl_ssl
    sasl.mechanism.inter.broker.protocol=plain
    sasl.enabled.mechanisms=plain

  3. 配置客户端使用 sasl

9. 故障排查

9.1 日志分析

  • kafka 服务器日志: logs/server.log
  • kafka 控制器日志: logs/controller.log
  • zookeeper 日志: logs/zookeeper-gc.log

使用命令分析日志:

 

grep error logs/server.log
tail -f logs/server.log

9.2 常见错误码及解决方案

  • leader_not_available: 等待新 leader 选举或手动触发 leader 选举
  • network_exception: 检查网络连接和防火墙设置
  • not_enough_replicas: 增加副本数或减少 min.insync.replicas 配置

9.3 zookeeper 问题

  • 使用 zkcli.sh 连接 zookeeper 并检查 kafka 相关 znode
  • 检查 zookeeper 集群状态: echo stat | nc localhost 2181

9.4 数据丢失问题

  1. 检查 producer 的 acks 设置
  2. 验证 min.insync.replicas 配置
  3. 分析 consumer 的 offset 提交策略

9.5 性能诊断

  1. 使用 jstatjstack 分析 jvm 状态
  2. 检查磁盘 i/o: iostat -x 1
  3. 网络流量分析: iftopnetstat

9.6 常见运维命令

  • 手动触发 leader 选举:
     
    bin/kafka-leader-election.sh --bootstrap-server localhost:9092 --topic my-topic --partition 0 --election-type preferred
  • 重新分配 partition:
     
    bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --topics-to-move-json-file topics-to-move.json --broker-list "0,1,2" --execute
  • 检查消费者组状态:
     
    bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group

记住,在进行任何更改之前,务必备份重要的配置文件和数据。在生产环境中,建议先在测试环境验证所有更改。如果遇到复杂问题,不要犹豫寻求 kafka 社区或专业支持的帮助。

------------------------------------------------------------------------------

这个是更新后的指南提供了更深入和全面的 kafka 入门和故障排查信息。它涵盖了从基本概念到高级配置和故障排查的各个方面。主要的新增和扩展内容包括:

  1. kafka 的基本概念介绍
  2. 更详细的安装和配置步骤
  3. 高级 topic 管理操作
  4. java 客户端代码示例
  5. 详细的配置最佳实践
  6. 监控和性能调优的方法
  7. 安全配置(ssl/tls 和 sasl)
  8. 更深入的故障排查技巧和常见问题解决方案
  9. 常用的运维命令

这个指南应该能够帮助您更全面地了解 kafka,并在实际使用中解决各种问题。

感谢提供:05互联