| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287 |
- // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
- // This source code is licensed under both the GPLv2 (found in the
- // COPYING file in the root directory) and Apache 2.0 License
- // (found in the LICENSE.Apache file in the root directory).
- //
- // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
- // Use of this source code is governed by a BSD-style license that can be
- // found in the LICENSE file. See the AUTHORS file for names of contributors.
- #include "db/version_set.h"
- #include "db/db_impl/db_impl.h"
- #include "db/log_writer.h"
- #include "logging/logging.h"
- #include "table/mock_table.h"
- #include "test_util/testharness.h"
- #include "test_util/testutil.h"
- #include "util/string_util.h"
- namespace ROCKSDB_NAMESPACE {
- class GenerateLevelFilesBriefTest : public testing::Test {
- public:
- std::vector<FileMetaData*> files_;
- LevelFilesBrief file_level_;
- Arena arena_;
- GenerateLevelFilesBriefTest() { }
- ~GenerateLevelFilesBriefTest() override {
- for (size_t i = 0; i < files_.size(); i++) {
- delete files_[i];
- }
- }
- void Add(const char* smallest, const char* largest,
- SequenceNumber smallest_seq = 100,
- SequenceNumber largest_seq = 100) {
- FileMetaData* f = new FileMetaData(
- files_.size() + 1, 0, 0,
- InternalKey(smallest, smallest_seq, kTypeValue),
- InternalKey(largest, largest_seq, kTypeValue), smallest_seq,
- largest_seq, /* marked_for_compact */ false, kInvalidBlobFileNumber,
- kUnknownOldestAncesterTime, kUnknownFileCreationTime,
- kUnknownFileChecksum, kUnknownFileChecksumFuncName);
- files_.push_back(f);
- }
- int Compare() {
- int diff = 0;
- for (size_t i = 0; i < files_.size(); i++) {
- if (file_level_.files[i].fd.GetNumber() != files_[i]->fd.GetNumber()) {
- diff++;
- }
- }
- return diff;
- }
- };
- TEST_F(GenerateLevelFilesBriefTest, Empty) {
- DoGenerateLevelFilesBrief(&file_level_, files_, &arena_);
- ASSERT_EQ(0u, file_level_.num_files);
- ASSERT_EQ(0, Compare());
- }
- TEST_F(GenerateLevelFilesBriefTest, Single) {
- Add("p", "q");
- DoGenerateLevelFilesBrief(&file_level_, files_, &arena_);
- ASSERT_EQ(1u, file_level_.num_files);
- ASSERT_EQ(0, Compare());
- }
- TEST_F(GenerateLevelFilesBriefTest, Multiple) {
- Add("150", "200");
- Add("200", "250");
- Add("300", "350");
- Add("400", "450");
- DoGenerateLevelFilesBrief(&file_level_, files_, &arena_);
- ASSERT_EQ(4u, file_level_.num_files);
- ASSERT_EQ(0, Compare());
- }
- class CountingLogger : public Logger {
- public:
- CountingLogger() : log_count(0) {}
- using Logger::Logv;
- void Logv(const char* /*format*/, va_list /*ap*/) override { log_count++; }
- int log_count;
- };
- Options GetOptionsWithNumLevels(int num_levels,
- std::shared_ptr<CountingLogger> logger) {
- Options opt;
- opt.num_levels = num_levels;
- opt.info_log = logger;
- return opt;
- }
- class VersionStorageInfoTest : public testing::Test {
- public:
- const Comparator* ucmp_;
- InternalKeyComparator icmp_;
- std::shared_ptr<CountingLogger> logger_;
- Options options_;
- ImmutableCFOptions ioptions_;
- MutableCFOptions mutable_cf_options_;
- VersionStorageInfo vstorage_;
- InternalKey GetInternalKey(const char* ukey,
- SequenceNumber smallest_seq = 100) {
- return InternalKey(ukey, smallest_seq, kTypeValue);
- }
- VersionStorageInfoTest()
- : ucmp_(BytewiseComparator()),
- icmp_(ucmp_),
- logger_(new CountingLogger()),
- options_(GetOptionsWithNumLevels(6, logger_)),
- ioptions_(options_),
- mutable_cf_options_(options_),
- vstorage_(&icmp_, ucmp_, 6, kCompactionStyleLevel, nullptr, false) {}
- ~VersionStorageInfoTest() override {
- for (int i = 0; i < vstorage_.num_levels(); i++) {
- for (auto* f : vstorage_.LevelFiles(i)) {
- if (--f->refs == 0) {
- delete f;
- }
- }
- }
- }
- void Add(int level, uint32_t file_number, const char* smallest,
- const char* largest, uint64_t file_size = 0) {
- assert(level < vstorage_.num_levels());
- FileMetaData* f = new FileMetaData(
- file_number, 0, file_size, GetInternalKey(smallest, 0),
- GetInternalKey(largest, 0), /* smallest_seq */ 0, /* largest_seq */ 0,
- /* marked_for_compact */ false, kInvalidBlobFileNumber,
- kUnknownOldestAncesterTime, kUnknownFileCreationTime,
- kUnknownFileChecksum, kUnknownFileChecksumFuncName);
- f->compensated_file_size = file_size;
- vstorage_.AddFile(level, f);
- }
- void Add(int level, uint32_t file_number, const InternalKey& smallest,
- const InternalKey& largest, uint64_t file_size = 0) {
- assert(level < vstorage_.num_levels());
- FileMetaData* f = new FileMetaData(
- file_number, 0, file_size, smallest, largest, /* smallest_seq */ 0,
- /* largest_seq */ 0, /* marked_for_compact */ false,
- kInvalidBlobFileNumber, kUnknownOldestAncesterTime,
- kUnknownFileCreationTime, kUnknownFileChecksum,
- kUnknownFileChecksumFuncName);
- f->compensated_file_size = file_size;
- vstorage_.AddFile(level, f);
- }
- std::string GetOverlappingFiles(int level, const InternalKey& begin,
- const InternalKey& end) {
- std::vector<FileMetaData*> inputs;
- vstorage_.GetOverlappingInputs(level, &begin, &end, &inputs);
- std::string result;
- for (size_t i = 0; i < inputs.size(); ++i) {
- if (i > 0) {
- result += ",";
- }
- AppendNumberTo(&result, inputs[i]->fd.GetNumber());
- }
- return result;
- }
- };
- TEST_F(VersionStorageInfoTest, MaxBytesForLevelStatic) {
- ioptions_.level_compaction_dynamic_level_bytes = false;
- mutable_cf_options_.max_bytes_for_level_base = 10;
- mutable_cf_options_.max_bytes_for_level_multiplier = 5;
- Add(4, 100U, "1", "2");
- Add(5, 101U, "1", "2");
- vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);
- ASSERT_EQ(vstorage_.MaxBytesForLevel(1), 10U);
- ASSERT_EQ(vstorage_.MaxBytesForLevel(2), 50U);
- ASSERT_EQ(vstorage_.MaxBytesForLevel(3), 250U);
- ASSERT_EQ(vstorage_.MaxBytesForLevel(4), 1250U);
- ASSERT_EQ(0, logger_->log_count);
- }
- TEST_F(VersionStorageInfoTest, MaxBytesForLevelDynamic) {
- ioptions_.level_compaction_dynamic_level_bytes = true;
- mutable_cf_options_.max_bytes_for_level_base = 1000;
- mutable_cf_options_.max_bytes_for_level_multiplier = 5;
- Add(5, 1U, "1", "2", 500U);
- vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);
- ASSERT_EQ(0, logger_->log_count);
- ASSERT_EQ(vstorage_.base_level(), 5);
- Add(5, 2U, "3", "4", 550U);
- vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);
- ASSERT_EQ(0, logger_->log_count);
- ASSERT_EQ(vstorage_.MaxBytesForLevel(4), 1000U);
- ASSERT_EQ(vstorage_.base_level(), 4);
- Add(4, 3U, "3", "4", 550U);
- vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);
- ASSERT_EQ(0, logger_->log_count);
- ASSERT_EQ(vstorage_.MaxBytesForLevel(4), 1000U);
- ASSERT_EQ(vstorage_.base_level(), 4);
- Add(3, 4U, "3", "4", 250U);
- Add(3, 5U, "5", "7", 300U);
- vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);
- ASSERT_EQ(1, logger_->log_count);
- ASSERT_EQ(vstorage_.MaxBytesForLevel(4), 1005U);
- ASSERT_EQ(vstorage_.MaxBytesForLevel(3), 1000U);
- ASSERT_EQ(vstorage_.base_level(), 3);
- Add(1, 6U, "3", "4", 5U);
- Add(1, 7U, "8", "9", 5U);
- logger_->log_count = 0;
- vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);
- ASSERT_EQ(1, logger_->log_count);
- ASSERT_GT(vstorage_.MaxBytesForLevel(4), 1005U);
- ASSERT_GT(vstorage_.MaxBytesForLevel(3), 1005U);
- ASSERT_EQ(vstorage_.MaxBytesForLevel(2), 1005U);
- ASSERT_EQ(vstorage_.MaxBytesForLevel(1), 1000U);
- ASSERT_EQ(vstorage_.base_level(), 1);
- }
- TEST_F(VersionStorageInfoTest, MaxBytesForLevelDynamicLotsOfData) {
- ioptions_.level_compaction_dynamic_level_bytes = true;
- mutable_cf_options_.max_bytes_for_level_base = 100;
- mutable_cf_options_.max_bytes_for_level_multiplier = 2;
- Add(0, 1U, "1", "2", 50U);
- Add(1, 2U, "1", "2", 50U);
- Add(2, 3U, "1", "2", 500U);
- Add(3, 4U, "1", "2", 500U);
- Add(4, 5U, "1", "2", 1700U);
- Add(5, 6U, "1", "2", 500U);
- vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);
- ASSERT_EQ(vstorage_.MaxBytesForLevel(4), 800U);
- ASSERT_EQ(vstorage_.MaxBytesForLevel(3), 400U);
- ASSERT_EQ(vstorage_.MaxBytesForLevel(2), 200U);
- ASSERT_EQ(vstorage_.MaxBytesForLevel(1), 100U);
- ASSERT_EQ(vstorage_.base_level(), 1);
- ASSERT_EQ(0, logger_->log_count);
- }
- TEST_F(VersionStorageInfoTest, MaxBytesForLevelDynamicLargeLevel) {
- uint64_t kOneGB = 1000U * 1000U * 1000U;
- ioptions_.level_compaction_dynamic_level_bytes = true;
- mutable_cf_options_.max_bytes_for_level_base = 10U * kOneGB;
- mutable_cf_options_.max_bytes_for_level_multiplier = 10;
- Add(0, 1U, "1", "2", 50U);
- Add(3, 4U, "1", "2", 32U * kOneGB);
- Add(4, 5U, "1", "2", 500U * kOneGB);
- Add(5, 6U, "1", "2", 3000U * kOneGB);
- vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);
- ASSERT_EQ(vstorage_.MaxBytesForLevel(5), 3000U * kOneGB);
- ASSERT_EQ(vstorage_.MaxBytesForLevel(4), 300U * kOneGB);
- ASSERT_EQ(vstorage_.MaxBytesForLevel(3), 30U * kOneGB);
- ASSERT_EQ(vstorage_.MaxBytesForLevel(2), 10U * kOneGB);
- ASSERT_EQ(vstorage_.base_level(), 2);
- ASSERT_EQ(0, logger_->log_count);
- }
- TEST_F(VersionStorageInfoTest, MaxBytesForLevelDynamicWithLargeL0_1) {
- ioptions_.level_compaction_dynamic_level_bytes = true;
- mutable_cf_options_.max_bytes_for_level_base = 40000;
- mutable_cf_options_.max_bytes_for_level_multiplier = 5;
- mutable_cf_options_.level0_file_num_compaction_trigger = 2;
- Add(0, 1U, "1", "2", 10000U);
- Add(0, 2U, "1", "2", 10000U);
- Add(0, 3U, "1", "2", 10000U);
- Add(5, 4U, "1", "2", 1286250U);
- Add(4, 5U, "1", "2", 200000U);
- Add(3, 6U, "1", "2", 40000U);
- Add(2, 7U, "1", "2", 8000U);
- vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);
- ASSERT_EQ(0, logger_->log_count);
- ASSERT_EQ(2, vstorage_.base_level());
- // level multiplier should be 3.5
- ASSERT_EQ(vstorage_.level_multiplier(), 5.0);
- // Level size should be around 30,000, 105,000, 367,500
- ASSERT_EQ(40000U, vstorage_.MaxBytesForLevel(2));
- ASSERT_EQ(51450U, vstorage_.MaxBytesForLevel(3));
- ASSERT_EQ(257250U, vstorage_.MaxBytesForLevel(4));
- }
- TEST_F(VersionStorageInfoTest, MaxBytesForLevelDynamicWithLargeL0_2) {
- ioptions_.level_compaction_dynamic_level_bytes = true;
- mutable_cf_options_.max_bytes_for_level_base = 10000;
- mutable_cf_options_.max_bytes_for_level_multiplier = 5;
- mutable_cf_options_.level0_file_num_compaction_trigger = 2;
- Add(0, 11U, "1", "2", 10000U);
- Add(0, 12U, "1", "2", 10000U);
- Add(0, 13U, "1", "2", 10000U);
- Add(5, 4U, "1", "2", 1286250U);
- Add(4, 5U, "1", "2", 200000U);
- Add(3, 6U, "1", "2", 40000U);
- Add(2, 7U, "1", "2", 8000U);
- vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);
- ASSERT_EQ(0, logger_->log_count);
- ASSERT_EQ(2, vstorage_.base_level());
- // level multiplier should be 3.5
- ASSERT_LT(vstorage_.level_multiplier(), 3.6);
- ASSERT_GT(vstorage_.level_multiplier(), 3.4);
- // Level size should be around 30,000, 105,000, 367,500
- ASSERT_EQ(30000U, vstorage_.MaxBytesForLevel(2));
- ASSERT_LT(vstorage_.MaxBytesForLevel(3), 110000U);
- ASSERT_GT(vstorage_.MaxBytesForLevel(3), 100000U);
- ASSERT_LT(vstorage_.MaxBytesForLevel(4), 370000U);
- ASSERT_GT(vstorage_.MaxBytesForLevel(4), 360000U);
- }
- TEST_F(VersionStorageInfoTest, MaxBytesForLevelDynamicWithLargeL0_3) {
- ioptions_.level_compaction_dynamic_level_bytes = true;
- mutable_cf_options_.max_bytes_for_level_base = 10000;
- mutable_cf_options_.max_bytes_for_level_multiplier = 5;
- mutable_cf_options_.level0_file_num_compaction_trigger = 2;
- Add(0, 11U, "1", "2", 5000U);
- Add(0, 12U, "1", "2", 5000U);
- Add(0, 13U, "1", "2", 5000U);
- Add(0, 14U, "1", "2", 5000U);
- Add(0, 15U, "1", "2", 5000U);
- Add(0, 16U, "1", "2", 5000U);
- Add(5, 4U, "1", "2", 1286250U);
- Add(4, 5U, "1", "2", 200000U);
- Add(3, 6U, "1", "2", 40000U);
- Add(2, 7U, "1", "2", 8000U);
- vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);
- ASSERT_EQ(0, logger_->log_count);
- ASSERT_EQ(2, vstorage_.base_level());
- // level multiplier should be 3.5
- ASSERT_LT(vstorage_.level_multiplier(), 3.6);
- ASSERT_GT(vstorage_.level_multiplier(), 3.4);
- // Level size should be around 30,000, 105,000, 367,500
- ASSERT_EQ(30000U, vstorage_.MaxBytesForLevel(2));
- ASSERT_LT(vstorage_.MaxBytesForLevel(3), 110000U);
- ASSERT_GT(vstorage_.MaxBytesForLevel(3), 100000U);
- ASSERT_LT(vstorage_.MaxBytesForLevel(4), 370000U);
- ASSERT_GT(vstorage_.MaxBytesForLevel(4), 360000U);
- }
- TEST_F(VersionStorageInfoTest, EstimateLiveDataSize) {
- // Test whether the overlaps are detected as expected
- Add(1, 1U, "4", "7", 1U); // Perfect overlap with last level
- Add(2, 2U, "3", "5", 1U); // Partial overlap with last level
- Add(2, 3U, "6", "8", 1U); // Partial overlap with last level
- Add(3, 4U, "1", "9", 1U); // Contains range of last level
- Add(4, 5U, "4", "5", 1U); // Inside range of last level
- Add(4, 5U, "6", "7", 1U); // Inside range of last level
- Add(5, 6U, "4", "7", 10U);
- ASSERT_EQ(10U, vstorage_.EstimateLiveDataSize());
- }
- TEST_F(VersionStorageInfoTest, EstimateLiveDataSize2) {
- Add(0, 1U, "9", "9", 1U); // Level 0 is not ordered
- Add(0, 1U, "5", "6", 1U); // Ignored because of [5,6] in l1
- Add(1, 1U, "1", "2", 1U); // Ignored because of [2,3] in l2
- Add(1, 2U, "3", "4", 1U); // Ignored because of [2,3] in l2
- Add(1, 3U, "5", "6", 1U);
- Add(2, 4U, "2", "3", 1U);
- Add(3, 5U, "7", "8", 1U);
- ASSERT_EQ(4U, vstorage_.EstimateLiveDataSize());
- }
- TEST_F(VersionStorageInfoTest, GetOverlappingInputs) {
- // Two files that overlap at the range deletion tombstone sentinel.
- Add(1, 1U, {"a", 0, kTypeValue}, {"b", kMaxSequenceNumber, kTypeRangeDeletion}, 1);
- Add(1, 2U, {"b", 0, kTypeValue}, {"c", 0, kTypeValue}, 1);
- // Two files that overlap at the same user key.
- Add(1, 3U, {"d", 0, kTypeValue}, {"e", kMaxSequenceNumber, kTypeValue}, 1);
- Add(1, 4U, {"e", 0, kTypeValue}, {"f", 0, kTypeValue}, 1);
- // Two files that do not overlap.
- Add(1, 5U, {"g", 0, kTypeValue}, {"h", 0, kTypeValue}, 1);
- Add(1, 6U, {"i", 0, kTypeValue}, {"j", 0, kTypeValue}, 1);
- vstorage_.UpdateNumNonEmptyLevels();
- vstorage_.GenerateLevelFilesBrief();
- ASSERT_EQ("1,2", GetOverlappingFiles(
- 1, {"a", 0, kTypeValue}, {"b", 0, kTypeValue}));
- ASSERT_EQ("1", GetOverlappingFiles(
- 1, {"a", 0, kTypeValue}, {"b", kMaxSequenceNumber, kTypeRangeDeletion}));
- ASSERT_EQ("2", GetOverlappingFiles(
- 1, {"b", kMaxSequenceNumber, kTypeValue}, {"c", 0, kTypeValue}));
- ASSERT_EQ("3,4", GetOverlappingFiles(
- 1, {"d", 0, kTypeValue}, {"e", 0, kTypeValue}));
- ASSERT_EQ("3", GetOverlappingFiles(
- 1, {"d", 0, kTypeValue}, {"e", kMaxSequenceNumber, kTypeRangeDeletion}));
- ASSERT_EQ("3,4", GetOverlappingFiles(
- 1, {"e", kMaxSequenceNumber, kTypeValue}, {"f", 0, kTypeValue}));
- ASSERT_EQ("3,4", GetOverlappingFiles(
- 1, {"e", 0, kTypeValue}, {"f", 0, kTypeValue}));
- ASSERT_EQ("5", GetOverlappingFiles(
- 1, {"g", 0, kTypeValue}, {"h", 0, kTypeValue}));
- ASSERT_EQ("6", GetOverlappingFiles(
- 1, {"i", 0, kTypeValue}, {"j", 0, kTypeValue}));
- }
- class FindLevelFileTest : public testing::Test {
- public:
- LevelFilesBrief file_level_;
- bool disjoint_sorted_files_;
- Arena arena_;
- FindLevelFileTest() : disjoint_sorted_files_(true) { }
- ~FindLevelFileTest() override {}
- void LevelFileInit(size_t num = 0) {
- char* mem = arena_.AllocateAligned(num * sizeof(FdWithKeyRange));
- file_level_.files = new (mem)FdWithKeyRange[num];
- file_level_.num_files = 0;
- }
- void Add(const char* smallest, const char* largest,
- SequenceNumber smallest_seq = 100,
- SequenceNumber largest_seq = 100) {
- InternalKey smallest_key = InternalKey(smallest, smallest_seq, kTypeValue);
- InternalKey largest_key = InternalKey(largest, largest_seq, kTypeValue);
- Slice smallest_slice = smallest_key.Encode();
- Slice largest_slice = largest_key.Encode();
- char* mem = arena_.AllocateAligned(
- smallest_slice.size() + largest_slice.size());
- memcpy(mem, smallest_slice.data(), smallest_slice.size());
- memcpy(mem + smallest_slice.size(), largest_slice.data(),
- largest_slice.size());
- // add to file_level_
- size_t num = file_level_.num_files;
- auto& file = file_level_.files[num];
- file.fd = FileDescriptor(num + 1, 0, 0);
- file.smallest_key = Slice(mem, smallest_slice.size());
- file.largest_key = Slice(mem + smallest_slice.size(),
- largest_slice.size());
- file_level_.num_files++;
- }
- int Find(const char* key) {
- InternalKey target(key, 100, kTypeValue);
- InternalKeyComparator cmp(BytewiseComparator());
- return FindFile(cmp, file_level_, target.Encode());
- }
- bool Overlaps(const char* smallest, const char* largest) {
- InternalKeyComparator cmp(BytewiseComparator());
- Slice s(smallest != nullptr ? smallest : "");
- Slice l(largest != nullptr ? largest : "");
- return SomeFileOverlapsRange(cmp, disjoint_sorted_files_, file_level_,
- (smallest != nullptr ? &s : nullptr),
- (largest != nullptr ? &l : nullptr));
- }
- };
- TEST_F(FindLevelFileTest, LevelEmpty) {
- LevelFileInit(0);
- ASSERT_EQ(0, Find("foo"));
- ASSERT_TRUE(! Overlaps("a", "z"));
- ASSERT_TRUE(! Overlaps(nullptr, "z"));
- ASSERT_TRUE(! Overlaps("a", nullptr));
- ASSERT_TRUE(! Overlaps(nullptr, nullptr));
- }
- TEST_F(FindLevelFileTest, LevelSingle) {
- LevelFileInit(1);
- Add("p", "q");
- ASSERT_EQ(0, Find("a"));
- ASSERT_EQ(0, Find("p"));
- ASSERT_EQ(0, Find("p1"));
- ASSERT_EQ(0, Find("q"));
- ASSERT_EQ(1, Find("q1"));
- ASSERT_EQ(1, Find("z"));
- ASSERT_TRUE(! Overlaps("a", "b"));
- ASSERT_TRUE(! Overlaps("z1", "z2"));
- ASSERT_TRUE(Overlaps("a", "p"));
- ASSERT_TRUE(Overlaps("a", "q"));
- ASSERT_TRUE(Overlaps("a", "z"));
- ASSERT_TRUE(Overlaps("p", "p1"));
- ASSERT_TRUE(Overlaps("p", "q"));
- ASSERT_TRUE(Overlaps("p", "z"));
- ASSERT_TRUE(Overlaps("p1", "p2"));
- ASSERT_TRUE(Overlaps("p1", "z"));
- ASSERT_TRUE(Overlaps("q", "q"));
- ASSERT_TRUE(Overlaps("q", "q1"));
- ASSERT_TRUE(! Overlaps(nullptr, "j"));
- ASSERT_TRUE(! Overlaps("r", nullptr));
- ASSERT_TRUE(Overlaps(nullptr, "p"));
- ASSERT_TRUE(Overlaps(nullptr, "p1"));
- ASSERT_TRUE(Overlaps("q", nullptr));
- ASSERT_TRUE(Overlaps(nullptr, nullptr));
- }
- TEST_F(FindLevelFileTest, LevelMultiple) {
- LevelFileInit(4);
- Add("150", "200");
- Add("200", "250");
- Add("300", "350");
- Add("400", "450");
- ASSERT_EQ(0, Find("100"));
- ASSERT_EQ(0, Find("150"));
- ASSERT_EQ(0, Find("151"));
- ASSERT_EQ(0, Find("199"));
- ASSERT_EQ(0, Find("200"));
- ASSERT_EQ(1, Find("201"));
- ASSERT_EQ(1, Find("249"));
- ASSERT_EQ(1, Find("250"));
- ASSERT_EQ(2, Find("251"));
- ASSERT_EQ(2, Find("299"));
- ASSERT_EQ(2, Find("300"));
- ASSERT_EQ(2, Find("349"));
- ASSERT_EQ(2, Find("350"));
- ASSERT_EQ(3, Find("351"));
- ASSERT_EQ(3, Find("400"));
- ASSERT_EQ(3, Find("450"));
- ASSERT_EQ(4, Find("451"));
- ASSERT_TRUE(! Overlaps("100", "149"));
- ASSERT_TRUE(! Overlaps("251", "299"));
- ASSERT_TRUE(! Overlaps("451", "500"));
- ASSERT_TRUE(! Overlaps("351", "399"));
- ASSERT_TRUE(Overlaps("100", "150"));
- ASSERT_TRUE(Overlaps("100", "200"));
- ASSERT_TRUE(Overlaps("100", "300"));
- ASSERT_TRUE(Overlaps("100", "400"));
- ASSERT_TRUE(Overlaps("100", "500"));
- ASSERT_TRUE(Overlaps("375", "400"));
- ASSERT_TRUE(Overlaps("450", "450"));
- ASSERT_TRUE(Overlaps("450", "500"));
- }
- TEST_F(FindLevelFileTest, LevelMultipleNullBoundaries) {
- LevelFileInit(4);
- Add("150", "200");
- Add("200", "250");
- Add("300", "350");
- Add("400", "450");
- ASSERT_TRUE(! Overlaps(nullptr, "149"));
- ASSERT_TRUE(! Overlaps("451", nullptr));
- ASSERT_TRUE(Overlaps(nullptr, nullptr));
- ASSERT_TRUE(Overlaps(nullptr, "150"));
- ASSERT_TRUE(Overlaps(nullptr, "199"));
- ASSERT_TRUE(Overlaps(nullptr, "200"));
- ASSERT_TRUE(Overlaps(nullptr, "201"));
- ASSERT_TRUE(Overlaps(nullptr, "400"));
- ASSERT_TRUE(Overlaps(nullptr, "800"));
- ASSERT_TRUE(Overlaps("100", nullptr));
- ASSERT_TRUE(Overlaps("200", nullptr));
- ASSERT_TRUE(Overlaps("449", nullptr));
- ASSERT_TRUE(Overlaps("450", nullptr));
- }
- TEST_F(FindLevelFileTest, LevelOverlapSequenceChecks) {
- LevelFileInit(1);
- Add("200", "200", 5000, 3000);
- ASSERT_TRUE(! Overlaps("199", "199"));
- ASSERT_TRUE(! Overlaps("201", "300"));
- ASSERT_TRUE(Overlaps("200", "200"));
- ASSERT_TRUE(Overlaps("190", "200"));
- ASSERT_TRUE(Overlaps("200", "210"));
- }
- TEST_F(FindLevelFileTest, LevelOverlappingFiles) {
- LevelFileInit(2);
- Add("150", "600");
- Add("400", "500");
- disjoint_sorted_files_ = false;
- ASSERT_TRUE(! Overlaps("100", "149"));
- ASSERT_TRUE(! Overlaps("601", "700"));
- ASSERT_TRUE(Overlaps("100", "150"));
- ASSERT_TRUE(Overlaps("100", "200"));
- ASSERT_TRUE(Overlaps("100", "300"));
- ASSERT_TRUE(Overlaps("100", "400"));
- ASSERT_TRUE(Overlaps("100", "500"));
- ASSERT_TRUE(Overlaps("375", "400"));
- ASSERT_TRUE(Overlaps("450", "450"));
- ASSERT_TRUE(Overlaps("450", "500"));
- ASSERT_TRUE(Overlaps("450", "700"));
- ASSERT_TRUE(Overlaps("600", "700"));
- }
- class VersionSetTestBase {
- public:
- const static std::string kColumnFamilyName1;
- const static std::string kColumnFamilyName2;
- const static std::string kColumnFamilyName3;
- int num_initial_edits_;
- VersionSetTestBase()
- : env_(Env::Default()),
- fs_(std::make_shared<LegacyFileSystemWrapper>(env_)),
- dbname_(test::PerThreadDBPath("version_set_test")),
- db_options_(),
- mutable_cf_options_(cf_options_),
- table_cache_(NewLRUCache(50000, 16)),
- write_buffer_manager_(db_options_.db_write_buffer_size),
- shutting_down_(false),
- mock_table_factory_(std::make_shared<mock::MockTableFactory>()) {
- EXPECT_OK(env_->CreateDirIfMissing(dbname_));
- db_options_.env = env_;
- db_options_.fs = fs_;
- versions_.reset(new VersionSet(dbname_, &db_options_, env_options_,
- table_cache_.get(), &write_buffer_manager_,
- &write_controller_,
- /*block_cache_tracer=*/nullptr)),
- reactive_versions_ = std::make_shared<ReactiveVersionSet>(
- dbname_, &db_options_, env_options_, table_cache_.get(),
- &write_buffer_manager_, &write_controller_);
- db_options_.db_paths.emplace_back(dbname_,
- std::numeric_limits<uint64_t>::max());
- }
- void PrepareManifest(std::vector<ColumnFamilyDescriptor>* column_families,
- SequenceNumber* last_seqno,
- std::unique_ptr<log::Writer>* log_writer) {
- assert(column_families != nullptr);
- assert(last_seqno != nullptr);
- assert(log_writer != nullptr);
- VersionEdit new_db;
- if (db_options_.write_dbid_to_manifest) {
- DBImpl* impl = new DBImpl(DBOptions(), dbname_);
- std::string db_id;
- impl->GetDbIdentityFromIdentityFile(&db_id);
- new_db.SetDBId(db_id);
- }
- new_db.SetLogNumber(0);
- new_db.SetNextFile(2);
- new_db.SetLastSequence(0);
- const std::vector<std::string> cf_names = {
- kDefaultColumnFamilyName, kColumnFamilyName1, kColumnFamilyName2,
- kColumnFamilyName3};
- const int kInitialNumOfCfs = static_cast<int>(cf_names.size());
- autovector<VersionEdit> new_cfs;
- uint64_t last_seq = 1;
- uint32_t cf_id = 1;
- for (int i = 1; i != kInitialNumOfCfs; ++i) {
- VersionEdit new_cf;
- new_cf.AddColumnFamily(cf_names[i]);
- new_cf.SetColumnFamily(cf_id++);
- new_cf.SetLogNumber(0);
- new_cf.SetNextFile(2);
- new_cf.SetLastSequence(last_seq++);
- new_cfs.emplace_back(new_cf);
- }
- *last_seqno = last_seq;
- num_initial_edits_ = static_cast<int>(new_cfs.size() + 1);
- const std::string manifest = DescriptorFileName(dbname_, 1);
- std::unique_ptr<WritableFile> file;
- Status s = env_->NewWritableFile(
- manifest, &file, env_->OptimizeForManifestWrite(env_options_));
- ASSERT_OK(s);
- std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
- NewLegacyWritableFileWrapper(std::move(file)), manifest, env_options_));
- {
- log_writer->reset(new log::Writer(std::move(file_writer), 0, false));
- std::string record;
- new_db.EncodeTo(&record);
- s = (*log_writer)->AddRecord(record);
- for (const auto& e : new_cfs) {
- record.clear();
- e.EncodeTo(&record);
- s = (*log_writer)->AddRecord(record);
- ASSERT_OK(s);
- }
- }
- ASSERT_OK(s);
- cf_options_.table_factory = mock_table_factory_;
- for (const auto& cf_name : cf_names) {
- column_families->emplace_back(cf_name, cf_options_);
- }
- }
- // Create DB with 3 column families.
- void NewDB() {
- std::vector<ColumnFamilyDescriptor> column_families;
- SequenceNumber last_seqno;
- std::unique_ptr<log::Writer> log_writer;
- SetIdentityFile(env_, dbname_);
- PrepareManifest(&column_families, &last_seqno, &log_writer);
- log_writer.reset();
- // Make "CURRENT" file point to the new manifest file.
- Status s = SetCurrentFile(env_, dbname_, 1, nullptr);
- ASSERT_OK(s);
- EXPECT_OK(versions_->Recover(column_families, false));
- EXPECT_EQ(column_families.size(),
- versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
- }
- Env* env_;
- std::shared_ptr<FileSystem> fs_;
- const std::string dbname_;
- EnvOptions env_options_;
- ImmutableDBOptions db_options_;
- ColumnFamilyOptions cf_options_;
- MutableCFOptions mutable_cf_options_;
- std::shared_ptr<Cache> table_cache_;
- WriteController write_controller_;
- WriteBufferManager write_buffer_manager_;
- std::shared_ptr<VersionSet> versions_;
- std::shared_ptr<ReactiveVersionSet> reactive_versions_;
- InstrumentedMutex mutex_;
- std::atomic<bool> shutting_down_;
- std::shared_ptr<mock::MockTableFactory> mock_table_factory_;
- };
- const std::string VersionSetTestBase::kColumnFamilyName1 = "alice";
- const std::string VersionSetTestBase::kColumnFamilyName2 = "bob";
- const std::string VersionSetTestBase::kColumnFamilyName3 = "charles";
- class VersionSetTest : public VersionSetTestBase, public testing::Test {
- public:
- VersionSetTest() : VersionSetTestBase() {}
- };
- TEST_F(VersionSetTest, SameColumnFamilyGroupCommit) {
- NewDB();
- const int kGroupSize = 5;
- autovector<VersionEdit> edits;
- for (int i = 0; i != kGroupSize; ++i) {
- edits.emplace_back(VersionEdit());
- }
- autovector<ColumnFamilyData*> cfds;
- autovector<const MutableCFOptions*> all_mutable_cf_options;
- autovector<autovector<VersionEdit*>> edit_lists;
- for (int i = 0; i != kGroupSize; ++i) {
- cfds.emplace_back(versions_->GetColumnFamilySet()->GetDefault());
- all_mutable_cf_options.emplace_back(&mutable_cf_options_);
- autovector<VersionEdit*> edit_list;
- edit_list.emplace_back(&edits[i]);
- edit_lists.emplace_back(edit_list);
- }
- SyncPoint::GetInstance()->DisableProcessing();
- SyncPoint::GetInstance()->ClearAllCallBacks();
- int count = 0;
- SyncPoint::GetInstance()->SetCallBack(
- "VersionSet::ProcessManifestWrites:SameColumnFamily", [&](void* arg) {
- uint32_t* cf_id = reinterpret_cast<uint32_t*>(arg);
- EXPECT_EQ(0u, *cf_id);
- ++count;
- });
- SyncPoint::GetInstance()->EnableProcessing();
- mutex_.Lock();
- Status s =
- versions_->LogAndApply(cfds, all_mutable_cf_options, edit_lists, &mutex_);
- mutex_.Unlock();
- EXPECT_OK(s);
- EXPECT_EQ(kGroupSize - 1, count);
- }
- class VersionSetAtomicGroupTest : public VersionSetTestBase,
- public testing::Test {
- public:
- VersionSetAtomicGroupTest() : VersionSetTestBase() {}
- void SetUp() override {
- PrepareManifest(&column_families_, &last_seqno_, &log_writer_);
- SetupTestSyncPoints();
- }
- void SetupValidAtomicGroup(int atomic_group_size) {
- edits_.resize(atomic_group_size);
- int remaining = atomic_group_size;
- for (size_t i = 0; i != edits_.size(); ++i) {
- edits_[i].SetLogNumber(0);
- edits_[i].SetNextFile(2);
- edits_[i].MarkAtomicGroup(--remaining);
- edits_[i].SetLastSequence(last_seqno_++);
- }
- ASSERT_OK(SetCurrentFile(env_, dbname_, 1, nullptr));
- }
- void SetupIncompleteTrailingAtomicGroup(int atomic_group_size) {
- edits_.resize(atomic_group_size);
- int remaining = atomic_group_size;
- for (size_t i = 0; i != edits_.size(); ++i) {
- edits_[i].SetLogNumber(0);
- edits_[i].SetNextFile(2);
- edits_[i].MarkAtomicGroup(--remaining);
- edits_[i].SetLastSequence(last_seqno_++);
- }
- ASSERT_OK(SetCurrentFile(env_, dbname_, 1, nullptr));
- }
- void SetupCorruptedAtomicGroup(int atomic_group_size) {
- edits_.resize(atomic_group_size);
- int remaining = atomic_group_size;
- for (size_t i = 0; i != edits_.size(); ++i) {
- edits_[i].SetLogNumber(0);
- edits_[i].SetNextFile(2);
- if (i != ((size_t)atomic_group_size / 2)) {
- edits_[i].MarkAtomicGroup(--remaining);
- }
- edits_[i].SetLastSequence(last_seqno_++);
- }
- ASSERT_OK(SetCurrentFile(env_, dbname_, 1, nullptr));
- }
- void SetupIncorrectAtomicGroup(int atomic_group_size) {
- edits_.resize(atomic_group_size);
- int remaining = atomic_group_size;
- for (size_t i = 0; i != edits_.size(); ++i) {
- edits_[i].SetLogNumber(0);
- edits_[i].SetNextFile(2);
- if (i != 1) {
- edits_[i].MarkAtomicGroup(--remaining);
- } else {
- edits_[i].MarkAtomicGroup(remaining--);
- }
- edits_[i].SetLastSequence(last_seqno_++);
- }
- ASSERT_OK(SetCurrentFile(env_, dbname_, 1, nullptr));
- }
- void SetupTestSyncPoints() {
- SyncPoint::GetInstance()->DisableProcessing();
- SyncPoint::GetInstance()->ClearAllCallBacks();
- SyncPoint::GetInstance()->SetCallBack(
- "AtomicGroupReadBuffer::AddEdit:FirstInAtomicGroup", [&](void* arg) {
- VersionEdit* e = reinterpret_cast<VersionEdit*>(arg);
- EXPECT_EQ(edits_.front().DebugString(),
- e->DebugString()); // compare based on value
- first_in_atomic_group_ = true;
- });
- SyncPoint::GetInstance()->SetCallBack(
- "AtomicGroupReadBuffer::AddEdit:LastInAtomicGroup", [&](void* arg) {
- VersionEdit* e = reinterpret_cast<VersionEdit*>(arg);
- EXPECT_EQ(edits_.back().DebugString(),
- e->DebugString()); // compare based on value
- EXPECT_TRUE(first_in_atomic_group_);
- last_in_atomic_group_ = true;
- });
- SyncPoint::GetInstance()->SetCallBack(
- "VersionSet::ReadAndRecover:RecoveredEdits", [&](void* arg) {
- num_recovered_edits_ = *reinterpret_cast<int*>(arg);
- });
- SyncPoint::GetInstance()->SetCallBack(
- "ReactiveVersionSet::ReadAndApply:AppliedEdits",
- [&](void* arg) { num_applied_edits_ = *reinterpret_cast<int*>(arg); });
- SyncPoint::GetInstance()->SetCallBack(
- "AtomicGroupReadBuffer::AddEdit:AtomicGroup",
- [&](void* /* arg */) { ++num_edits_in_atomic_group_; });
- SyncPoint::GetInstance()->SetCallBack(
- "AtomicGroupReadBuffer::AddEdit:AtomicGroupMixedWithNormalEdits",
- [&](void* arg) {
- corrupted_edit_ = *reinterpret_cast<VersionEdit*>(arg);
- });
- SyncPoint::GetInstance()->SetCallBack(
- "AtomicGroupReadBuffer::AddEdit:IncorrectAtomicGroupSize",
- [&](void* arg) {
- edit_with_incorrect_group_size_ =
- *reinterpret_cast<VersionEdit*>(arg);
- });
- SyncPoint::GetInstance()->EnableProcessing();
- }
- void AddNewEditsToLog(int num_edits) {
- for (int i = 0; i < num_edits; i++) {
- std::string record;
- edits_[i].EncodeTo(&record);
- ASSERT_OK(log_writer_->AddRecord(record));
- }
- }
- void TearDown() override {
- SyncPoint::GetInstance()->DisableProcessing();
- SyncPoint::GetInstance()->ClearAllCallBacks();
- log_writer_.reset();
- }
- protected:
- std::vector<ColumnFamilyDescriptor> column_families_;
- SequenceNumber last_seqno_;
- std::vector<VersionEdit> edits_;
- bool first_in_atomic_group_ = false;
- bool last_in_atomic_group_ = false;
- int num_edits_in_atomic_group_ = 0;
- int num_recovered_edits_ = 0;
- int num_applied_edits_ = 0;
- VersionEdit corrupted_edit_;
- VersionEdit edit_with_incorrect_group_size_;
- std::unique_ptr<log::Writer> log_writer_;
- };
- TEST_F(VersionSetAtomicGroupTest, HandleValidAtomicGroupWithVersionSetRecover) {
- const int kAtomicGroupSize = 3;
- SetupValidAtomicGroup(kAtomicGroupSize);
- AddNewEditsToLog(kAtomicGroupSize);
- EXPECT_OK(versions_->Recover(column_families_, false));
- EXPECT_EQ(column_families_.size(),
- versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
- EXPECT_TRUE(first_in_atomic_group_);
- EXPECT_TRUE(last_in_atomic_group_);
- EXPECT_EQ(num_initial_edits_ + kAtomicGroupSize, num_recovered_edits_);
- EXPECT_EQ(0, num_applied_edits_);
- }
- TEST_F(VersionSetAtomicGroupTest,
- HandleValidAtomicGroupWithReactiveVersionSetRecover) {
- const int kAtomicGroupSize = 3;
- SetupValidAtomicGroup(kAtomicGroupSize);
- AddNewEditsToLog(kAtomicGroupSize);
- std::unique_ptr<log::FragmentBufferedReader> manifest_reader;
- std::unique_ptr<log::Reader::Reporter> manifest_reporter;
- std::unique_ptr<Status> manifest_reader_status;
- EXPECT_OK(reactive_versions_->Recover(column_families_, &manifest_reader,
- &manifest_reporter,
- &manifest_reader_status));
- EXPECT_EQ(column_families_.size(),
- reactive_versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
- EXPECT_TRUE(first_in_atomic_group_);
- EXPECT_TRUE(last_in_atomic_group_);
- // The recover should clean up the replay buffer.
- EXPECT_TRUE(reactive_versions_->TEST_read_edits_in_atomic_group() == 0);
- EXPECT_TRUE(reactive_versions_->replay_buffer().size() == 0);
- EXPECT_EQ(num_initial_edits_ + kAtomicGroupSize, num_recovered_edits_);
- EXPECT_EQ(0, num_applied_edits_);
- }
- TEST_F(VersionSetAtomicGroupTest,
- HandleValidAtomicGroupWithReactiveVersionSetReadAndApply) {
- const int kAtomicGroupSize = 3;
- SetupValidAtomicGroup(kAtomicGroupSize);
- std::unique_ptr<log::FragmentBufferedReader> manifest_reader;
- std::unique_ptr<log::Reader::Reporter> manifest_reporter;
- std::unique_ptr<Status> manifest_reader_status;
- EXPECT_OK(reactive_versions_->Recover(column_families_, &manifest_reader,
- &manifest_reporter,
- &manifest_reader_status));
- AddNewEditsToLog(kAtomicGroupSize);
- InstrumentedMutex mu;
- std::unordered_set<ColumnFamilyData*> cfds_changed;
- mu.Lock();
- EXPECT_OK(
- reactive_versions_->ReadAndApply(&mu, &manifest_reader, &cfds_changed));
- mu.Unlock();
- EXPECT_TRUE(first_in_atomic_group_);
- EXPECT_TRUE(last_in_atomic_group_);
- // The recover should clean up the replay buffer.
- EXPECT_TRUE(reactive_versions_->TEST_read_edits_in_atomic_group() == 0);
- EXPECT_TRUE(reactive_versions_->replay_buffer().size() == 0);
- EXPECT_EQ(num_initial_edits_, num_recovered_edits_);
- EXPECT_EQ(kAtomicGroupSize, num_applied_edits_);
- }
- TEST_F(VersionSetAtomicGroupTest,
- HandleIncompleteTrailingAtomicGroupWithVersionSetRecover) {
- const int kAtomicGroupSize = 4;
- const int kNumberOfPersistedVersionEdits = kAtomicGroupSize - 1;
- SetupIncompleteTrailingAtomicGroup(kAtomicGroupSize);
- AddNewEditsToLog(kNumberOfPersistedVersionEdits);
- EXPECT_OK(versions_->Recover(column_families_, false));
- EXPECT_EQ(column_families_.size(),
- versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
- EXPECT_TRUE(first_in_atomic_group_);
- EXPECT_FALSE(last_in_atomic_group_);
- EXPECT_EQ(kNumberOfPersistedVersionEdits, num_edits_in_atomic_group_);
- EXPECT_EQ(num_initial_edits_, num_recovered_edits_);
- EXPECT_EQ(0, num_applied_edits_);
- }
- TEST_F(VersionSetAtomicGroupTest,
- HandleIncompleteTrailingAtomicGroupWithReactiveVersionSetRecover) {
- const int kAtomicGroupSize = 4;
- const int kNumberOfPersistedVersionEdits = kAtomicGroupSize - 1;
- SetupIncompleteTrailingAtomicGroup(kAtomicGroupSize);
- AddNewEditsToLog(kNumberOfPersistedVersionEdits);
- std::unique_ptr<log::FragmentBufferedReader> manifest_reader;
- std::unique_ptr<log::Reader::Reporter> manifest_reporter;
- std::unique_ptr<Status> manifest_reader_status;
- EXPECT_OK(reactive_versions_->Recover(column_families_, &manifest_reader,
- &manifest_reporter,
- &manifest_reader_status));
- EXPECT_EQ(column_families_.size(),
- reactive_versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
- EXPECT_TRUE(first_in_atomic_group_);
- EXPECT_FALSE(last_in_atomic_group_);
- EXPECT_EQ(kNumberOfPersistedVersionEdits, num_edits_in_atomic_group_);
- // Reactive version set should store the edits in the replay buffer.
- EXPECT_TRUE(reactive_versions_->TEST_read_edits_in_atomic_group() ==
- kNumberOfPersistedVersionEdits);
- EXPECT_TRUE(reactive_versions_->replay_buffer().size() == kAtomicGroupSize);
- // Write the last record. The reactive version set should now apply all
- // edits.
- std::string last_record;
- edits_[kAtomicGroupSize - 1].EncodeTo(&last_record);
- EXPECT_OK(log_writer_->AddRecord(last_record));
- InstrumentedMutex mu;
- std::unordered_set<ColumnFamilyData*> cfds_changed;
- mu.Lock();
- EXPECT_OK(
- reactive_versions_->ReadAndApply(&mu, &manifest_reader, &cfds_changed));
- mu.Unlock();
- // Reactive version set should be empty now.
- EXPECT_TRUE(reactive_versions_->TEST_read_edits_in_atomic_group() == 0);
- EXPECT_TRUE(reactive_versions_->replay_buffer().size() == 0);
- EXPECT_EQ(num_initial_edits_, num_recovered_edits_);
- EXPECT_EQ(kAtomicGroupSize, num_applied_edits_);
- }
- TEST_F(VersionSetAtomicGroupTest,
- HandleIncompleteTrailingAtomicGroupWithReactiveVersionSetReadAndApply) {
- const int kAtomicGroupSize = 4;
- const int kNumberOfPersistedVersionEdits = kAtomicGroupSize - 1;
- SetupIncompleteTrailingAtomicGroup(kAtomicGroupSize);
- std::unique_ptr<log::FragmentBufferedReader> manifest_reader;
- std::unique_ptr<log::Reader::Reporter> manifest_reporter;
- std::unique_ptr<Status> manifest_reader_status;
- // No edits in an atomic group.
- EXPECT_OK(reactive_versions_->Recover(column_families_, &manifest_reader,
- &manifest_reporter,
- &manifest_reader_status));
- EXPECT_EQ(column_families_.size(),
- reactive_versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
- // Write a few edits in an atomic group.
- AddNewEditsToLog(kNumberOfPersistedVersionEdits);
- InstrumentedMutex mu;
- std::unordered_set<ColumnFamilyData*> cfds_changed;
- mu.Lock();
- EXPECT_OK(
- reactive_versions_->ReadAndApply(&mu, &manifest_reader, &cfds_changed));
- mu.Unlock();
- EXPECT_TRUE(first_in_atomic_group_);
- EXPECT_FALSE(last_in_atomic_group_);
- EXPECT_EQ(kNumberOfPersistedVersionEdits, num_edits_in_atomic_group_);
- // Reactive version set should store the edits in the replay buffer.
- EXPECT_TRUE(reactive_versions_->TEST_read_edits_in_atomic_group() ==
- kNumberOfPersistedVersionEdits);
- EXPECT_TRUE(reactive_versions_->replay_buffer().size() == kAtomicGroupSize);
- EXPECT_EQ(num_initial_edits_, num_recovered_edits_);
- EXPECT_EQ(0, num_applied_edits_);
- }
- TEST_F(VersionSetAtomicGroupTest,
- HandleCorruptedAtomicGroupWithVersionSetRecover) {
- const int kAtomicGroupSize = 4;
- SetupCorruptedAtomicGroup(kAtomicGroupSize);
- AddNewEditsToLog(kAtomicGroupSize);
- EXPECT_NOK(versions_->Recover(column_families_, false));
- EXPECT_EQ(column_families_.size(),
- versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
- EXPECT_EQ(edits_[kAtomicGroupSize / 2].DebugString(),
- corrupted_edit_.DebugString());
- }
- TEST_F(VersionSetAtomicGroupTest,
- HandleCorruptedAtomicGroupWithReactiveVersionSetRecover) {
- const int kAtomicGroupSize = 4;
- SetupCorruptedAtomicGroup(kAtomicGroupSize);
- AddNewEditsToLog(kAtomicGroupSize);
- std::unique_ptr<log::FragmentBufferedReader> manifest_reader;
- std::unique_ptr<log::Reader::Reporter> manifest_reporter;
- std::unique_ptr<Status> manifest_reader_status;
- EXPECT_NOK(reactive_versions_->Recover(column_families_, &manifest_reader,
- &manifest_reporter,
- &manifest_reader_status));
- EXPECT_EQ(column_families_.size(),
- reactive_versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
- EXPECT_EQ(edits_[kAtomicGroupSize / 2].DebugString(),
- corrupted_edit_.DebugString());
- }
- TEST_F(VersionSetAtomicGroupTest,
- HandleCorruptedAtomicGroupWithReactiveVersionSetReadAndApply) {
- const int kAtomicGroupSize = 4;
- SetupCorruptedAtomicGroup(kAtomicGroupSize);
- InstrumentedMutex mu;
- std::unordered_set<ColumnFamilyData*> cfds_changed;
- std::unique_ptr<log::FragmentBufferedReader> manifest_reader;
- std::unique_ptr<log::Reader::Reporter> manifest_reporter;
- std::unique_ptr<Status> manifest_reader_status;
- EXPECT_OK(reactive_versions_->Recover(column_families_, &manifest_reader,
- &manifest_reporter,
- &manifest_reader_status));
- // Write the corrupted edits.
- AddNewEditsToLog(kAtomicGroupSize);
- mu.Lock();
- EXPECT_OK(
- reactive_versions_->ReadAndApply(&mu, &manifest_reader, &cfds_changed));
- mu.Unlock();
- EXPECT_EQ(edits_[kAtomicGroupSize / 2].DebugString(),
- corrupted_edit_.DebugString());
- }
- TEST_F(VersionSetAtomicGroupTest,
- HandleIncorrectAtomicGroupSizeWithVersionSetRecover) {
- const int kAtomicGroupSize = 4;
- SetupIncorrectAtomicGroup(kAtomicGroupSize);
- AddNewEditsToLog(kAtomicGroupSize);
- EXPECT_NOK(versions_->Recover(column_families_, false));
- EXPECT_EQ(column_families_.size(),
- versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
- EXPECT_EQ(edits_[1].DebugString(),
- edit_with_incorrect_group_size_.DebugString());
- }
- TEST_F(VersionSetAtomicGroupTest,
- HandleIncorrectAtomicGroupSizeWithReactiveVersionSetRecover) {
- const int kAtomicGroupSize = 4;
- SetupIncorrectAtomicGroup(kAtomicGroupSize);
- AddNewEditsToLog(kAtomicGroupSize);
- std::unique_ptr<log::FragmentBufferedReader> manifest_reader;
- std::unique_ptr<log::Reader::Reporter> manifest_reporter;
- std::unique_ptr<Status> manifest_reader_status;
- EXPECT_NOK(reactive_versions_->Recover(column_families_, &manifest_reader,
- &manifest_reporter,
- &manifest_reader_status));
- EXPECT_EQ(column_families_.size(),
- reactive_versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
- EXPECT_EQ(edits_[1].DebugString(),
- edit_with_incorrect_group_size_.DebugString());
- }
- TEST_F(VersionSetAtomicGroupTest,
- HandleIncorrectAtomicGroupSizeWithReactiveVersionSetReadAndApply) {
- const int kAtomicGroupSize = 4;
- SetupIncorrectAtomicGroup(kAtomicGroupSize);
- InstrumentedMutex mu;
- std::unordered_set<ColumnFamilyData*> cfds_changed;
- std::unique_ptr<log::FragmentBufferedReader> manifest_reader;
- std::unique_ptr<log::Reader::Reporter> manifest_reporter;
- std::unique_ptr<Status> manifest_reader_status;
- EXPECT_OK(reactive_versions_->Recover(column_families_, &manifest_reader,
- &manifest_reporter,
- &manifest_reader_status));
- AddNewEditsToLog(kAtomicGroupSize);
- mu.Lock();
- EXPECT_OK(
- reactive_versions_->ReadAndApply(&mu, &manifest_reader, &cfds_changed));
- mu.Unlock();
- EXPECT_EQ(edits_[1].DebugString(),
- edit_with_incorrect_group_size_.DebugString());
- }
- class VersionSetTestDropOneCF : public VersionSetTestBase,
- public testing::TestWithParam<std::string> {
- public:
- VersionSetTestDropOneCF() : VersionSetTestBase() {}
- };
- // This test simulates the following execution sequence
- // Time thread1 bg_flush_thr
- // | Prepare version edits (e1,e2,e3) for atomic
- // | flush cf1, cf2, cf3
- // | Enqueue e to drop cfi
- // | to manifest_writers_
- // | Enqueue (e1,e2,e3) to manifest_writers_
- // |
- // | Apply e,
- // | cfi.IsDropped() is true
- // | Apply (e1,e2,e3),
- // | since cfi.IsDropped() == true, we need to
- // | drop ei and write the rest to MANIFEST.
- // V
- //
- // Repeat the test for i = 1, 2, 3 to simulate dropping the first, middle and
- // last column family in an atomic group.
- TEST_P(VersionSetTestDropOneCF, HandleDroppedColumnFamilyInAtomicGroup) {
- std::vector<ColumnFamilyDescriptor> column_families;
- SequenceNumber last_seqno;
- std::unique_ptr<log::Writer> log_writer;
- PrepareManifest(&column_families, &last_seqno, &log_writer);
- Status s = SetCurrentFile(env_, dbname_, 1, nullptr);
- ASSERT_OK(s);
- EXPECT_OK(versions_->Recover(column_families, false /* read_only */));
- EXPECT_EQ(column_families.size(),
- versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
- const int kAtomicGroupSize = 3;
- const std::vector<std::string> non_default_cf_names = {
- kColumnFamilyName1, kColumnFamilyName2, kColumnFamilyName3};
- // Drop one column family
- VersionEdit drop_cf_edit;
- drop_cf_edit.DropColumnFamily();
- const std::string cf_to_drop_name(GetParam());
- auto cfd_to_drop =
- versions_->GetColumnFamilySet()->GetColumnFamily(cf_to_drop_name);
- ASSERT_NE(nullptr, cfd_to_drop);
- // Increase its refcount because cfd_to_drop is used later, and we need to
- // prevent it from being deleted.
- cfd_to_drop->Ref();
- drop_cf_edit.SetColumnFamily(cfd_to_drop->GetID());
- mutex_.Lock();
- s = versions_->LogAndApply(cfd_to_drop,
- *cfd_to_drop->GetLatestMutableCFOptions(),
- &drop_cf_edit, &mutex_);
- mutex_.Unlock();
- ASSERT_OK(s);
- std::vector<VersionEdit> edits(kAtomicGroupSize);
- uint32_t remaining = kAtomicGroupSize;
- size_t i = 0;
- autovector<ColumnFamilyData*> cfds;
- autovector<const MutableCFOptions*> mutable_cf_options_list;
- autovector<autovector<VersionEdit*>> edit_lists;
- for (const auto& cf_name : non_default_cf_names) {
- auto cfd = (cf_name != cf_to_drop_name)
- ? versions_->GetColumnFamilySet()->GetColumnFamily(cf_name)
- : cfd_to_drop;
- ASSERT_NE(nullptr, cfd);
- cfds.push_back(cfd);
- mutable_cf_options_list.emplace_back(cfd->GetLatestMutableCFOptions());
- edits[i].SetColumnFamily(cfd->GetID());
- edits[i].SetLogNumber(0);
- edits[i].SetNextFile(2);
- edits[i].MarkAtomicGroup(--remaining);
- edits[i].SetLastSequence(last_seqno++);
- autovector<VersionEdit*> tmp_edits;
- tmp_edits.push_back(&edits[i]);
- edit_lists.emplace_back(tmp_edits);
- ++i;
- }
- int called = 0;
- SyncPoint::GetInstance()->DisableProcessing();
- SyncPoint::GetInstance()->ClearAllCallBacks();
- SyncPoint::GetInstance()->SetCallBack(
- "VersionSet::ProcessManifestWrites:CheckOneAtomicGroup", [&](void* arg) {
- std::vector<VersionEdit*>* tmp_edits =
- reinterpret_cast<std::vector<VersionEdit*>*>(arg);
- EXPECT_EQ(kAtomicGroupSize - 1, tmp_edits->size());
- for (const auto e : *tmp_edits) {
- bool found = false;
- for (const auto& e2 : edits) {
- if (&e2 == e) {
- found = true;
- break;
- }
- }
- ASSERT_TRUE(found);
- }
- ++called;
- });
- SyncPoint::GetInstance()->EnableProcessing();
- mutex_.Lock();
- s = versions_->LogAndApply(cfds, mutable_cf_options_list, edit_lists,
- &mutex_);
- mutex_.Unlock();
- ASSERT_OK(s);
- ASSERT_EQ(1, called);
- if (cfd_to_drop->Unref()) {
- delete cfd_to_drop;
- cfd_to_drop = nullptr;
- }
- }
- INSTANTIATE_TEST_CASE_P(
- AtomicGroup, VersionSetTestDropOneCF,
- testing::Values(VersionSetTestBase::kColumnFamilyName1,
- VersionSetTestBase::kColumnFamilyName2,
- VersionSetTestBase::kColumnFamilyName3));
- } // namespace ROCKSDB_NAMESPACE
- int main(int argc, char** argv) {
- ::testing::InitGoogleTest(&argc, argv);
- return RUN_ALL_TESTS();
- }
|