RocketMQ系列-消息刷盘详解

RocketMQ架构设计之消息 这篇文章中, 我们学习了 消息发送到消费 的整体流程。这篇文章,我们一起来看看 MappedFile 的实现细节.

引言

我们都知道的是 RocketMQ息都是持久化到磁盘上的,消息的读取和写入也都是会从磁盘上进行IO的。可是磁盘的性能不禁令CPU啧啧撇嘴。RocketMQ 是如何实现消息的高性能IO的呢?

答案就是 MappedFile 这个 643行的java文件中。

RokcetMQMappedFile

我们知道的是 MappedFileRocketMQ 真实消息文件在内存中的映射. 主要是通过 java NIO 技术来实现。

我们还是从源码入手, 按照 MappedFile 的初始化, 提交消息(消息写入物理内存),刷盘 的顺序去学习。

初始化

Broker 的初始化过程中, Broker 通过 AllocateMappedFileService 完成了 MappedFile 的 初始化过程。其主要的实现就是在 mmapOperation 方法中. 主要的职责有: 1、创建 MappedFile实例  2、对MappedFile进行预热。

创建 MappedFile实例

根据 消息的刷盘方式 初始化 MappedFile 实例的不同的字段。如下图。

RocketMQ系列-消息刷盘详解


预热 MappedFile

MappedFile 预热的条件: 实际的消息文件大小超过配置的大小(默认1G) 并且 配置允许MappedFile预热。MappedFile 预热的作用是: 将 存在在磁盘上的消息预先加载到页缓存中(Page Cache), 这也就是所谓的预热, 省去了现用现加载的时间。

预热的主要代码,我贴在下面

RocketMQ系列-消息刷盘详解

注意看下注释的内容. 乍一看好像没有什么奇怪。

我先补充一下关于操作系统中存储的小知识点:

页是操作系统虚拟内存中空间划分的单位。是逻辑地址空间顺序等分而成的一段逻辑空间,并依次连续编号。页的大小一般为512B~8KB

页缓存(Page Cache)

为了提升对文件的读写效率, Linux 内核会以页大小(4KB)为单位,将文件划分为多数据块。当用户对文件中的某个数据块进行读写操作时,内核首先会申请一个内存页(称为 页缓存)与文件中的数据块进行绑定. 在Linux系统中写入数据的时候并不会直接写到硬盘上,而是会先写到Page Cache中,并打上dirty标识,由内核线程flusher定期将被打上dirty的页发送给IO调度层,最后由IO调度决定何时落地到磁盘中,而Linux一般会把还没有使用的内存全拿来给Page Cache使用。而读的过程也是类似,会先到Page Cache中寻找是否有数据,有的话直接返回,如果没有才会到磁盘中去读取并写入Page Cache然后再次读取Page Cache并返回。而且读的这个过程中操作系统也会有一个预读的操作,你的每一次读取操作系统都会帮你预读出后面一部分数据,而且当你一直在使用预读数据的时候,系统会帮你预读出更多的数据(最大到128K)。

page cache的作用主要是将磁盘中文件缓存到内存,并集中管理,便于回收利用。

了解了 页和页缓存 之后,就应该可以知道了 RocketMQ 为什么会在 MappedByteBuffer 的每个 pageSize 大小的位置 (也就是一个页)写入一个0了。其实就是 把磁盘上的数据写到 PageCache 中,也就达到预热的效果。

还有一个  mlock 方法, 这个方法的作用是什么呢?

该方法主要是实现文件预热后,防止把预热过的文件被操作系统调到swap空间中。当程序再次读取交换出去的数据的时候会产生缺页异常。这里也有一个关于操作系统的知识点.

Page Fault

CPU通过地址总线可以访问连接在地址总线上的所有外设,包括物理内存、IO设备等等,但从CPU发出的访问地址并非是这些外设在地址总线上的物理地址,而是一个虚拟地址,由内存管理单元(MMU)将虚拟地址转换成物理地址再从地址总线上发出,内存管理单元上的这种虚拟地址和物理地址的转换关系是需要创建的,并且内存管理单元还可以设置这个物理页是否可以进行写操作,当没有创建一个虚拟地址到物理地址的映射,或者创建了这样的映射但那个物理页不可写的时候,内存管理单元将会通知CPU产生了一个缺页异常。产生了 Page Fault 之后会交给PageFaultHandler处理, 这里不详细介绍了。导致用户进程产生异常, 无法正常工作。

mlock 就是为了阻止出现 pageFault 异常

内存管理机制:页框回收机制

系统中初识化了很多页缓存,但是这些页长时间没有使用,linux 会有一个 页框的回收机制,将 page cache 中数据标识 “可回收”。在回收之前,操作系统就会把 Page cache 中的内容复制到 swap cache 中(这个操作也叫做 swap out), 如果进程要访问 当前页的时候,就会出现 page Fault. 然后 根据 swap cache 内存地址把内存拷贝到新的 PageCache 中 (这个操作叫做 swap in).

总结:预热MappedFile

预热时已经已经建立了 MappedByteBuffer 和物理内存的地址映射,但是还没有把消息加载进内存,所谓的预热是为了把消息读取到操作系统的物理内存中. 并且使用 mlock 把锁定内存,防止消息被操作系统回收。

消息写入物理内存

这篇文章《RocketMQ架构设计之消息》我们介绍了消息写入的大致过程: BrokerNetty Server 接收到 Producer 发送来的消息, 通过 SendMessageProcessor 进行处理, 然后 DefaultMessageStore 把消息交给 CommitLog 进行写消息和刷盘.

我们都知道的是 CommitLog 表示的是 RocketMQ 的消息文件。它提供了写入消息和读取消息的功能. 我们就从 CommitLog.asyncPutMessage 来看消息的写入过程。

asyncPutMessage 这个方法 实现了两个功能:

  • 通过 ByteBuffer 写入消息。
  • 提交刷盘请求。

RocketMQ系列-消息刷盘详解

上图源码中可以看到 CommitLog 调用了 MappedFile 把消息写入文件, 真实去写入者是 ByteBuffer.put() 方法把消息写到了物理内存中.

注意,这个ByteBuffer 非常的有意思.

当 刷盘方式是 同步刷盘的时候, 使用的是:this.mappedByteBuffer.slice() , 这个 mappedByteBuffer对象是由FileChannel.map()方法生成。当刷盘方式是 异步刷盘的时候, 则是使用 DirectByteBuffer 进行写入的。

这两者有什么区别呢? 本质上其实没有什么区别!!

FileChannel.map() 出来的 MappedByteBuffer类型是一个抽象类,本质上还是通过 DirectByteBuffer 进行构建出来的。

RocketMQ系列-消息刷盘详解

这样, RocketMQ 就把消息写入到了物理内存中.

什么时候把消息写到磁盘上的呢?

我们读写的数据都是从 pageCache 中读写的,并不会直接读写磁盘。

操作系统中提供了两种方式来实现 pageCache 和 磁盘的数据一致性。

  • Write Through(写穿):向用户层提供特定接口,应用程序可主动调用接口来保证文件一致性;
  • Write back(写回):系统中存在定期任务(表现形式为内核线程),周期性地同步文件系统中文件脏数据块,这是默认的 Linux 一致性方案.

所以,我们接下来看 RocketMQ 是如何确保消息落盘的。

消息刷盘

RocketMQ有三种刷盘方式. 两种刷盘的实现。

RocketMQ系列-消息刷盘详解

分别是 GroupCommitService(同步刷盘), FlushRealTimeService(异步刷盘), CommitRealTimeService(异步刷盘+缓冲区). 这三个类有一个共同的父类:

RocketMQ系列-消息刷盘详解

我们详细的去看一下同步刷盘的过程. 因为异步刷盘和同步刷盘的flush过程是一样的.

同步刷盘

在消息启动的时候, 如果配置了 使用同步刷盘方式的话, Broker 会启动 GroupCommitServiceCommitRealTimeService(这个是异步刷盘并且配置内存池的时候使用的刷盘方式, 不管配置什么方式的时候都会创建, 在启用isTransientStorePoolEnable 的时候才会 start() ).

GroupCommitService 在启动之后, 会每 10ms 执行一次刷盘操作。

我们从 CommitLog 添加完消息提交刷盘请求开始,详细分析整个过程.

如下图。

RocketMQ系列-消息刷盘详解

CommitLog 提交一个 刷盘请求, 这时根据配置 是否等待消息写入完成 来执行刷盘操作.

  • 如果不等待消息写入完成, 则唤醒 GroupCommitService, 直接返回写入成功, 这时 线程是否执行是由JVM决定的,所以并不确定线程立刻就执行了。
  • 如果等待消息写入完成,即第二步,则会放入一个 GroupCommitRequest 同步刷盘请求. 实际上是把 这个 Request 放到 GroupCommitServiceLinkedList 中, 为了线程安全和提高效率, 使用了两个 LinkedList, 分别是 writeread. 当执行刷盘操作的时候,会将 writeread 互换. 每次都会读取 read 队列中的 Request 进行刷盘。这一步是在 waitForRunning 方法中执行的,见如图第5-7步。接下来就是刷盘操作了, 调用 mappedFileQueue.flush(0) 操作完成刷盘动作. 然后, 返回 CompleteFuture 如图中第9步。当需要等待消息写入完成的时候, CommitLog 添加消息是在一直等待的。如下图.

RocketMQ系列-消息刷盘详解

那接下来,我就详细的来看下 mappedFileQueue 其实是 MappedFile 的刷盘过程.

MappedFileQueue.flush

/**
    * 刷盘
    *
    * @param flushLeastPages 最后一页
    * @return
    */

public boolean flush(final int flushLeastPages) {
    boolean result = true;
    // flushWhere. 应该刷新数据end的位置。
    MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);
    if (mappedFile != null) {
        long tmpTimeStamp = mappedFile.getStoreTimestamp();
        // 使用 MappedFile 进行刷盘。返回 MappedFile 中 已被刷新到磁盘的数据的偏移量
        int offset = mappedFile.flush(flushLeastPages);
        // 在整个MappedFileQueue中的偏移量
        long where = mappedFile.getFileFromOffset() + offset;
        result = where == this.flushedWhere;
        this.flushedWhere = where;
        if (0 == flushLeastPages) {
            // 刷新之前,最新一条消息的存储时间戳
            this.storeTimestamp = tmpTimeStamp;
        }
    }
    return result;
}

获取到当前需要写入的MappedFile. 然后通过MappedFile 写入”磁盘”。

/**
     * 刷盘。 把写入MappedFile 的数据,写入磁盘。
     *
     * @return The current flushed position
     */

    public int flush(final int flushLeastPages) {
        if (this.isAbleToFlush(flushLeastPages)) {
            if (this.hold()) {
                int value = getReadPosition();
                try {
                    // 只有在异步刷盘+缓存区方式的时候,才初始化 writeBuffter
                    //We only append data to fileChannel or mappedByteBuffer, never both.
                    if (writeBuffer != null || this.fileChannel.position() != 0) {
                        this.fileChannel.force(false);
                    } else {
                        // 不管什么刷盘方式,都会初始化mappedByteBuffer.
                        // 同步刷盘和异步刷盘方式都是使用 mappedByteBuffer 进行刷盘
                        this.mappedByteBuffer.force();
                    }
                } catch (Throwable e) {
                    log.error("Error occurred when force data to disk.", e);
                }

                this.flushedPosition.set(value);
                this.release();
            } else {
                log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get());
                this.flushedPosition.set(getReadPosition());
            }
        }
        return this.getFlushedPosition();
    }

异步刷盘

看完同步刷盘的流程之后,异步刷盘就简单很多了。异步刷盘仍然是使用 mappedFileQueue.flush() 进行刷盘的

实现逻辑如下图(代码有精简):

RocketMQ系列-消息刷盘详解

可以看到, 异步刷盘的线程在执行的时候,会通过 Thread.sleep()进行线程等待 或者 通过 CountDownLatch2 进行等待的。需要注意的是: CountDownLatch2RocketMQ 实现的一个类, 作用同CountDownLatch 一样。

而且在 消息进行刷盘的时候, 只需要将 FlushRealTimeService 线程 wakeup() 就好了。

RocketMQ系列-消息刷盘详解

此时的 flushCommitLogServiceFlushRealTimeService 的实例。

异步刷盘的时机:

  • 判断是否超过10s没有刷盘了,如果超过则强制刷盘。
  • 等待flush间隔,默认500ms
  • 通过MappedFile 刷盘
  • 超过500ms的刷盘日志
  • Broker正常挺值钱,把内存page中的数据刷盘。

和同步刷盘相比,异步刷盘提高了IO性能。

异步刷盘+缓冲区

我们先看一下执行的流程。

RocketMQ系列-消息刷盘详解

然后如果是CountDownLatch2结束,执行线程的话,就会执行下面的逻辑:

RocketMQ系列-消息刷盘详解

判断是否有新的数据提交, 如果有,则唤醒异步刷盘方式。没有则等待。

waitForRunning方法的逻辑是:

RocketMQ系列-消息刷盘详解

可以看到 是通过线程等待的方式,等待时间间隔完成。

开启缓冲区后的刷盘策略:

  • 判断是否超过200毫秒没提交,需要强制提交
  • 提交到MappedFile,此时还未刷盘
  • 然后唤醒刷盘线程
  • Broker正常停止前,提交内存page中的数据

RocketMQ 申请一块和CommitLog文件大小相同的堆外内存在做缓冲区,数据会先写入缓冲池,提交线程 commitRealTimeService 每隔 500ms 尝试提交到 FileChannel 中进行刷盘, 最后使用 FlushRealTimeService 来完成。使用了缓冲区的目的是 多条数据合并写入,提高IO性能.

总结

RocketMQ 使用 FileChannelMappedByteBuffer 完成了消息的落盘。提供了三种持久化策略:

  • 同步刷盘: 使用 GroupCommitService 线程, 通过 MappedByteBufferforce 方法将消息写入文件中, 写入完成后才会返回 ACK.
  • 异步刷盘: 消息写入 pageCache 之后,就立即返回 ACK. 使用 FlushRealTimeService 线程 通过 FileChannelforce 方法确保将消息写入文件.
  • 异步刷盘+开启缓冲区: RocketMQ 申请一块直接内存用作数据缓冲区,消息先写入缓冲区,然后由 CommitRealTimeService 线程定时将缓冲区数据写入 FileChannel ,再唤醒 FlushRealTimeServiceFileChannel 缓冲区数据强制刷新到磁盘。

回答一下文章开头的问题:RocketMQ 是如何提高 数据读写的性能的呢?RocketMQ 的大致做法是,将数据文件映射到OS的虚拟内存中(通过JDK NIOMappedByteBuffer),写消息的时候首先写入 PageCache ,并通过异步刷盘的方式将消息批量的做持久化(同时也支持同步刷盘);订阅消费消息时(对CommitLog操作是随机读取),由于PageCache的局部性热点原理且整体情况下还是从旧到新的有序读,因此大部分情况下消息还是可以直接从Page Cachecache hit)中读取,不会产生太多的缺页(Page Fault)中断而从磁盘读取。当然, PageCache 机制也不是完全无缺点的,当遇到OS进行脏页回写,内存回收,内存swap等情况时,就会引起较大的消息读写延迟.对于这些情况, RocketMQ 采用了多种优化技术,比如内存预分配,文件预热, mlock 系统调用等,来保证在最大可能地发挥 PageCache 机制优点的同时,尽可能地减少其缺点带来的消息读写延迟.

最后

期望和你一起遇见更好的自己

RocketMQ系列-消息刷盘详解


原文始发于微信公众号(方家小白):RocketMQ系列-消息刷盘详解

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/37629.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!