springboot集成

RocketMQ依赖

1
2
3
4
5
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.7.0</version>
</dependency>

application.properties

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 服务端口
server.port=8082

# 生产者分组
rocketmq.producer.groupName=GROUP_A

# MQ注册中心地地址
rocketmq.producer.namesrvAddr=localhost:9876

# 消息最大长度 默认 1024 * 4 (4M)
rocketmq.producer.maxMessageSize = 4096

# 发送消息超时时间,默认 3000
rocketmq.producer.sendMsgTimeOut=3000

# 发送消息失败重试次数,默认2
rocketmq.producer.retryTimesWhenSendFailed=2

将生产者注册到IOC

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@Configuration
public class RocketMQProducerConfig {

@Value("${rocketmq.producer.groupName}")
private String groupName;

@Value("${rocketmq.producer.namesrvAddr}")
private String namesrvAddr;

@Value("${rocketmq.producer.maxMessageSize}")
private Integer maxMessageSize;

@Value("${rocketmq.producer.sendMsgTimeOut}")
private Integer sendMsgTimeOut;

@Value("${rocketmq.producer.retryTimesWhenSendFailed}")
private Integer retryTimesWhenSendFailed;

/**
* RocketMQ生产者对象
*/
@Bean
public DefaultMQProducer defaultProducer() throws MQClientException {

DefaultMQProducer producer = new DefaultMQProducer(groupName);
producer.setNamesrvAddr(namesrvAddr);
producer.setVipChannelEnabled(false);
producer.setMaxMessageSize(maxMessageSize);
producer.setSendMsgTimeout(sendMsgTimeOut);
producer.setRetryTimesWhenSendAsyncFailed(retryTimesWhenSendFailed);
producer.start();

return producer;
}

}

路由选择

DefaultMQProducer类提供了fetchPublishMessageQueues方法,可以查看当前Producer从NameServer同步过来的topic路由信息:

1
List<MessageQueue> messageQueueList = producer.fetchPublishMessageQueues("TEST_TOPIC");

Producer向某个topic发送消息时,如果从NameServer同步的路由信息中发现,此topic在多个Broker中都存在队列,那么就需要依靠某种策略从中选择一个进行发送,这个策略由MQFaultStrategy类的selectOneMessageQueue方法实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public MessageQueue selectOneMessageQueue(String lastBrokerName) {

// 如果上次发送的brokerName为空,直接轮询
if (lastBrokerName == null) {
return this.selectOneMessageQueue();
} else {
// 对messageQueueList进行轮询选取
int index = this.sendWhichQueue.getAndIncrement();

for(int i = 0; i < this.messageQueueList.size(); ++i) {
int pos = Math.abs(index++) % this.messageQueueList.size();
if (pos < 0) {
pos = 0;
}

// 如果此次轮询到的MessageQueue对应的Broker节点没任何问题,直接返回
MessageQueue mq = (MessageQueue)this.messageQueueList.get(pos);
if (!mq.getBrokerName().equals(lastBrokerName)) {
return mq;
}
}

// 直接轮询
return this.selectOneMessageQueue();
}
}

Producer采用轮询的方式发送消息,每次发送消息时调用无参的selectOneMessageQueue()方法,这个方法就是一个单纯的轮询策略。只是消息并不能保证每次都会发送成功,如果发送失败并且设置了重试次数,那么Producer会调用有参的selectOneMessageQueue()方法,将发送失败的Broker名称作为参数传进去,并且此次轮询会忽略掉这个Broker名称。

当消息经过重试发送成功后,有问题的Broker并没有被记录下来,这就导致下次发送消息时并不会知晓这个Broker有问题,仍然在轮询策略的选择范围内,仍然有发送失败的情况出现。想要避免此问题,就需要开启延迟故障。

延迟故障

延迟故障功能通过创建Producer时调用setSendLatencyFaultEnable(true)开启(默认false),开启后对于高延迟、有故障的Broker都会记录下来,并且根据故障的严重程度,给予一段不可用的时间。具体的规则维护在MQFaultStrategy类的俩个数组中:

1
2
private long[] latencyMax = new long[]{50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
private long[] notAvailableDuration = new long[]{0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};

latencyMax数组元素表示各种延迟的时间,notAvailableDuration数组元素表示各种不可用时间,这俩个数组的同一个下坐标对应的值就是延迟时间对应的惩罚时间。由此可以看到100ms以内的延迟都是正常的,其余的规定时间内会排除掉,从正常的Broker中轮询获取。

如果在极端的情况,所有的Broker延迟都高于100,都被视为故不可用的故障节点,这段时间的消息发送如何处理呢。RocketMQ提供了pickOneAtLeast()方法,从故障Broker列表中选择最优的一个,也就是俗话矮子里面拔将军:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public String pickOneAtLeast() {
final Enumeration<FaultItem> elements = this.faultItemTable.elements();
List<FaultItem> tmpList = new LinkedList<FaultItem>();

// 先把故障的broker列表复制一份,后面好做打乱
while (elements.hasMoreElements()) {
final FaultItem faultItem = elements.nextElement();
tmpList.add(faultItem);
}

// 打乱该列表
if (!tmpList.isEmpty()) {
Collections.shuffle(tmpList);

// 排序
Collections.sort(tmpList);

// 从前50%里面递增选取一个
final int half = tmpList.size() / 2;
if (half <= 0) {
return tmpList.get(0).getName();
} else {
final int i = this.whichItemWorst.getAndIncrement() % half;
return tmpList.get(i).getName();
}
}

return null;
}

从故障Broker列表中按照规则排序,然后从前半部分中轮询获取一个作为发送的目的地,因此重点在于排序规则,这就需要查看FaultItem类重写的排序方法了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public int compareTo(final FaultItem other) {

// 可用的笔不可用的优先级高
if (this.isAvailable() != other.isAvailable()) {
if (this.isAvailable())
return -1;
if (other.isAvailable())
return 1;
}

// 延迟低的比延迟高的优先级高
if (this.currentLatency < other.currentLatency)
return -1;
else if (this.currentLatency > other.currentLatency) {
return 1;
}

// 被惩罚时间早的比晚的优先级高
if (this.startTimestamp < other.startTimestamp)
return -1;
else if (this.startTimestamp > other.startTimestamp) {
return 1;
}

return 0;
}

这里并没有将选择结果固定为排序规则中最好的那个,主要是为了避免topic的消息聚集在一个Broker的队列中造成负担过重,负载均衡到所有故障Broker又失去了故障延迟的初衷,因此选择折中的方式,将前50%的节点采用轮询方式进行负载均衡。

发送方式

同步发送

  • 同步发送消息时,线程进入阻塞状态,直到发送完毕返回SendResult类
  • 如果发送失败,会在默认的超时时间进行重试,最多重试俩次
  • 返回SendResult类,并不代表发送成功,需要根据sendStatus来判断是否成功
  • 可以根据返回的结果作相应处理,因此理论上不会出现消息丢失(可靠)
1
2
3
4
5
6
7
8
// 生产者对象
@Autowired
private DefaultMQProducer producer;

// 创建消息对象
Message sendMsg = new Message("ORDER_REMINDER", "LOGISTICS", message.getBytes());
// 同步发送
SendResult sendResult = producer.send(sendMsg);

异步发送

  • 异步调用不存在返回值,投递的结果信息SendResult类在成功回调onSuccess()方法中
  • 异步发送没有retry机制,投递失败回调onException()方法
  • 异步调用主要请求耗时过长,或者对响应时间过于敏感的请求,比如大数据量的excel导入,选择用户群体后批量推送消息等
  • 可以根据onSuccess()方法返回的结果作相应处理,因此理论上不会出现消息丢失(可靠)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 生产者对象
@Autowired
private DefaultMQProducer producer;

// 创建消息对象
Message sendMsg = new Message("TEST_TOPIC", "tag", message.getBytes());
// 异步发送
producer.send(sendMsg, new SendCallback(){
@Override
public void onSuccess(SendResult sendResult) {
// 发送成功回调逻辑
}
@Override
public void onException(Throwable e) {
// 发送失败回调逻辑
}
});

单向发送

  • 单向发送仅仅处理消息的发送,由于发送是否成功无法知晓,因此有几率造成数据丢失(不可靠)
  • 优点在于发送消息的耗时非常短,一般在微秒级别
1
2
3
4
5
6
7
8
// 生产者对象
@Autowired
private DefaultMQProducer producer;

// 创建消息对象
Message sendMsg = new Message("TEST_TOPIC", "tag", message.getBytes());
// 单向发送
producer.sendOneway(sendMsg);

发送特点

顺序发送
Producer将消息发送到Broker节点后,Broker会找到对应的队列文件,并采用追加的方式将消息内容写入文件中,可以理解为RocketMQ可以保证队列级别的消息顺序。因此我们只需要将同一类数据发送到同一个队列中,就可以保证消息的顺序发送,Consumer在进行消费时也就毫无悬念的顺序消费了。

最常见的顺序消费场景就是使用canal或maxwell来订阅Mysql数据库的Binlog,同一条数据会产生多条操作日志,如果不做任何措施可能会导致Insert日志和Update日志被投递到不同的Broker中,这就有很大几率出现Update日志抢先在Insert日志处理前被消费处理,造成数据不一致的情况。对此最常见的解决方案是提取消息的业务唯一标示(比如id等),和队列长度取余运算定位需要发送的Broker的节点进行发送。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 生产者对象
@Autowired
private DefaultMQProducer producer;

// 创建消息对象
Message sendMsg = new Message("TEST_TOPIC", "tag", message.getBytes());
// 假设此binlog消息的主键id为666
Long id = 666L;
// 顺序发送
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
// arg的值就是send方法第三个参数(id)的值
Long id = (Long) arg;
// mqs为此Producer从注册中心拿到的,需要发送
int index = id % mqs.size();
return mqs.get(index);
}, id);
}

延迟发送
延迟消息的使用场景很多,例如电商系统的订单业务场景,用户下单后发送一个30分钟的延迟消息,30分钟后Consumer会收到此消息,然后检查订单是否已付款,如果未付款执行自动取消逻辑处理。

1
2
3
4
5
6
7
8
9
10
// 生产者对象
@Autowired
private DefaultMQProducer producer;

// 创建消息对象
Message sendMsg = new Message("TEST_TOPIC", "tag", message.getBytes());
// 设置延迟级别
sendMsg.setDelayTimeLevel(2);
// 延迟发送
SendResult sendResult = producer.send(sendMsg);

RocketMQ商业版本支持任意精度的延迟消息,而开源版本仅仅支持18个特定时间的延迟功能,通过setDelayTimeLevel()方法进行控制,具体的延迟等级与延迟时间的对应关系如下:

延迟等级 延迟时间
1 1秒
2 5秒
3 10秒
4 30秒
5 1分钟
6 2分钟
7 3分钟
8 4分钟
9 5分钟
10 6分钟
11 7分钟
12 8分钟
13 9分钟
14 10分钟
15 20分钟
16 30分钟
17 1小时
18 2小时

Producer发送延迟消息时,消息首先会被投递到名为SCHEDULE_TOPIC_XXXX的topic中,这个topic是集群自动创建的,可以在控制台的Topic页面中勾选SYSTEM进行REFRESH查询,点开此topic的路由信息可以看到每个Broker节点都负责此topic的18个读写队列。

Producer会根据延迟等级值来决定将消息发送到SCHEDULE_TOPIC_XXXX的哪个队列号中,紧接着将真实的topic和queueId设置到Message的propertiesString属性中,然后选择一个Broker进行发送。每个Broker的SCHEDULE_TOPIC_XXXX的每个queue,都会对应一个定时器去刷新,是否有到达时间需要被发送的消息,若有就从propertiesString取出真实的topic和queueId发送出去。

评论