Kafka 4.3.0部署方法(KRaft模式)
Kafka 4.0+ 已完全移除 ZooKeeper 依赖,采用内置的 KRaft 协议进行元数据管理,部署比旧版本简单得多。
要求:Java版本:Java 17+(必须,Kafka 4.0 不再支持 Java 11 以下)
一、下载并解压安装
从Apache官方下载最新版本:https://kafka.apache.org/community/downloads
wget "https://www.apache.org/dyn/closer.lua/kafka/4.3.0/kafka_2.13-4.3.0.tgz"
tar -zxf kafka_2.13-4.3.0.tgz
mv kafka_2.13-4.3.0 /usr/local/kafka
二、生成集群UUID
cd /usr/local/kafka/
KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
echo $KAFKA_CLUSTER_ID
三、格式化存储目录(首次部署必须执行)
修改配置文件config/server.properties:
# 数据存储目录
log.dirs=/var/lib/kafka
# 对外公布的地址 192.168.10.8为部署Kafka的服务器IP地址
advertised.listeners=PLAINTEXT://192.168.10.8:9092,CONTROLLER://192.168.10.8:9093
# 集群控制器的选民名单,值必须包含所有设置了process.roles=controller的节点
controller.quorum.voters=1@192.168.10.8:9093,2@192.168.10.10:9093,3@192.168.10.12:9093
# 自动创建的Topic默认分区数和副本数,生产环境推荐3
num.partitions=3
default.replication.factor=3
# 内部Topic副本数,必须>=3
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
/usr/local/kafka/bin/kafka-storage.sh format --standalone -t $KAFKA_CLUSTER_ID -c /usr/local/kafka/config/server.properties
--standalone:参数表示单机模式,无需配置多个节点
四、启动Kafka
/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties
后台运行
/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
用lsof -i:9092命令查看启动情况
COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
java 272648 root 165u IPv6 2934447 0t0 TCP *:XmlIpcRegSvc (LISTEN)
停止Kafka
/usr/local/kafka/bin/kafka-server-stop.sh
自动启动Kafka
编辑/etc/systemd/system/kafka.service,输入一下内容:
[Unit]
Description=Kafka
After=network.target
[Service]
Type=forking
ExecStart=/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
ExecStop=/usr/local/kafka/bin/kafka-server-stop.sh
Restart=on-failure
RestartSec=10
TimeoutStartSec=120
TimeoutStopSec=120
[Install]
WantedBy=multi-user.target
五、命令行操作Kafka
1.创建Topic
/usr/local/kafka/bin/kafka-topics.sh --create --topic test-topic --bootstrap-server localhost:9092
2.删除Topic
/usr/local/kafka/bin/kafka-topics.sh --delete --topic test-topic --bootstrap-server localhost:9092
3.发送消息
/usr/local/kafka/bin/kafka-console-producer.sh --topic test-topic --bootstrap-server localhost:9092
4.消费消息
/usr/local/kafka/bin/kafka-console-consumer.sh --topic test-topic --from-beginning --group test-group --command-property group.protocol=consumer --bootstrap-server localhost:9092
--from-beginning:从头消费,消费 topic 中所有的历史消息
--offset latest:从最新开始,可以显式指定,但与默认行为相同(Kafka 2.0+ 支持)
5.查看消费者组详细信息
/usr/local/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test-group
各字段含义
CURRENT-OFFSET:该消费者组当前已消费的偏移量
LOG-END-OFFSET:该分区最新的消息偏移量
LAG:积压量 = LOG-END-OFFSET - CURRENT-OFFSET,这是最重要的监控指标
CONSUMER-ID:消费者的唯一标识。若为 -,表示该分区未被任何消费者分配
6.查看消费者组成员信息
/usr/local/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test-group --members
7.查看消费者组状态
/usr/local/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test-group --state
消费者组状态含义
Stable:正常状态,消费者正在稳定消费
PreparingRebalance:正在准备重平衡(短暂)
CompletingRebalance:正在完成重平衡(短暂)
Empty:有元数据但没有活跃成员(所有消费者已离开)
Dead:消费者组已无任何元数据或成员
8.查看详细的偏移量信息
/usr/local/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test-group --offsets
五、核心配置说明(config/server.properties)
| 参数 | 推荐值(单机) | 说明 |
|---|---|---|
| node.id | 1 | 节点唯一标识(必须从 1 开始递增且不重复) |
| listeners | PLAINTEXT://:9092 | 客户端连接地址 |
| advertised.listeners | PLAINTEXT://localhost:9092,CONTROLLER://localhost:9093 | 对外公布的地址(需改为IP) |
| controller.quorum.voters | 1@192.168.10.8:9093,2@192.168.10.10:9093,3@192.168.10.12:9093 | 所有Controller节点的列表,三个节点必须完全一致 |
| log.dirs | /var/lib/kafka | 数据存储目录 |
| log.retention.hours | 168 | 消息保留时间(7天) |
| num.io.threads | 8 | 建议设为CPU核心数 |
六、日志配置修改(config/log4j2.yaml)
将Configuration.Properties.Property属性下的name为kafka.logs.dir的value改为/var/log/kafka;将name为logPattern的name改为log.pattern,value改为%d{yyyy-MM-dd HH:mm:ss.SSS} [%p]-[%t][%c] %m%n。
将所有的${logPattern}改为${log.pattern}。
将所有的${sys:kafka.logs.dir}改为${kafka.logs.dir}。
将所有的.log.%d{yyyy-MM-dd-HH}改为.%d{yyyyMMdd}.log。