log 在 LevelDB 中发挥着重要的作用,它主要有两个使用场景

  • wal,在写 key/value 之前,先向 wal 中写入,用于 crash 后恢复内存数据
  • manifest,记录 lsm 的一些元信息,包括层级信息等,用于重启后恢复 db 的一些重要信息

LevelDB 将对 Log 的读写操作分别封装为了 Writer, Reader 两个类,分别在 log_writer.cclog_reader.cc 中。这两个类并不知道数据长什么样子(可能是单 key 写入,也可能是 WriteBatch批量写入,也可能存储的 VersionEdit),只是单纯地将数据当做字节数组来处理。对于上层调用者来说,他们不需要知道数据底层是怎么存储的(比如一条数据可能跨 block 读取),只需要调用 log 的方法每次写入或者读取一条数据即可。

Log 结构

log 是由一个一个 block 组成的,根据一次写入的数据在 block 的位置,可以将数据分为三种类型:

  • 数据完整地放在一个 block 中,可以标记为 kFullType
  • 数据跨越多个 block。为了完整地读取出一条记录,LevelDB 将这种数据又划分为了多种类型
    • 数据的第一部分标记为 kFirstType
    • 数据的最后一部分标记为 kLastType
    • 数据的中间部分标记为 kMiddleType

对此,LevelDB 提供了一个枚举类型

1
2
3
4
5
6
7
8
9
enum RecordType {  
// Zero is reserved for preallocated files
kZeroType = 0,
kFullType = 1,
// For fragments
kFirstType = 2,
kMiddleType = 3,
kLastType = 4
};

Writer

Writer 对 WritableFile 做了封装,而 WritableFile 是一个抽象类,表示一个顺序写入的文件。与之类似,Reader 是对 SequentialFile 的封装,表示一个顺序读取的文件。这两个类都定义在include/leveldb/env.h 中,表示实现细节与平台相关,使用者无需关注具体的实现细节

Writer 对外只提供了几个构造函数和一个 AddRecord() 方法,用户可以通过这个方法向 log 中添加数据。

classDiagram direction LR Writer o-- WritableFile <<Abstract>> WritableFile class Writer { - dest_ : WritableFile* - block_offset_ : int - type_crc_[kMaxRecordType + 1] : uint32_t - EmitPhysicalRecord(RecordType type, char* ptr, size_t length) Status + Writer(WritableFile* dest) + Writer(WritableFile* dest, uint64_t dest_length) + AddRecord(const Slice& slice) Status }
  • dest_: 内部的文件的抽象,只提供了写入相关的操作
  • block_offset:当前写入 block 的偏移量
  • type_crc_:体现计算好的 crc,添加数据时,只需要在此基础上拓展

在写入 block 前,先会判断当前 block 剩余的空间是否足够放下数据的 header,如果连 header(固定字节) 都放不下,就会在末尾填充 0,切换到下一个 block 写入。同时根据剩余空间是否能放下整条数据,它会将数据的 type 字段标记为相应的类型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
Status Writer::AddRecord(const Slice& slice) {
const char* ptr = slice.data();
size_t left = slice.size();

// Fragment the record if necessary and emit it. Note that if slice
// is empty, we still want to iterate once to emit a single
// zero-length record
Status s;
bool begin = true;
do {
const int leftover = kBlockSize - block_offset_;
assert(leftover >= 0);
if (leftover < kHeaderSize) {
// Switch to a new block
if (leftover > 0) { // 剩余部分填充 0
// Fill the trailer (literal below relies on kHeaderSize being 7)
static_assert(kHeaderSize == 7, "");
dest_->Append(Slice("\x00\x00\x00\x00\x00\x00", leftover));
}
block_offset_ = 0;
}

// Invariant: we never leave < kHeaderSize bytes in a block.
assert(kBlockSize - block_offset_ - kHeaderSize >= 0);

const size_t avail = kBlockSize - block_offset_ - kHeaderSize;
const size_t fragment_length = (left < avail) ? left : avail;

RecordType type;
// 所需容量大于等于block 剩余容量时(当前 block 放不下)才为 false
const bool end = (left == fragment_length);
if (begin && end) { // record 可以完整地放在一个 block 中
type = kFullType;
} else if (begin) { // 跨 block 的第一部分
type = kFirstType;
} else if (end) { // 跨 block 的最后一部分
type = kLastType;
} else { // 跨 block 的中间部分
type = kMiddleType;
}

s = EmitPhysicalRecord(type, ptr, fragment_length); // 写入 block 数据
ptr += fragment_length;
left -= fragment_length;
begin = false;
} while (s.ok() && left > 0);
return s;
}

数据的真正写入是通过 EmitPhysicalRecord 方法进行的。写入时,会对数据和数据的类型计算 crc32 校验码,这样在进行恢复时可以通过比对 crc 校验码就可以知道数据是否有损坏。为了读取数据并校验,写入时也会将数据对应的类型和长度写入。因此,一条写入 block 的记录的格式如下所示

1
2
3
//   +-------------------------+
// | crc | len | type | data |
// +-------------------------+

数据的写入操作是就是单纯的将字节数据写入到文件中,所以这里就不展示代码了

Reader

Writer 类似,Reader 对外提供了 ReadRecord() 方法,用于读取一条记录。由于一条记录可能分布在多个 block 上,所以读取时,可能会读取多个 block,并将数据拼接到一起返回。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
bool Reader::ReadRecord(Slice* record, std::string* scratch) {
bool in_fragmented_record = false;
// Record offset of the logical record that we're reading
// 0 is a dummy value to make compilers happy
uint64_t prospective_record_offset = 0;

Slice fragment;
while (true) { // 循环读取,因为数据可能跨 block 存储
const unsigned int record_type = ReadPhysicalRecord(&fragment);

// ReadPhysicalRecord may have only had an empty trailer remaining in its
// internal buffer. Calculate the offset of the next physical record now
// that it has returned, properly accounting for its header size.
uint64_t physical_record_offset =
end_of_buffer_offset_ - buffer_.size() - kHeaderSize - fragment.size();

if (resyncing_) {
if (record_type == kMiddleType) {
continue;
} else if (record_type == kLastType) {
resyncing_ = false;
continue;
} else {
resyncing_ = false;
}
}

switch (record_type) {
case kFullType: // 说明数据完整地存储在当前 block 中,只需要读取一次就可以返回了
prospective_record_offset = physical_record_offset;
scratch->clear();
*record = fragment;
last_record_offset_ = prospective_record_offset;
return true;
case kFirstType: // 数据的第一部分,将数据 append 到 scratch 即可
prospective_record_offset = physical_record_offset;
scratch->assign(fragment.data(), fragment.size());
in_fragmented_record = true;
break;
case kMiddleType: // 数据的中间部分,同样是 append
scratch->append(fragment.data(), fragment.size());
break;
case kLastType: // 数据的最后一部分,拼接为完整地一条数据后就可以返回了
scratch->append(fragment.data(), fragment.size());
*record = Slice(*scratch);
last_record_offset_ = prospective_record_offset;
return true;
break;
case kEof:
if (in_fragmented_record) {
scratch->clear();
}
return false;
case kBadRecord: // 数据错误,忽略
if (in_fragmented_record) {
ReportCorruption(scratch->size(), "error in middle of record");
in_fragmented_record = false;
scratch->clear(); // 清空,因为该数据出错了
}
break;
default: { // 未知的数据类型
scratch->clear();
break;
}
}
}
return false;
}

数据的真正读取是通过 ReadPhysicalRecord 放发实现的,这个方法会根据数据的存储格式,从文件中读取一条记录,然后重新计算 crc32 校验码,并与之前计算的结果进行比对。如果结果不相同,就是一个 BadRecord,Reader 会忽略这条数据,并将之前拼接的与该记录相关的数据情况

Log 的应用

Log 有三种应用场景:

  • 写入 MemTable 先写 wal
  • 修改系统元信息前先写 manifest
  • 系统启动时从日志文件中恢复

写 WAL

每次向 MemTable 写数据前都需要写 WAL 日志,这是为了在系统 crash 后仍能够恢复内存中的状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
{
mutex_.Unlock();
// 先写 log
status = log_->AddRecord(WriteBatchInternal::Contents(write_batch));
bool sync_error = false;
if (status.ok() && options.sync) {
// 如果开启了同步选项,将文件刷盘
status = logfile_->Sync();
if (!status.ok()) {
sync_error = true;
}
}
if (status.ok()) {
// 插入 memTable
status = WriteBatchInternal::InsertInto(write_batch, mem_);
}
mutex_.Lock();
if (sync_error) {
// The state of the log file is indeterminate: the log record we
// just added may or may not show up when the DB is re-opened.
// So we force the DB into a mode where all future writes fail.
RecordBackgroundError(status);
}
}

写 MANIFEST

写 MANIFEST 的时机有很多,不过基本都时在 Compaction 的时候,包括 Minor Compaction 和 Majority Compaction,这里展示BackgroundCompaction中与 MANIFEST 相关的代码的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// 优先压缩 memTable
if (imm_ != nullptr) {
CompactMemTable(); // 该方法中也会写 MANIFEST
return;
}
if (c == nullptr) {
// Nothing to do
} else if (!is_manual && c->IsTrivialMove()) {
// Move file to next level 可以直接移到下一层(只有一个文件的情况)
assert(c->num_input_files(0) == 1);
FileMetaData* f = c->input(0, 0);
c->edit()->RemoveFile(c->level(), f->number);
c->edit()->AddFile(c->level() + 1, f->number, f->file_size, f->smallest,
f->largest);
status = versions_->LogAndApply(c->edit(), &mutex_);
if (!status.ok()) {
RecordBackgroundError(status);
}
} else {
CompactionState* compact = new CompactionState(c);
status = DoCompactionWork(compact);
if (!status.ok()) {
RecordBackgroundError(status);
}
CleanupCompaction(compact);
c->ReleaseInputs();
RemoveObsoleteFiles();
}

主要方法在 edit 相关的语句和 LogAndApply 当中。首先,会将需要删除和添加的文件记录到 VersionEdit 中,然后需要将 VersionEdit 通过 LogAndApply 写入到 manifest 中。

Recovery

当 DB 重启的时候,需要根据 log 文件恢复关闭前的状态(包括 wal, manifest)。当 log 中的文件恢复完后,会删除没有用的文件,避免下次重启时造成数据库前后状态不一致的后果(主要是删除已经恢复的 wal log 文件和已经没用的 sst 文件)。

log 的恢复是在打开 DB 的使用通过调用 DBImpl::Recover 的时候完成的,而 VersionSet 的恢复是通过 VersionSet::Recover来完成的,wal 的恢复是通过 DBImpl::RecoverLogFile 来完成的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
Status DBImpl::Recover(VersionEdit* edit, bool* save_manifest) {
mutex_.AssertHeld();

env_->CreateDir(dbname_);
assert(db_lock_ == nullptr);
Status s = env_->LockFile(LockFileName(dbname_), &db_lock_);

// 从 manifest 中恢复
s = versions_->Recover(save_manifest);
if (!s.ok()) {
return s;
}
SequenceNumber max_sequence(0);

// 从 log 文件中恢复
// Recover from all newer log files than the ones named in the
// descriptor (new log files may have been added by the previous
// incarnation without registering them in the descriptor).
//
// Note that PrevLogNumber() is no longer used, but we pay
// attention to it in case we are recovering a database
// produced by an older version of leveldb.
const uint64_t min_log = versions_->LogNumber();
const uint64_t prev_log = versions_->PrevLogNumber();
std::vector<std::string> filenames;
s = env_->GetChildren(dbname_, &filenames);
if (!s.ok()) {
return s;
}
std::set<uint64_t> expected;
versions_->AddLiveFiles(&expected); // 获取系统中目前存活的文件,即当前时间点有用的文件
uint64_t number;
FileType type;
std::vector<uint64_t> logs;
for (size_t i = 0; i < filenames.size(); i++) {
if (ParseFileName(filenames[i], &number, &type)) {
expected.erase(number);
if (type == kLogFile && ((number >= min_log) || (number == prev_log)))
logs.push_back(number);
}
}
if (!expected.empty()) { // expected 不为空,说明有部分文件丢失
return Status::Corruption(buf, TableFileName(dbname_, *(expected.begin())));
}

// Recover in the order in which the logs were generated
// 因为在恢复过程中可能会 crash,所以 wal log 可能会有多个
// 对 (wal)log 文件进行排序,因为序号越小,说明越早生成,就越应该先恢复
std::sort(logs.begin(), logs.end());
for (size_t i = 0; i < logs.size(); i++) {
s = RecoverLogFile(logs[i], (i == logs.size() - 1), save_manifest, edit,
&max_sequence);
if (!s.ok()) {
return s;
}

// The previous incarnation may not have written any MANIFEST
// records after allocating this log number. So we manually
// update the file number allocation counter in VersionSet.
versions_->MarkFileNumberUsed(logs[i]);
}

if (versions_->LastSequence() < max_sequence) {
versions_->SetLastSequence(max_sequence);
}

return Status::OK();
}

wal log 的恢复是通过 RecoverLogFile 完成的,其过程很简单,创建了一个 Reader,并不断读取记录,写入到 WriteBatch 中,并向 MemTable 中插入数据,如果 MemTable 的容量超过限制,会写入到 L0 层。需要注意的是,这个过程是不需要写入 log 的。

对于 manifest 文件的恢复,从 Reader 中读取记录后,需要解码为 VersionEdit 的形式,重新 Apply