概述
RocketMQ事务消息本质是将发送消息和本地数据库操作融合为一个事务,要么一起成功,要么一起失败,不会存在一个操作成功另一操作失败的情况,说明白点就是Broker端与数据库端持久化数据的分布式事务问题。
例如电商系统中,用户下单成功后会得到一定数量的积分奖励,但是订单系统与积分系统是俩个独立的微服务,无法放在一个事务里执行。这种业务场景可以在订单系统中执行数据库insert订单的操作,然后将增加积分以RocketMQ形式异步发送处理。
事务原理
阿里官方的RocketMQ事务消息发送流程图:

RocketMQ参考2PC的分布式事务解决方案,将RocketMQ事务消息的发送分解为俩个阶段:
事务状态回查:
整个流程在执行过程中,可能由于网络或者服务器宕机等原因导致Broker没有收到二次确认请求,那么数据库操作和消息推送就会出现数据一致性问题。因此Broker端会启动一个线程定时扫描本地存储的半消息,并向Producer发起事务状态回查,根据回查结果决定消息的去留。
由事务状态回查机制可以断定,RocketMQ并不能保证分布式事务的强一致性,仅仅保证分布式事务的最终一致性,还不是100%保证。因为Broker存储的半消息同样会被删除策略扫描,如果因为各种原因没能及时处理二次确认,或者消息转移到队列但是Consumer一直消费失败,都会导致Conusmer错过此消息的逻辑处理。
对于Producer端要保证回查接口能正常调用,对于Consumer端要保证一定次数的消费失败后,将消息写入数据库并开启定时器扫描继续处理,然后采用发短信或钉钉、企业微信等办公APP的内部通知方式告知管理员,对消息进行人工排查处理。
代码示例
订单新增业务层:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| @Slf4j @Service public class OrderServiceImpl implements OrderService {
@Autowired private OrderMapper orderMapper;
@Transactional @Override public void insert(OrderInfo orderInfo){
orderMapper.insert(orderInfo); }
@Override public boolean existById(String id){
return id == null ? false : orderMapper.existById(id); } }
|
RocketMQ本地事务监听对象:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
| @Slf4j @Component @RocketMQTransactionListener(txProducerGroup = "order_integral_tx_producer_group") public class OrderIntegralTransactionListener implements RocketMQLocalTransactionListener {
@Autowired private OrderService orderService;
@Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try { String jsonStr = new String(msg.getBody()); OrderInfo orderInfo = JSONObject.parseObject(jsonStr, OrderInfo.class); orderService.insert(orderInfo); }catch (Exception e){ log.error("本地事务执行异常, 回滚消息", e.getErrorMessage()); return LocalTransactionState.ROLLBACK_MESSAGE; }
return LocalTransactionState.COMMIT_MESSAGE; }
@Override public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
String halfMessage = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET); OrderInfo orderInfo = JSONObject.parseObject(halfMessage, OrderInfo.class);
boolean exist = orderService.existById(orderInfo.getId());
RocketMQLocalTransactionState localTransactionState = exist ? LocalTransactionState.COMMIT_MESSAGE : LocalTransactionState.ROLLBACK_MESSAGE;
log.info("本地事务状态回查执行完毕, halfMessage:{}, localTransactionState:{}", halfMessage, localTransactionState);
if(!exist){ }
return localTransactionState; } }
|
订单新增控制层:
1 2 3 4 5 6 7 8 9 10 11 12 13
| @RestController @RequestMapping("order") public class OrderController {
@RequestMapping(value = "/insert", method = RequestMethod.POST) public String addUser(@RequestBody OrderInfo orderInfo){
String message = JSONObject.toJSONString(orderInfo); Message sendMsg = new Message("ORDER_INTEGRAL", "*", message.getBytes()); producer.sendMessageInTransaction(sendMsg, null); return Result.success("新增成功"); } }
|