新闻中心
新闻中心与新手教程
新闻中心与新手教程
发布时间:2024-10-12 01:38:40
kafka 是一个分布式流处理平台,设计用于高吞吐量、可靠性和可扩展性。主要概念包括:
wget https://downloads.apache.org/kafka/3.5.0/kafka_2.13-3.5.0.tgz
tar -xzf kafka_2.13-3.5.0.tgz
cd kafka_2.13-3.5.0
export kafka_home=/path/to/kafka
export path=$path:$kafka_home/bin
编辑 config/server.properties
:
broker.id=0
(每个 broker 应该有唯一的 id)listeners=plaintext://:9092
(监听地址)log.dirs=/tmp/kafka-logs
(日志存储位置)num.partitions=3
(每个 topic 的默认分区数)log.dirs
指定的目录是否有足够空间bin/zookeeper-server-start.sh config/zookeeper.properties
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
bin/kafka-server-start.sh -daemon config/server.properties
netstat -tunlp | grep 9092
kafka_heap_opts
)log.dirs
的权限bin/kafka-topics.sh --create --topic my-topic --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
bin/kafka-topics.sh --describe --topic my-topic --bootstrap-server localhost:9092
bin/kafka-topics.sh --delete --topic my-topic --bootstrap-server localhost:9092
bin/kafka-topics.sh --alter --topic my-topic --partitions 4 --bootstrap-server localhost:9092
bin/kafka-console-producer.sh --topic my-topic --bootstrap-server localhost:9092
bin/kafka-console-consumer.sh --topic my-topic --from-beginning --bootstrap-server localhost:9092
producer 示例:
properties props = new properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.stringserializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.stringserializer");
producer
producer.send(new producerrecord<>("my-topic", "key", "value"));
producer.close();
consumer 示例:
properties props = new properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.stringdeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.stringdeserializer");
kafkaconsumer
consumer.subscribe(arrays.aslist("my-topic"));
while (true) {
consumerrecords
for (consumerrecord
system.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
num.network.threads
: 处理网络请求的线程数,默认3num.io.threads
: 处理磁盘i/o的线程数,默认8socket.send.buffer.bytes
: so_sndbuf缓冲区大小,默认100kbsocket.receive.buffer.bytes
: so_rcvbuf缓冲区大小,默认100kbsocket.request.max.bytes
: 单个请求的最大大小,默认100mbretention.ms
: 消息保留时间,默认7天retention.bytes
: 分区最大大小,默认-1(无限制)max.message.bytes
: 单条消息最大大小,默认1mbacks
: 确认机制 (0, 1, all)retries
: 重试次数batch.size
: 批量发送大小linger.ms
: 等待时间以形成批次fetch.min.bytes
: 每次拉取的最小数据量max.poll.records
: 单次调用poll()返回的最大记录数auto.offset.reset
: 新消费者组的起始offset (earliest, latest)启用 jmx:
export kafka_jmx_opts="-dcom.sun.management.jmxremote -dcom.sun.management.jmxremote.port=9999 -dcom.sun.management.jmxremote.authenticate=false -dcom.sun.management.jmxremote.ssl=false"
使用 kafka-producer-perf-test.sh 和 kafka-consumer-perf-test.sh 脚本进行性能测试。
listeners=ssl://localhost:9093
ssl.keystore.location=/path/to/kafka.server.keystore.jks
ssl.keystore.password=test1234
ssl.key.password=test1234
listeners=sasl_ssl://localhost:9094
security.inter.broker.protocol=sasl_ssl
sasl.mechanism.inter.broker.protocol=plain
sasl.enabled.mechanisms=plain
logs/server.log
logs/controller.log
logs/zookeeper-gc.log
使用命令分析日志:
grep error logs/server.log
tail -f logs/server.log
zkcli.sh
连接 zookeeper 并检查 kafka 相关 znodeecho stat | nc localhost 2181
jstat
和 jstack
分析 jvm 状态iostat -x 1
iftop
或 netstat
bin/kafka-leader-election.sh --bootstrap-server localhost:9092 --topic my-topic --partition 0 --election-type preferred
bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --topics-to-move-json-file topics-to-move.json --broker-list "0,1,2" --execute
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group
记住,在进行任何更改之前,务必备份重要的配置文件和数据。在生产环境中,建议先在测试环境验证所有更改。如果遇到复杂问题,不要犹豫寻求 kafka 社区或专业支持的帮助。
------------------------------------------------------------------------------
这个是更新后的指南提供了更深入和全面的 kafka 入门和故障排查信息。它涵盖了从基本概念到高级配置和故障排查的各个方面。主要的新增和扩展内容包括:
这个指南应该能够帮助您更全面地了解 kafka,并在实际使用中解决各种问题。
感谢提供:05互联