基于go实现lsm tree之level sorted merge流程

0 前言

今天我们终于迎来专题——基于 go 语言从零到一实现 lsm tree 的完结篇——level sorted merge 流程篇. 在此先回顾一下本系列的全部内容:

关于以上内容的源码均已上传到我的个人开源项目中,大家根据需要自取,如果觉得我的代码写得还凑合,可以留个 star,不胜感激.

开源项目 golsm 传送门 >> https://github.com/xiaoxuxiansheng/golsm

基于go实现lsm tree之level sorted merge流程

 

1 流程介绍

基于go实现lsm tree之level sorted merge流程

1.1 lsm compact 机制

首先,关于 lsm tree 的 level sorted merge 流程(简称 compact 流程),我们在此再针对这部分理论进行铺垫介绍:

  • • lsm tree 采取磁盘分层架构,将存储数据的 sst 文件分为 level0-levelk 共计 k+1 个层级

  • • level(i+1) 中 sst 文件的容量大小约为 level(i) 的 T 倍,T 为常量,通常取值为 10 左右

  • • 数据流向由浅入深,层层递进,即由 level(i) -> level(i+1)

  • • 内存有序表 memtable 的数据溢写生成 level0 的 sst 文件,因此 level0 层文件局部有序,文件间可能重复且无序

  • • 当某个 level 层数据容量达到阈值时,会发起 level(i) -> level(i+1) 的 compact 操作

  • • 通过数据流向可以看出,实时性较好的热数据位于浅层,实时性较差的冷数据位于深层. 倘若出现重复 kv 对,则最浅层一笔数据为有效数据,其余底层数据均为脏数据

  • • 数据从 level(i) 流向 level(i+1) 过程中,通过 compact 操作,数据去重和排序,保证汇入 level(i+1) 层后数据无重复且全局有序

  • • compact 操作可能产生连锁反应,即 level(i) -> level(i+1) 的 compact 流程完成后,可能导致 level(i+1) 数据容量达到阈值,进一步触发 level(i+1) -> level(i+2) 的 compact 流程

 

1.2 lsm compact 示例

下面通过一个示例,对 compact 流程进一步加以说明.

基于go实现lsm tree之level sorted merge流程

背景:某时刻,lsm tree 中 level1 层的数据容量达到阈值,接下来要发起 level(1) -> level(2) 的归并操作

  • • 从 level1 中选取一个本轮 comact 的数据范围 [0-30] (此处选取范围的规则比较灵活)

  • • 查找到 level2 层与 [0-30] 有重叠的 sst 文件——[0-10] 和 [11-32]

  • • 按照 [0-10]、[11-32]、[0-30] 的顺序有序归并其中的 kv 对数据(如出现重复 key,level1 文件更晚写入,因此可以覆盖 level2 的脏数据

  • • 将归并后的 kv 对数据写入 level2,根据 level2 文件容量阈值,可能被拆成多个文件,如 [0-16] 和 [17-32]

  • • 删除完成 compact 操作的旧文件 [0-10]、[11-32]、[0-30]

倘若因为这一轮 compact 操作,导致 level2 的数据容量又超出阈值,则进一步执行 level2 -> level3 的 compact 流程,以此类推.

 

2 核心模块

下面我们梳理具体的实现细节. 和 compact 流程有关的核心类包括 lsm node 和 lsm tree.

2.1 Node

基于go实现lsm tree之level sorted merge流程

 

Node 是 lsm tree 拓扑结构中对应于一个 sst 文件的节点. Node 中一方面记录了其在 lsm tree 中的拓扑信息,如层级以及序号,另一方面通过持有 sstReader、文件索引 index、块过滤器 filter 等成员属性,能够很好地支持 sst 文件的数据检索操作.

基于go实现lsm tree之level sorted merge流程

 

// lsm tree 中的一个节点. 对应一个 sstables
type Node struct {
    conf          *Config           // 配置文件
    file          string            // sstable 对应的文件名,不含目录路径
    level         int               // sstable 所在 level 层级
    seq           int32             // sstable 的 seq 序列号. 对应为文件名中的 level_seq.sst 中的 seq
    size          uint64            // sstable 的大小,单位 byte
    blockToFilter map[uint64][]byte // 各 block 对应的 filter bitmap
    index         []*Index          // 各 block 对应的索引
    startKey      []byte            // sstable 中最小的 key
    endKey        []byte            // sstable 中最大的 key
    sstReader     *SSTReader        // 读取 sst 文件的 reader 入口
}

 

2.2 Tree

基于go实现lsm tree之level sorted merge流程

 

Tree 对应的就是一棵 lsm tree 实例,其中维护了内存读写有序表 memtable 以及一系列内存只读有序表 rOnlyMemtable,并通过一个二维数组 nodes 实现了磁盘下多层级 sst 文件的拓扑结构.

基于go实现lsm tree之level sorted merge流程

 

// 一棵 lsm tree
type Tree struct {
    conf *Config


    // 读写数据时使用的锁
    dataLock sync.RWMutex


    // 每层 node 节点使用的读写锁
    levelLocks []sync.RWMutex


    // 读写 memtable
    memTable memtable.MemTable


    // 只读 memtable
    rOnlyMemTable []*memTableCompactItem


    // 预写日志写入口
    walWriter *wal.WALWriter


    // lsm树状数据结构
    nodes [][]*Node


    // memtable 达到阈值时,通过该 chan 传递信号,进行溢写工作
    memCompactC chan *memTableCompactItem


    // 某层 sst 文件大小达到阈值时,通过该 chan 传递信号,进行溢写工作
    levelCompactC chan int


    // lsm tree 停止时通过该 chan 传递信号
    stopc chan struct{}


    // memtable index,需要与 wal 文件一一对应
    memTableIndex int


    // 各层 sstable 文件 seq. sstable 文件命名为 level_seq.sst
    levelToSeq []atomic.Int32
}

 

2.3 compact 协程

在一棵 lsm tree 实例启动时,会异步启动一个 compact goroutine,持续负责为只读 memtable 完成溢写落盘操作,以及为各 level 层完成 compact 操作.

// 构建出一棵 lsm tree
func NewTree(conf *Config) (*Tree, error) {
    // 1 构造 lsm tree 实例
    t := Tree{
        // ...
        // compact 协程接收待溢写落盘到 level0 层的只读 memtable
        memCompactC:   make(chan *memTableCompactItem),
        // compact 协程接收某层需要执行 compact 操作的信号
        levelCompactC: make(chan int),
        // compact 协程退出信号
        stopc:         make(chan struct{}),
        // 各 level 层级 sst 文件的最大序号,单调递增
        levelToSeq:    make([]atomic.Int32, conf.MaxLevel),
        nodes:         make([][]*Node, conf.MaxLevel),
        // lsm tree 的 level 层级锁,保证在某个 level 层内修改操作 node 时并发安全
        levelLocks:    make([]sync.RWMutex, conf.MaxLevel),
    }


    // 2 读取 sst 文件,还原出整棵树
    // ...


    // 3 运行 lsm tree 压缩调整协程
    go t.compact()


    // 4 读取 wal 还原出 memtable
    // ...


    // 5 返回 lsm tree 实例
    return &t, nil
}

 

基于go实现lsm tree之level sorted merge流程

 

compact 协程通过 for 循环 + select 多路复用的模式运行,持续监听 memCompactC 和 levelCompactC,接收到信号后执行对应的 flushMemtable 和 compactLevel 流程.

// 运行 compact 协程.
func (t *Tree) compact() {
    for {
        select {
        // 接收到 lsm tree 终止信号,退出协程.
        case <-t.stopc:
            // log
            return
            // 接收到 read-only memtable,需要将其溢写到磁盘成为 level0 层 sstable 文件.
        case memCompactItem := <-t.memCompactC:
            t.compactMemTable(memCompactItem)
            // 接收到 level 层 compact 指令,需要执行 level~level+1 之间的 level sorted merge 流程.
        case level := <-t.levelCompactC:
            t.compactLevel(level)
        }
    }
}

 

3 memtable 落盘

3.1 主流程

基于go实现lsm tree之level sorted merge流程

compact 协程接收到只读 memtable 后,会进入 compactMemTable 方法:

  • • 将 memtable 通过 sstWriter 溢写落盘到 level0 层,生成 sst 文件

  • • 从新生成的 sstable 包装成 lsm node,插入到 lsm tree nodes 数组的 level0 层

  • • 从 lsm tree 的 rOnlyMemTable 数组中移除已经完成落盘的只读 memtable

  • • 移除已完成落盘的 memtable 对应的 wal 文件,因为这部分数据已经安全了

// 将只读 memtable 溢写落盘成为 level0 层 sstable 文件
func (t *Tree) compactMemTable(memCompactItem *memTableCompactItem) {
    // 处理 memtable 溢写工作:
    // 1 memtable 溢写到 0 层 sstable 中
    t.flushMemTable(memCompactItem.memTable)


    // 2 从 rOnly slice 中回收对应的 table
    t.dataLock.Lock()
    for i := 0; i < len(t.rOnlyMemTable); i++ {
        if t.rOnlyMemTable[i].memTable != memCompactItem.memTable {
            continue
        }
        t.rOnlyMemTable = t.rOnlyMemTable[i+1:]
    }
    t.dataLock.Unlock()


    // 3 删除相应的预写日志. 因为 memtable 落盘后数据已经安全,不存在丢失风险
    _ = os.Remove(memCompactItem.walFile)
}

 

3.2 溢写落盘

将 memtable 溢写落盘到 level0 层的详细方法为 flushMemTable.

需要注意的是,在完成 level0 层 sst 文件溢写后,需要判断 level0 层文件容量是否达到阈值,如果达到的话,需要驱动一轮 level0 -> level1 的 compact 流程.

// 将 memtable 的数据溢写落盘到 level0 层成为一个新的 sst 文件
func (t *Tree) flushMemTable(memTable memtable.MemTable) {
    // memtable 写到 level 0 层 sstable 中
    seq := t.levelToSeq[0].Load() + 1


    // 创建 sst writer
    sstWriter, _ := NewSSTWriter(t.sstFile(0, seq), t.conf)
    defer sstWriter.Close()


    // 遍历 memtable 写入数据到 sst writer
    for _, kv := range memTable.All() {
        sstWriter.Append(kv.Key, kv.Value)
    }


    // sstable 落盘
    size, blockToFilter, index := sstWriter.Finish()


    // 构造节点添加到 tree 的 node 中
    t.insertNode(0, seq, size, blockToFilter, index)
    // 尝试引发一轮 compact 操作
    t.tryTriggerCompact(0)
}

 

3.3 插入 lsm tree

基于go实现lsm tree之level sorted merge流程

通过 insertNode 方法,实现 sstable 对应 lsm node 实例的创建,并将其插入到 lsm tree nodes 数组的指定 level 层级中:

  • • 倘若插入到 level0 层,直接在当前层末尾追加即可(level0 层文件不保证全局去重和有序)

  • • 倘若插入到 level1 ~ levelk 层,则需要根据 sstable 数据 key 范围,按照大小顺序寻找到合适的位置插入(level1 ~ levelk 层文件保证全局去重和有序)

func (t *Tree) insertNode(level int, seq int32, size uint64, blockToFilter map[uint64][]byte, index []*Index) {
    file := t.sstFile(level, seq)
    sstReader, _ := NewSSTReader(file, t.conf)


    t.insertNodeWithReader(sstReader, level, seq, size, blockToFilter, index)
}

 

// 插入一个 node 到指定 level 层
func (t *Tree) insertNodeWithReader(sstReader *SSTReader, level int, seq int32, size uint64, blockToFilter map[uint64][]byte, index []*Index) {
    file := t.sstFile(level, seq)
    // 记录当前 level 层对应的 seq 号(单调递增)
    t.levelToSeq[level].Store(seq)


    // 创建一个 lsm node
    newNode := NewNode(t.conf, file, sstReader, level, seq, size, blockToFilter, index)
    // 对于 level0 而言,只需要 append 插入 node 即可
    if level == 0 {
        t.levelLocks[0].Lock()
        t.nodes[level] = append(t.nodes[level], newNode)
        t.levelLocks[0].Unlock()
        return
    }


    // 对于 level1~levelk 层,需要根据 node 中 key 的大小,遵循顺序插入
    for i := 0; i < len(t.nodes[level])-1; i++ {
        // 遵循从小到大的遍历顺序,找到首个最小 key 比 newNode 最大 key 还大的 node,将 newNode 插入在其之前
        if bytes.Compare(newNode.End(), t.nodes[level][i+1].Start()) < 0 {
            t.levelLocks[level].Lock()
            t.nodes[level] = append(t.nodes[level][:i+1], t.nodes[level][i:]...)
            t.nodes[level][i+1] = newNode
            t.levelLocks[level].Unlock()
            return
        }
    }


    // 遍历完 level 层所有节点都还没插入 newNode,说明 newNode 是该层 key 值最大的节点,则 append 到最后即可
    t.levelLocks[level].Lock()
    t.nodes[level] = append(t.nodes[level], newNode)
    t.levelLocks[level].Unlock()
}

 

3.4 驱动 compact

基于go实现lsm tree之level sorted merge流程

每当在 level 层中创建 sst 文件后,都需要校验当前 level 层是否有必要开启 compact 流程:

  • • 如果当前层已经是最深的一层,则不会执行 compact 流程

  • • 如果当前层 sst 文件总大小达到阈值,则往 levelCompactC 传递信号,驱动 compact 协程执行 compact 流程

func (t *Tree) tryTriggerCompact(level int) {
    // 最后一层不执行 compact 操作
    if level == len(t.nodes)-1 {
        return
    }


    var size uint64
    for _, node := range t.nodes[level] {
        size += node.size
    }


    if size <= t.conf.SSTSize*uint64(math.Pow10(level))*uint64(t.conf.SSTNumPerLevel) {
        return
    }


    go func() {
        t.levelCompactC <- level
    }()
}

 

4 level compact 流程

4.1 主流程

基于go实现lsm tree之level sorted merge流程

 

当 compact 协程接收到 compact 信号后,通过 compactLevel 方法驱动一轮 compact 流程:

  • • 首先获取到本轮 compact 所影响到的所有 lsm node(包括 level 层和 level + 1 层)

  • • 针对这些 lsm node 的 kv 对进行有序归并操作,出现重复 key 时,需要保留实时性更强的数据

  • • 通过 sstWriter,将 kv 对数据写入到 level + 1 层的 sst 文件中. 遵循 level + 1 层的文件容量阈值,必要的时候进行 sst 文件拆分

  • • 把新生成的 sstable 都封装成 lsm node,插入到 lsm tree 的 nodes 数组中

  • • 把完成 compact 操作的旧 sstable 对应 lsm node 从 lsm tree 的 nodes 数组中移除,并把这部分 sst 文件删除

  • • 检查 level + 1 层文件容量是否达到阈值,如有必要,驱动下一轮 compact 流程

// 针对 level 层进行排序归并操作
func (t *Tree) compactLevel(level int) {
    // 获取到 level 和 level + 1 层内需要进行本次归并的节点
    pickedNodes := t.pickCompactNodes(level)


    // 插入到 level + 1 层对应的目标 sstWriter
    seq := t.levelToSeq[level+1].Load() + 1
    sstWriter, _ := NewSSTWriter(t.sstFile(level+1, seq), t.conf)
    defer sstWriter.Close()


    // 获取 level + 1 层每个 sst 文件的大小阈值
    sstLimit := t.conf.SSTSize * uint64(math.Pow10(level+1))
    // 获取本次排序归并的节点涉及到的所有 kv 数据
    pickedKVs := t.pickedNodesToKVs(pickedNodes)
    // 遍历每笔需要归并的 kv 数据
    for i := 0; i < len(pickedKVs); i++ {
        // 倘若新生成的 level + 1 层 sst 文件大小已经超限
        if sstWriter.Size() > sstLimit {
            // 将 sst 文件溢写落盘
            size, blockToFilter, index := sstWriter.Finish()
            // 将 sst 文件对应 node 插入到 lsm tree 内存结构中
            t.insertNode(level+1, seq, size, blockToFilter, index)
            // 构造一个新的 level + 1 层 sstWriter
            seq = t.levelToSeq[level+1].Load() + 1
            sstWriter, _ = NewSSTWriter(t.sstFile(level+1, seq), t.conf)
            defer sstWriter.Close()
        }


        // 将 kv 数据追加到 sstWriter
        sstWriter.Append(pickedKVs[i].Key, pickedKVs[i].Value)
        // 倘若这是最后一笔 kv 数据,需要负责把 sstWriter 溢写落盘并把对应 node 插入到 lsm tree 内存结构中
        if i == len(pickedKVs)-1 {
            size, blockToFilter, index := sstWriter.Finish()
            t.insertNode(level+1, seq, size, blockToFilter, index)
        }
    }


    // 移除这部分被合并的节点
    t.removeNodes(level, pickedNodes)


    // 尝试触发下一层的 compact 操作
    t.tryTriggerCompact(level + 1)
}

 

4.2 有序归并

基于go实现lsm tree之level sorted merge流程

 

通过 pickCompactNodes 方法,能够获取到本轮 compact 所涉及到的所有 lsm node:

  • • 以 level 层前一半 node 的最小 key 作为 startKey,最大 key 作为 endKey

  • • 遍历 level + 1 和 level 层所有 node,所有和 [startKey, endKey] 有重叠部分的 node 都追加到 list 进行归并

  • • node 进入 list 遵循 level 从大到小、index 从小到大的顺序,保证 list 中越靠后的 node 数据实时性越高

// 获取本轮 compact 流程涉及到的所有节点,范围涵盖 level 和 level+1 层
func (t *Tree) pickCompactNodes(level int) []*Node {
    // 每次合并范围为当前层前一半节点
    startKey := t.nodes[level][0].Start()
    endKey := t.nodes[level][0].End()


    mid := len(t.nodes[level]) >> 1
    if bytes.Compare(t.nodes[level][mid].Start(), startKey) < 0 {
        startKey = t.nodes[level][mid].Start()
    }


    if bytes.Compare(t.nodes[level][mid].End(), endKey) > 0 {
        endKey = t.nodes[level][mid].End()
    }


    var pickedNodes []*Node
    // 将 level 层和 level + 1 层 和 [start,end] 范围有重叠的节点进行合并
    for i := level + 1; i >= level; i-- {
        for j := 0; j < len(t.nodes[i]); j++ {
            if bytes.Compare(endKey, t.nodes[i][j].Start()) < 0 || bytes.Compare(startKey, t.nodes[i][j].End()) > 0 {
                continue
            }


            // 所有范围有重叠的节点都追加到 list
            pickedNodes = append(pickedNodes, t.nodes[i][j])
        }
    }


    return pickedNodes
}

 

通过 pickedNodesToKVs 方法,进行一系列 node 中 kv 对数据的有序归并:

  • • 借助 memtable,实现 kv 数据的有序排列

  • • 出现重复 key 时,用 node list 中靠后的 kv 对覆盖靠前的 kv 对,保证保留下来的是实时性更高的数据

// 获取本轮 compact 流程涉及到的所有 kv 对. 这个过程中可能存在重复 k,保证只保留最新的 v
func (t *Tree) pickedNodesToKVs(pickedNodes []*Node) []*KV {
    // index 越小,数据越老. index 越大,数据越新
    // 所以使用大 index 的数据覆盖小 index 数据,以新覆久
    memtable := t.conf.MemTableConstructor()
    for _, node := range pickedNodes {
        kvs, _ := node.GetAll()
        for _, kv := range kvs {
            memtable.Put(kv.Key, kv.Value)
        }
    }


    // 借助 memtable 实现有序排列
    _kvs := memtable.All()
    kvs := make([]*KV, 0, len(_kvs))
    for _, kv := range _kvs {
        kvs = append(kvs, &KV{
            Key:   kv.Key,
            Value: kv.Value,
        })
    }


    return kvs
}

 

4.3 移除旧节点

基于go实现lsm tree之level sorted merge流程

最后是移除一系列 lsm node 的 removeNodes 方法:

  • • 从 lsm tree nodes 数组中移除对应的 node

  • • 调用 node.Destroy 方法,删除其对应的 sst 文件. (这个流程可以异步完成,以提高 compact 协程的性能)

// 移除所有完成 compact 流程的老节点
func (t *Tree) removeNodes(level int, nodes []*Node) {
    // 从 lsm tree 的 nodes 中移除老节点
outer:
    for k := 0; k < len(nodes); k++ {
        node := nodes[k]
        for i := level + 1; i >= level; i-- {
            for j := 0; j < len(t.nodes[i]); j++ {
                if node != t.nodes[i][j] {
                    continue
                }


                t.levelLocks[i].Lock()
                t.nodes[i] = append(t.nodes[i][:j], t.nodes[i][j+1:]...)
                t.levelLocks[i].Unlock()
                continue outer
            }
        }
    }


    go func() {
        // 销毁老节点,包括关闭 sst reader,并且删除节点对应 sst 磁盘文件
        for _, node := range nodes {
            node.Destroy()
        }
    }()
}

 

func (n *Node) Destroy() {
    n.sstReader.Close()
    _ = os.Remove(path.Join(n.conf.Dir, n.file))
}

 

5 小结

祝贺!至此,我们已完成【基于go实现lsm tree】 系列的全部内容:

  • • 基于go实现lsm tree 之主干框架(已完成)

  • • 基于go实现lsm tree之memtable结构(已完成)

  • • 基于go实现lsm tree之sstable结构(已完成)

  • • 基于go实现lsm tree之level sorted merge流程(已完成)

在此做个展望,未来一段时间内会和大家一起探讨一个新的话题——如何基于 Golang 从零到一实现 B+ Tree


原文始发于微信公众号(小徐先生的编程世界):基于go实现lsm tree之level sorted merge流程

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/203446.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!