新闻中心
新闻中心与新手教程
新闻中心与新手教程
发布时间:2024-10-12 01:38:40
kafka 是一个分布式流处理平台,设计用于高吞吐量、可靠性和可扩展性。主要概念包括:
wget https://downloads.apache.org/kafka/3.5.0/kafka_2.13-3.5.0.tgztar -xzf kafka_2.13-3.5.0.tgz
cd kafka_2.13-3.5.0
export kafka_home=/path/to/kafka
export path=$path:$kafka_home/bin
编辑 config/server.properties:
broker.id=0 (每个 broker 应该有唯一的 id)listeners=plaintext://:9092 (监听地址)log.dirs=/tmp/kafka-logs (日志存储位置)num.partitions=3 (每个 topic 的默认分区数)log.dirs 指定的目录是否有足够空间bin/zookeeper-server-start.sh config/zookeeper.propertiesbin/zookeeper-server-start.sh -daemon config/zookeeper.propertiesbin/kafka-server-start.sh config/server.propertiesbin/kafka-server-start.sh -daemon config/server.propertiesnetstat -tunlp | grep 9092kafka_heap_opts)log.dirs 的权限bin/kafka-topics.sh --create --topic my-topic --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1bin/kafka-topics.sh --list --bootstrap-server localhost:9092bin/kafka-topics.sh --describe --topic my-topic --bootstrap-server localhost:9092bin/kafka-topics.sh --delete --topic my-topic --bootstrap-server localhost:9092bin/kafka-topics.sh --alter --topic my-topic --partitions 4 --bootstrap-server localhost:9092bin/kafka-console-producer.sh --topic my-topic --bootstrap-server localhost:9092bin/kafka-console-consumer.sh --topic my-topic --from-beginning --bootstrap-server localhost:9092producer 示例:
properties props = new properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.stringserializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.stringserializer");
producer
producer.send(new producerrecord<>("my-topic", "key", "value"));
producer.close();
consumer 示例:
properties props = new properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.stringdeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.stringdeserializer");
kafkaconsumer
consumer.subscribe(arrays.aslist("my-topic"));
while (true) {
consumerrecords
for (consumerrecord
system.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
num.network.threads: 处理网络请求的线程数,默认3num.io.threads: 处理磁盘i/o的线程数,默认8socket.send.buffer.bytes: so_sndbuf缓冲区大小,默认100kbsocket.receive.buffer.bytes: so_rcvbuf缓冲区大小,默认100kbsocket.request.max.bytes: 单个请求的最大大小,默认100mbretention.ms: 消息保留时间,默认7天retention.bytes: 分区最大大小,默认-1(无限制)max.message.bytes: 单条消息最大大小,默认1mbacks: 确认机制 (0, 1, all)retries: 重试次数batch.size: 批量发送大小linger.ms: 等待时间以形成批次fetch.min.bytes: 每次拉取的最小数据量max.poll.records: 单次调用poll()返回的最大记录数auto.offset.reset: 新消费者组的起始offset (earliest, latest)启用 jmx:
export kafka_jmx_opts="-dcom.sun.management.jmxremote -dcom.sun.management.jmxremote.port=9999 -dcom.sun.management.jmxremote.authenticate=false -dcom.sun.management.jmxremote.ssl=false"使用 kafka-producer-perf-test.sh 和 kafka-consumer-perf-test.sh 脚本进行性能测试。
listeners=ssl://localhost:9093
ssl.keystore.location=/path/to/kafka.server.keystore.jks
ssl.keystore.password=test1234
ssl.key.password=test1234
listeners=sasl_ssl://localhost:9094
security.inter.broker.protocol=sasl_ssl
sasl.mechanism.inter.broker.protocol=plain
sasl.enabled.mechanisms=plain
logs/server.loglogs/controller.loglogs/zookeeper-gc.log使用命令分析日志:
grep error logs/server.log
tail -f logs/server.log
zkcli.sh 连接 zookeeper 并检查 kafka 相关 znodeecho stat | nc localhost 2181jstat 和 jstack 分析 jvm 状态iostat -x 1iftop 或 netstatbin/kafka-leader-election.sh --bootstrap-server localhost:9092 --topic my-topic --partition 0 --election-type preferredbin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --topics-to-move-json-file topics-to-move.json --broker-list "0,1,2" --executebin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group记住,在进行任何更改之前,务必备份重要的配置文件和数据。在生产环境中,建议先在测试环境验证所有更改。如果遇到复杂问题,不要犹豫寻求 kafka 社区或专业支持的帮助。
------------------------------------------------------------------------------
这个是更新后的指南提供了更深入和全面的 kafka 入门和故障排查信息。它涵盖了从基本概念到高级配置和故障排查的各个方面。主要的新增和扩展内容包括:
这个指南应该能够帮助您更全面地了解 kafka,并在实际使用中解决各种问题。
感谢提供:05互联