概述

RocketMQ事务消息本质是将发送消息和本地数据库操作融合为一个事务,要么一起成功,要么一起失败,不会存在一个操作成功另一操作失败的情况,说明白点就是Broker端与数据库端持久化数据的分布式事务问题。

例如电商系统中,用户下单成功后会得到一定数量的积分奖励,但是订单系统与积分系统是俩个独立的微服务,无法放在一个事务里执行。这种业务场景可以在订单系统中执行数据库insert订单的操作,然后将增加积分以RocketMQ形式异步发送处理。

事务原理

阿里官方的RocketMQ事务消息发送流程图:
图片

RocketMQ参考2PC的分布式事务解决方案,将RocketMQ事务消息的发送分解为俩个阶段:

  • 第一阶段:发送消息,Producer把消息发送到Broker,但是此时该消息还不能被投递给消费者,此时消息的状态被称为半消息。

  • 第二阶段:请求Broker进行消息的二次确认,根据本地数据库事务执行的成功或失败,对半消息进行提交或者回滚,成功提交的消息才可以被投递给消费者,回滚的消息会被删除。

事务状态回查:
整个流程在执行过程中,可能由于网络或者服务器宕机等原因导致Broker没有收到二次确认请求,那么数据库操作和消息推送就会出现数据一致性问题。因此Broker端会启动一个线程定时扫描本地存储的半消息,并向Producer发起事务状态回查,根据回查结果决定消息的去留。

由事务状态回查机制可以断定,RocketMQ并不能保证分布式事务的强一致性,仅仅保证分布式事务的最终一致性,还不是100%保证。因为Broker存储的半消息同样会被删除策略扫描,如果因为各种原因没能及时处理二次确认,或者消息转移到队列但是Consumer一直消费失败,都会导致Conusmer错过此消息的逻辑处理。

对于Producer端要保证回查接口能正常调用,对于Consumer端要保证一定次数的消费失败后,将消息写入数据库并开启定时器扫描继续处理,然后采用发短信或钉钉、企业微信等办公APP的内部通知方式告知管理员,对消息进行人工排查处理。

代码示例

订单新增业务层:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Slf4j
@Service
public class OrderServiceImpl implements OrderService {

@Autowired
private OrderMapper orderMapper;

@Transactional
@Override
public void insert(OrderInfo orderInfo){

orderMapper.insert(orderInfo);

// 其他逻辑...
}

@Override
public boolean existById(String id){

return id == null ? false : orderMapper.existById(id);
}

}

RocketMQ本地事务监听对象:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
@Slf4j
@Component
@RocketMQTransactionListener(txProducerGroup = "order_integral_tx_producer_group")
public class OrderIntegralTransactionListener implements RocketMQLocalTransactionListener {

@Autowired
private OrderService orderService;

/**
* 半消息发送成功后需要执行的本地事务
*/
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {

try {
String jsonStr = new String(msg.getBody());
OrderInfo orderInfo = JSONObject.parseObject(jsonStr, OrderInfo.class);
orderService.insert(orderInfo);
}catch (Exception e){
log.error("本地事务执行异常, 回滚消息", e.getErrorMessage());
return LocalTransactionState.ROLLBACK_MESSAGE;
}

return LocalTransactionState.COMMIT_MESSAGE;
}

/**
* Broker进行事务状态回查调用的方法
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {

// 解析半消息
String halfMessage = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);
OrderInfo orderInfo = JSONObject.parseObject(halfMessage, OrderInfo.class);

// 查询数据库是否存在
boolean exist = orderService.existById(orderInfo.getId());

// 根据订单查询结果决定返回状态
RocketMQLocalTransactionState localTransactionState = exist
? LocalTransactionState.COMMIT_MESSAGE : LocalTransactionState.ROLLBACK_MESSAGE;

log.info("本地事务状态回查执行完毕, halfMessage:{}, localTransactionState:{}",
halfMessage, localTransactionState);

// 失败的最好记录下来,方便数据异常排查
if(!exist){
// 记录到数据库或NoSQL
}

return localTransactionState;
}
}

订单新增控制层:

1
2
3
4
5
6
7
8
9
10
11
12
13
@RestController
@RequestMapping("order")
public class OrderController {

@RequestMapping(value = "/insert", method = RequestMethod.POST)
public String addUser(@RequestBody OrderInfo orderInfo){

String message = JSONObject.toJSONString(orderInfo);
Message sendMsg = new Message("ORDER_INTEGRAL", "*", message.getBytes());
producer.sendMessageInTransaction(sendMsg, null);
return Result.success("新增成功");
}
}

评论

Powered By Valine
v1.4.14