以前写过一篇Kafka结合zookeeper组件集群,最近Kafka发布了3.x版本,不在需要zookeeper也可以完成集群搭建,测试一下。
kafka分布式消息队列部署
zookeeper单机和集群部署
1、服务器环境
2、安装JDK,JDK会提供 jps
命令,来查看相关的Kafka进程 ps -ef |grep kafka
一样
yum install java-1.8.0-openjdk java-1.8.0-openjdk-devel -y
3、下载Kafka软件包
cd /data
wget https://dlcdn.apache.org/kafka/3.1.0/kafka_2.13-3.1.0.tgz
tar xf kafka_2.13-3.1.0.tgz
4、修改配置文件
# 修改配置文件:/data/kafka_2.13-3.1.0/config/kraft/server.properties
# kafka1配置
process.roles=broker,controller
node.id=1
controller.quorum.voters=1@172.16.1.210:9093,2@172.16.1.211:9093,3@172.16.1.212:9093
listeners=PLAINTEXT://172.16.1.210:9092,CONTROLLER://172.16.1.210:9093
advertised.listeners=PLAINTEXT://172.16.1.210:9092
# kafka2配置
process.roles=broker,controller
node.id=2
controller.quorum.voters=1@172.16.1.210:9093,2@172.16.1.211:9093,3@172.16.1.212:9093
listeners=PLAINTEXT://172.16.1.211:9092,CONTROLLER://172.16.1.211:9093
advertised.listeners=PLAINTEXT://172.16.1.211:9092
# kafka3配置
process.roles=broker,controller
node.id=3
controller.quorum.voters=1@172.16.1.210:9093,2@172.16.1.211:9093,3@172.16.1.212:9093
listeners=PLAINTEXT://172.16.1.212:9092,CONTROLLER://172.16.1.212:9093
advertised.listeners=PLAINTEXT://172.16.1.212:9092
5、启动kafka
# 1、设置集群的uuid
[root@k8s-node2 kraft]# /data/kafka_2.13-3.1.0/bin/kafka-storage.sh random-uuid
NI66tP2zRy-aVryItSoQkw
# 2、格式化kafka数据 (3台节点都需要执行)
# 初次格式化【二选一】
/data/kafka_2.13-3.1.0/bin/kafka-storage.sh format -t NI66tP2zRy-aVryItSoQkw -c /data/kafka_2.13-3.1.0/config/kraft/server.properties
# 强制格式化,覆盖格式化【二选一】
/data/kafka_2.13-3.1.0/bin/kafka-storage.sh format -t NI66tP2zRy-aVryItSoQkw -c /data/kafka_2.13-3.1.0/config/kraft/server.properties --ignore-formatted
# 查看配置
[root@k8s-master1 kraft]# cat /tmp/kraft-combined-logs/meta.properties
#
#Thu Mar 03 21:07:32 CST 2022
cluster.id=NI66tP2zRy-aVryItSoQkw
version=1
node.id=1
# 3、前台启动kafka节点 (3台节点都需要执行)
/data/kafka_2.13-3.1.0/bin/kafka-server-start.sh /data/kafka_2.13-3.1.0/config/kraft/server.properties
# 3.1、后台启动kafka节点 (3台节点都需要执行)
/data/kafka_2.13-3.1.0/bin/kafka-server-start.sh -daemon /data/kafka_2.13-3.1.0/config/kraft/server.properties
# 4、Kafka停止
/data/kafka_2.13-3.1.0/bin/kafka-server-stop.sh
5.1、查看Kraft主节点/从节点(类似于zookeeper选举)
# 查看kafka启动日志
cat /data/kafka_2.13-3.1.0/logs/server.log |grep RaftManager
# cat /data/kafka_2.13-3.1.0/logs/server.log |grep -e 'RaftManager' |grep -ie 'leader' -ie 'FollowerState'
主节点日志:[2022-03-04 08:44:53,137] INFO [RaftManager nodeId=1] Completed transition to Leader
从节点日志:[2022-03-04 08:44:53,407] INFO [RaftManager nodeId=2] Completed transition to FollowerState
6、在kafka1上进行消息写入与读取测试
# 在kafka1创建topics 1副本6分区
/data/kafka_2.13-3.1.0/bin/kafka-topics.sh --create --topic test-16 --partitions 6 --replication-factor 1 --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092
# 在kafka1创建topics 2副本(--replication-factor)3分区(--partitions)
/data/kafka_2.13-3.1.0/bin/kafka-topics.sh --create --topic test-13 --partitions 3 --replication-factor 1 --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092
# 写入测试:总记录数:--num-records 单条消息大小(单位:b):--record-size
/data/kafka_2.13-3.1.0/bin/kafka-producer-perf-test.sh --num-records 5000000 --record-size 100 --topic test-16 --throughput -1 --producer-props bootstrap.servers=kafka1:9092,kafka2:9092,kafka3:9092 acks=-1 linger.ms=2000 compression.type=lz4
/data/kafka_2.13-3.1.0/bin/kafka-producer-perf-test.sh --num-records 5000000 --record-size 100 --topic test --throughput -1 --producer-props bootstrap.servers=kafka1:9092,kafka2:9092,kafka3:9092
/data/kafka_2.13-3.1.0/bin/kafka-producer-perf-test.sh --num-records 5000000 --record-size 100 --topic test-33 --throughput -1 --producer-props bootstrap.servers=kafka1:9092,kafka2:9092,kafka3:9092
# 消费读取测试
/data/kafka_2.13-3.1.0/bin/kafka-consumer-perf-test.sh --broker-list kafka1:9092 --messages 5000000 --topic test-16
7、Topic创建/查看/删除/批量删除
# 在kafka1创建topics 2副本(--replication-factor)3分区(--partitions)
/data/kafka_2.13-3.1.0/bin/kafka-topics.sh --create --topic test-13 --partitions 3 --replication-factor 1 --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092
# 删除topic
/data/kafka_2.13-3.1.0/bin/kafka-topics.sh --delete --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --topic test-33
# 批量删除topic
/data/kafka_2.13-3.1.0/bin/kafka-topics.sh --delete --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --topic test-13,test-11,test
# 列出所有topic
/data/kafka_2.13-3.1.0/bin/kafka-topics.sh --list --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092
# 查看topic详情
/data/kafka_2.13-3.1.0/bin/kafka-topics.sh --describe --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --topic test-13
# 修改topic分区数量(只能扩不能缩容)
/data/kafka_2.13-3.1.0/bin/kafka-topics.sh --alter --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --topic test-13 --partitions 8
# 查看topic对应的消息数量
/data/kafka_2.13-3.1.0/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --topic test-13 --time -1
# 清除topic里面的数据
/data/kafka_2.13-3.1.0/bin/kafka-topics.sh --alter --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --topic test-16 --config retention.ms=1000
# 控制台消息写入测试(任意台可写入消息/也可接受消息)
/data/kafka_2.13-3.1.0/bin/kafka-console-producer.sh --broker-list kafka1:9092 --topic test-16
# 在kafka1写入消息
[root@k8s-master1 ~]# /data/kafka_2.13-3.1.0/bin/kafka-console-producer.sh --broker-list kafka1:9092 --topic test-13
>hello kafka
>test-13
>
# 控制台读取消息测试(消费者取消息)
/data/kafka_2.13-3.1.0/bin/kafka-console-consumer.sh --bootstrap-server kafka2:9092 --topic test-16 --from-beginning
# 从kafka2和kafka3接收消息
# kafka2
[root@k8s-node1 ~]# /data/kafka_2.13-3.1.0/bin/kafka-console-consumer.sh --bootstrap-server kafka2:9092 --topic test-13 --from-beginning
hell
hello kafka
hello kafka
test-13
# kafka3
[root@k8s-node2 ~]# /data/kafka_2.13-3.1.0/bin/kafka-console-consumer.sh --bootstrap-server kafka3:9092 --topic test-13 --from-beginning
hell
hello kafka
hello kafka
test-13
8、Kafka常用运维命令
# 1、查看所有用户组
/data/kafka_2.13-3.1.0/bin/kafka-consumer-groups.sh --bootstrap-server kafka1:9092 --list
# 2、查看消息消费情况(可根据LAG列,确认消息是否有积压)
/data/kafka_2.13-3.1.0/bin/kafka-consumer-groups.sh --describe --bootstrap-server kafka1:9092 --group perf-consumer-38961
8、参考文档
kafka shell 命令:https://shirenchuang.blog.csdn.net/article/details/118215928
kafka 总结:https://blog.csdn.net/weixin_39410864/article/details/123080540
还没有任何评论,你来说两句吧