springboot集成 RocketMQ依赖
1 2 3 4 5 <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.7.0</version> </dependency>
application.properties
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 # 服务端口 server.port=8082 # 生产者分组 rocketmq.producer.groupName=GROUP_A # MQ注册中心地地址 rocketmq.producer.namesrvAddr=localhost:9876 # 消息最大长度 默认 1024 * 4 (4M) rocketmq.producer.maxMessageSize = 4096 # 发送消息超时时间,默认 3000 rocketmq.producer.sendMsgTimeOut=3000 # 发送消息失败重试次数,默认2 rocketmq.producer.retryTimesWhenSendFailed=2
将生产者注册到IOC
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 @Configuration public class RocketMQProducerConfig { @Value("${rocketmq.producer.groupName}") private String groupName; @Value("${rocketmq.producer.namesrvAddr}") private String namesrvAddr; @Value("${rocketmq.producer.maxMessageSize}") private Integer maxMessageSize; @Value("${rocketmq.producer.sendMsgTimeOut}") private Integer sendMsgTimeOut; @Value("${rocketmq.producer.retryTimesWhenSendFailed}") private Integer retryTimesWhenSendFailed; @Bean public DefaultMQProducer defaultProducer () throws MQClientException { DefaultMQProducer producer = new DefaultMQProducer(groupName); producer.setNamesrvAddr(namesrvAddr); producer.setVipChannelEnabled(false ); producer.setMaxMessageSize(maxMessageSize); producer.setSendMsgTimeout(sendMsgTimeOut); producer.setRetryTimesWhenSendAsyncFailed(retryTimesWhenSendFailed); producer.start(); return producer; } }
路由选择 DefaultMQProducer类提供了fetchPublishMessageQueues方法,可以查看当前Producer从NameServer同步过来的topic路由信息:
1 List<MessageQueue> messageQueueList = producer.fetchPublishMessageQueues("TEST_TOPIC" );
Producer向某个topic发送消息时,如果从NameServer同步的路由信息中发现,此topic在多个Broker中都存在队列,那么就需要依靠某种策略从中选择一个进行发送,这个策略由MQFaultStrategy类的selectOneMessageQueue方法实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public MessageQueue selectOneMessageQueue (String lastBrokerName) { if (lastBrokerName == null ) { return this .selectOneMessageQueue(); } else { int index = this .sendWhichQueue.getAndIncrement(); for (int i = 0 ; i < this .messageQueueList.size(); ++i) { int pos = Math.abs(index++) % this .messageQueueList.size(); if (pos < 0 ) { pos = 0 ; } MessageQueue mq = (MessageQueue)this .messageQueueList.get(pos); if (!mq.getBrokerName().equals(lastBrokerName)) { return mq; } } return this .selectOneMessageQueue(); } }
Producer采用轮询的方式发送消息,每次发送消息时调用无参的selectOneMessageQueue()方法,这个方法就是一个单纯的轮询策略。只是消息并不能保证每次都会发送成功,如果发送失败并且设置了重试次数,那么Producer会调用有参的selectOneMessageQueue()方法,将发送失败的Broker名称作为参数传进去,并且此次轮询会忽略掉这个Broker名称。
当消息经过重试发送成功后,有问题的Broker并没有被记录下来,这就导致下次发送消息时并不会知晓这个Broker有问题,仍然在轮询策略的选择范围内,仍然有发送失败的情况出现。想要避免此问题,就需要开启延迟故障。
延迟故障 延迟故障功能通过创建Producer时调用setSendLatencyFaultEnable(true)开启(默认false),开启后对于高延迟、有故障的Broker都会记录下来,并且根据故障的严重程度,给予一段不可用的时间。具体的规则维护在MQFaultStrategy类的俩个数组中:
1 2 private long [] latencyMax = new long []{50L , 100L , 550L , 1000L , 2000L , 3000L , 15000L };private long [] notAvailableDuration = new long []{0L , 0L , 30000L , 60000L , 120000L , 180000L , 600000L };
latencyMax数组元素表示各种延迟的时间,notAvailableDuration数组元素表示各种不可用时间,这俩个数组的同一个下坐标对应的值就是延迟时间对应的惩罚时间。由此可以看到100ms以内的延迟都是正常的,其余的规定时间内会排除掉,从正常的Broker中轮询获取。
如果在极端的情况,所有的Broker延迟都高于100,都被视为故不可用的故障节点,这段时间的消息发送如何处理呢。RocketMQ提供了pickOneAtLeast()方法,从故障Broker列表中选择最优的一个,也就是俗话矮子里面拔将军:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public String pickOneAtLeast () { final Enumeration<FaultItem> elements = this .faultItemTable.elements(); List<FaultItem> tmpList = new LinkedList<FaultItem>(); while (elements.hasMoreElements()) { final FaultItem faultItem = elements.nextElement(); tmpList.add(faultItem); } if (!tmpList.isEmpty()) { Collections.shuffle(tmpList); Collections.sort(tmpList); final int half = tmpList.size() / 2 ; if (half <= 0 ) { return tmpList.get(0 ).getName(); } else { final int i = this .whichItemWorst.getAndIncrement() % half; return tmpList.get(i).getName(); } } return null ; }
从故障Broker列表中按照规则排序,然后从前半部分中轮询获取一个作为发送的目的地,因此重点在于排序规则,这就需要查看FaultItem类重写的排序方法了:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public int compareTo (final FaultItem other) { if (this .isAvailable() != other.isAvailable()) { if (this .isAvailable()) return -1 ; if (other.isAvailable()) return 1 ; } if (this .currentLatency < other.currentLatency) return -1 ; else if (this .currentLatency > other.currentLatency) { return 1 ; } if (this .startTimestamp < other.startTimestamp) return -1 ; else if (this .startTimestamp > other.startTimestamp) { return 1 ; } return 0 ; }
这里并没有将选择结果固定为排序规则中最好的那个,主要是为了避免topic的消息聚集在一个Broker的队列中造成负担过重,负载均衡到所有故障Broker又失去了故障延迟的初衷,因此选择折中的方式,将前50%的节点采用轮询方式进行负载均衡。
发送方式 同步发送
同步发送消息时,线程进入阻塞状态,直到发送完毕返回SendResult类
如果发送失败,会在默认的超时时间进行重试,最多重试俩次
返回SendResult类,并不代表发送成功,需要根据sendStatus来判断是否成功
可以根据返回的结果作相应处理,因此理论上不会出现消息丢失(可靠)
1 2 3 4 5 6 7 8 @Autowired private DefaultMQProducer producer;Message sendMsg = new Message("ORDER_REMINDER" , "LOGISTICS" , message.getBytes()); SendResult sendResult = producer.send(sendMsg);
异步发送
异步调用不存在返回值,投递的结果信息SendResult类在成功回调onSuccess()方法中
异步发送没有retry机制,投递失败回调onException()方法
异步调用主要请求耗时过长,或者对响应时间过于敏感的请求,比如大数据量的excel导入,选择用户群体后批量推送消息等
可以根据onSuccess()方法返回的结果作相应处理,因此理论上不会出现消息丢失(可靠)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @Autowired private DefaultMQProducer producer;Message sendMsg = new Message("TEST_TOPIC" , "tag" , message.getBytes()); producer.send(sendMsg, new SendCallback(){ @Override public void onSuccess (SendResult sendResult) { } @Override public void onException (Throwable e) { } });
单向发送
单向发送仅仅处理消息的发送,由于发送是否成功无法知晓,因此有几率造成数据丢失(不可靠)
优点在于发送消息的耗时非常短,一般在微秒级别
1 2 3 4 5 6 7 8 @Autowired private DefaultMQProducer producer;Message sendMsg = new Message("TEST_TOPIC" , "tag" , message.getBytes()); producer.sendOneway(sendMsg);
发送特点 顺序发送 Producer将消息发送到Broker节点后,Broker会找到对应的队列文件,并采用追加的方式将消息内容写入文件中,可以理解为RocketMQ可以保证队列级别的消息顺序。因此我们只需要将同一类数据发送到同一个队列中,就可以保证消息的顺序发送,Consumer在进行消费时也就毫无悬念的顺序消费了。
最常见的顺序消费场景就是使用canal或maxwell来订阅Mysql数据库的Binlog,同一条数据会产生多条操作日志,如果不做任何措施可能会导致Insert日志和Update日志被投递到不同的Broker中,这就有很大几率出现Update日志抢先在Insert日志处理前被消费处理,造成数据不一致的情况。对此最常见的解决方案是提取消息的业务唯一标示(比如id等),和队列长度取余运算定位需要发送的Broker的节点进行发送。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 @Autowired private DefaultMQProducer producer;Message sendMsg = new Message("TEST_TOPIC" , "tag" , message.getBytes()); Long id = 666L ; SendResult sendResult = producer.send(msg, new MessageQueueSelector() { @Override public MessageQueue select (List<MessageQueue> mqs, Message msg, Object arg) { Long id = (Long) arg; int index = id % mqs.size(); return mqs.get(index); }, id); }
延迟发送 延迟消息的使用场景很多,例如电商系统的订单业务场景,用户下单后发送一个30分钟的延迟消息,30分钟后Consumer会收到此消息,然后检查订单是否已付款,如果未付款执行自动取消逻辑处理。
1 2 3 4 5 6 7 8 9 10 @Autowired private DefaultMQProducer producer;Message sendMsg = new Message("TEST_TOPIC" , "tag" , message.getBytes()); sendMsg.setDelayTimeLevel(2 ); SendResult sendResult = producer.send(sendMsg);
RocketMQ商业版本支持任意精度的延迟消息,而开源版本仅仅支持18个特定时间的延迟功能,通过setDelayTimeLevel()方法进行控制,具体的延迟等级与延迟时间的对应关系如下:
延迟等级
延迟时间
1
1秒
2
5秒
3
10秒
4
30秒
5
1分钟
6
2分钟
7
3分钟
8
4分钟
9
5分钟
10
6分钟
11
7分钟
12
8分钟
13
9分钟
14
10分钟
15
20分钟
16
30分钟
17
1小时
18
2小时
Producer发送延迟消息时,消息首先会被投递到名为SCHEDULE_TOPIC_XXXX的topic中,这个topic是集群自动创建的,可以在控制台的Topic页面中勾选SYSTEM进行REFRESH查询,点开此topic的路由信息可以看到每个Broker节点都负责此topic的18个读写队列。
Producer会根据延迟等级值来决定将消息发送到SCHEDULE_TOPIC_XXXX的哪个队列号中,紧接着将真实的topic和queueId设置到Message的propertiesString属性中,然后选择一个Broker进行发送。每个Broker的SCHEDULE_TOPIC_XXXX的每个queue,都会对应一个定时器去刷新,是否有到达时间需要被发送的消息,若有就从propertiesString取出真实的topic和queueId发送出去。