Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions db/builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ Status BuildTable(const std::string& dbname, Env* env, const Options& options,
TableCache* table_cache, Iterator* iter, FileMetaData* meta) {
Status s;
meta->file_size = 0;
meta->num_entries = 0;
meta->num_tombstones = 0;
iter->SeekToFirst();

std::string fname = TableFileName(dbname, meta->number);
Expand All @@ -31,8 +33,13 @@ Status BuildTable(const std::string& dbname, Env* env, const Options& options,
TableBuilder* builder = new TableBuilder(options, file);
meta->smallest.DecodeFrom(iter->key());
Slice key;
ParsedInternalKey ikey;
for (; iter->Valid(); iter->Next()) {
key = iter->key();
meta->num_entries++;
if (ParseInternalKey(key, &ikey) && ikey.type == kTypeDeletion) {
meta->num_tombstones++;
}
builder->Add(key, iter->value());
}
if (!key.empty()) {
Expand Down
24 changes: 19 additions & 5 deletions db/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ struct DBImpl::CompactionState {
struct Output {
uint64_t number;
uint64_t file_size;
uint64_t num_tombstones; // Number of tombstone entries in output file
uint64_t num_entries; // Total number of entries in output file
InternalKey smallest, largest;
};

Expand Down Expand Up @@ -535,8 +537,8 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit,
if (base != nullptr) {
level = base->PickLevelForMemTableOutput(min_user_key, max_user_key);
}
edit->AddFile(level, meta.number, meta.file_size, meta.smallest,
meta.largest);
edit->AddFile(level, meta.number, meta.file_size, meta.num_tombstones,
meta.num_entries, meta.smallest, meta.largest);
}

CompactionStats stats;
Expand Down Expand Up @@ -740,8 +742,8 @@ void DBImpl::BackgroundCompaction() {
assert(c->num_input_files(0) == 1);
FileMetaData* f = c->input(0, 0);
c->edit()->RemoveFile(c->level(), f->number);
c->edit()->AddFile(c->level() + 1, f->number, f->file_size, f->smallest,
f->largest);
c->edit()->AddFile(c->level() + 1, f->number, f->file_size, f->num_tombstones,
f->num_entries, f->smallest, f->largest);
status = versions_->LogAndApply(c->edit(), &mutex_);
if (!status.ok()) {
RecordBackgroundError(status);
Expand Down Expand Up @@ -813,6 +815,9 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) {
pending_outputs_.insert(file_number);
CompactionState::Output out;
out.number = file_number;
out.file_size = 0;
out.num_tombstones = 0;
out.num_entries = 0;
out.smallest.Clear();
out.largest.Clear();
compact->outputs.push_back(out);
Expand Down Expand Up @@ -890,6 +895,7 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) {
for (size_t i = 0; i < compact->outputs.size(); i++) {
const CompactionState::Output& out = compact->outputs[i];
compact->compaction->edit()->AddFile(level + 1, out.number, out.file_size,
out.num_tombstones, out.num_entries,
out.smallest, out.largest);
}
return versions_->LogAndApply(compact->compaction->edit(), &mutex_);
Expand Down Expand Up @@ -949,7 +955,8 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {

// Handle key/value, add to state, etc.
bool drop = false;
if (!ParseInternalKey(key, &ikey)) {
const bool parsed_ok = ParseInternalKey(key, &ikey);
if (!parsed_ok) {
// Do not hide error keys
current_user_key.clear();
has_current_user_key = false;
Expand Down Expand Up @@ -1004,6 +1011,13 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
compact->current_output()->smallest.DecodeFrom(key);
}
compact->current_output()->largest.DecodeFrom(key);

// Update tombstone statistics for output file
compact->current_output()->num_entries++;
if (parsed_ok && ikey.type == kTypeDeletion) {
compact->current_output()->num_tombstones++;
}

compact->builder->Add(key, input->value());

// Close output file if it is big enough
Expand Down
10 changes: 8 additions & 2 deletions db/repair.cc
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,8 @@ class Repairer {
bool empty = true;
ParsedInternalKey parsed;
t.max_sequence = 0;
t.meta.num_entries = 0;
t.meta.num_tombstones = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
Slice key = iter->key();
if (!ParseInternalKey(key, &parsed)) {
Expand All @@ -268,6 +270,10 @@ class Repairer {
}

counter++;
t.meta.num_entries++;
if (parsed.type == kTypeDeletion) {
t.meta.num_tombstones++;
}
if (empty) {
empty = false;
t.meta.smallest.DecodeFrom(key);
Expand Down Expand Up @@ -368,8 +374,8 @@ class Repairer {
for (size_t i = 0; i < tables_.size(); i++) {
// TODO(opt): separate out into multiple levels
const TableInfo& t = tables_[i];
edit_.AddFile(0, t.meta.number, t.meta.file_size, t.meta.smallest,
t.meta.largest);
edit_.AddFile(0, t.meta.number, t.meta.file_size, t.meta.num_tombstones,
t.meta.num_entries, t.meta.smallest, t.meta.largest);
}

// std::fprintf(stderr,
Expand Down
45 changes: 38 additions & 7 deletions db/version_edit.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ enum Tag {
kDeletedFile = 6,
kNewFile = 7,
// 8 was used for large value refs
kPrevLogNumber = 9
kPrevLogNumber = 9,
kNewFileWithStats = 10 // New file with tombstone statistics
};

void VersionEdit::Clear() {
Expand Down Expand Up @@ -75,12 +76,23 @@ void VersionEdit::EncodeTo(std::string* dst) const {

for (size_t i = 0; i < new_files_.size(); i++) {
const FileMetaData& f = new_files_[i].second;
PutVarint32(dst, kNewFile);
PutVarint32(dst, new_files_[i].first); // level
PutVarint64(dst, f.number);
PutVarint64(dst, f.file_size);
PutLengthPrefixedSlice(dst, f.smallest.Encode());
PutLengthPrefixedSlice(dst, f.largest.Encode());
if (f.num_entries > 0 || f.num_tombstones > 0) {
PutVarint32(dst, kNewFileWithStats);
PutVarint32(dst, new_files_[i].first); // level
PutVarint64(dst, f.number);
PutVarint64(dst, f.file_size);
PutVarint64(dst, f.num_tombstones);
PutVarint64(dst, f.num_entries);
PutLengthPrefixedSlice(dst, f.smallest.Encode());
PutLengthPrefixedSlice(dst, f.largest.Encode());
} else {
PutVarint32(dst, kNewFile);
PutVarint32(dst, new_files_[i].first); // level
PutVarint64(dst, f.number);
PutVarint64(dst, f.file_size);
PutLengthPrefixedSlice(dst, f.smallest.Encode());
PutLengthPrefixedSlice(dst, f.largest.Encode());
}
}
}

Expand Down Expand Up @@ -176,6 +188,8 @@ Status VersionEdit::DecodeFrom(const Slice& src) {
break;

case kNewFile:
f.num_tombstones = 0;
f.num_entries = 0;
if (GetLevel(&input, &level) && GetVarint64(&input, &f.number) &&
GetVarint64(&input, &f.file_size) &&
GetInternalKey(&input, &f.smallest) &&
Expand All @@ -186,6 +200,19 @@ Status VersionEdit::DecodeFrom(const Slice& src) {
}
break;

case kNewFileWithStats:
if (GetLevel(&input, &level) && GetVarint64(&input, &f.number) &&
GetVarint64(&input, &f.file_size) &&
GetVarint64(&input, &f.num_tombstones) &&
GetVarint64(&input, &f.num_entries) &&
GetInternalKey(&input, &f.smallest) &&
GetInternalKey(&input, &f.largest)) {
new_files_.push_back(std::make_pair(level, f));
} else {
msg = "new-file with stats entry";
}
break;

default:
msg = "unknown tag";
break;
Expand Down Expand Up @@ -246,6 +273,10 @@ std::string VersionEdit::DebugString() const {
AppendNumberTo(&r, f.number);
r.append(" ");
AppendNumberTo(&r, f.file_size);
r.append(" tombstones:");
AppendNumberTo(&r, f.num_tombstones);
r.append(" entries:");
AppendNumberTo(&r, f.num_entries);
r.append(" ");
r.append(f.smallest.DebugString());
r.append(" .. ");
Expand Down
32 changes: 28 additions & 4 deletions db/version_edit.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,25 @@ namespace leveldb {
class VersionSet;

struct FileMetaData {
FileMetaData() : refs(0), allowed_seeks(1 << 30), file_size(0) {}
FileMetaData()
: refs(0),
allowed_seeks(1 << 30),
file_size(0),
num_tombstones(0),
num_entries(0) {}

int refs;
int allowed_seeks; // Seeks allowed until compaction
uint64_t number;
uint64_t file_size; // File size in bytes
InternalKey smallest; // Smallest internal key served by table
InternalKey largest; // Largest internal key served by table
uint64_t file_size; // File size in bytes
uint64_t num_tombstones; // Number of tombstone entries (kTypeDeletion)
uint64_t num_entries; // Total number of entries in the file
InternalKey smallest; // Smallest internal key served by table
InternalKey largest; // Largest internal key served by table

double TombstoneDensity() const {
return (num_entries > 0) ? static_cast<double>(num_tombstones) / num_entries : 0.0;
}
};

class VersionEdit {
Expand Down Expand Up @@ -70,6 +81,19 @@ class VersionEdit {
new_files_.push_back(std::make_pair(level, f));
}

void AddFile(int level, uint64_t file, uint64_t file_size,
uint64_t num_tombstones, uint64_t num_entries,
const InternalKey& smallest, const InternalKey& largest) {
FileMetaData f;
f.number = file;
f.file_size = file_size;
f.num_tombstones = num_tombstones;
f.num_entries = num_entries;
f.smallest = smallest;
f.largest = largest;
new_files_.push_back(std::make_pair(level, f));
}

// Delete the specified "file" from the specified "level".
void RemoveFile(int level, uint64_t file) {
deleted_files_.insert(std::make_pair(level, file));
Expand Down
84 changes: 79 additions & 5 deletions db/version_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -593,9 +593,20 @@ class VersionSet::Builder {
Version* base_;
LevelState levels_[config::kNumLevels];

// Incremental tracking for tombstone-based compaction priority
// These track the file with highest tombstone density (> 50%) across all levels > 0
FileMetaData* best_tombstone_file_;
int best_tombstone_level_;
double best_tombstone_density_;

public:
// Initialize a builder with the files from *base and other info from *vset
Builder(VersionSet* vset, Version* base) : vset_(vset), base_(base) {
Builder(VersionSet* vset, Version* base)
: vset_(vset),
base_(base),
best_tombstone_file_(nullptr),
best_tombstone_level_(-1),
best_tombstone_density_(0.5) { // Threshold: 50%
base_->Ref();
BySmallestKey cmp;
cmp.internal_comparator = &vset_->icmp_;
Expand Down Expand Up @@ -663,13 +674,51 @@ class VersionSet::Builder {
f->allowed_seeks = static_cast<int>((f->file_size / 16384U));
if (f->allowed_seeks < 100) f->allowed_seeks = 100;

// Incremental check: if this new file has high tombstone density (> 50%),
// and it's in level > 0, track it as a candidate for priority compaction.
// This is O(1) per new file.
if (level > 0) {
double density = f->TombstoneDensity();
if (density > best_tombstone_density_) {
best_tombstone_density_ = density;
best_tombstone_file_ = f;
best_tombstone_level_ = level;
}
}

levels_[level].deleted_files.erase(f->number);
levels_[level].added_files->insert(f);
}
}

// Save the current state in *v.
void SaveTo(Version* v) {
// Initialize tombstone compaction fields for new version
v->tombstone_file_to_compact_ = nullptr;
v->tombstone_file_to_compact_level_ = -1;

// Step 1: Check if base_ has a tombstone file that should be inherited.
// This is O(1) check: just verify if the file was deleted.
if (base_->tombstone_file_to_compact_ != nullptr) {
FileMetaData* base_candidate = base_->tombstone_file_to_compact_;
int base_level = base_->tombstone_file_to_compact_level_;
double base_density = base_candidate->TombstoneDensity();

// Check if this candidate file was deleted in this version edit
bool is_deleted = (levels_[base_level].deleted_files.count(base_candidate->number) > 0);

if (!is_deleted) {
// Base candidate survives; compare with our best from new files
if (base_density > best_tombstone_density_) {
// Base candidate is better
best_tombstone_density_ = base_density;
best_tombstone_file_ = base_candidate;
best_tombstone_level_ = base_level;
}
}
}

// Step 2: Now perform the normal file merging
BySmallestKey cmp;
cmp.internal_comparator = &vset_->icmp_;
for (int level = 0; level < config::kNumLevels; level++) {
Expand Down Expand Up @@ -712,6 +761,18 @@ class VersionSet::Builder {
}
#endif
}

// Step 3: Set the tombstone compaction file in the new version
// Only set if density > 50% threshold and file was not deleted
if (best_tombstone_file_ != nullptr && best_tombstone_density_ > 0.5) {
// Double-check that our best candidate was not deleted in this version edit
bool is_deleted = (levels_[best_tombstone_level_].deleted_files.count(
best_tombstone_file_->number) > 0);
if (!is_deleted) {
v->tombstone_file_to_compact_ = best_tombstone_file_;
v->tombstone_file_to_compact_level_ = best_tombstone_level_;
}
}
}

void MaybeAddFile(Version* v, int level, FileMetaData* f) {
Expand Down Expand Up @@ -1087,7 +1148,8 @@ Status VersionSet::WriteSnapshot(log::Writer* log) {
const std::vector<FileMetaData*>& files = current_->files_[level];
for (size_t i = 0; i < files.size(); i++) {
const FileMetaData* f = files[i];
edit.AddFile(level, f->number, f->file_size, f->smallest, f->largest);
edit.AddFile(level, f->number, f->file_size, f->num_tombstones,
f->num_entries, f->smallest, f->largest);
}
}

Expand Down Expand Up @@ -1253,11 +1315,23 @@ Compaction* VersionSet::PickCompaction() {
Compaction* c;
int level;

// We prefer compactions triggered by too much data in a level over
// the compactions triggered by seeks.
// Priority order:
// 1. Tombstone compaction (density > 50%) - highest priority
// 2. Size compaction (based on file size/number limits)
// 3. Seek compaction (based on seek stats)
const bool tombstone_compaction = (current_->tombstone_file_to_compact_ != nullptr);
const bool size_compaction = (current_->compaction_score_ >= 1);
const bool seek_compaction = (current_->file_to_compact_ != nullptr);
if (size_compaction) {

if (tombstone_compaction) {
// Prioritize compaction for files with high tombstone density (> 50%)
level = current_->tombstone_file_to_compact_level_;
assert(level >= 0);
assert(level > 0); // Tombstone compaction only for levels > 0
assert(level + 1 < config::kNumLevels);
c = new Compaction(options_, level);
c->inputs_[0].push_back(current_->tombstone_file_to_compact_);
} else if (size_compaction) {
level = current_->compaction_level_;
assert(level >= 0);
assert(level + 1 < config::kNumLevels);
Expand Down
Loading