存储文件

Broker负责存储消息、主题、对应队列等,将这些内容持久化到配置文件的store文件夹中:
图片

当生产者通过路由信息将消息发送到Broker后,Broker先将消息写到内存中,然后通过线程刷到磁盘。刷盘方式分为同步和异步俩种方式,在配置文件的flushDiskType属性中进行设置:
图片

config

此文件夹存储了Broker运行需要的各种配置信息,采用json文本形式存储配置文件以及对应的备份文件:

文件名称 解析存储类 描述
topics.json TopicConfigManager 存储每个topic的读写队列数、权限、是否顺序等信息
consumerOffset.json ConsumerOffsetManager 记录每个Consumer在每个topic上对于该topic的consumequeue队列的消费进度
consumerFilter.json ConsumerFilterManager 存储每个消费者Consumer的过滤信息
subscriptionGroup.json SubscriptionGroupManager 存储每个消费者Consumer的订阅信息
delayOffset.json ScheduleMessageService 记录对于延迟主题SCHEDULE_TOPIC_XXXX的每个consumequeue队列的消费进度

commitlog

此文件夹负责存储Broker节点接收到的消息,存储消息的文件名长度为20位,以起始物理偏移量的值进行命名,如果长度不够20位则在左侧补零,每个文件的大小默认1G,也就是1073741824字节,当文件写满则创建下一个文件。比如Broker接收到第一个消息后创建00000000000000000000代表第一个文件,起始偏移量为0,当文件写满1G后,创建名为00000000001073741824的文件继续存储消息,起始偏移量为1073741824。

注:这里的偏移量是指物理偏移量(1个字节代表一个偏移量),并非按消息数量递增的逻辑偏移量,物理偏移量用于定位某个消息在此文件中的物理位置

消息在commitlog文件中的存储格式如下:
图片

存储结构中,各部位代表的含义:

名称 占用磁盘 描述
msgLen 4字节 消息长度,具体指整个消息体所占用的字节大小
magicCode 4字节 魔数,固定值daa320a7
bodyCRC 4字节 消息体验证码
queueId 4字节 消息体发送到了哪个MessageQueue
flag 4字节 创建Message对象时由生产者通过构造器设定的flag值
queueOffset 8字节 表示在队列中逻辑的偏移量
physicalPosition 8字节 表示在存储文件中的偏移量
sysFlag 4字节 是生产者相关的信息标识
msg born timestamp 8字节 消息创建时间
msg host 8字节 消息生产者的host
store timestamp 8字节 消息存储时间
store host 8字节 消息存储机器的host
reconsume times 4字节 消息重复消费次数
prepare transaction offset 8字节 消息事务相关偏移量
body length 4字节 消息体的长度
msg body 不固定 消息内容
topic length 1 主题名称长度(1字节=8位,因此topic长度不会超过127)
topic length 不固定 主题名称
properties lengh 不固定 Properties内容长度
properties 不固定 Properties的内容

某个Broker节点接收到的所有topic-queue消息,全部写入一个文件中(直到写满为止),这一点与Kafka有很大不同。Kafka是以topic-partition为单位创建存储消息的文件,存储文件的分类粒度比RocketMQ要细很多,也意味着相同的业务场景,kafka节点创建存储消息的文件数量肯定会比RocketMQ节点要多,且随着节点中topic和queue/partition的激增,数量差距越来越大。

当kafka的存储文件越来越多(单机以64个partition为分水岭),顺序写入特性会被大大破坏从而引起大量的随机I/O,吞吐量会急剧下降,而RocketMQ的这个设计,很好的避免了这个问题。

consumequeue

Broker节点接收到的任何topic-queue消息都会写在commitlog文件中,并且没有规则的分布情况,如果消费者想要拉取某个topic-queue消息时,需要整体检索commitlog文件,这种方式的效率难以保障。这就需要对commitlog文件的消息建立索引来加快消息的检索,而这些索引数据就是在consumequeue文件夹进行存储。

例如现在某个Broker节点中负责订单的发货(DELIVER_PACKAGE)、到货(ARRIVAL_PACKAGE)、签收(SIGN_PACKAGE)的三个消息通知topic,每个topic都有4个队列,目录结构如下:
图片

由此图可以看出,consumequeue目录结构的分类粒度与Kafka存储消息文件的粒度类似,只不过consumequeue目录不存储具体的消息内容,只存储路由到该queue中的消息在CommitLog中的物理偏移量(offset)、消息大小(size)、消息所属的tag的hash值(tagCode)三个属性,每条索引占用磁盘空间固定为20字节。

文件写入
当Broker收到某个topic-queue消息时,先将消息写入commitlog文件中并得到物理偏移量,然后进入consumequeue目录对应的文件夹创建文件,文件名长度为20位,以起始物理偏移量的值进行命名,如果长度不够20位则在左侧补零。每个文件默认可以存储30W个消息索引,文件磁盘大小也就是60W字节,比如Broker收到某个topic-queue的第一条消息时,创建文件00000000000000000000,起始物理偏移量为0,将刚才的物理偏移量、消息大小、tag的hash值写入创建的文件中,当写满后30W条消息时创建第二个文件00000000000006000000,起始物理偏移量为60W,继续存储写入的数据。

文件读取
当Broker收到Consumer的消息拉取请求时,根据topic、queueId找到对应文件夹,通过文件夹里面各文件的名称计算出存储的逻辑偏移量范围,然后根据Consumer提供的offset锁定到具体检索哪个文件。将Consumer提供的offset*20后减去文件名的数值,即可得到要查询的消息在consumequeue索引中的物理偏移量位置,往后取20字节的数据就是消息的索引信息,拿到物理偏移量去commitlog即可查询最终的消息。

consumequeue文件夹创建的文件命名虽然和commitlog同样都是物理偏移量,不过consumequeue文件存储的数据量固定,且布局有序、有固定规则,因此可以通过文件名去定位具体的物理偏移量,不会存在检索效率的问题。

index

Broker除了通过Consumer提供的offset获取消息外,还支持通过MessageID或者MessageKey查询消息。使用ID查询处理起来比较简单,因为MessageID就是根据Broker+offset生成的,因此很容易找到对应的commitlog文件来读取消息。而MessageKey由生产者自己设定,RocketMQ为了保证其查询的效率,使用index文件夹记录对应的索引信息。

index文件由IndexHeader、HashSlot、Index三部分构成:
图片

IndexHeader中存储的都是一些文件基础数据:

名称 占用磁盘 描述
beginTimestamp 8字节 该索引文件第一个消息的存储(落盘)时间
endTimestamp 8字节 该索引文件最后一个消息的存储(落盘)时间
beginPhyoffset 8字节 该索引文件第一个消息在commitlog的物理偏移量
endPhyoffset 8字节 该索引文件最后一个消息在commitlog的物理偏移量
hashSlotCount 4字节 该索引文件目前的哈希槽个数
indexCount 4字节 该索引文件目前已存储的索引数量

HashSlot负责对索引内容进行分片:
消息在进行index存储时,通过消息key的哈希值%500W定位哈希槽(貌似500W这个值可以修改),因此可以理解为一个index文件的哈希槽最多500W个,另外IndexHeader与HashSlot的大小都是固定的,再文件中的位置不需要额外标记,可以直接定位获取。

其中每个哈希槽固定占有4字节,仅仅用来存储当前卡槽最后一个key在index部分的逻辑位置(整个索引文件index区域中第几个写入的),这么做的目的仍然是节省磁盘空间,如果直接存储物理偏移量需要8个字节,这里四个字节就够了,因为IndexHeader与HashSlot的大小是固定的,每个索引数据在整个index的物理偏移量为:

IndexHeader大小(40字节) + hashSlot大小(4字节) * 数量(500w) + 逻辑位置 * 每个index大小(20字节)

Index存储真正的索引信息:

名称 占用磁盘 描述
key hash value 4字节 消息key的哈希值
phyOffset 8字节 消息在commitlog的物理偏移量(索引的核心数据)
timeDiff 4字节 消息落盘时间与IndexHeader中的beginTimestamp的差值(节省磁盘空间,如果直接存储落盘时间就需要占用8字节)
prevIndex 4字节 哈希槽的第一个索引prevIndex=0,后续的索引prevIndex=前一个索引的物理偏移量

索引文件创建过程:
当Broker接收到消息后,先将消息写入commitlog文件中并得到物理偏移量,然后进入index文件夹查询最新创建的索引文件,如果此索引文件的IndexHeader的indexCount等于2000W则表示已满,获取当前时间戳作为名称创建新的索引文件,如果小于2000W直接进入写文件环节。

获取消息key的哈希值并%500W计算出卡槽号,如果此卡槽存储的值为0,表示目前下面还没有存储索引,写入索引数据(其中prevIndex=0),如果卡槽存储的值不为0,通过卡槽值计算出索引数据所在的物理偏移量并取出,然后将新的索引数据写入文件(其中prevIndex=当前卡槽值),然后刷新卡槽值为当前索引数据的逻辑位置(整个索引文件index区域中第几个写入的),其实就是类似HashMap的数组+链表。

消息key查询过程:
查询的传入值除了key外,还包含一个时间起始值以及截止值(这玩意在控制台貌似不需要?),时间范围参数用于定位具体的索引文件,根据消息key的哈希值锁定具体的卡槽。通过卡槽值计算出最后一个索引数据在索引文件中的物理偏移量并取出,紧接着通过prevIndex属性递归遍历,再加上时间范围与timeDiff属性对比筛选,得出符合条件的phyOffset属性集合,去commitlog捞消息。由于key的hash相同,内容不一定相同,所以在捞消息的时候还要对key的内容进行筛选。

abort文件

Broker启动时会在store目录创建此文件,在正常退出后会删除此文件。如果Broker在启动时发现store目录中发现abort文件存在,那么说明上次服务退出属于异常退出(宕机或者手动kill进程等),异常退出会导致部分文件的写入逻辑没有执行完就被打断(比如某条消息只写入一半),造成脏数据影响后续的解析。

lock

这叼毛文件完全搜不到是干啥的

checkpoint

checkpoint文件的作用是记录commitlog、consumequeue、index最后一次刷盘时间,此文件固定长度为4k。但是Broker仅仅使用了前24个字节,这24个字节按照8字节为单位划分为三部分,使用16进制数字表示上述的三个刷盘时间戳。

使用文本打开此文件,内容如下:
图片

删除策略

每个Broker节点存储的文件都会随着时间的推移不断增长,CommitLog、ConsumeQueue文件是基于文件内存映射机制,并且在启动的时候会将所有的文件加载,为了避免内存与磁盘的浪费、能够让磁盘能够循环利用、避免因为磁盘不足导致消息无法写入等引入了文件过期删除机制。

RocketMQ采用定时器扫描方式执行删除策略,并且在删除过程中不会关心消息是否被消费,删除策略在以下任一条件成立的情况才会执行批量删除:

  • 消息文件过期(默认72小时),且到达清理时点(默认是凌晨4点),删除过期文件。
  • 消息文件过期(默认72小时),且磁盘空间达到了水位线(默认75%),删除过期文件。
  • 磁盘已经达到必须释放的上限(85%水位线)的时候,则开始批量清理文件(无论是否过期),直到空间充足。

注:若磁盘空间达到危险水位线(默认90%),出于保护自身的目的,broker会拒绝写入服务。

高性能存储

整个RocketMQ集群服务的本质是进行消息发送与消费,Producer发送消息到Broker端并进行磁盘写入,Consumer发送请求到Broker端并进行磁盘读取,也就是说RocketMQ的工作与磁盘数据的读写息息相关。上述的commitlog、consumequeue、index是在磁盘的存储结构上进行优化,除此之外,RocketMQ设计者们还从磁盘的读写层面进行了性能优化。

磁盘读过程

  • 寻道 : 磁头移动定位到指定磁道,这部分时间代价最高
  • 旋转延迟 : 在磁道位置确定后,中轴带动盘面旋转到合适的扇区开头处
  • 数据传输 : 表示盘面继续转动,读取数据并传送到内存

顺序与随机读写
对于磁盘的读写分为两种模式,顺序读写和随机读写。随机读写存在一个多次寻址的过程,而顺序读写相当于有一个物理索引,在读取的时候不需要寻找地址,因此顺序读写相对于随机读写,效率很高。

RocketMQ与Kafka在进行消息持久化时,都采用追加的方式写入磁盘文件(顺序写入)。Consumer在进行消费时,也是对磁盘文件进行顺序读取,每次要读取的数据在磁盘的位置,正好都在上次的后面,因此不需要寻道,移动到扇区开头处后直接进行数据传输,可以理解为RocketMQ与Kafka的顺序写入保证了顺序读取。

PageCache
当操作系统的运行内存有大量剩余时,为了提高IO的性能,会将多余的内存抽取出来作为文件的缓存使用。操作系统将文件按大小切割为多个数据块,并放入PageCache中进行缓存。

当Broker进行文件写入时,操作系统只是将数据写入PageCache中,并采用异步方式将PageCache写入的数据刷盘到磁盘中,也就是说文件写入磁盘是纯内存操作的,内存IO相对于磁盘IO来说,效率提升很多。这也带来了一定的风险,写入PageCache的数据如果未来得及同步到磁盘就发生宕机,会导致这部分数据丢失,因此RocketMQ在Broker端配置文件中可以自己选择同步或者异步刷盘方式,效率与可靠性由开发者自己权衡,而kafka直接不支持同步刷盘(太奔放了)。

当Broker进行文件读取时,首先去PageCache中查询要操作的文件内容是否存在,如果存在则直接读取并返回,如果不存在则需要启动磁盘IO,读取完毕后将内容加入PageCache中,除了读取查询需要的数据外,操作系统还会往后预读取若干(至少1个,通常3个)页数据一并放入PageCache中,这么做的目的就是尽量减少磁盘IO的次数,尽量提高PageCache的命中率。如果保证多数情况下对文件都是顺序读取,配合PageCache的预读取,可以保证请求几乎都命中PageCache,大大提升文件读取的执行效率。

PageCache机制是牺牲可靠性(宕机引发数据丢失)的前提下,尽量将磁盘IO转化为内存IO从而提高执行效率,这种机制虽然是操作系统层面的优化,但是RocketMQ顺序读写完美契合了PageCache,使其发挥出优越的性能。

如果某Broker端负责的topic不多且Consumer消费进度几乎一致,那么PageCache在缓存数据很小的情况下依然能极大提高效率。如果topic数量极多且各Consumer消费进度完全不一致,那么PageCache缓存的数据也会激增,一旦超出PageCache所能承受的极限,会导致后续的磁盘IO无法缓存引发随机读写,性能并不能达到满意的效果。

零拷贝

以Linux系统为例,当A服务器需要读取B服务器的磁盘数据时,B服务器默认采用操作系统提供的read()、write()函数进行读取并传输,在此过程中涉及到四个步骤:

  • 1.用户态切换到内核态,操作系统调用read()函数,将磁盘数据copy到内核缓冲区,此过程由MDA完成。
  • 2.将内核缓冲区数据copy到用户缓冲区,内存之间的数据拷贝需CPU协助完成,copy完毕后内核态切回用户态。
  • 3.用户空间的数据如果需要网络传送到其他服务器,需要调用write()函数,此时用户态切换到内核态,然后将用户缓冲区数据copy到内核socket发送缓冲区,涉及到内存之间的数据copy,仍然需CPU协助完成。
  • 4.最后socket缓冲区数据需要复制到网卡缓冲区中,此操作由MDA完成,复制完毕后由网卡发送到客户端,然后CPU状态切回用户态。

步骤流程图:
图片

整个过程涉及4次数据的copy、2次CPU调度、4次内核用户态的切换,频繁的数据复制以及内核/用户态的切换对系统开销非常大,因此便产生了零拷贝技术。零拷贝技术的思想是减少CPU调度和内核用户态切换的次数,实现方式分为mmap、sendfile等。

mmap通过虚拟内存映射,让多个虚拟地址指向同一个物理内存地址,用户空间通过映射关系共享内核空间的数据,内核缓冲区数据copy到用户缓冲区的步骤可以省略,这样从PageCache到网卡的传输就省掉了1次CPU调度、2次内核用户态切换:
图片

sendfile直接没有用户态进行参与,也不需要将数据拷贝到用户缓冲区,相比较mmap又少了一次CPU调度:
图片

mmap适合操作小块数据量,sendfile适合操作大块数据量,RocketMQ选择mmap方式,而Kafka选择sendfile方式,至于区别暂时没找到...

评论