重复消费
Rebalance
集群模式下,由于各种原因导致触发rebalance,可能出现某个Consumer已经消费完一条消息,但是并没有向Broker提交offset,导致rebalance结束后,Consumer再次拉取此条数据进行消费。
TIMEOUT
Consumer没有及时向Broker反馈消息的处理状态,或者网络原因导致Broker没有收到反馈,Broker判定为TIMEOUT并触发重试机制,导致消息重复消费。
instanceName重复
RocketMQ会通过ClientConfig类的buildMQClientId方法为每个Consumer生成唯一标示cid:
1 | public String buildMQClientId() { |
如果某个消费组的Consumer完全在部署在一个服务器的不同Docker容器中,会出现instanceName相同的情况,默认情况下会导致不同Consumer的唯一标示cid相同。前面再均衡章节有讲述过重新分配是对cid集合、队列集合提取并排序,然后根据策略重新给Consumer分配队列。
如果Consumer的cid相同,会被分配到同一个队列进行消息拉取消费,也就是说一个队列会被同一消费组内多个Consumer订阅,那么投递到此队列的所有消息都会出现重复消费,甚至退化为广播模式。
分配策略不同
这个场景只是个人猜测,rebalance过程中,Consumer各自通过分配策略计算出自身需要拉取的队列,如果同一消费组内不同的Consumer设置的策略不同,会出现计算结果有重叠的队列,也就意味着一条消息被会被多个Consumer消费,造成消息重复。
解决方案
根据业务逻辑做幂等处理,或者使用redis等中间件对相关数据进行加锁,如果Consumer部署在同一个服务器的不同Docker容器中,则需要在Consumer手动设置随机instanceName(比如UUID)。
消息丢失
Producer单向发送
Producer如果采用单向发送消息的方式,无法知晓消息是否发送成功,如果因为网络等原因Broker没有收到消息并落盘,会导致消息丢失。
异步刷盘
RocketMQ的异步刷盘机制,采用pageCache减少磁盘IO次数,并由操作系统定时将pageCache中的数据IO到磁盘中,如果消息存储在pageCache中还未落盘就发生宕机,会导致这部分消息丢失。
Broker无备份
即使Broker采用同步刷盘方式保存消息数据,由于特殊情况导致磁盘损坏,没有任何备份的情况下该Broker保存的所有消息全部丢失。
解决方案
- Producer端使用同步发送机制,如果有必要采用事务消息机制,确保消息投递到Broker进行存储。
- Broker端开启主从模式并且复制方式设置为同步复制。
- Consumer端确保消息处理完毕后提交offset,而不是接收消息后交给线程异步处理直接提交offset。