Kafka 4.3.0部署方法(KRaft模式)

Eave 2026.05.28

Kafka 4.0+ 已完全移除 ZooKeeper 依赖,采用内置的 KRaft 协议进行元数据管理,部署比旧版本简单得多。

要求:Java版本:Java 17+(必须,Kafka 4.0 不再支持 Java 11 以下)

一、下载并解压安装

从Apache官方下载最新版本:https://kafka.apache.org/community/downloads

wget "https://www.apache.org/dyn/closer.lua/kafka/4.3.0/kafka_2.13-4.3.0.tgz"
tar -zxf kafka_2.13-4.3.0.tgz
mv kafka_2.13-4.3.0 /usr/local/kafka

二、生成集群UUID

cd /usr/local/kafka/
KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
echo $KAFKA_CLUSTER_ID

三、格式化存储目录(首次部署必须执行)

修改配置文件config/server.properties

# 数据存储目录
log.dirs=/var/lib/kafka

# 对外公布的地址 192.168.10.8为部署Kafka的服务器IP地址
advertised.listeners=PLAINTEXT://192.168.10.8:9092,CONTROLLER://192.168.10.8:9093

# 集群控制器的选民名单,值必须包含所有设置了process.roles=controller的节点
controller.quorum.voters=1@192.168.10.8:9093,2@192.168.10.10:9093,3@192.168.10.12:9093

# 自动创建的Topic默认分区数和副本数,生产环境推荐3
num.partitions=3
default.replication.factor=3

# 内部Topic副本数,必须>=3
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
/usr/local/kafka/bin/kafka-storage.sh format --standalone -t $KAFKA_CLUSTER_ID -c /usr/local/kafka/config/server.properties

--standalone:参数表示单机模式,无需配置多个节点

四、启动Kafka

/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties

后台运行

/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties

lsof -i:9092命令查看启动情况

COMMAND    PID USER   FD   TYPE  DEVICE SIZE/OFF NODE NAME
java    272648 root  165u  IPv6 2934447      0t0  TCP *:XmlIpcRegSvc (LISTEN)

停止Kafka

/usr/local/kafka/bin/kafka-server-stop.sh

自动启动Kafka

编辑/etc/systemd/system/kafka.service,输入一下内容:

[Unit]
Description=Kafka
After=network.target

[Service]
Type=forking
ExecStart=/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
ExecStop=/usr/local/kafka/bin/kafka-server-stop.sh
Restart=on-failure
RestartSec=10

TimeoutStartSec=120
TimeoutStopSec=120

[Install]
WantedBy=multi-user.target

五、命令行操作Kafka

1.创建Topic

/usr/local/kafka/bin/kafka-topics.sh --create --topic test-topic --bootstrap-server localhost:9092

2.删除Topic

/usr/local/kafka/bin/kafka-topics.sh --delete --topic test-topic --bootstrap-server localhost:9092

3.发送消息

/usr/local/kafka/bin/kafka-console-producer.sh --topic test-topic --bootstrap-server localhost:9092

4.消费消息

/usr/local/kafka/bin/kafka-console-consumer.sh --topic test-topic --from-beginning --group test-group --command-property group.protocol=consumer --bootstrap-server localhost:9092

--from-beginning:从头消费,消费 topic 中所有的历史消息

--offset latest:从最新开始,可以显式指定,但与默认行为相同(Kafka 2.0+ 支持)

5.查看消费者组详细信息

/usr/local/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test-group

各字段含义

CURRENT-OFFSET:该消费者组当前已消费的偏移量

LOG-END-OFFSET:该分区最新的消息偏移量

LAG:积压量 = LOG-END-OFFSET - CURRENT-OFFSET,这是最重要的监控指标

CONSUMER-ID:消费者的唯一标识。若为 -,表示该分区未被任何消费者分配

6.查看消费者组成员信息

/usr/local/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test-group --members

7.查看消费者组状态

/usr/local/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test-group --state

消费者组状态含义

Stable:正常状态,消费者正在稳定消费

PreparingRebalance:正在准备重平衡(短暂)

CompletingRebalance:正在完成重平衡(短暂)

Empty:有元数据但没有活跃成员(所有消费者已离开)

Dead:消费者组已无任何元数据或成员

8.查看详细的偏移量信息

/usr/local/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test-group --offsets

五、核心配置说明(config/server.properties

参数 推荐值(单机) 说明
node.id 1 节点唯一标识(必须从 1 开始递增且不重复)
listeners PLAINTEXT://:9092 客户端连接地址
advertised.listeners PLAINTEXT://localhost:9092,CONTROLLER://localhost:9093 对外公布的地址(需改为IP)
controller.quorum.voters 1@192.168.10.8:9093,2@192.168.10.10:9093,3@192.168.10.12:9093 所有Controller节点的列表,三个节点必须完全一致
log.dirs /var/lib/kafka 数据存储目录
log.retention.hours 168 消息保留时间(7天)
num.io.threads 8 建议设为CPU核心数

六、日志配置修改(config/log4j2.yaml

Configuration.Properties.Property属性下的namekafka.logs.dirvalue改为/var/log/kafka;将namelogPatternname改为log.patternvalue改为%d{yyyy-MM-dd HH:mm:ss.SSS} [%p]-[%t][%c] %m%n

将所有的${logPattern}改为${log.pattern}

将所有的${sys:kafka.logs.dir}改为${kafka.logs.dir}

将所有的.log.%d{yyyy-MM-dd-HH}改为.%d{yyyyMMdd}.log