void VersionSet::SetupOtherInputs(Compaction* c) {
const int level = c->level();
InternalKey smallest, largest;
GetRange(c->inputs_[0], &smallest, &largest);
//找到所有在Level N+1层有重叠的文件
current_->GetOverlappingInputs(level+1, &smallest, &largest, &c->inputs_[1]);
//取出key的范围
InternalKey all_start, all_limit;
GetRange2(c->inputs_[0], c->inputs_[1], &all_start, &all_limit);
//检查是否能从Level N找到更多的文件
if (!c->inputs_[1].empty()) {
std::vector<FileMetaData*> expanded0;
current_->GetOverlappingInputs(level, &all_start, &all_limit, &expanded0);
const int64_t inputs0_size &#61; TotalFileSize(c->inputs_[0]);
const int64_t inputs1_size &#61; TotalFileSize(c->inputs_[1]);
const int64_t expanded0_size &#61; TotalFileSize(expanded0);
if (expanded0.size() > c->inputs_[0].size() &&
inputs1_size &#43; expanded0_size < kExpandedCompactionByteSizeLimit) {
InternalKey new_start, new_limit;
GetRange(expanded0, &new_start, &new_limit);
std::vector<FileMetaData*> expanded1;
current_->GetOverlappingInputs(level&#43;1, &new_start, &new_limit,
&expanded1);
if (expanded1.size() &#61;&#61; c->inputs_[1].size()) {
Log(options_->info_log,
"Expanding&#64;%d %d&#43;%d (%ld&#43;%ld bytes) to %d&#43;%d (%ld&#43;%ld bytes)\n",
level,
int(c->inputs_[0].size()),
int(c->inputs_[1].size()),
long(inputs0_size), long(inputs1_size),
int(expanded0.size()),
int(expanded1.size()),
long(expanded0_size), long(inputs1_size));
smallest &#61; new_start;
largest &#61; new_limit;
c->inputs_[0] &#61; expanded0;
c->inputs_[1] &#61; expanded1;
GetRange2(c->inputs_[0], c->inputs_[1], &all_start, &all_limit);
}
}
}
// Compute the set of grandparent files that overlap this compaction
// (parent &#61;&#61; level&#43;1; grandparent &#61;&#61; level&#43;2)
if (level &#43; 2 < config::kNumLevels) {
current_->GetOverlappingInputs(level &#43; 2, &all_start, &all_limit,
&c->grandparents_);
}
if (false) {
Log(options_->info_log, "Compacting %d &#39;%s&#39; .. &#39;%s&#39;",
level,
smallest.DebugString().c_str(),
largest.DebugString().c_str());
}
//设置新的compact_pointer
compact_pointer_[level] &#61; largest.Encode().ToString();
c->edit_.SetCompactPointer(level, largest);
}
do compaction task:
Status DBImpl::DoCompactionWork(CompactionState* compact) {
const uint64_t start_micros &#61; env_->NowMicros();
int64_t imm_micros &#61; 0; // Micros spent doing imm_ compactions
Log(options_.info_log, "Compacting %d&#64;%d &#43; %d&#64;%d files",
compact->compaction->num_input_files(0),
compact->compaction->level(),
compact->compaction->num_input_files(1),
compact->compaction->level() &#43; 1);
assert(versions_->NumLevelFiles(compact->compaction->level()) > 0);
assert(compact->builder &#61;&#61; NULL);
assert(compact->outfile &#61;&#61; NULL);
if (snapshots_.empty()) {
compact->smallest_snapshot &#61; versions_->LastSequence();
} else {
compact->smallest_snapshot &#61; snapshots_.oldest()->number_;
}
// Release mutex while we&#39;re actually doing the compaction work
mutex_.Unlock();
//生成iterator:遍历要compaction的数据
Iterator* input &#61; versions_->MakeInputIterator(compact->compaction);
input->SeekToFirst();
Status status;
ParsedInternalKey ikey;
std::string current_user_key;
bool has_current_user_key &#61; false;
SequenceNumber last_sequence_for_key &#61; kMaxSequenceNumber;
for (; input->Valid() && !shutting_down_.Acquire_Load(); ) {
// 如果有memtable要compaction:优先去做
if (has_imm_.NoBarrier_Load() !&#61; NULL) {
const uint64_t imm_start &#61; env_->NowMicros();
mutex_.Lock();
if (imm_ !&#61; NULL) {
CompactMemTable();
bg_cv_.SignalAll(); // Wakeup MakeRoomForWrite() if necessary
}
mutex_.Unlock();
imm_micros &#43;&#61; (env_->NowMicros() - imm_start);
}
Slice key &#61; input->key();
//检查是不是中途输出compaction的结果&#xff0c;避免compaction结果和level N&#43;2 files有过多的重叠
if (compact->compaction->ShouldStopBefore(key) &&
compact->builder !&#61; NULL) {
status &#61; FinishCompactionOutputFile(compact, input);
if (!status.ok()) {
break;
}
}
// Handle key/value, add to state, etc.
bool drop &#61; false;
if (!ParseInternalKey(key, &ikey)) {
// Do not hide error keys
current_user_key.clear();
has_current_user_key &#61; false;
last_sequence_for_key &#61; kMaxSequenceNumber;
} else {
if (!has_current_user_key ||
user_comparator()->Compare(ikey.user_key,
Slice(current_user_key)) !&#61; 0) {
// First occurrence of this user key
current_user_key.assign(ikey.user_key.data(), ikey.user_key.size());
has_current_user_key &#61; true;
last_sequence_for_key &#61; kMaxSequenceNumber;
}
if (last_sequence_for_key <&#61; compact->smallest_snapshot) {
// Hidden by an newer entry for same user key
drop &#61; true; // (A)
} else if (ikey.type &#61;&#61; kTypeDeletion &&
ikey.sequence <&#61; compact->smallest_snapshot &&
compact->compaction->IsBaseLevelForKey(ikey.user_key)) {
// For this user key:
// (1) there is no data in higher levels
// (2) data in lower levels will have larger sequence numbers
// (3) data in layers that are being compacted here and have
// smaller sequence numbers will be dropped in the next
// few iterations of this loop (by rule (A) above).
// Therefore this deletion marker is obsolete and can be dropped.
drop &#61; true;
}
last_sequence_for_key &#61; ikey.sequence;
}
if (!drop) {
// Open output file if necessary
if (compact->builder &#61;&#61; NULL) {
status &#61; OpenCompactionOutputFile(compact);
if (!status.ok()) {
break;
}
}
if (compact->builder->NumEntries() &#61;&#61; 0) {
compact->current_output()->smallest.DecodeFrom(key);
}
compact->current_output()->largest.DecodeFrom(key);
compact->builder->Add(key, input->value());
// 达到sst文件大小&#xff0c;重新写文件
if (compact->builder->FileSize() >&#61;
compact->compaction->MaxOutputFileSize()) {
status &#61; FinishCompactionOutputFile(compact, input);
if (!status.ok()) {
break;
}
}
}
input->Next();
}
if (status.ok() && shutting_down_.Acquire_Load()) {
status &#61; Status::IOError("Deleting DB during compaction");
}
if (status.ok() && compact->builder !&#61; NULL) {
status &#61; FinishCompactionOutputFile(compact, input);
}
if (status.ok()) {
status &#61; input->status();
}
delete input;
input &#61; NULL;
//更新compaction的一些统计数据
CompactionStats stats;
stats.micros &#61; env_->NowMicros() - start_micros - imm_micros;
for (int which &#61; 0; which < 2; which&#43;&#43;) {
for (int i &#61; 0; i < compact->compaction->num_input_files(which); i&#43;&#43;) {
stats.bytes_read &#43;&#61; compact->compaction->input(which, i)->file_size;
}
}
for (size_t i &#61; 0; i < compact->outputs.size(); i&#43;&#43;) {
stats.bytes_written &#43;&#61; compact->outputs[i].file_size;
}
mutex_.Lock();
stats_[compact->compaction->level() &#43; 1].Add(stats);
if (status.ok()) {
status &#61; InstallCompactionResults(compact);
}
VersionSet::LevelSummaryStorage tmp;
Log(options_.info_log,
"compacted to: %s", versions_->LevelSummary(&tmp));
return status;
}