LevelDB源码分析之十二:block

一.Block的存储格式

Block的种类很多,包括Data Block、Meta Block等,每个Block由三部分组成,如下图所示:


1.block data
block data是具体的KV对存储区域。虽然Block有好几种,但是block data都是有序的KV对,因此写入、读取block data的接口都是统一的。
2.type
type指明使用的是哪种压缩方式,当前支持none和snappy压缩。
3.crc32
数据校验位
LevelDB对block data的管理是读写分离的,读取后的遍历查询操作由Block类实现,block data的构建则由BlockBuilder类实现。
二.block data的结构
假设每一个KV对是一条记录(Record),则记录的格式如下。
——共享前缀长度                     shared_bytes:       varint32
——前缀之后的字符串长度     unshared_bytes:  varint32
——值的长度                             value_length:        varint32
——前缀之后的字符串             key_delta:             char[unshared_bytes]
——值                                         value:                     char[value_length]
对于重启点而言,shared_bytes = 0,重启点存储完整的key。
block data的结尾段格式是:
——restarts:             uint32[num_restarts]
——num_restarts:  uint32 
尾段存储的是重启点相关信息,包括重启点的位置和个数。元素restarts[i]存储的是block data第i个重启点距离block data首地址的偏移。很明显第一条记录,总是第一个重启点,也就是restarts[0] = 0。num_restarts是重启点的个数。

block data的结构图如下所示:


总体来看block data可分为KV存储区和重启点信息存储区两部分。

三.block data的构建
block data的构建是通过BlockBuilder实现的,BlockBuilder类的头文件如下所示。

class BlockBuilder {
 public:
  explicit BlockBuilder(const Options* options);

  // 重置BlockBuilder
  void Reset();

  // Add的调用应该在Reset之后,在Finish之前。
  // Add只添加KV对(一条记录),重启点信息部分由Finish添加。
  // 每次调用Add时,key应该越来越大。
  void Add(const Slice& key, const Slice& value);

  // 组建block data完成,返回block data
  Slice Finish();

  // 估算当前block data的大小
  size_t CurrentSizeEstimate() const;

  // 是否已经开始组建block data
  bool empty() const {
    return buffer_.empty();
  }

 private:
  const Options*        options_;     
  std::string           buffer_;      // 用于存放block data
  std::vector<uint32_t> restarts_;    // 用于存放重启点的位置信息
  int                   counter_;     // 从上个重启点遍历到下个重启点时的计数
  bool                  finished_;    // 是否调用了Finish
  std::string           last_key_;    // 记录最后Add的key

  // No copying allowed
  BlockBuilder(const BlockBuilder&);
  void operator=(const BlockBuilder&);
};
下面重点介绍BlockBuilder类中的方法。
1.构造函数
BlockBuilder::BlockBuilder(const Options* options)
    : options_(options),
      restarts_(),
      counter_(0),
      finished_(false) {
  assert(options->block_restart_interval >= 1);
  restarts_.push_back(0);   
}
options->block_restart_interval表示当前重启点(其实也是一条记录)和上个重启点之间间隔了多少条记录。
restarts_.push_back(0),表示第一个重启点距离block data起始位置的偏移为0,也就是说第一条记录就是重启点。
2.Add函数
void BlockBuilder::Add(const Slice& key, const Slice& value) {
  Slice last_key_piece(last_key_);
  assert(!finished_);
  assert(counter_ <= options_->block_restart_interval);
  assert(buffer_.empty() // No values yet?
         || options_->comparator->Compare(key, last_key_piece) > 0);
  size_t shared = 0;
  if (counter_ < options_->block_restart_interval) {
    // See how much sharing to do with previous string
    const size_t min_length = std::min(last_key_piece.size(), key.size());
    while ((shared < min_length) && (last_key_piece[shared] == key[shared])) {
      shared++;
    }
  } else {
    // 如果counter_=options_->block_restart_interval,说明这条记录就是重启点。
	// 将这条记录距离block data首地址的偏移添加到restarts_中,并使counter_ = 0,
    restarts_.push_back(buffer_.size());
    counter_ = 0;
  }
  const size_t non_shared = key.size() - shared;

  // 开始组建一条记录
  PutVarint32(&buffer_, shared);
  PutVarint32(&buffer_, non_shared);
  PutVarint32(&buffer_, value.size());

  // Add string delta to buffer_ followed by value
  buffer_.append(key.data() + shared, non_shared);
  buffer_.append(value.data(), value.size());

  // 此时last_key_还等于上一个key。resize用于取出last_key_与当前key中相同的前缀
  last_key_.resize(shared);
  // last_key_添加当前key中与上一个key的不同部分,此时last_key_与当前key是相等的。
  last_key_.append(key.data() + shared, non_shared);
  // 上面两句其实等效于last_key_=key.ToString(),但是像上面那样写可以使内存copy最小化
  assert(Slice(last_key_) == key);
  counter_++;
}
在求相同前缀的长度时,为何要调用std::min来计算上一个key(last_key_piece)和当前key的长度呢?因为当前key虽然比上一个key大(通过Compare得出),但是不一定就比上一个key长。比如mouse和morning,由于u大于r,mouse是大于moring的。关于comparator可以参考: LevelDB源码分析之二:comparator
需要注意的是,为了节约存储空间,每条记录的前三个字段是被压缩存储的(通过PutVarint32实现),关于压缩,详见: LevelDB源码分析之一:coding
3.Finish函数
Slice BlockBuilder::Finish() {
  // 添加重启点信息部分
  for (size_t i = 0; i < restarts_.size(); i++) {
    PutFixed32(&buffer_, restarts_[i]);
  }
  PutFixed32(&buffer_, restarts_.size());
  finished_ = true;
  return Slice(buffer_);
}
Finish只是在记录存储区后边添加了重启点信息,重启点信息没有进行压缩,关于PutFixed32也可以参考: LevelDB源码分析之一:coding
假设添加的5个KV对分别是("the bus","1"),("the car","11"),("the color","111"),("the mouse","1111"),("the tree","11111"),那么当options_->block_restart_interval=3时,block data的示意图如下所示。


四.block data的解析

block data的解析是通过Block类实现的。

inline uint32_t Block::NumRestarts() const {
  // size_为何要大于等于2*sizeof(uint32_t),因为如果只调用BlockBuilder中
  // 的Finish函数,那么block data至少包含一个uint32_t类型的重启点位置信息和
  // 一个uint32_t类型的重启点个数信息
  assert(size_ >= 2*sizeof(uint32_t));
  // block data的最后一个uint32_t类型字段表示重启点个数。
  return DecodeFixed32(data_ + size_ - sizeof(uint32_t));
}

Block::Block(const char* data, size_t size)
    : data_(data),
      size_(size) {
  if (size_ < sizeof(uint32_t)) {
    size_ = 0; // 出错时,使size_=0
  } else {
	// 总大小减去重启点信息的大小,重启点信息包括重启点位置数组和重启点个数,
	// 他们都是uint32_t类型的
    restart_offset_ = size_ - (1 + NumRestarts()) * sizeof(uint32_t);
	// 这里的条件判断是防止NumRestarts()返回值为负数
    if (restart_offset_ > size_ - sizeof(uint32_t)) {
      size_ = 0;
    }
  }
}

Block::~Block() {
  delete[] data_;
}

// Helper routine: decode the next block entry starting at "p",
// storing the number of shared key bytes, non_shared key bytes,
// and the length of the value in "*shared", "*non_shared", and
// "*value_length", respectively.  Will not derefence past "limit".
//

// 解析从block data的p位置开始的数据,将解析得到的shared、non_shared和value length
// 分别放到"*shared"、"*non_shared"和"*value_length"
// 从源码看出,p应该是一条记录的起始位置
// 如果解析错误,返回NULL。否则,返回指向一条记录的key_delta字段的指针
static inline const char* DecodeEntry(const char* p, const char* limit,
                                      uint32_t* shared,
                                      uint32_t* non_shared,
                                      uint32_t* value_length) {
  if (limit - p < 3) return NULL;
  *shared = reinterpret_cast<const unsigned char*>(p)[0];
  *non_shared = reinterpret_cast<const unsigned char*>(p)[1];
  *value_length = reinterpret_cast<const unsigned char*>(p)[2];
  // 如果最高位都是0, 那么按照压缩规则,每个值只占一个字节,且小于128
  // 这里相当于做了一个优化,如果三个值之和都小于128,那肯定是每个值只占一个字节 
  if ((*shared | *non_shared | *value_length) < 128) {
    p += 3;
  } else {
    if ((p = GetVarint32Ptr(p, limit, shared)) == NULL) return NULL;
    if ((p = GetVarint32Ptr(p, limit, non_shared)) == NULL) return NULL;
    if ((p = GetVarint32Ptr(p, limit, value_length)) == NULL) return NULL;
  }
  // 如果limit中剩下的空间小于*non_shared、*value_length之和,说明limit中
  // 已经容纳不下记录中的key_delta和value字段了。
  if (static_cast<uint32_t>(limit - p) < (*non_shared + *value_length)) {
    return NULL;
  }
  return p;
}

class Block::Iter : public Iterator {
 private:
  const Comparator* const comparator_;
  const char* const data_;      // block data
  uint32_t const restarts_;     // 重启点信息在block data中的偏移  
  uint32_t const num_restarts_; // 重启点个数

  // current_是当前记录在bock data中的偏移,如果current_>=restarts_,说明出错啦。
  uint32_t current_;
  uint32_t restart_index_; // 重启点的索引
  std::string key_;
  Slice value_;
  Status status_;

  inline int Compare(const Slice& a, const Slice& b) const {
    return comparator_->Compare(a, b);
  }

  // 因为value_是一条记录的最后一个字段,所以这里返回的是下一条记录的偏移量,也就是current_
  // 但是如果在该函数之前调用了SeekToRestartPoint,此时的value_.data()=data_,value.size=0
  // 这样的话即使是block data的第一条记录,也可以用使用该函数,此时返回的偏移量为0
  inline uint32_t NextEntryOffset() const {
    return (value_.data() + value_.size()) - data_;
  }
  // 获取第index个重启点的偏移
  uint32_t GetRestartPoint(uint32_t index) {
    assert(index < num_restarts_);
    return DecodeFixed32(data_ + restarts_ + index * sizeof(uint32_t));
  }
  // 该函数只是设置了几个有限的状态,其它值将在函数ParseNextKey()中设置。
  // 需要注意的是,这里的value_并不是记录中的value字段,而只是一个指向记录起始位置的0长度指针,
  // 这样后面的ParseNextKey函数将会解析出重启点的value字段,并赋值到value_中。
  void SeekToRestartPoint(uint32_t index) {
    key_.clear();
    restart_index_ = index;
    // current_ will be fixed by ParseNextKey();
	// current_的值会在ParseNextKey()方法中不断被修改
    // ParseNextKey() starts at the end of value_, so set value_ accordingly
    uint32_t offset = GetRestartPoint(index);
    value_ = Slice(data_ + offset, 0);
  }

 public:
  Iter(const Comparator* comparator,
       const char* data,
       uint32_t restarts,
       uint32_t num_restarts)
      : comparator_(comparator),
        data_(data),
        restarts_(restarts),
        num_restarts_(num_restarts),
        current_(restarts_),
        restart_index_(num_restarts_) {
    assert(num_restarts_ > 0);
  }

  virtual bool Valid() const { return current_ < restarts_; }
  virtual Status status() const { return status_; }
  virtual Slice key() const {
    assert(Valid());
    return key_;
  }
  virtual Slice value() const {
    assert(Valid());
    return value_;
  }
  // 向后解析比较简单,就是解析当前记录的下一个记录
  virtual void Next() {
    assert(Valid());
    ParseNextKey();
  }
  // 向前解析复杂一些,步骤如下
  // 1.先向前查找当前记录之前的重启点
  // 2.当循环到了第一个重启点,其偏移量(0)依然与当前记录的偏移量相等
  //   说明当前记录就是第一条记录,此时初始化current_和restart_index_,并返回
  // 3.调用SeekToRestartPoint定位到符合要求的启动点
  // 4.向后循环解析,直到解析了原记录之前的一条记录,结束
  virtual void Prev() {
    assert(Valid());

    const uint32_t original = current_;
    while (GetRestartPoint(restart_index_) >= original) {
      if (restart_index_ == 0) {
        // No more entries
        current_ = restarts_;
        restart_index_ = num_restarts_;
        return;
      }
      restart_index_--;
    }
	
    SeekToRestartPoint(restart_index_);
    do {
      // Loop until end of current entry hits the start of original entry
    } while (ParseNextKey() && NextEntryOffset() < original);
  }
  // 从左到右(从前到后)查找第一条key大于target的记录 
  // 1.二分查找,找到key < target的最后一个重启点
  // 2.定位到该重启点,其索引由left指定,这是前面二分查找到的结果。如前面所分析的,
  //   value_指向重启点的地址,而size_指定为0,这样ParseNextKey函数将会解析出重启点key和value。
  // 3.自重启点线性向下查找,直到遇到key>=target的记录或者直到最后一条记录,也不满足key>=target,返回
  virtual void Seek(const Slice& target) {
    // Binary search in restart array to find the first restart point
    // with a key >= target
    uint32_t left = 0;
    uint32_t right = num_restarts_ - 1;
    while (left < right) {
      uint32_t mid = (left + right + 1) / 2;
      uint32_t region_offset = GetRestartPoint(mid);
      uint32_t shared, non_shared, value_length;
      const char* key_ptr = DecodeEntry(data_ + region_offset,
                                        data_ + restarts_,
                                        &shared, &non_shared, &value_length);
	  // 需要注意的是重启点保存的key是完整的,所以它的shared字段等于0
      if (key_ptr == NULL || (shared != 0)) {
        CorruptionError();
        return;
      }
      Slice mid_key(key_ptr, non_shared);
      if (Compare(mid_key, target) < 0) {
        // Key at "mid" is smaller than "target".  Therefore all
        // blocks before "mid" are uninteresting.
        left = mid;
      } else {
        // Key at "mid" is >= "target".  Therefore all blocks at or
        // after "mid" are uninteresting.
        right = mid - 1;
      }
    }

    // Linear search (within restart block) for first key >= target
    SeekToRestartPoint(left);
    while (true) {
      if (!ParseNextKey()) {
        return;
      }
      if (Compare(key_, target) >= 0) {
        return;
      }
    }
  }
  // 解析block data的第一条记录
  virtual void SeekToFirst() {
	//先定位到第一个重启点
    SeekToRestartPoint(0);
    ParseNextKey();
  }
  // 解析block data的最后一条记录
  virtual void SeekToLast() {
    // 先定位到最后一个重启点
    SeekToRestartPoint(num_restarts_ - 1);
    while (ParseNextKey() && NextEntryOffset() < restarts_) {
      // Keep skipping
    }
  }

 private:
  void CorruptionError() {
    current_ = restarts_;
    restart_index_ = num_restarts_;
    status_ = Status::Corruption("bad entry in block");
    key_.clear();
    value_.clear();
  }

  bool ParseNextKey() {
    current_ = NextEntryOffset();
    const char* p = data_ + current_;// 指向当前记录
    const char* limit = data_ + restarts_;  // limit限制了记录存储区的范围
    if (p >= limit) {
      // 如果出错,恢复到默认值,并返回false
      current_ = restarts_;
      restart_index_ = num_restarts_;
      return false;
    }

    // Decode next entry
    uint32_t shared, non_shared, value_length;
    p = DecodeEntry(p, limit, &shared, &non_shared, &value_length);
    if (p == NULL || key_.size() < shared) {
      CorruptionError();
      return false;
    } else {
	  // 解析出记录中的key和value
      key_.resize(shared);
      key_.append(p, non_shared);
      value_ = Slice(p + non_shared, value_length);
	  // 如果你当前记录的偏移已经比下一个重启点的偏移还有大了
	  // 那么关键点索引restart_index_加1,且后面记录的解析都是
	  // 以这个重启点为参照的。
	  // 因为restart_index_=0的重启点就是block data的第一条记录
	  // 所以下一个重启点的索引是restart_index_ + 1
      while (restart_index_ + 1 < num_restarts_ &&
             GetRestartPoint(restart_index_ + 1) < current_) {
        ++restart_index_;
      }
      return true;
    }
  }
};

Iterator* Block::NewIterator(const Comparator* cmp) {
  if (size_ < 2*sizeof(uint32_t)) {
    return NewErrorIterator(Status::Corruption("bad block contents"));
  }
  const uint32_t num_restarts = NumRestarts();
  if (num_restarts == 0) {
    return NewEmptyIterator();
  } else {
    return new Iter(cmp, data_, restart_offset_, num_restarts);
  }
}

初始化迭代器时,为什么是把current设置为restarts,把restart_index_设置为num_restarts_?
创建一个Block::Itr之后,它是处于invalid状态的,即不能Prev也不能Next;只能先Seek/SeekToxxx之后,才能调用next/prev。想想和std的iterator行为很像吧,比如你声明一个vector::iterator,必须先赋值才能使用。


参考链接:http://www.voidcn.com/article/p-aibgjkgh-yn.html

本站公众号
   欢迎关注本站公众号,获取更多程序园信息
开发小院