重复消费

Rebalance
集群模式下,由于各种原因导致触发rebalance,可能出现某个Consumer已经消费完一条消息,但是并没有向Broker提交offset,导致rebalance结束后,Consumer再次拉取此条数据进行消费。

TIMEOUT
Consumer没有及时向Broker反馈消息的处理状态,或者网络原因导致Broker没有收到反馈,Broker判定为TIMEOUT并触发重试机制,导致消息重复消费。

instanceName重复
RocketMQ会通过ClientConfig类的buildMQClientId方法为每个Consumer生成唯一标示cid:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public String buildMQClientId() {
StringBuilder sb = new StringBuilder();

// 本地IP地址
sb.append(this.getClientIP());
sb.append("@");
// jvm进程ID
sb.append(this.getInstanceName());

// unitName可以在启动时指定,默认null
if (!UtilAll.isBlank(this.unitName)) {
sb.append("@");
sb.append(this.unitName);
}

return sb.toString();
}

如果某个消费组的Consumer完全在部署在一个服务器的不同Docker容器中,会出现instanceName相同的情况,默认情况下会导致不同Consumer的唯一标示cid相同。前面再均衡章节有讲述过重新分配是对cid集合、队列集合提取并排序,然后根据策略重新给Consumer分配队列。

如果Consumer的cid相同,会被分配到同一个队列进行消息拉取消费,也就是说一个队列会被同一消费组内多个Consumer订阅,那么投递到此队列的所有消息都会出现重复消费,甚至退化为广播模式。

分配策略不同
这个场景只是个人猜测,rebalance过程中,Consumer各自通过分配策略计算出自身需要拉取的队列,如果同一消费组内不同的Consumer设置的策略不同,会出现计算结果有重叠的队列,也就意味着一条消息被会被多个Consumer消费,造成消息重复。

解决方案
根据业务逻辑做幂等处理,或者使用redis等中间件对相关数据进行加锁,如果Consumer部署在同一个服务器的不同Docker容器中,则需要在Consumer手动设置随机instanceName(比如UUID)。

消息丢失

Producer单向发送
Producer如果采用单向发送消息的方式,无法知晓消息是否发送成功,如果因为网络等原因Broker没有收到消息并落盘,会导致消息丢失。

异步刷盘
RocketMQ的异步刷盘机制,采用pageCache减少磁盘IO次数,并由操作系统定时将pageCache中的数据IO到磁盘中,如果消息存储在pageCache中还未落盘就发生宕机,会导致这部分消息丢失。

Broker无备份
即使Broker采用同步刷盘方式保存消息数据,由于特殊情况导致磁盘损坏,没有任何备份的情况下该Broker保存的所有消息全部丢失。

解决方案

  • Producer端使用同步发送机制,如果有必要采用事务消息机制,确保消息投递到Broker进行存储。
  • Broker端开启主从模式并且复制方式设置为同步复制。
  • Consumer端确保消息处理完毕后提交offset,而不是接收消息后交给线程异步处理直接提交offset。

评论