• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

用 Rust 从 0 到 1 实现最简化的 KV 存储引擎

武飞扬头像
Knight丶
帮助1

本文将从 下层的数据编码 到 上层的 kv 数据读写接口实现 完整介绍如何实现一个最简化的 kv 存储引擎,适合 Bitcask 存储模型和 Rust 语言的入门。本文的完整代码已开源在:GitHub - Morgan279/miniDB: A mini kv database demo that using simplified bitcask storage model with rust implementation.

功能和架构设计

学新通

如上图所示,本文的存储引擎支持 Get, Set, Remove 这三种最基础的命令。架构上可以分为 KV Store 和 Storage 两个部分,其中 KV Store 负责与客户端交互,接收外部命令和返回操作结果;Storage 负责持久化经过编码之后的数据。图里面的 Entry 是指数据的二进制编码,即每一份数据都会先封装成 Entry 然后编码成二进制之后再存入 Storage(实际的物理存储中,Entry 之间不会有缝隙)。

存储流程概述

本文的存储流程将采用经过简化后的 Bistcask 存储模型。Bitcask 是一种 LSM 类别下的简单的存储模型,其主要思想是通过顺序写非原地更新(通过 append 新数据来更新或删除旧数据)节省磁盘随机寻道的开销,从而提高写吞吐量,但是这个特性也会造成读放大,适合写多读少的场景。

其基本的读写流程是:

  1. 从磁盘中读取数据文件在内存中生成索引(该步骤只在初始化时执行一次)
  2. 接收到读写命令之后通过在内存中的索引获取数据在磁盘中的位置
  3. 根据在第 2 步中拿到的具体位置信息在磁盘中进行数据读写,成功后更新内存中的索引

如下图所示:

学新通

由于数据是非原地更新,所以当接收到更新或删除命令时,只需要在文件末尾追加新的数据而不用管旧的数据,因为每次做数据读写都要先经过内存中的索引且索引指向的都是最新且有效的数据,所以该模型可以保证读写的正确性

通过观察上图的存储模型,在实现层面上有两个问题需要解决:

  1. 存储的 key-value 数据的长度可以是任意的,这就导致每个 Entry 的长度不是固定的(如图中的浅蓝色 Entry 和 深蓝色 Entry)且相邻 Entry 之间没有缝隙(即 Entry 之间无分隔符,视图上看起来都是无差别的二进制数据),而我们通过索引只能知道数据在磁盘中的起始位置,如何在 Entry 长度不确定的情况下正确解码出数据(数据编码与解码问题)。
  2. 资源是有限的,不可能无止境地追加新数据而不管旧数据,需要定期地对无效的旧数据做清理,回收对应的资源(无效数据回收问题)。

下面将分别对这两个问题的解决方案进行介绍。

数据编码与解码

数据正确解码的前提是明确知道 Entry 的在磁盘中的起始位置和终止位置,然后根据设计好的编码模型进行反序列解码即可。Entry 的起始位置我们可以从索引中很容易地获取到,重点在于如何知道 Entry 的终止位置。最简单的实现方案是在索引中额外存储 Entry 序列化之后的长度字段,起始位置加上长度即可得到终止位置,但是相对于磁盘,内存的存储资源是更为宝贵的,所以本文将基于磁盘通过 定长 变长 的编码方案解决这个问题。

虽然我们并不能限制用户输入的 key、value 数据的长度,但是我们可以用固定长度的字节来描述输入的 key 和 value 的长度信息,比如我们可以用固定的 4 字节(即一个无符号整型占的空间)来描述长度在 [0,2^32] 的 key 的长度, 已经可以满足常规的需求,而 value 也是同样的道理。

这里的 4 字节就是“定长”,key 或者 value 序列化之后的二进制数据就是 “变长”。我们把“定长”部分放在 Entry 的头部,后面跟“变长”部分,这样当我们在索引中拿到 Entry 的起始位置时,由于定长部分的长度是一个常量,所以我们可以直接相加得到 Entry 头部的终止位置,这样我们就可以解码出 Entry 头部的信息,即 key 和 value 的长度的信息,最后我们根据这两个长度信息计算得到整个 Entry 的终止位置,从而正确地解码出数据。

值得注意的是:数据解码的主要开销在于 I/O 操作,所以相对于直接在内存中存储 Entry 长度信息的方案,本方案在节约了内存空间的同时,并不会造成性能上的损耗。

本方案具体使用的对 Entry 的编码格式如下图所示:

学新通

这里编码中的 kind 字段是为了标识追加的新数据是 “新增、更新” 还是 “删除”,因为在服务重启时需要读取数据文件在内存中建立索引,假如只有新增和更新操作,因为我们只是从头开始顺序读取数据文件,而最新的数据总是是后面,即对于相同的 key 来说,我们顺序读取时,新数据的位置会不断覆盖索引中旧数据的位置,所以此时没有标识也不会影响正确性;

但是引入删除操作之后,如果被删除的这个 key 是唯一的,即这个 key 不会被新的有效的数据覆盖,如果不做特殊标识的话,那么就会导致用户读取到已经被删除的数据,从而造成正确性问题。所以,这里需要一个字段来标识操作类型,这样就可以在初始化索引时通过该标识跳过带有删除标识的 Entry(同时这个标识也会在无效数据回收过程中用到)。

无效数据回收

无效数据的回收在 Bitcask 模型中被称为 merge 操作或者 compact 操作,该操作将会清除磁盘中无效的数据,释放对应的资源。这里介绍最简单的一种实现方式:顺序扫描并解码磁盘中数据文件中的所有 Entry,判断这个 Entry 是不是有效的,然后把所有有效的 Entry 写入到一个新文件,然后用这个新文件替换掉原来的旧数据文件即可,这样就自然而然地把无效数据清理掉了。而判断 Entry 是否有效的方法也很简单,首先如果这个 Entry 带有删除标识,那么一定就是无效的,其次将这个 Entry 的起始位置与索引中的对应 key 的起始位置对比,如果相等就是有效的(因为索引中存储的总是最新数据的起始位置),否则就是无效的。

上面的问题都解决后,一个最简化的 KV 存储引擎就完成了。

下面将从编码实现层面进行进一步介绍。

Rust 实现

首先我们先对 Entry 进行定义:

  1.  
    pub struct Entry {
  2.  
    key_len: usize,
  3.  
     
  4.  
    value_len: usize,
  5.  
     
  6.  
    key: String,
  7.  
     
  8.  
    value: String,
  9.  
     
  10.  
    kind: CmdKind,
  11.  
    }
  12.  
     
  13.  
     
  14.  
    pub enum CmdKind {
  15.  
    PUT = 1,
  16.  
    DEL = 2,
  17.  
    }
学新通

这里的定义与上文图中描述的一致。

然后是 Entry 的编码与解码,首先我们定义出“定长部分”:

  1.  
    const USIZE_LEN: usize = std::mem::size_of::<usize>();
  2.  
    // key_len 和 val_len 都占 USIZE_LEN 的长度,操作类型标识字段(PUT、DEL)占 1 字节的长度
  3.  
    const ENTRY_HEAD_LEN: usize = USIZE_LEN * 2 1;

“定长”部分由 4 字节或 8 字节 (usize 在 32 位机器上为 4 字节, 64位机器上为 8 字节)的 key 长度字段,value 长度字段,以及 1 字节的标识字段组成,这样我们可以通过计算得到整个 Entry 的长度信息:

  1.  
    pub fn size(&self) -> usize {
  2.  
    ENTRY_HEAD_LEN self.key_len self.value_len
  3.  
    }

在编码时,我们对各个字段逐一进行序列化即可:

  1.  
    pub fn encode(&self) -> Vec<u8> {
  2.  
    let mut buf = vec![0; self.size()];
  3.  
    // encode key len
  4.  
    buf[0..USIZE_LEN].copy_from_slice(&self.key_len.to_be_bytes());
  5.  
     
  6.  
    // encode value length
  7.  
    buf[USIZE_LEN..USIZE_LEN * 2].copy_from_slice(&self.value_len.to_be_bytes());
  8.  
     
  9.  
    // encode kind
  10.  
    buf[USIZE_LEN * 2..ENTRY_HEAD_LEN]
  11.  
    .copy_from_slice(bincode::serialize(&self.kind).unwrap().as_slice());
  12.  
     
  13.  
    // encode key
  14.  
    buf[ENTRY_HEAD_LEN..ENTRY_HEAD_LEN self.key_len].copy_from_slice(self.key.as_bytes());
  15.  
     
  16.  
    // encode value
  17.  
    buf[ENTRY_HEAD_LEN self.key_len..].copy_from_slice(self.value.as_bytes());
  18.  
     
  19.  
    buf
  20.  
    }
学新通

在解码时我们先解码出 Entry 的头部信息:

  1.  
    pub fn decode(b: &[u8; ENTRY_HEAD_LEN]) -> Result<Entry> {
  2.  
    let key_len = usize::from_be_bytes(b[0..USIZE_LEN].try_into()?);
  3.  
    let value_len = usize::from_be_bytes(b[USIZE_LEN..USIZE_LEN * 2].try_into()?);
  4.  
    let kind: CmdKind = bincode::deserialize(&b[USIZE_LEN * 2..ENTRY_HEAD_LEN])?;
  5.  
    Ok(Entry {
  6.  
    key_len,
  7.  
    value_len,
  8.  
    kind,
  9.  
    key: String::new(),
  10.  
    value: String::new(),
  11.  
    })
  12.  
    }

然后解码出整个 Entry 的信息:

  1.  
    let mut e = Entry::decode(&buf)?;
  2.  
     
  3.  
    let mut key_buf = vec![0; e.key_len];
  4.  
    self.reader.read_exact(key_buf.as_mut_slice())?;
  5.  
    e.key = String::from_utf8(key_buf)?;
  6.  
     
  7.  
    let mut val_buf = vec![0; e.value_len];
  8.  
    self.reader.read_exact(val_buf.as_mut_slice())?;
  9.  
    e.value = String::from_utf8(val_buf)?;

然后是无效数据回收的实现,首先筛选出有效的 Entry:

  1.  
    let mut offset = 0;
  2.  
    let mut valid_entry = Vec::new();
  3.  
    loop {
  4.  
    match self.read_at(offset) {
  5.  
    Ok(e) => {
  6.  
    let size = e.size() as u64;
  7.  
    if let Some(valid_pos) = self.index.get(&e.key) {
  8.  
    // 滤出带有删除标记的数据且只保留最新数据
  9.  
    if e.kind == CmdKind::PUT && *valid_pos == offset {
  10.  
    valid_entry.push(e);
  11.  
    }
  12.  
    }
  13.  
    offset = size;
  14.  
    }
  15.  
    Err(KvsError::EOF) => {
  16.  
    // 全部解码完毕
  17.  
    break;
  18.  
    }
  19.  
    Err(e) => {
  20.  
    return Err(e);
  21.  
    }
  22.  
    }
  23.  
    }
学新通

然后将有效的 Entry 写入新数据文件然后直接替换掉旧数据文件:

  1.  
    if !valid_entry.is_empty() {
  2.  
    let mut data_path_ancestors = self.data_path_buf.ancestors();
  3.  
    data_path_ancestors.next();
  4.  
    let merge_path_buf = data_path_ancestors
  5.  
    .next()
  6.  
    .ok_or(KvsError::InvalidDataPath)?
  7.  
    .join(STORAGE_FILE_PREFIX.to_string() ".merge");
  8.  
    let merge_file = File::create(merge_path_buf.as_path())?;
  9.  
    // 在数据文件的目录下新建回收文件
  10.  
    let mut write_buf = BufWriterWithPos::new(merge_file)?;
  11.  
     
  12.  
    // 向回收文件中写入所有有效的数据
  13.  
    for e in &valid_entry {
  14.  
    let key = e.key.clone();
  15.  
    self.index.insert(key, write_buf.pos);
  16.  
    write_buf.write(&e.encode())?;
  17.  
    }
  18.  
     
  19.  
    // 用回收文件替换原来的数据文件
  20.  
    self.writer = write_buf;
  21.  
    self.reader = BufReaderWithPos::new(File::open(merge_path_buf.as_path())?)?;
  22.  
    std::fs::remove_file(self.data_path_buf.as_path())?;
  23.  
    std::fs::rename(merge_path_buf.as_path(), self.data_path_buf.as_path())?;
  24.  
    }
学新通

值得注意的是,本文基于“最简化”的考虑,无效数据回收的触发方式是执行 set 操作之后如果无效的数据大小超过一定的阈值,则同步阻塞地进行回收操作:

  1.  
    fn put(&mut self, key: String, val: String) -> Result<()> {
  2.  
    let e = Entry::new(key, val, CmdKind::PUT);
  3.  
    self.write(e)?;
  4.  
     
  5.  
    // 无效的数据超过一定的阈值时则执行回收操作
  6.  
    if self.pending_compact >= COMPACTION_THRESHOLD {
  7.  
    self.merge()?;
  8.  
    }
  9.  
     
  10.  
    Ok(())
  11.  
    }

这里是一个非常大的可以进行性能优化的地方,比如创建另一个线程非阻塞地定期执行无效数据回收操作,但是需要额外处理多线程操作同一个数据源而带来的“数据竞争”问题。此外还可以将整个单线程模型优化为多线程模型,为了减小临界区的竞争还可以将数据存储为多个小文件,从而减小锁的粒度等等很多性能优化手段,这篇博文旨在于从功能性方面介绍如何设计和实现一种最简单的 KV 存储模型,性能优化方面不做过多讨论。

以上关键逻辑实现之后,可以利用 trait 抽象出 Storage 的共有行为然后组合上面的核心逻辑对其进行实现即可:

  1.  
    pub trait Storage {
  2.  
    fn get(&mut self, key: String) -> Result<Option<String>>;
  3.  
     
  4.  
    fn put(&mut self, key: String, val: String) -> Result<()>;
  5.  
     
  6.  
    fn remove(&mut self, key: String) -> Result<()>;
  7.  
    }
  8.  
     
  9.  
     
  10.  
    impl Storage for SimplifiedBitcask {
  11.  
    fn get(&mut self, key: String) -> Result<Option<String>> {
  12.  
    match self.read(&key) {
  13.  
    Ok(e) => Ok(Some(e.value)),
  14.  
    Err(KvsError::KeyNotFound) => Ok(None),
  15.  
    Err(e) => Err(e),
  16.  
    }
  17.  
    }
  18.  
     
  19.  
    fn put(&mut self, key: String, val: String) -> Result<()> {
  20.  
    let e = Entry::new(key, val, CmdKind::PUT);
  21.  
    self.write(e)?;
  22.  
    if self.pending_compact >= COMPACTION_THRESHOLD {
  23.  
    self.merge()?;
  24.  
    }
  25.  
    Ok(())
  26.  
    }
  27.  
     
  28.  
    fn remove(&mut self, key: String) -> Result<()> {
  29.  
    if self.index.contains_key(&key) {
  30.  
    let e = Entry::new(key.clone(), String::new(), CmdKind::DEL);
  31.  
    self.write(e)?;
  32.  
    self.index.remove(&key);
  33.  
    return Ok(());
  34.  
    }
  35.  
     
  36.  
    Err(KvsError::KeyNotFound)
  37.  
    }
  38.  
    }
学新通

最后再简单地实现 KV Store 作为对外交互的接口:

  1.  
    pub struct KvStore {
  2.  
    storage: Box<dyn Storage>,
  3.  
    }
  4.  
     
  5.  
     
  6.  
    impl KvStore {
  7.  
    pub fn open(path: &Path) -> Result<KvStore> {
  8.  
    let storage = SimplifiedBitcask::open(path.to_path_buf())?;
  9.  
    Ok(KvStore {
  10.  
    storage: Box::new(storage),
  11.  
    })
  12.  
    }
  13.  
     
  14.  
    pub fn get(&mut self, key: String) -> Result<Option<String>> {
  15.  
    self.storage.get(key)
  16.  
    }
  17.  
     
  18.  
    pub fn set(&mut self, key: String, val: String) -> Result<()> {
  19.  
    self.storage.put(key, val)
  20.  
    }
  21.  
     
  22.  
    pub fn remove(&mut self, key: String) -> Result<()> {
  23.  
    self.storage.remove(key)
  24.  
    }
  25.  
    }
学新通

本文在理论层面,介绍的是最简单的一种存储模型;在实现层面,关键在于 I/O 操作方面的编码实现,其次在于一些异常情况的处理。本文的完整代码已开源在 GitHub - Morgan279/miniDB: A mini kv database demo that using simplified bitcask storage model with rust implementation.

欢迎大家提出修改建议或者在此基础上进行进一步完善。

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhgaeejk
系列文章
更多 icon
同类精品
更多 icon
继续加载