springboot集成 RocketMQ依赖 
1 2 3 4 5 <dependency> 	<groupId>org.apache.rocketmq</groupId> 	<artifactId>rocketmq-client</artifactId> 	<version>4.7.0</version> </dependency> 
application.properties 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 # 服务端口 server.port=8082 # 生产者分组 rocketmq.producer.groupName=GROUP_A # MQ注册中心地地址 rocketmq.producer.namesrvAddr=localhost:9876 # 消息最大长度 默认 1024 * 4 (4M) rocketmq.producer.maxMessageSize = 4096 # 发送消息超时时间,默认 3000 rocketmq.producer.sendMsgTimeOut=3000 # 发送消息失败重试次数,默认2 rocketmq.producer.retryTimesWhenSendFailed=2 
将生产者注册到IOC 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 @Configuration public  class  RocketMQProducerConfig      @Value("${rocketmq.producer.groupName}")      private  String groupName;     @Value("${rocketmq.producer.namesrvAddr}")      private  String namesrvAddr;     @Value("${rocketmq.producer.maxMessageSize}")      private  Integer maxMessageSize;     @Value("${rocketmq.producer.sendMsgTimeOut}")      private  Integer sendMsgTimeOut;     @Value("${rocketmq.producer.retryTimesWhenSendFailed}")      private  Integer retryTimesWhenSendFailed;          @Bean      public  DefaultMQProducer defaultProducer ()  throws  MQClientException          DefaultMQProducer producer = new  DefaultMQProducer(groupName);         producer.setNamesrvAddr(namesrvAddr);         producer.setVipChannelEnabled(false );         producer.setMaxMessageSize(maxMessageSize);         producer.setSendMsgTimeout(sendMsgTimeOut);         producer.setRetryTimesWhenSendAsyncFailed(retryTimesWhenSendFailed);         producer.start();         return  producer;     } } 
路由选择 DefaultMQProducer类提供了fetchPublishMessageQueues方法,可以查看当前Producer从NameServer同步过来的topic路由信息:
1 List<MessageQueue> messageQueueList = producer.fetchPublishMessageQueues("TEST_TOPIC" ); 
Producer向某个topic发送消息时,如果从NameServer同步的路由信息中发现,此topic在多个Broker中都存在队列,那么就需要依靠某种策略从中选择一个进行发送,这个策略由MQFaultStrategy类的selectOneMessageQueue方法实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public  MessageQueue selectOneMessageQueue (String lastBrokerName)           if  (lastBrokerName == null ) {         return  this .selectOneMessageQueue();     } else  {                  int  index = this .sendWhichQueue.getAndIncrement();         for (int  i = 0 ; i < this .messageQueueList.size(); ++i) {             int  pos = Math.abs(index++) % this .messageQueueList.size();             if  (pos < 0 ) {                 pos = 0 ;             }                          MessageQueue mq = (MessageQueue)this .messageQueueList.get(pos);             if  (!mq.getBrokerName().equals(lastBrokerName)) {                 return  mq;             }         }                  return  this .selectOneMessageQueue();     } } 
Producer采用轮询的方式发送消息,每次发送消息时调用无参的selectOneMessageQueue()方法,这个方法就是一个单纯的轮询策略。只是消息并不能保证每次都会发送成功,如果发送失败并且设置了重试次数,那么Producer会调用有参的selectOneMessageQueue()方法,将发送失败的Broker名称作为参数传进去,并且此次轮询会忽略掉这个Broker名称。
当消息经过重试发送成功后,有问题的Broker并没有被记录下来,这就导致下次发送消息时并不会知晓这个Broker有问题,仍然在轮询策略的选择范围内,仍然有发送失败的情况出现。想要避免此问题,就需要开启延迟故障。
延迟故障 延迟故障功能通过创建Producer时调用setSendLatencyFaultEnable(true)开启(默认false),开启后对于高延迟、有故障的Broker都会记录下来,并且根据故障的严重程度,给予一段不可用的时间。具体的规则维护在MQFaultStrategy类的俩个数组中:
1 2 private  long [] latencyMax = new  long []{50L , 100L , 550L , 1000L , 2000L , 3000L , 15000L };private  long [] notAvailableDuration = new  long []{0L , 0L , 30000L , 60000L , 120000L , 180000L , 600000L };
latencyMax数组元素表示各种延迟的时间,notAvailableDuration数组元素表示各种不可用时间,这俩个数组的同一个下坐标对应的值就是延迟时间对应的惩罚时间。由此可以看到100ms以内的延迟都是正常的,其余的规定时间内会排除掉,从正常的Broker中轮询获取。
如果在极端的情况,所有的Broker延迟都高于100,都被视为故不可用的故障节点,这段时间的消息发送如何处理呢。RocketMQ提供了pickOneAtLeast()方法,从故障Broker列表中选择最优的一个,也就是俗话矮子里面拔将军:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public  String pickOneAtLeast ()      final  Enumeration<FaultItem> elements = this .faultItemTable.elements();     List<FaultItem> tmpList = new  LinkedList<FaultItem>();          while  (elements.hasMoreElements()) {          final  FaultItem faultItem = elements.nextElement();         tmpList.add(faultItem);     }          if  (!tmpList.isEmpty()) {         Collections.shuffle(tmpList);                   Collections.sort(tmpList);                   final  int  half = tmpList.size() / 2 ;          if  (half <= 0 ) {             return  tmpList.get(0 ).getName();         } else  {             final  int  i = this .whichItemWorst.getAndIncrement() % half;             return  tmpList.get(i).getName();         }     }     return  null ; } 
从故障Broker列表中按照规则排序,然后从前半部分中轮询获取一个作为发送的目的地,因此重点在于排序规则,这就需要查看FaultItem类重写的排序方法了:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public  int  compareTo (final  FaultItem other)           if  (this .isAvailable() != other.isAvailable()) {         if  (this .isAvailable())             return  -1 ;             if  (other.isAvailable())                 return  1 ;         }                  if  (this .currentLatency < other.currentLatency)             return  -1 ;         else  if  (this .currentLatency > other.currentLatency) {             return  1 ;         }                  if  (this .startTimestamp < other.startTimestamp)             return  -1 ;         else  if  (this .startTimestamp > other.startTimestamp) {             return  1 ;         }         return  0 ;     } 
这里并没有将选择结果固定为排序规则中最好的那个,主要是为了避免topic的消息聚集在一个Broker的队列中造成负担过重,负载均衡到所有故障Broker又失去了故障延迟的初衷,因此选择折中的方式,将前50%的节点采用轮询方式进行负载均衡。
发送方式 同步发送 
同步发送消息时,线程进入阻塞状态,直到发送完毕返回SendResult类 
如果发送失败,会在默认的超时时间进行重试,最多重试俩次 
返回SendResult类,并不代表发送成功,需要根据sendStatus来判断是否成功 
可以根据返回的结果作相应处理,因此理论上不会出现消息丢失(可靠) 
 
1 2 3 4 5 6 7 8 @Autowired private  DefaultMQProducer producer;Message sendMsg = new  Message("ORDER_REMINDER" , "LOGISTICS" , message.getBytes()); SendResult sendResult = producer.send(sendMsg); 
异步发送 
异步调用不存在返回值,投递的结果信息SendResult类在成功回调onSuccess()方法中 
异步发送没有retry机制,投递失败回调onException()方法 
异步调用主要请求耗时过长,或者对响应时间过于敏感的请求,比如大数据量的excel导入,选择用户群体后批量推送消息等 
可以根据onSuccess()方法返回的结果作相应处理,因此理论上不会出现消息丢失(可靠) 
 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @Autowired private  DefaultMQProducer producer;Message sendMsg = new  Message("TEST_TOPIC" , "tag" , message.getBytes()); producer.send(sendMsg, new  SendCallback(){     @Override      public  void  onSuccess (SendResult sendResult)                }     @Override      public  void  onException (Throwable e)                } }); 
单向发送 
单向发送仅仅处理消息的发送,由于发送是否成功无法知晓,因此有几率造成数据丢失(不可靠) 
优点在于发送消息的耗时非常短,一般在微秒级别 
 
1 2 3 4 5 6 7 8 @Autowired private  DefaultMQProducer producer;Message sendMsg = new  Message("TEST_TOPIC" , "tag" , message.getBytes()); producer.sendOneway(sendMsg); 
发送特点 顺序发送 
最常见的顺序消费场景就是使用canal或maxwell来订阅Mysql数据库的Binlog,同一条数据会产生多条操作日志,如果不做任何措施可能会导致Insert日志和Update日志被投递到不同的Broker中,这就有很大几率出现Update日志抢先在Insert日志处理前被消费处理,造成数据不一致的情况。对此最常见的解决方案是提取消息的业务唯一标示(比如id等),和队列长度取余运算定位需要发送的Broker的节点进行发送。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 @Autowired private  DefaultMQProducer producer;Message sendMsg = new  Message("TEST_TOPIC" , "tag" , message.getBytes()); Long id = 666L ; SendResult sendResult = producer.send(msg, new  MessageQueueSelector() {     @Override      public  MessageQueue select (List<MessageQueue> mqs, Message msg, Object arg)                    Long id = (Long) arg;                  int  index = id % mqs.size();         return  mqs.get(index);     }, id); } 
延迟发送 
1 2 3 4 5 6 7 8 9 10 @Autowired private  DefaultMQProducer producer;Message sendMsg = new  Message("TEST_TOPIC" , "tag" , message.getBytes()); sendMsg.setDelayTimeLevel(2 ); SendResult sendResult = producer.send(sendMsg); 
RocketMQ商业版本支持任意精度的延迟消息,而开源版本仅仅支持18个特定时间的延迟功能,通过setDelayTimeLevel()方法进行控制,具体的延迟等级与延迟时间的对应关系如下:
延迟等级 
延迟时间 
 
 
1 
1秒 
 
2 
5秒 
 
3 
10秒 
 
4 
30秒 
 
5 
1分钟 
 
6 
2分钟 
 
7 
3分钟 
 
8 
4分钟 
 
9 
5分钟 
 
10 
6分钟 
 
11 
7分钟 
 
12 
8分钟 
 
13 
9分钟 
 
14 
10分钟 
 
15 
20分钟 
 
16 
30分钟 
 
17 
1小时 
 
18 
2小时 
 
Producer发送延迟消息时,消息首先会被投递到名为SCHEDULE_TOPIC_XXXX的topic中,这个topic是集群自动创建的,可以在控制台的Topic页面中勾选SYSTEM进行REFRESH查询,点开此topic的路由信息可以看到每个Broker节点都负责此topic的18个读写队列。
Producer会根据延迟等级值来决定将消息发送到SCHEDULE_TOPIC_XXXX的哪个队列号中,紧接着将真实的topic和queueId设置到Message的propertiesString属性中,然后选择一个Broker进行发送。每个Broker的SCHEDULE_TOPIC_XXXX的每个queue,都会对应一个定时器去刷新,是否有到达时间需要被发送的消息,若有就从propertiesString取出真实的topic和queueId发送出去。