前言 高可用性 NameServer作为一个十分重要的核心组件,在整个RockteMQ集群运作过程中发挥着重要作用,NameServer服务一旦宕机,整个集群就无法正常运转,因此一定要集群部署,这样才能保证高可用性。NameServer在CAP理论中强调AP,仅保证最终一致性,集群各节点彼此之间互不通信,也就是某一刻NameServer节点之间的数据并不完全相同,但这对消息发送不会造成任何影响。
路由信息 在RocketMQ源码中,namesrv模块的org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager类负责存储Broker集群各节点注册的路由信息:
HashMap<String, List> topicQueueTable 此变量以topic为单位,记录对应的所有队列信息,因为一个topic的数据可以分散到多个Broker中,所以topic与QueueData集合是一对多的关系,QueueData类数据结构:
1 2 3 4 5 6 7 8 9 10 11 12 public class QueueData implements Comparable<QueueData> { // 队列所属的Broker名称 private String brokerName; // 读队列数量 private int readQueueNums; // 写队列数量 private int writeQueueNums; // Topic的读写权限(2是写 4是读 6是读写) private int perm; // 同步复制还是异步复制标记 private int topicSynFlag; }
HashMap<String, BrokerData> brokerAddrTable 此变量以Broker名称为单位,记录对应的所有主从节点信息,Broker名称与主从节点也是一对多的关系,不过主从节点集合被设计到BrokerData类中:
1 2 3 4 5 6 7 8 public class BrokerData implements Comparable<BrokerData> { // 所属集群名称 private String cluster; // broker名称 private String brokerName; // key为节点id,value为节点地址端口 private HashMap<Long, String> brokerAddrs; }
HashMap<String, Set> clusterAddrTable 此变量以Broker集群名称为单位,记录每个集群对应的Broker名称集合。
brokerLiveTable 此变量以Broker地址端口为单位,记录每个Broker的实时信息,与BrokerLiveInfo类是一对一的关系:
1 2 3 4 5 6 7 8 9 10 class BrokerLiveInfo { // 上次心跳时间戳 private long lastUpdateTimestamp; // 数据版本 private DataVersion dataVersion; // 长连接通道 private Channel channel; // Ha地址 private String haServerAddr; }
HashMap<String, List> filterServerTable 此变量以Broker地址端口为单位,记录每个Broker的消费者在进行消费时的过滤逻辑,后续会详细讲。
由clusterAddrTable的结构可以判断,一个注册中心集群可以包含多个Broker集群,同一个Broker名称可以出现在多个不同的Broker集群中。
不过我发现了一个奇怪的问题,我有一个名称为lvt-cluster的集群,内部有个broker-a的Broker,然后在test-cluster集群中也用broker-a这个名称启动了一个Broker,控制台能看到这俩个Broker,当我使用kill杀死test-cluster集群中的broker-a时,在控制台仍然存在,然后我又kill掉lvt-cluster集群中的broker-a,俩个Broker在控制台都消失了。再次启动lvt-cluster集群中的broker-a,又出现了2个Broker,直到我重启注册中心集群,才回归正常。
服务注册 Broker在启动的时候,会向NameServer注册自己的服务信息,由于NameServer支持集群部署,并且集群中各节点之间没有任何数据交互。因此每个Broker节点启动时,会获取NameSever的地址列表(乱序),采用遍历列表的方式向每一个NameServer节点注册自己的信息。
启动并注册完毕后,Broker会启动一个定时任务,每隔30s定时向NameServer进行心跳更新。无论是启动时注册,还是心跳注册,NameServer接收到注册信息都不会持久化到本地,而是保存在上述的各个map中。
源码中Broker是通过NamesrvStartup类的main方法启动,main方法先是创建了一个BrokerController,然后调用其start()方法,在该方法中有如下代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 if (!messageStoreConfig.isEnableDLegerCommitLog()) { startProcessorByHa(messageStoreConfig.getBrokerRole()); handleSlaveSynchronize(messageStoreConfig.getBrokerRole()); // 重点在这里 this.registerBrokerAll(true, false, true); } // 开启定时任务 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { // Broker会每隔30s向NameSrv注册并更新自身topic信息,完成心跳功能 BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister()); } catch (Throwable e) { log.error("registerBrokerAll Exception", e); } } // 定时器延迟10秒后开始运行,间隔时间用函数绕了一圈,其实就是1000*30,单位毫秒 }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
Broker的服务注册逻辑全部包含在BrokerController类的registerBrokerAll方法中,此方法并没有真正去处理注册的事情,而是委托doRegisterBrokerAll方法来处理,doRegisterBrokerAll也没有亲自去进行注册,而是委托内部的BrokerOuterAPI类的registerBrokerAll方法来处理:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 public List<RegisterBrokerResult> registerBrokerAll( final String clusterName, final String brokerAddr, final String brokerName, final long brokerId, final String haServerAddr, final TopicConfigSerializeWrapper topicConfigWrapper, final List<String> filterServerList, final boolean oneway, final int timeoutMills, final boolean compressed) { // 使用内部类remotingClient获取NameServer集群中所有节点的IP地址 final List<RegisterBrokerResult> registerBrokerResultList = Lists.newArrayList(); List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList(); // 如果获取的集合不为空 if (nameServerAddressList != null && nameServerAddressList.size() > 0) { // 将Broker自身的各种信息写入requestHeader中 final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader(); requestHeader.setBrokerAddr(brokerAddr); requestHeader.setBrokerId(brokerId); requestHeader.setBrokerName(brokerName); requestHeader.setClusterName(clusterName); requestHeader.setHaServerAddr(haServerAddr); requestHeader.setCompressed(compressed); // 将Broker的topic配置信息、过滤信息写入requestBody中 RegisterBrokerBody requestBody = new RegisterBrokerBody(); requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper); requestBody.setFilterServerList(filterServerList); // 转码 final byte[] body = requestBody.encode(compressed); final int bodyCrc32 = UtilAll.crc32(body); requestHeader.setBodyCrc32(bodyCrc32); // 使用CountDownLatch机制 并行注册 final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size()); for (final String namesrvAddr : nameServerAddressList) { brokerOuterExecutor.execute(new Runnable() { // 省略... }); } // 进入超时等待 try { countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { } } return registerBrokerResultList; }
Broker将需要注册的信息整理好发送后,我们再来看看NameServer是如何接收的,这部分逻辑在RouteInfoManager类的registerBroker方法中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 public RegisterBrokerResult registerBroker( final String clusterName, final String brokerAddr, final String brokerName, final long brokerId, final String haServerAddr, final TopicConfigSerializeWrapper topicConfigWrapper, final List<String> filterServerList, final Channel channel) { // 创建返回值 RegisterBrokerResult result = new RegisterBrokerResult(); try { try { // 加锁 this.lock.writeLock().lockInterruptibly(); // 获取Broker所属集群名称 Set<String> brokerNames = this.clusterAddrTable.get(clusterName); // 初始化或保存Broker的集群名称 if (null == brokerNames) { brokerNames = new HashSet<String>(); this.clusterAddrTable.put(clusterName, brokerNames); } brokerNames.add(brokerName); // 默认此Broker名称不是第一次注册 boolean registerFirst = false; // 获取Broker名称对应的所有节点(主从)信息 BrokerData brokerData = this.brokerAddrTable.get(brokerName); // 如果broker名称是第一次注册,初始化并标记 if (null == brokerData) { registerFirst = true; brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>()); this.brokerAddrTable.put(brokerName, brokerData); } // 获取当前broker的所有主从节点Map<brokerId, IP:PORT> Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs(); // 遍历 Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator(); while (it.hasNext()) { Entry<Long, String> item = it.next(); // 如果要注册的broker的地址已经存在,但是id不同,这属于脏数据,需要删除掉 // 主要考虑到服务更换brokerId后立刻重启的情况 if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) { it.remove(); } } // 将当前Broker的IP地址注册到map中,如果put没有冲突也视为第一次注册 String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr); registerFirst = registerFirst || (null == oldAddr); // 如果Broker是主节点并且对应名称是第一次注册,保存topic的配置信息 if (null != topicConfigWrapper && MixAll.MASTER_ID == brokerId) { if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion()) || registerFirst) { ConcurrentMap<String, TopicConfig> tcTable = topicConfigWrapper.getTopicConfigTable(); if (tcTable != null) { for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) { this.createAndUpdateQueueData(brokerName, entry.getValue()); } } } } // 保存心跳信息 BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr, new BrokerLiveInfo( System.currentTimeMillis(), topicConfigWrapper.getDataVersion(), channel, haServerAddr)); if (null == prevBrokerLiveInfo) { log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr); } // 保存过滤信息 if (filterServerList != null) { if (filterServerList.isEmpty()) { this.filterServerTable.remove(brokerAddr); } else { this.filterServerTable.put(brokerAddr, filterServerList); } } // 如果注册的broker是从节点,通过名称寻找对应主节点,并保存主节点的IP地址 if (MixAll.MASTER_ID != brokerId) { String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID); if (masterAddr != null) { BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr); if (brokerLiveInfo != null) { result.setHaServerAddr(brokerLiveInfo.getHaServerAddr()); result.setMasterAddr(masterAddr); } } } } finally { this.lock.writeLock().unlock(); } } catch (Exception e) { log.error("registerBroker Exception", e); } return result; }
服务注册的代码不是很难理解,就是往RouteInfoManager类的5个Map中塞数据,另外slave节点在注册后的返回值中,还会拿到对应master节点的IP地址,方便注册后展开数据同步操作。
服务发现 Producer与Consumer在启动后会定时向NameServer获取路由信息,以保证后续工作的正常运行,定时任务代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 private void startScheduledTask() { // 其他代码...... this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { MQClientInstance.this.updateTopicRouteInfoFromNameServer(); } catch (Exception e) { log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e); } } }, 10, this.clientConfig.getPollNameServerInteval(), TimeUnit.MILLISECONDS); // 其他代码...... }
源码就写到这吧,写多了基本就没看的欲望了,Producer与Consumer在服务发现完毕后会得到TopicRouteData集合:
1 2 3 4 5 6 public class TopicRouteData extends RemotingSerializable { private String orderTopicConf; private List<QueueData> queueDatas; private List<BrokerData> brokerDatas; private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable; }
通过queueDatas可以知道当前topic有多少个队列、每个队列所在的Broker服务名称,有了名称就可以通过brokerDatas找到对应的主从节点地址,有了地址就可以通过filterServerTable处理过滤逻辑、生产/消费消息。
故障剔除 Broker节点每隔30秒会向NameServer发送一次心跳,并更新自身在brokerLiveTable中的心跳时间戳,NameServer节点每隔10秒会扫描一次brokerLiveTable,如果发现某个Broker的上次更新时间戳距离当前时间超过2分钟,则认为Broker已死亡,剔除其注册信息并关闭长连接。
故障节点剔除后并不会像Kafka那样采用再均衡策略通知Producer与Consumer,而是等待他们的服务发现机制自己去感知。这就意味着某个Broker节点挂了之后Producer与Consumer最长要等30秒才会感知到。