概述
RocketMQ事务消息本质是将发送消息和本地数据库操作融合为一个事务,要么一起成功,要么一起失败,不会存在一个操作成功另一操作失败的情况,说明白点就是Broker端与数据库端持久化数据的分布式事务问题。
例如电商系统中,用户下单成功后会得到一定数量的积分奖励,但是订单系统与积分系统是俩个独立的微服务,无法放在一个事务里执行。这种业务场景可以在订单系统中执行数据库insert订单的操作,然后将增加积分以RocketMQ形式异步发送处理。
事务原理
阿里官方的RocketMQ事务消息发送流程图:

RocketMQ参考2PC的分布式事务解决方案,将RocketMQ事务消息的发送分解为俩个阶段:
事务状态回查:
整个流程在执行过程中,可能由于网络或者服务器宕机等原因导致Broker没有收到二次确认请求,那么数据库操作和消息推送就会出现数据一致性问题。因此Broker端会启动一个线程定时扫描本地存储的半消息,并向Producer发起事务状态回查,根据回查结果决定消息的去留。
由事务状态回查机制可以断定,RocketMQ并不能保证分布式事务的强一致性,仅仅保证分布式事务的最终一致性,还不是100%保证。因为Broker存储的半消息同样会被删除策略扫描,如果因为各种原因没能及时处理二次确认,或者消息转移到队列但是Consumer一直消费失败,都会导致Conusmer错过此消息的逻辑处理。
对于Producer端要保证回查接口能正常调用,对于Consumer端要保证一定次数的消费失败后,将消息写入数据库并开启定时器扫描继续处理,然后采用发短信或钉钉、企业微信等办公APP的内部通知方式告知管理员,对消息进行人工排查处理。
代码示例
订单新增业务层:
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 
 | @Slf4j@Service
 public class OrderServiceImpl implements OrderService {
 
 @Autowired
 private OrderMapper orderMapper;
 
 @Transactional
 @Override
 public void insert(OrderInfo orderInfo){
 
 orderMapper.insert(orderInfo);
 
 
 }
 
 @Override
 public boolean existById(String id){
 
 return id == null ? false : orderMapper.existById(id);
 }
 
 }
 
 | 
RocketMQ本地事务监听对象:
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 
 | @Slf4j@Component
 @RocketMQTransactionListener(txProducerGroup = "order_integral_tx_producer_group")
 public class OrderIntegralTransactionListener implements RocketMQLocalTransactionListener {
 
 @Autowired
 private OrderService orderService;
 
 
 
 
 @Override
 public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
 
 try {
 String jsonStr = new String(msg.getBody());
 OrderInfo orderInfo = JSONObject.parseObject(jsonStr, OrderInfo.class);
 orderService.insert(orderInfo);
 }catch (Exception e){
 log.error("本地事务执行异常, 回滚消息", e.getErrorMessage());
 return LocalTransactionState.ROLLBACK_MESSAGE;
 }
 
 return LocalTransactionState.COMMIT_MESSAGE;
 }
 
 
 
 
 @Override
 public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
 
 
 String halfMessage = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);
 OrderInfo orderInfo = JSONObject.parseObject(halfMessage, OrderInfo.class);
 
 
 boolean exist = orderService.existById(orderInfo.getId());
 
 
 RocketMQLocalTransactionState localTransactionState = exist
 ? LocalTransactionState.COMMIT_MESSAGE : LocalTransactionState.ROLLBACK_MESSAGE;
 
 log.info("本地事务状态回查执行完毕, halfMessage:{}, localTransactionState:{}",
 halfMessage, localTransactionState);
 
 
 if(!exist){
 
 }
 
 return localTransactionState;
 }
 }
 
 | 
订单新增控制层:
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 
 | @RestController@RequestMapping("order")
 public class OrderController {
 
 @RequestMapping(value = "/insert", method = RequestMethod.POST)
 public String addUser(@RequestBody OrderInfo orderInfo){
 
 String message = JSONObject.toJSONString(orderInfo);
 Message sendMsg = new Message("ORDER_INTEGRAL", "*", message.getBytes());
 producer.sendMessageInTransaction(sendMsg, null);
 return Result.success("新增成功");
 }
 }
 
 |