简单介绍

在集群消费模式下,消费组内各Consumer均摊订阅的队列信息,也就是消费端的负载均衡机制。在消费过程中如果队列、消费组信息发生变化,则需要根据分配策略对组内所有Consumer重新分配队列,这个过程就是再均衡(rebalance)。

队列变化

1.当Broker节点因为宕机、停止等原因导致服务不可用时,触发一次rebalance,重新分配消费组队列:
图片

2.当故障Broker节点重新启动后,触发一次rebalance,重新分配消费组的订阅队列:
图片

3.当Broker节点的某个topic进行队列扩容时(为了明显一点,使用环形分配策略):
图片

4.当Broker节点的某个topic进行队列缩容时:
图片

消费组变化

1.当消费组启动一个Consumer时,触发一次rebalance,重新分配消费组队列:
图片

2.当消费组某个Consumer因为宕机、停止、网络异常导致无法与Broker心跳等原因停止消费时,触发一次rebalance,重新分配消费组队列:
图片

rebalance过程

整个过程由Broker检测并通知Consumer端,Consumer端接收到通知后调用RebalanceImpl类的rebalanceByTopic()方法进行rebalance。rebalance是以topic+消费组为粒度对队列进行重分配的,例如某个组订阅了5个topic,那么会遍历这5个topic并分别调用rebalanceByTopic()方法执行rebalance。

图片

注意:由于rebalance都是通过Broker端发起通知,为了防止特殊原因导致Consumer端没有收到通知错过rebalance,Consumer端会开启RebalanceService线程,每隔10秒钟对订阅的topic进行一次rebalance。

分配策略

平均分配策略(默认)(AllocateMessageQueueAveragely)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,List<String> cidAll) {

//省略参数校验代码...

// 获取当前Consumer在消费者集合(cidAll)中下标的位置
int index = cidAll.indexOf(currentCID);

/**
* Queue数除以Consumer数取余
* 如果mod等于0,说明Queue可以被平均分配
* 如果mod大于0,说明不可以被平均分配,一定程度平均分配后仍剩余mod个队列
*/
int mod = mqAll.size() % cidAll.size();

/**
* averageSize代表当前Consumer可以得到的队列数
* Queue数小于等于Consumer,averageSize=1,也就是平均分配1个队列(虽然靠后的Consumer可能啥都没分到)
* Queue数大于Consumer的情况下,根据Queue能否被Consumer平均分配继续处理:
* 如果能平均分配,直接计算Queue数除以Consumer数得到平均值
* 如果不能平均分配,继续判断当前Consumer在cidAll的index是否小于mod:
* 如果小于则说明最大化平均后剩余的mod个队列还有自己一份,averageSize = Queue数除以Consumer数取整后+1
* 如果大于等于说明最大化平均后剩余的mod个队列和自己没缘分了,averageSize = Queue数除以Consumer数取整
*/
int averageSize =
mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
+ 1 : mqAll.size() / cidAll.size());

/**
* 后续会遍历mqAll集合提取属于当前Consumer的Queue,这里需要计算遍历的起始坐标
* 如果不能平均消费,且剩余的mod个队列有自己一份,startIndex = index * averageSize
* 如果不能平均消费,且剩余的mod个队列和自己无缘,startIndex = index * averageSize + mod
*/
int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;

// 根据Math.min()计算消费者最终需要消费的数量
int range = Math.min(averageSize, mqAll.size() - startIndex);

// 遍历提取
for (int i = 0; i < range; i++) {
result.add(mqAll.get((startIndex + i) % mqAll.size()));
}
return result;
}

各种情况下Queue分配情况:
图片

环形分配策略(AllocateMessageQueueAveragelyByCircle)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) {

//省略参数校验代码...

// 获取当前Consumer在消费者集合(cidAll)中下标的位置
int index = cidAll.indexOf(currentCID);

// 以index作为起点遍历Queue集合
for (int i = index; i < mqAll.size(); i++) {
// 对下标取模(mod), 如果与index相等, 则存储到result集合中
if (i % cidAll.size() == index) {
result.add(mqAll.get(i));
}
}
return result;
}

各种情况下Queue分配情况:
图片

手动配置分配策略(AllocateMessageQueueByConfig)
自己发挥

机房分配策略(AllocateMessageQueueByMachineRoom)
感觉与平均分配策略的逻辑差不多,就近机房的逻辑判断没看懂

一致性哈希分配策略(AllocateMessageQueueConsistentHash)
牵涉到一致性哈希算法,以后在更新

与kafka区别

Kafka是在消费组的众多Consumer中,选举一个作为Group Leader,由这个Group Leader来计算出分配结果并同步给其他Consumer,这种机制很好的避免了脑裂问题。

RocketMQ中,Consumer端无论是主动收到通知、还是RebalanceService线程触发的rebalance,都是每个Consumer自己按照分配策略给自己重新分配队列。如果消费组内各Consumer设置的分配策略以及获取的队列信息相同,那么计算出的分配结果也是相同的,不会有任何问题,如果不相同会导致脑裂,造成多个Consumer抢夺一个队列,有的队列无人问津。

评论