Status Writer::AddRecord(const Slice& slice){ constchar* ptr = slice.data(); size_t left = slice.size();
// Fragment the record if necessary and emit it. Note that if slice // is empty, we still want to iterate once to emit a single // zero-length record Status s; bool begin = true; do { constint leftover = kBlockSize - block_offset_; assert(leftover >= 0); if (leftover < kHeaderSize) { // Switch to a new block if (leftover > 0) { // 剩余部分填充 0 // Fill the trailer (literal below relies on kHeaderSize being 7) static_assert(kHeaderSize == 7, ""); dest_->Append(Slice("\x00\x00\x00\x00\x00\x00", leftover)); } block_offset_ = 0; }
// Invariant: we never leave < kHeaderSize bytes in a block. assert(kBlockSize - block_offset_ - kHeaderSize >= 0);
boolReader::ReadRecord(Slice* record, std::string* scratch){ bool in_fragmented_record = false; // Record offset of the logical record that we're reading // 0 is a dummy value to make compilers happy uint64_t prospective_record_offset = 0;
// ReadPhysicalRecord may have only had an empty trailer remaining in its // internal buffer. Calculate the offset of the next physical record now // that it has returned, properly accounting for its header size. uint64_t physical_record_offset = end_of_buffer_offset_ - buffer_.size() - kHeaderSize - fragment.size();
{ mutex_.Unlock(); // 先写 log status = log_->AddRecord(WriteBatchInternal::Contents(write_batch)); bool sync_error = false; if (status.ok() && options.sync) { // 如果开启了同步选项,将文件刷盘 status = logfile_->Sync(); if (!status.ok()) { sync_error = true; } } if (status.ok()) { // 插入 memTable status = WriteBatchInternal::InsertInto(write_batch, mem_); } mutex_.Lock(); if (sync_error) { // The state of the log file is indeterminate: the log record we // just added may or may not show up when the DB is re-opened. // So we force the DB into a mode where all future writes fail. RecordBackgroundError(status); } }
Status DBImpl::Recover(VersionEdit* edit, bool* save_manifest){ mutex_.AssertHeld();
env_->CreateDir(dbname_); assert(db_lock_ == nullptr); Status s = env_->LockFile(LockFileName(dbname_), &db_lock_);
// 从 manifest 中恢复 s = versions_->Recover(save_manifest); if (!s.ok()) { return s; } SequenceNumber max_sequence(0);
// 从 log 文件中恢复 // Recover from all newer log files than the ones named in the // descriptor (new log files may have been added by the previous // incarnation without registering them in the descriptor). // // Note that PrevLogNumber() is no longer used, but we pay // attention to it in case we are recovering a database // produced by an older version of leveldb. constuint64_t min_log = versions_->LogNumber(); constuint64_t prev_log = versions_->PrevLogNumber(); std::vector<std::string> filenames; s = env_->GetChildren(dbname_, &filenames); if (!s.ok()) { return s; } std::set<uint64_t> expected; versions_->AddLiveFiles(&expected); // 获取系统中目前存活的文件,即当前时间点有用的文件 uint64_t number; FileType type; std::vector<uint64_t> logs; for (size_t i = 0; i < filenames.size(); i++) { if (ParseFileName(filenames[i], &number, &type)) { expected.erase(number); if (type == kLogFile && ((number >= min_log) || (number == prev_log))) logs.push_back(number); } } if (!expected.empty()) { // expected 不为空,说明有部分文件丢失 return Status::Corruption(buf, TableFileName(dbname_, *(expected.begin()))); }
// Recover in the order in which the logs were generated // 因为在恢复过程中可能会 crash,所以 wal log 可能会有多个 // 对 (wal)log 文件进行排序,因为序号越小,说明越早生成,就越应该先恢复 std::sort(logs.begin(), logs.end()); for (size_t i = 0; i < logs.size(); i++) { s = RecoverLogFile(logs[i], (i == logs.size() - 1), save_manifest, edit, &max_sequence); if (!s.ok()) { return s; }
// The previous incarnation may not have written any MANIFEST // records after allocating this log number. So we manually // update the file number allocation counter in VersionSet. versions_->MarkFileNumberUsed(logs[i]); }
if (versions_->LastSequence() < max_sequence) { versions_->SetLastSequence(max_sequence); }