Broker的总体工作流程

20220408104831.png

  1. broker启动后在ZK中注册
  2. 每个Broker中都有一个Controller,zookeeper中的controller信息是哪个Broker中的Controller先注册上,就是哪个Broker中的Controller说的算。
  3. 由选举出来的Controller监听Zookeeper中brokers节点数据的变化情况。
  4. 由Controller进行Leader选举(实时监听Broker的上下线状态,并进行相应操作)
    1. AR:Kafka分区中的所有副本系统:Zookeeper中/broker/ids/ 中的数据
    2. 选举规则:在ISR中存活为前提,按照AR中排在前面的优先。例如AR[1,0,2],ISR[0,1,2],那么Controller就会按照[1,0,2]的顺序,每次都从头开始寻找第一个存活的节点作为Leader。
  5. Controller将选举结果和节点信息保存到Zookeeper中;
  6. 其他Controller从ZK同步相关信息;

Leader挂掉的选举流程

20220408105012.png

  1. 假设Broker1中的Leader挂了;
  2. Controller监听到节点变化;
  3. 从ZK中获取ISR信息;
  4. 选举新的Leader(在ISR中存活为前提,按照AR中排在最前面的优先);
  5. 更新ZK中的Leader和ISR信息;

节点的服役

新节点准备

  1. 安装必要的依赖和软件
  2. 加入到集群中
  3. 启动节点上的Kafka

==注意==:此时旧的topics不会使用新节点进行存储数据,如果想要旧节点的数据存储在新节点上或者迁移到新节点上,需要使用负载均衡

负载均衡

1、创建一个负载均衡主题

[root@VM-4-10-centos ~]# vim topics-to-move.json

{
	"topics": [{
		"topic": "first"
	}],
	"version": 1
}

2、生成一个负载均衡计划

[root@VM-4-10-centos kafka_2.12-3.1.0]# bin/kafka-reassign-partitions.sh --bootstrap-server 127.0.0.1:9092
--topics-to-move-json-file topics-to-move.json --broker-list "0,1,2,3" --generate

==注意==:这个只是生成计划,后续需要根据计划进行执行操作

20220411222511.png

3、创建副本存储计划

将生成的计划写入副本存储计划文件increase-replication-factor.json

20220411222740.png

4、执行副本存储计划

[root@VM-4-10-centos kafka_2.12-3.1.0]# bin/kafka-reassign-partitions.sh --bootstrap-server 127.0.0.1:9092
--reassignment-json-file increase-replication-factor.json --execute

5、验证副本存储计划

[root@VM-4-10-centos kafka_2.12-3.1.0]# bin/kafka-reassign-partitions.sh --bootstrap-server 127.0.0.1:9092
--reassignment-json-file increase-replication-factor.json --verify

节点的退役

执行负载均衡

1、创建一个负载均衡主题

同新节点服役中的该步骤

2、生成一个负载均衡计划

==注意==:此处与节点的服役不同,如果你想要将节点2退役,那么在创建计划时,需要将节点2排除,保留[0,1,3]

[root@VM-4-10-centos kafka_2.12-3.1.0]# bin/kafka-reassign-partitions.sh --bootstrap-server 127.0.0.1:9092
--topics-to-move-json-file topics-to-move.json --broker-list "0,1,3" --generate

3、创建副本存储计划

将生成的计划写入副本存储计划文件increase-replication-factor.json

4、执行副本存储计划

[root@VM-4-10-centos kafka_2.12-3.1.0]# bin/kafka-reassign-partitions.sh --bootstrap-server 127.0.0.1:9092
--reassignment-json-file increase-replication-factor.json --execute

5、验证副本存储计划

[root@VM-4-10-centos kafka_2.12-3.1.0]# bin/kafka-reassign-partitions.sh --bootstrap-server 127.0.0.1:9092
--reassignment-json-file increase-replication-factor.json --verify

退役节点停机

1、停止退役节点机器上的Kafka

2、对机器进行任意的其他操作…