| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287 | //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.//  This source code is licensed under both the GPLv2 (found in the//  COPYING file in the root directory) and Apache 2.0 License//  (found in the LICENSE.Apache file in the root directory).//// Copyright (c) 2011 The LevelDB Authors. All rights reserved.// Use of this source code is governed by a BSD-style license that can be// found in the LICENSE file. See the AUTHORS file for names of contributors.#include "db/version_set.h"#include "db/db_impl/db_impl.h"#include "db/log_writer.h"#include "logging/logging.h"#include "table/mock_table.h"#include "test_util/testharness.h"#include "test_util/testutil.h"#include "util/string_util.h"namespace ROCKSDB_NAMESPACE {class GenerateLevelFilesBriefTest : public testing::Test { public:  std::vector<FileMetaData*> files_;  LevelFilesBrief file_level_;  Arena arena_;  GenerateLevelFilesBriefTest() { }  ~GenerateLevelFilesBriefTest() override {    for (size_t i = 0; i < files_.size(); i++) {      delete files_[i];    }  }  void Add(const char* smallest, const char* largest,           SequenceNumber smallest_seq = 100,           SequenceNumber largest_seq = 100) {    FileMetaData* f = new FileMetaData(        files_.size() + 1, 0, 0,        InternalKey(smallest, smallest_seq, kTypeValue),        InternalKey(largest, largest_seq, kTypeValue), smallest_seq,        largest_seq, /* marked_for_compact */ false, kInvalidBlobFileNumber,        kUnknownOldestAncesterTime, kUnknownFileCreationTime,        kUnknownFileChecksum, kUnknownFileChecksumFuncName);    files_.push_back(f);  }  int Compare() {    int diff = 0;    for (size_t i = 0; i < files_.size(); i++) {      if (file_level_.files[i].fd.GetNumber() != files_[i]->fd.GetNumber()) {        diff++;      }    }    return diff;  }};TEST_F(GenerateLevelFilesBriefTest, Empty) {  DoGenerateLevelFilesBrief(&file_level_, files_, &arena_);  ASSERT_EQ(0u, file_level_.num_files);  ASSERT_EQ(0, Compare());}TEST_F(GenerateLevelFilesBriefTest, Single) {  Add("p", "q");  DoGenerateLevelFilesBrief(&file_level_, files_, &arena_);  ASSERT_EQ(1u, file_level_.num_files);  ASSERT_EQ(0, Compare());}TEST_F(GenerateLevelFilesBriefTest, Multiple) {  Add("150", "200");  Add("200", "250");  Add("300", "350");  Add("400", "450");  DoGenerateLevelFilesBrief(&file_level_, files_, &arena_);  ASSERT_EQ(4u, file_level_.num_files);  ASSERT_EQ(0, Compare());}class CountingLogger : public Logger { public:  CountingLogger() : log_count(0) {}  using Logger::Logv;  void Logv(const char* /*format*/, va_list /*ap*/) override { log_count++; }  int log_count;};Options GetOptionsWithNumLevels(int num_levels,                                std::shared_ptr<CountingLogger> logger) {  Options opt;  opt.num_levels = num_levels;  opt.info_log = logger;  return opt;}class VersionStorageInfoTest : public testing::Test { public:  const Comparator* ucmp_;  InternalKeyComparator icmp_;  std::shared_ptr<CountingLogger> logger_;  Options options_;  ImmutableCFOptions ioptions_;  MutableCFOptions mutable_cf_options_;  VersionStorageInfo vstorage_;  InternalKey GetInternalKey(const char* ukey,                             SequenceNumber smallest_seq = 100) {    return InternalKey(ukey, smallest_seq, kTypeValue);  }  VersionStorageInfoTest()      : ucmp_(BytewiseComparator()),        icmp_(ucmp_),        logger_(new CountingLogger()),        options_(GetOptionsWithNumLevels(6, logger_)),        ioptions_(options_),        mutable_cf_options_(options_),        vstorage_(&icmp_, ucmp_, 6, kCompactionStyleLevel, nullptr, false) {}  ~VersionStorageInfoTest() override {    for (int i = 0; i < vstorage_.num_levels(); i++) {      for (auto* f : vstorage_.LevelFiles(i)) {        if (--f->refs == 0) {          delete f;        }      }    }  }  void Add(int level, uint32_t file_number, const char* smallest,           const char* largest, uint64_t file_size = 0) {    assert(level < vstorage_.num_levels());    FileMetaData* f = new FileMetaData(        file_number, 0, file_size, GetInternalKey(smallest, 0),        GetInternalKey(largest, 0), /* smallest_seq */ 0, /* largest_seq */ 0,        /* marked_for_compact */ false, kInvalidBlobFileNumber,        kUnknownOldestAncesterTime, kUnknownFileCreationTime,        kUnknownFileChecksum, kUnknownFileChecksumFuncName);    f->compensated_file_size = file_size;    vstorage_.AddFile(level, f);  }  void Add(int level, uint32_t file_number, const InternalKey& smallest,           const InternalKey& largest, uint64_t file_size = 0) {    assert(level < vstorage_.num_levels());    FileMetaData* f = new FileMetaData(        file_number, 0, file_size, smallest, largest, /* smallest_seq */ 0,        /* largest_seq */ 0, /* marked_for_compact */ false,        kInvalidBlobFileNumber, kUnknownOldestAncesterTime,        kUnknownFileCreationTime, kUnknownFileChecksum,        kUnknownFileChecksumFuncName);    f->compensated_file_size = file_size;    vstorage_.AddFile(level, f);  }  std::string GetOverlappingFiles(int level, const InternalKey& begin,                                  const InternalKey& end) {    std::vector<FileMetaData*> inputs;    vstorage_.GetOverlappingInputs(level, &begin, &end, &inputs);    std::string result;    for (size_t i = 0; i < inputs.size(); ++i) {      if (i > 0) {        result += ",";      }      AppendNumberTo(&result, inputs[i]->fd.GetNumber());    }    return result;  }};TEST_F(VersionStorageInfoTest, MaxBytesForLevelStatic) {  ioptions_.level_compaction_dynamic_level_bytes = false;  mutable_cf_options_.max_bytes_for_level_base = 10;  mutable_cf_options_.max_bytes_for_level_multiplier = 5;  Add(4, 100U, "1", "2");  Add(5, 101U, "1", "2");  vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);  ASSERT_EQ(vstorage_.MaxBytesForLevel(1), 10U);  ASSERT_EQ(vstorage_.MaxBytesForLevel(2), 50U);  ASSERT_EQ(vstorage_.MaxBytesForLevel(3), 250U);  ASSERT_EQ(vstorage_.MaxBytesForLevel(4), 1250U);  ASSERT_EQ(0, logger_->log_count);}TEST_F(VersionStorageInfoTest, MaxBytesForLevelDynamic) {  ioptions_.level_compaction_dynamic_level_bytes = true;  mutable_cf_options_.max_bytes_for_level_base = 1000;  mutable_cf_options_.max_bytes_for_level_multiplier = 5;  Add(5, 1U, "1", "2", 500U);  vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);  ASSERT_EQ(0, logger_->log_count);  ASSERT_EQ(vstorage_.base_level(), 5);  Add(5, 2U, "3", "4", 550U);  vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);  ASSERT_EQ(0, logger_->log_count);  ASSERT_EQ(vstorage_.MaxBytesForLevel(4), 1000U);  ASSERT_EQ(vstorage_.base_level(), 4);  Add(4, 3U, "3", "4", 550U);  vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);  ASSERT_EQ(0, logger_->log_count);  ASSERT_EQ(vstorage_.MaxBytesForLevel(4), 1000U);  ASSERT_EQ(vstorage_.base_level(), 4);  Add(3, 4U, "3", "4", 250U);  Add(3, 5U, "5", "7", 300U);  vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);  ASSERT_EQ(1, logger_->log_count);  ASSERT_EQ(vstorage_.MaxBytesForLevel(4), 1005U);  ASSERT_EQ(vstorage_.MaxBytesForLevel(3), 1000U);  ASSERT_EQ(vstorage_.base_level(), 3);  Add(1, 6U, "3", "4", 5U);  Add(1, 7U, "8", "9", 5U);  logger_->log_count = 0;  vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);  ASSERT_EQ(1, logger_->log_count);  ASSERT_GT(vstorage_.MaxBytesForLevel(4), 1005U);  ASSERT_GT(vstorage_.MaxBytesForLevel(3), 1005U);  ASSERT_EQ(vstorage_.MaxBytesForLevel(2), 1005U);  ASSERT_EQ(vstorage_.MaxBytesForLevel(1), 1000U);  ASSERT_EQ(vstorage_.base_level(), 1);}TEST_F(VersionStorageInfoTest, MaxBytesForLevelDynamicLotsOfData) {  ioptions_.level_compaction_dynamic_level_bytes = true;  mutable_cf_options_.max_bytes_for_level_base = 100;  mutable_cf_options_.max_bytes_for_level_multiplier = 2;  Add(0, 1U, "1", "2", 50U);  Add(1, 2U, "1", "2", 50U);  Add(2, 3U, "1", "2", 500U);  Add(3, 4U, "1", "2", 500U);  Add(4, 5U, "1", "2", 1700U);  Add(5, 6U, "1", "2", 500U);  vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);  ASSERT_EQ(vstorage_.MaxBytesForLevel(4), 800U);  ASSERT_EQ(vstorage_.MaxBytesForLevel(3), 400U);  ASSERT_EQ(vstorage_.MaxBytesForLevel(2), 200U);  ASSERT_EQ(vstorage_.MaxBytesForLevel(1), 100U);  ASSERT_EQ(vstorage_.base_level(), 1);  ASSERT_EQ(0, logger_->log_count);}TEST_F(VersionStorageInfoTest, MaxBytesForLevelDynamicLargeLevel) {  uint64_t kOneGB = 1000U * 1000U * 1000U;  ioptions_.level_compaction_dynamic_level_bytes = true;  mutable_cf_options_.max_bytes_for_level_base = 10U * kOneGB;  mutable_cf_options_.max_bytes_for_level_multiplier = 10;  Add(0, 1U, "1", "2", 50U);  Add(3, 4U, "1", "2", 32U * kOneGB);  Add(4, 5U, "1", "2", 500U * kOneGB);  Add(5, 6U, "1", "2", 3000U * kOneGB);  vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);  ASSERT_EQ(vstorage_.MaxBytesForLevel(5), 3000U * kOneGB);  ASSERT_EQ(vstorage_.MaxBytesForLevel(4), 300U * kOneGB);  ASSERT_EQ(vstorage_.MaxBytesForLevel(3), 30U * kOneGB);  ASSERT_EQ(vstorage_.MaxBytesForLevel(2), 10U * kOneGB);  ASSERT_EQ(vstorage_.base_level(), 2);  ASSERT_EQ(0, logger_->log_count);}TEST_F(VersionStorageInfoTest, MaxBytesForLevelDynamicWithLargeL0_1) {  ioptions_.level_compaction_dynamic_level_bytes = true;  mutable_cf_options_.max_bytes_for_level_base = 40000;  mutable_cf_options_.max_bytes_for_level_multiplier = 5;  mutable_cf_options_.level0_file_num_compaction_trigger = 2;  Add(0, 1U, "1", "2", 10000U);  Add(0, 2U, "1", "2", 10000U);  Add(0, 3U, "1", "2", 10000U);  Add(5, 4U, "1", "2", 1286250U);  Add(4, 5U, "1", "2", 200000U);  Add(3, 6U, "1", "2", 40000U);  Add(2, 7U, "1", "2", 8000U);  vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);  ASSERT_EQ(0, logger_->log_count);  ASSERT_EQ(2, vstorage_.base_level());  // level multiplier should be 3.5  ASSERT_EQ(vstorage_.level_multiplier(), 5.0);  // Level size should be around 30,000, 105,000, 367,500  ASSERT_EQ(40000U, vstorage_.MaxBytesForLevel(2));  ASSERT_EQ(51450U, vstorage_.MaxBytesForLevel(3));  ASSERT_EQ(257250U, vstorage_.MaxBytesForLevel(4));}TEST_F(VersionStorageInfoTest, MaxBytesForLevelDynamicWithLargeL0_2) {  ioptions_.level_compaction_dynamic_level_bytes = true;  mutable_cf_options_.max_bytes_for_level_base = 10000;  mutable_cf_options_.max_bytes_for_level_multiplier = 5;  mutable_cf_options_.level0_file_num_compaction_trigger = 2;  Add(0, 11U, "1", "2", 10000U);  Add(0, 12U, "1", "2", 10000U);  Add(0, 13U, "1", "2", 10000U);  Add(5, 4U, "1", "2", 1286250U);  Add(4, 5U, "1", "2", 200000U);  Add(3, 6U, "1", "2", 40000U);  Add(2, 7U, "1", "2", 8000U);  vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);  ASSERT_EQ(0, logger_->log_count);  ASSERT_EQ(2, vstorage_.base_level());  // level multiplier should be 3.5  ASSERT_LT(vstorage_.level_multiplier(), 3.6);  ASSERT_GT(vstorage_.level_multiplier(), 3.4);  // Level size should be around 30,000, 105,000, 367,500  ASSERT_EQ(30000U, vstorage_.MaxBytesForLevel(2));  ASSERT_LT(vstorage_.MaxBytesForLevel(3), 110000U);  ASSERT_GT(vstorage_.MaxBytesForLevel(3), 100000U);  ASSERT_LT(vstorage_.MaxBytesForLevel(4), 370000U);  ASSERT_GT(vstorage_.MaxBytesForLevel(4), 360000U);}TEST_F(VersionStorageInfoTest, MaxBytesForLevelDynamicWithLargeL0_3) {  ioptions_.level_compaction_dynamic_level_bytes = true;  mutable_cf_options_.max_bytes_for_level_base = 10000;  mutable_cf_options_.max_bytes_for_level_multiplier = 5;  mutable_cf_options_.level0_file_num_compaction_trigger = 2;  Add(0, 11U, "1", "2", 5000U);  Add(0, 12U, "1", "2", 5000U);  Add(0, 13U, "1", "2", 5000U);  Add(0, 14U, "1", "2", 5000U);  Add(0, 15U, "1", "2", 5000U);  Add(0, 16U, "1", "2", 5000U);  Add(5, 4U, "1", "2", 1286250U);  Add(4, 5U, "1", "2", 200000U);  Add(3, 6U, "1", "2", 40000U);  Add(2, 7U, "1", "2", 8000U);  vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);  ASSERT_EQ(0, logger_->log_count);  ASSERT_EQ(2, vstorage_.base_level());  // level multiplier should be 3.5  ASSERT_LT(vstorage_.level_multiplier(), 3.6);  ASSERT_GT(vstorage_.level_multiplier(), 3.4);  // Level size should be around 30,000, 105,000, 367,500  ASSERT_EQ(30000U, vstorage_.MaxBytesForLevel(2));  ASSERT_LT(vstorage_.MaxBytesForLevel(3), 110000U);  ASSERT_GT(vstorage_.MaxBytesForLevel(3), 100000U);  ASSERT_LT(vstorage_.MaxBytesForLevel(4), 370000U);  ASSERT_GT(vstorage_.MaxBytesForLevel(4), 360000U);}TEST_F(VersionStorageInfoTest, EstimateLiveDataSize) {  // Test whether the overlaps are detected as expected  Add(1, 1U, "4", "7", 1U);  // Perfect overlap with last level  Add(2, 2U, "3", "5", 1U);  // Partial overlap with last level  Add(2, 3U, "6", "8", 1U);  // Partial overlap with last level  Add(3, 4U, "1", "9", 1U);  // Contains range of last level  Add(4, 5U, "4", "5", 1U);  // Inside range of last level  Add(4, 5U, "6", "7", 1U);  // Inside range of last level  Add(5, 6U, "4", "7", 10U);  ASSERT_EQ(10U, vstorage_.EstimateLiveDataSize());}TEST_F(VersionStorageInfoTest, EstimateLiveDataSize2) {  Add(0, 1U, "9", "9", 1U);  // Level 0 is not ordered  Add(0, 1U, "5", "6", 1U);  // Ignored because of [5,6] in l1  Add(1, 1U, "1", "2", 1U);  // Ignored because of [2,3] in l2  Add(1, 2U, "3", "4", 1U);  // Ignored because of [2,3] in l2  Add(1, 3U, "5", "6", 1U);  Add(2, 4U, "2", "3", 1U);  Add(3, 5U, "7", "8", 1U);  ASSERT_EQ(4U, vstorage_.EstimateLiveDataSize());}TEST_F(VersionStorageInfoTest, GetOverlappingInputs) {  // Two files that overlap at the range deletion tombstone sentinel.  Add(1, 1U, {"a", 0, kTypeValue}, {"b", kMaxSequenceNumber, kTypeRangeDeletion}, 1);  Add(1, 2U, {"b", 0, kTypeValue}, {"c", 0, kTypeValue}, 1);  // Two files that overlap at the same user key.  Add(1, 3U, {"d", 0, kTypeValue}, {"e", kMaxSequenceNumber, kTypeValue}, 1);  Add(1, 4U, {"e", 0, kTypeValue}, {"f", 0, kTypeValue}, 1);  // Two files that do not overlap.  Add(1, 5U, {"g", 0, kTypeValue}, {"h", 0, kTypeValue}, 1);  Add(1, 6U, {"i", 0, kTypeValue}, {"j", 0, kTypeValue}, 1);  vstorage_.UpdateNumNonEmptyLevels();  vstorage_.GenerateLevelFilesBrief();  ASSERT_EQ("1,2", GetOverlappingFiles(      1, {"a", 0, kTypeValue}, {"b", 0, kTypeValue}));  ASSERT_EQ("1", GetOverlappingFiles(      1, {"a", 0, kTypeValue}, {"b", kMaxSequenceNumber, kTypeRangeDeletion}));  ASSERT_EQ("2", GetOverlappingFiles(      1, {"b", kMaxSequenceNumber, kTypeValue}, {"c", 0, kTypeValue}));  ASSERT_EQ("3,4", GetOverlappingFiles(      1, {"d", 0, kTypeValue}, {"e", 0, kTypeValue}));  ASSERT_EQ("3", GetOverlappingFiles(      1, {"d", 0, kTypeValue}, {"e", kMaxSequenceNumber, kTypeRangeDeletion}));  ASSERT_EQ("3,4", GetOverlappingFiles(      1, {"e", kMaxSequenceNumber, kTypeValue}, {"f", 0, kTypeValue}));  ASSERT_EQ("3,4", GetOverlappingFiles(      1, {"e", 0, kTypeValue}, {"f", 0, kTypeValue}));  ASSERT_EQ("5", GetOverlappingFiles(      1, {"g", 0, kTypeValue}, {"h", 0, kTypeValue}));  ASSERT_EQ("6", GetOverlappingFiles(      1, {"i", 0, kTypeValue}, {"j", 0, kTypeValue}));}class FindLevelFileTest : public testing::Test { public:  LevelFilesBrief file_level_;  bool disjoint_sorted_files_;  Arena arena_;  FindLevelFileTest() : disjoint_sorted_files_(true) { }  ~FindLevelFileTest() override {}  void LevelFileInit(size_t num = 0) {    char* mem = arena_.AllocateAligned(num * sizeof(FdWithKeyRange));    file_level_.files = new (mem)FdWithKeyRange[num];    file_level_.num_files = 0;  }  void Add(const char* smallest, const char* largest,           SequenceNumber smallest_seq = 100,           SequenceNumber largest_seq = 100) {    InternalKey smallest_key = InternalKey(smallest, smallest_seq, kTypeValue);    InternalKey largest_key = InternalKey(largest, largest_seq, kTypeValue);    Slice smallest_slice = smallest_key.Encode();    Slice largest_slice = largest_key.Encode();    char* mem = arena_.AllocateAligned(        smallest_slice.size() + largest_slice.size());    memcpy(mem, smallest_slice.data(), smallest_slice.size());    memcpy(mem + smallest_slice.size(), largest_slice.data(),        largest_slice.size());    // add to file_level_    size_t num = file_level_.num_files;    auto& file = file_level_.files[num];    file.fd = FileDescriptor(num + 1, 0, 0);    file.smallest_key = Slice(mem, smallest_slice.size());    file.largest_key = Slice(mem + smallest_slice.size(),        largest_slice.size());    file_level_.num_files++;  }  int Find(const char* key) {    InternalKey target(key, 100, kTypeValue);    InternalKeyComparator cmp(BytewiseComparator());    return FindFile(cmp, file_level_, target.Encode());  }  bool Overlaps(const char* smallest, const char* largest) {    InternalKeyComparator cmp(BytewiseComparator());    Slice s(smallest != nullptr ? smallest : "");    Slice l(largest != nullptr ? largest : "");    return SomeFileOverlapsRange(cmp, disjoint_sorted_files_, file_level_,                                 (smallest != nullptr ? &s : nullptr),                                 (largest != nullptr ? &l : nullptr));  }};TEST_F(FindLevelFileTest, LevelEmpty) {  LevelFileInit(0);  ASSERT_EQ(0, Find("foo"));  ASSERT_TRUE(! Overlaps("a", "z"));  ASSERT_TRUE(! Overlaps(nullptr, "z"));  ASSERT_TRUE(! Overlaps("a", nullptr));  ASSERT_TRUE(! Overlaps(nullptr, nullptr));}TEST_F(FindLevelFileTest, LevelSingle) {  LevelFileInit(1);  Add("p", "q");  ASSERT_EQ(0, Find("a"));  ASSERT_EQ(0, Find("p"));  ASSERT_EQ(0, Find("p1"));  ASSERT_EQ(0, Find("q"));  ASSERT_EQ(1, Find("q1"));  ASSERT_EQ(1, Find("z"));  ASSERT_TRUE(! Overlaps("a", "b"));  ASSERT_TRUE(! Overlaps("z1", "z2"));  ASSERT_TRUE(Overlaps("a", "p"));  ASSERT_TRUE(Overlaps("a", "q"));  ASSERT_TRUE(Overlaps("a", "z"));  ASSERT_TRUE(Overlaps("p", "p1"));  ASSERT_TRUE(Overlaps("p", "q"));  ASSERT_TRUE(Overlaps("p", "z"));  ASSERT_TRUE(Overlaps("p1", "p2"));  ASSERT_TRUE(Overlaps("p1", "z"));  ASSERT_TRUE(Overlaps("q", "q"));  ASSERT_TRUE(Overlaps("q", "q1"));  ASSERT_TRUE(! Overlaps(nullptr, "j"));  ASSERT_TRUE(! Overlaps("r", nullptr));  ASSERT_TRUE(Overlaps(nullptr, "p"));  ASSERT_TRUE(Overlaps(nullptr, "p1"));  ASSERT_TRUE(Overlaps("q", nullptr));  ASSERT_TRUE(Overlaps(nullptr, nullptr));}TEST_F(FindLevelFileTest, LevelMultiple) {  LevelFileInit(4);  Add("150", "200");  Add("200", "250");  Add("300", "350");  Add("400", "450");  ASSERT_EQ(0, Find("100"));  ASSERT_EQ(0, Find("150"));  ASSERT_EQ(0, Find("151"));  ASSERT_EQ(0, Find("199"));  ASSERT_EQ(0, Find("200"));  ASSERT_EQ(1, Find("201"));  ASSERT_EQ(1, Find("249"));  ASSERT_EQ(1, Find("250"));  ASSERT_EQ(2, Find("251"));  ASSERT_EQ(2, Find("299"));  ASSERT_EQ(2, Find("300"));  ASSERT_EQ(2, Find("349"));  ASSERT_EQ(2, Find("350"));  ASSERT_EQ(3, Find("351"));  ASSERT_EQ(3, Find("400"));  ASSERT_EQ(3, Find("450"));  ASSERT_EQ(4, Find("451"));  ASSERT_TRUE(! Overlaps("100", "149"));  ASSERT_TRUE(! Overlaps("251", "299"));  ASSERT_TRUE(! Overlaps("451", "500"));  ASSERT_TRUE(! Overlaps("351", "399"));  ASSERT_TRUE(Overlaps("100", "150"));  ASSERT_TRUE(Overlaps("100", "200"));  ASSERT_TRUE(Overlaps("100", "300"));  ASSERT_TRUE(Overlaps("100", "400"));  ASSERT_TRUE(Overlaps("100", "500"));  ASSERT_TRUE(Overlaps("375", "400"));  ASSERT_TRUE(Overlaps("450", "450"));  ASSERT_TRUE(Overlaps("450", "500"));}TEST_F(FindLevelFileTest, LevelMultipleNullBoundaries) {  LevelFileInit(4);  Add("150", "200");  Add("200", "250");  Add("300", "350");  Add("400", "450");  ASSERT_TRUE(! Overlaps(nullptr, "149"));  ASSERT_TRUE(! Overlaps("451", nullptr));  ASSERT_TRUE(Overlaps(nullptr, nullptr));  ASSERT_TRUE(Overlaps(nullptr, "150"));  ASSERT_TRUE(Overlaps(nullptr, "199"));  ASSERT_TRUE(Overlaps(nullptr, "200"));  ASSERT_TRUE(Overlaps(nullptr, "201"));  ASSERT_TRUE(Overlaps(nullptr, "400"));  ASSERT_TRUE(Overlaps(nullptr, "800"));  ASSERT_TRUE(Overlaps("100", nullptr));  ASSERT_TRUE(Overlaps("200", nullptr));  ASSERT_TRUE(Overlaps("449", nullptr));  ASSERT_TRUE(Overlaps("450", nullptr));}TEST_F(FindLevelFileTest, LevelOverlapSequenceChecks) {  LevelFileInit(1);  Add("200", "200", 5000, 3000);  ASSERT_TRUE(! Overlaps("199", "199"));  ASSERT_TRUE(! Overlaps("201", "300"));  ASSERT_TRUE(Overlaps("200", "200"));  ASSERT_TRUE(Overlaps("190", "200"));  ASSERT_TRUE(Overlaps("200", "210"));}TEST_F(FindLevelFileTest, LevelOverlappingFiles) {  LevelFileInit(2);  Add("150", "600");  Add("400", "500");  disjoint_sorted_files_ = false;  ASSERT_TRUE(! Overlaps("100", "149"));  ASSERT_TRUE(! Overlaps("601", "700"));  ASSERT_TRUE(Overlaps("100", "150"));  ASSERT_TRUE(Overlaps("100", "200"));  ASSERT_TRUE(Overlaps("100", "300"));  ASSERT_TRUE(Overlaps("100", "400"));  ASSERT_TRUE(Overlaps("100", "500"));  ASSERT_TRUE(Overlaps("375", "400"));  ASSERT_TRUE(Overlaps("450", "450"));  ASSERT_TRUE(Overlaps("450", "500"));  ASSERT_TRUE(Overlaps("450", "700"));  ASSERT_TRUE(Overlaps("600", "700"));}class VersionSetTestBase { public:  const static std::string kColumnFamilyName1;  const static std::string kColumnFamilyName2;  const static std::string kColumnFamilyName3;  int num_initial_edits_;  VersionSetTestBase()      : env_(Env::Default()),        fs_(std::make_shared<LegacyFileSystemWrapper>(env_)),        dbname_(test::PerThreadDBPath("version_set_test")),        db_options_(),        mutable_cf_options_(cf_options_),        table_cache_(NewLRUCache(50000, 16)),        write_buffer_manager_(db_options_.db_write_buffer_size),        shutting_down_(false),        mock_table_factory_(std::make_shared<mock::MockTableFactory>()) {    EXPECT_OK(env_->CreateDirIfMissing(dbname_));    db_options_.env = env_;    db_options_.fs = fs_;    versions_.reset(new VersionSet(dbname_, &db_options_, env_options_,                                   table_cache_.get(), &write_buffer_manager_,                                   &write_controller_,                                   /*block_cache_tracer=*/nullptr)),        reactive_versions_ = std::make_shared<ReactiveVersionSet>(            dbname_, &db_options_, env_options_, table_cache_.get(),            &write_buffer_manager_, &write_controller_);    db_options_.db_paths.emplace_back(dbname_,                                      std::numeric_limits<uint64_t>::max());  }  void PrepareManifest(std::vector<ColumnFamilyDescriptor>* column_families,                       SequenceNumber* last_seqno,                       std::unique_ptr<log::Writer>* log_writer) {    assert(column_families != nullptr);    assert(last_seqno != nullptr);    assert(log_writer != nullptr);    VersionEdit new_db;    if (db_options_.write_dbid_to_manifest) {      DBImpl* impl = new DBImpl(DBOptions(), dbname_);      std::string db_id;      impl->GetDbIdentityFromIdentityFile(&db_id);      new_db.SetDBId(db_id);    }    new_db.SetLogNumber(0);    new_db.SetNextFile(2);    new_db.SetLastSequence(0);    const std::vector<std::string> cf_names = {        kDefaultColumnFamilyName, kColumnFamilyName1, kColumnFamilyName2,        kColumnFamilyName3};    const int kInitialNumOfCfs = static_cast<int>(cf_names.size());    autovector<VersionEdit> new_cfs;    uint64_t last_seq = 1;    uint32_t cf_id = 1;    for (int i = 1; i != kInitialNumOfCfs; ++i) {      VersionEdit new_cf;      new_cf.AddColumnFamily(cf_names[i]);      new_cf.SetColumnFamily(cf_id++);      new_cf.SetLogNumber(0);      new_cf.SetNextFile(2);      new_cf.SetLastSequence(last_seq++);      new_cfs.emplace_back(new_cf);    }    *last_seqno = last_seq;    num_initial_edits_ = static_cast<int>(new_cfs.size() + 1);    const std::string manifest = DescriptorFileName(dbname_, 1);    std::unique_ptr<WritableFile> file;    Status s = env_->NewWritableFile(        manifest, &file, env_->OptimizeForManifestWrite(env_options_));    ASSERT_OK(s);    std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(        NewLegacyWritableFileWrapper(std::move(file)), manifest, env_options_));    {      log_writer->reset(new log::Writer(std::move(file_writer), 0, false));      std::string record;      new_db.EncodeTo(&record);      s = (*log_writer)->AddRecord(record);      for (const auto& e : new_cfs) {        record.clear();        e.EncodeTo(&record);        s = (*log_writer)->AddRecord(record);        ASSERT_OK(s);      }    }    ASSERT_OK(s);    cf_options_.table_factory = mock_table_factory_;    for (const auto& cf_name : cf_names) {      column_families->emplace_back(cf_name, cf_options_);    }  }  // Create DB with 3 column families.  void NewDB() {    std::vector<ColumnFamilyDescriptor> column_families;    SequenceNumber last_seqno;    std::unique_ptr<log::Writer> log_writer;    SetIdentityFile(env_, dbname_);    PrepareManifest(&column_families, &last_seqno, &log_writer);    log_writer.reset();    // Make "CURRENT" file point to the new manifest file.    Status s = SetCurrentFile(env_, dbname_, 1, nullptr);    ASSERT_OK(s);    EXPECT_OK(versions_->Recover(column_families, false));    EXPECT_EQ(column_families.size(),              versions_->GetColumnFamilySet()->NumberOfColumnFamilies());  }  Env* env_;  std::shared_ptr<FileSystem> fs_;  const std::string dbname_;  EnvOptions env_options_;  ImmutableDBOptions db_options_;  ColumnFamilyOptions cf_options_;  MutableCFOptions mutable_cf_options_;  std::shared_ptr<Cache> table_cache_;  WriteController write_controller_;  WriteBufferManager write_buffer_manager_;  std::shared_ptr<VersionSet> versions_;  std::shared_ptr<ReactiveVersionSet> reactive_versions_;  InstrumentedMutex mutex_;  std::atomic<bool> shutting_down_;  std::shared_ptr<mock::MockTableFactory> mock_table_factory_;};const std::string VersionSetTestBase::kColumnFamilyName1 = "alice";const std::string VersionSetTestBase::kColumnFamilyName2 = "bob";const std::string VersionSetTestBase::kColumnFamilyName3 = "charles";class VersionSetTest : public VersionSetTestBase, public testing::Test { public:  VersionSetTest() : VersionSetTestBase() {}};TEST_F(VersionSetTest, SameColumnFamilyGroupCommit) {  NewDB();  const int kGroupSize = 5;  autovector<VersionEdit> edits;  for (int i = 0; i != kGroupSize; ++i) {    edits.emplace_back(VersionEdit());  }  autovector<ColumnFamilyData*> cfds;  autovector<const MutableCFOptions*> all_mutable_cf_options;  autovector<autovector<VersionEdit*>> edit_lists;  for (int i = 0; i != kGroupSize; ++i) {    cfds.emplace_back(versions_->GetColumnFamilySet()->GetDefault());    all_mutable_cf_options.emplace_back(&mutable_cf_options_);    autovector<VersionEdit*> edit_list;    edit_list.emplace_back(&edits[i]);    edit_lists.emplace_back(edit_list);  }  SyncPoint::GetInstance()->DisableProcessing();  SyncPoint::GetInstance()->ClearAllCallBacks();  int count = 0;  SyncPoint::GetInstance()->SetCallBack(      "VersionSet::ProcessManifestWrites:SameColumnFamily", [&](void* arg) {        uint32_t* cf_id = reinterpret_cast<uint32_t*>(arg);        EXPECT_EQ(0u, *cf_id);        ++count;      });  SyncPoint::GetInstance()->EnableProcessing();  mutex_.Lock();  Status s =      versions_->LogAndApply(cfds, all_mutable_cf_options, edit_lists, &mutex_);  mutex_.Unlock();  EXPECT_OK(s);  EXPECT_EQ(kGroupSize - 1, count);}class VersionSetAtomicGroupTest : public VersionSetTestBase,                                  public testing::Test { public:  VersionSetAtomicGroupTest() : VersionSetTestBase() {}  void SetUp() override {    PrepareManifest(&column_families_, &last_seqno_, &log_writer_);    SetupTestSyncPoints();  }  void SetupValidAtomicGroup(int atomic_group_size) {    edits_.resize(atomic_group_size);    int remaining = atomic_group_size;    for (size_t i = 0; i != edits_.size(); ++i) {      edits_[i].SetLogNumber(0);      edits_[i].SetNextFile(2);      edits_[i].MarkAtomicGroup(--remaining);      edits_[i].SetLastSequence(last_seqno_++);    }    ASSERT_OK(SetCurrentFile(env_, dbname_, 1, nullptr));  }  void SetupIncompleteTrailingAtomicGroup(int atomic_group_size) {    edits_.resize(atomic_group_size);    int remaining = atomic_group_size;    for (size_t i = 0; i != edits_.size(); ++i) {      edits_[i].SetLogNumber(0);      edits_[i].SetNextFile(2);      edits_[i].MarkAtomicGroup(--remaining);      edits_[i].SetLastSequence(last_seqno_++);    }    ASSERT_OK(SetCurrentFile(env_, dbname_, 1, nullptr));  }  void SetupCorruptedAtomicGroup(int atomic_group_size) {    edits_.resize(atomic_group_size);    int remaining = atomic_group_size;    for (size_t i = 0; i != edits_.size(); ++i) {      edits_[i].SetLogNumber(0);      edits_[i].SetNextFile(2);      if (i != ((size_t)atomic_group_size / 2)) {        edits_[i].MarkAtomicGroup(--remaining);      }      edits_[i].SetLastSequence(last_seqno_++);    }    ASSERT_OK(SetCurrentFile(env_, dbname_, 1, nullptr));  }  void SetupIncorrectAtomicGroup(int atomic_group_size) {    edits_.resize(atomic_group_size);    int remaining = atomic_group_size;    for (size_t i = 0; i != edits_.size(); ++i) {      edits_[i].SetLogNumber(0);      edits_[i].SetNextFile(2);      if (i != 1) {        edits_[i].MarkAtomicGroup(--remaining);      } else {        edits_[i].MarkAtomicGroup(remaining--);      }      edits_[i].SetLastSequence(last_seqno_++);    }    ASSERT_OK(SetCurrentFile(env_, dbname_, 1, nullptr));  }  void SetupTestSyncPoints() {    SyncPoint::GetInstance()->DisableProcessing();    SyncPoint::GetInstance()->ClearAllCallBacks();    SyncPoint::GetInstance()->SetCallBack(        "AtomicGroupReadBuffer::AddEdit:FirstInAtomicGroup", [&](void* arg) {          VersionEdit* e = reinterpret_cast<VersionEdit*>(arg);          EXPECT_EQ(edits_.front().DebugString(),                    e->DebugString());  // compare based on value          first_in_atomic_group_ = true;        });    SyncPoint::GetInstance()->SetCallBack(        "AtomicGroupReadBuffer::AddEdit:LastInAtomicGroup", [&](void* arg) {          VersionEdit* e = reinterpret_cast<VersionEdit*>(arg);          EXPECT_EQ(edits_.back().DebugString(),                    e->DebugString());  // compare based on value          EXPECT_TRUE(first_in_atomic_group_);          last_in_atomic_group_ = true;        });    SyncPoint::GetInstance()->SetCallBack(        "VersionSet::ReadAndRecover:RecoveredEdits", [&](void* arg) {          num_recovered_edits_ = *reinterpret_cast<int*>(arg);        });    SyncPoint::GetInstance()->SetCallBack(        "ReactiveVersionSet::ReadAndApply:AppliedEdits",        [&](void* arg) { num_applied_edits_ = *reinterpret_cast<int*>(arg); });    SyncPoint::GetInstance()->SetCallBack(        "AtomicGroupReadBuffer::AddEdit:AtomicGroup",        [&](void* /* arg */) { ++num_edits_in_atomic_group_; });    SyncPoint::GetInstance()->SetCallBack(        "AtomicGroupReadBuffer::AddEdit:AtomicGroupMixedWithNormalEdits",        [&](void* arg) {          corrupted_edit_ = *reinterpret_cast<VersionEdit*>(arg);        });    SyncPoint::GetInstance()->SetCallBack(        "AtomicGroupReadBuffer::AddEdit:IncorrectAtomicGroupSize",        [&](void* arg) {          edit_with_incorrect_group_size_ =              *reinterpret_cast<VersionEdit*>(arg);        });    SyncPoint::GetInstance()->EnableProcessing();  }  void AddNewEditsToLog(int num_edits) {    for (int i = 0; i < num_edits; i++) {      std::string record;      edits_[i].EncodeTo(&record);      ASSERT_OK(log_writer_->AddRecord(record));    }  }  void TearDown() override {    SyncPoint::GetInstance()->DisableProcessing();    SyncPoint::GetInstance()->ClearAllCallBacks();    log_writer_.reset();  } protected:  std::vector<ColumnFamilyDescriptor> column_families_;  SequenceNumber last_seqno_;  std::vector<VersionEdit> edits_;  bool first_in_atomic_group_ = false;  bool last_in_atomic_group_ = false;  int num_edits_in_atomic_group_ = 0;  int num_recovered_edits_ = 0;  int num_applied_edits_ = 0;  VersionEdit corrupted_edit_;  VersionEdit edit_with_incorrect_group_size_;  std::unique_ptr<log::Writer> log_writer_;};TEST_F(VersionSetAtomicGroupTest, HandleValidAtomicGroupWithVersionSetRecover) {  const int kAtomicGroupSize = 3;  SetupValidAtomicGroup(kAtomicGroupSize);  AddNewEditsToLog(kAtomicGroupSize);  EXPECT_OK(versions_->Recover(column_families_, false));  EXPECT_EQ(column_families_.size(),            versions_->GetColumnFamilySet()->NumberOfColumnFamilies());  EXPECT_TRUE(first_in_atomic_group_);  EXPECT_TRUE(last_in_atomic_group_);  EXPECT_EQ(num_initial_edits_ + kAtomicGroupSize, num_recovered_edits_);  EXPECT_EQ(0, num_applied_edits_);}TEST_F(VersionSetAtomicGroupTest,       HandleValidAtomicGroupWithReactiveVersionSetRecover) {  const int kAtomicGroupSize = 3;  SetupValidAtomicGroup(kAtomicGroupSize);  AddNewEditsToLog(kAtomicGroupSize);  std::unique_ptr<log::FragmentBufferedReader> manifest_reader;  std::unique_ptr<log::Reader::Reporter> manifest_reporter;  std::unique_ptr<Status> manifest_reader_status;  EXPECT_OK(reactive_versions_->Recover(column_families_, &manifest_reader,                                        &manifest_reporter,                                        &manifest_reader_status));  EXPECT_EQ(column_families_.size(),            reactive_versions_->GetColumnFamilySet()->NumberOfColumnFamilies());  EXPECT_TRUE(first_in_atomic_group_);  EXPECT_TRUE(last_in_atomic_group_);  // The recover should clean up the replay buffer.  EXPECT_TRUE(reactive_versions_->TEST_read_edits_in_atomic_group() == 0);  EXPECT_TRUE(reactive_versions_->replay_buffer().size() == 0);  EXPECT_EQ(num_initial_edits_ + kAtomicGroupSize, num_recovered_edits_);  EXPECT_EQ(0, num_applied_edits_);}TEST_F(VersionSetAtomicGroupTest,       HandleValidAtomicGroupWithReactiveVersionSetReadAndApply) {  const int kAtomicGroupSize = 3;  SetupValidAtomicGroup(kAtomicGroupSize);  std::unique_ptr<log::FragmentBufferedReader> manifest_reader;  std::unique_ptr<log::Reader::Reporter> manifest_reporter;  std::unique_ptr<Status> manifest_reader_status;  EXPECT_OK(reactive_versions_->Recover(column_families_, &manifest_reader,                                        &manifest_reporter,                                        &manifest_reader_status));  AddNewEditsToLog(kAtomicGroupSize);  InstrumentedMutex mu;  std::unordered_set<ColumnFamilyData*> cfds_changed;  mu.Lock();  EXPECT_OK(      reactive_versions_->ReadAndApply(&mu, &manifest_reader, &cfds_changed));  mu.Unlock();  EXPECT_TRUE(first_in_atomic_group_);  EXPECT_TRUE(last_in_atomic_group_);  // The recover should clean up the replay buffer.  EXPECT_TRUE(reactive_versions_->TEST_read_edits_in_atomic_group() == 0);  EXPECT_TRUE(reactive_versions_->replay_buffer().size() == 0);  EXPECT_EQ(num_initial_edits_, num_recovered_edits_);  EXPECT_EQ(kAtomicGroupSize, num_applied_edits_);}TEST_F(VersionSetAtomicGroupTest,       HandleIncompleteTrailingAtomicGroupWithVersionSetRecover) {  const int kAtomicGroupSize = 4;  const int kNumberOfPersistedVersionEdits = kAtomicGroupSize - 1;  SetupIncompleteTrailingAtomicGroup(kAtomicGroupSize);  AddNewEditsToLog(kNumberOfPersistedVersionEdits);  EXPECT_OK(versions_->Recover(column_families_, false));  EXPECT_EQ(column_families_.size(),            versions_->GetColumnFamilySet()->NumberOfColumnFamilies());  EXPECT_TRUE(first_in_atomic_group_);  EXPECT_FALSE(last_in_atomic_group_);  EXPECT_EQ(kNumberOfPersistedVersionEdits, num_edits_in_atomic_group_);  EXPECT_EQ(num_initial_edits_, num_recovered_edits_);  EXPECT_EQ(0, num_applied_edits_);}TEST_F(VersionSetAtomicGroupTest,       HandleIncompleteTrailingAtomicGroupWithReactiveVersionSetRecover) {  const int kAtomicGroupSize = 4;  const int kNumberOfPersistedVersionEdits = kAtomicGroupSize - 1;  SetupIncompleteTrailingAtomicGroup(kAtomicGroupSize);  AddNewEditsToLog(kNumberOfPersistedVersionEdits);  std::unique_ptr<log::FragmentBufferedReader> manifest_reader;  std::unique_ptr<log::Reader::Reporter> manifest_reporter;  std::unique_ptr<Status> manifest_reader_status;  EXPECT_OK(reactive_versions_->Recover(column_families_, &manifest_reader,                                        &manifest_reporter,                                        &manifest_reader_status));  EXPECT_EQ(column_families_.size(),            reactive_versions_->GetColumnFamilySet()->NumberOfColumnFamilies());  EXPECT_TRUE(first_in_atomic_group_);  EXPECT_FALSE(last_in_atomic_group_);  EXPECT_EQ(kNumberOfPersistedVersionEdits, num_edits_in_atomic_group_);  // Reactive version set should store the edits in the replay buffer.  EXPECT_TRUE(reactive_versions_->TEST_read_edits_in_atomic_group() ==              kNumberOfPersistedVersionEdits);  EXPECT_TRUE(reactive_versions_->replay_buffer().size() == kAtomicGroupSize);  // Write the last record. The reactive version set should now apply all  // edits.  std::string last_record;  edits_[kAtomicGroupSize - 1].EncodeTo(&last_record);  EXPECT_OK(log_writer_->AddRecord(last_record));  InstrumentedMutex mu;  std::unordered_set<ColumnFamilyData*> cfds_changed;  mu.Lock();  EXPECT_OK(      reactive_versions_->ReadAndApply(&mu, &manifest_reader, &cfds_changed));  mu.Unlock();  // Reactive version set should be empty now.  EXPECT_TRUE(reactive_versions_->TEST_read_edits_in_atomic_group() == 0);  EXPECT_TRUE(reactive_versions_->replay_buffer().size() == 0);  EXPECT_EQ(num_initial_edits_, num_recovered_edits_);  EXPECT_EQ(kAtomicGroupSize, num_applied_edits_);}TEST_F(VersionSetAtomicGroupTest,       HandleIncompleteTrailingAtomicGroupWithReactiveVersionSetReadAndApply) {  const int kAtomicGroupSize = 4;  const int kNumberOfPersistedVersionEdits = kAtomicGroupSize - 1;  SetupIncompleteTrailingAtomicGroup(kAtomicGroupSize);  std::unique_ptr<log::FragmentBufferedReader> manifest_reader;  std::unique_ptr<log::Reader::Reporter> manifest_reporter;  std::unique_ptr<Status> manifest_reader_status;  // No edits in an atomic group.  EXPECT_OK(reactive_versions_->Recover(column_families_, &manifest_reader,                                        &manifest_reporter,                                        &manifest_reader_status));  EXPECT_EQ(column_families_.size(),            reactive_versions_->GetColumnFamilySet()->NumberOfColumnFamilies());  // Write a few edits in an atomic group.  AddNewEditsToLog(kNumberOfPersistedVersionEdits);  InstrumentedMutex mu;  std::unordered_set<ColumnFamilyData*> cfds_changed;  mu.Lock();  EXPECT_OK(      reactive_versions_->ReadAndApply(&mu, &manifest_reader, &cfds_changed));  mu.Unlock();  EXPECT_TRUE(first_in_atomic_group_);  EXPECT_FALSE(last_in_atomic_group_);  EXPECT_EQ(kNumberOfPersistedVersionEdits, num_edits_in_atomic_group_);  // Reactive version set should store the edits in the replay buffer.  EXPECT_TRUE(reactive_versions_->TEST_read_edits_in_atomic_group() ==              kNumberOfPersistedVersionEdits);  EXPECT_TRUE(reactive_versions_->replay_buffer().size() == kAtomicGroupSize);  EXPECT_EQ(num_initial_edits_, num_recovered_edits_);  EXPECT_EQ(0, num_applied_edits_);}TEST_F(VersionSetAtomicGroupTest,       HandleCorruptedAtomicGroupWithVersionSetRecover) {  const int kAtomicGroupSize = 4;  SetupCorruptedAtomicGroup(kAtomicGroupSize);  AddNewEditsToLog(kAtomicGroupSize);  EXPECT_NOK(versions_->Recover(column_families_, false));  EXPECT_EQ(column_families_.size(),            versions_->GetColumnFamilySet()->NumberOfColumnFamilies());  EXPECT_EQ(edits_[kAtomicGroupSize / 2].DebugString(),            corrupted_edit_.DebugString());}TEST_F(VersionSetAtomicGroupTest,       HandleCorruptedAtomicGroupWithReactiveVersionSetRecover) {  const int kAtomicGroupSize = 4;  SetupCorruptedAtomicGroup(kAtomicGroupSize);  AddNewEditsToLog(kAtomicGroupSize);  std::unique_ptr<log::FragmentBufferedReader> manifest_reader;  std::unique_ptr<log::Reader::Reporter> manifest_reporter;  std::unique_ptr<Status> manifest_reader_status;  EXPECT_NOK(reactive_versions_->Recover(column_families_, &manifest_reader,                                         &manifest_reporter,                                         &manifest_reader_status));  EXPECT_EQ(column_families_.size(),            reactive_versions_->GetColumnFamilySet()->NumberOfColumnFamilies());  EXPECT_EQ(edits_[kAtomicGroupSize / 2].DebugString(),            corrupted_edit_.DebugString());}TEST_F(VersionSetAtomicGroupTest,       HandleCorruptedAtomicGroupWithReactiveVersionSetReadAndApply) {  const int kAtomicGroupSize = 4;  SetupCorruptedAtomicGroup(kAtomicGroupSize);  InstrumentedMutex mu;  std::unordered_set<ColumnFamilyData*> cfds_changed;  std::unique_ptr<log::FragmentBufferedReader> manifest_reader;  std::unique_ptr<log::Reader::Reporter> manifest_reporter;  std::unique_ptr<Status> manifest_reader_status;  EXPECT_OK(reactive_versions_->Recover(column_families_, &manifest_reader,                                        &manifest_reporter,                                        &manifest_reader_status));  // Write the corrupted edits.  AddNewEditsToLog(kAtomicGroupSize);  mu.Lock();  EXPECT_OK(      reactive_versions_->ReadAndApply(&mu, &manifest_reader, &cfds_changed));  mu.Unlock();  EXPECT_EQ(edits_[kAtomicGroupSize / 2].DebugString(),            corrupted_edit_.DebugString());}TEST_F(VersionSetAtomicGroupTest,       HandleIncorrectAtomicGroupSizeWithVersionSetRecover) {  const int kAtomicGroupSize = 4;  SetupIncorrectAtomicGroup(kAtomicGroupSize);  AddNewEditsToLog(kAtomicGroupSize);  EXPECT_NOK(versions_->Recover(column_families_, false));  EXPECT_EQ(column_families_.size(),            versions_->GetColumnFamilySet()->NumberOfColumnFamilies());  EXPECT_EQ(edits_[1].DebugString(),            edit_with_incorrect_group_size_.DebugString());}TEST_F(VersionSetAtomicGroupTest,       HandleIncorrectAtomicGroupSizeWithReactiveVersionSetRecover) {  const int kAtomicGroupSize = 4;  SetupIncorrectAtomicGroup(kAtomicGroupSize);  AddNewEditsToLog(kAtomicGroupSize);  std::unique_ptr<log::FragmentBufferedReader> manifest_reader;  std::unique_ptr<log::Reader::Reporter> manifest_reporter;  std::unique_ptr<Status> manifest_reader_status;  EXPECT_NOK(reactive_versions_->Recover(column_families_, &manifest_reader,                                         &manifest_reporter,                                         &manifest_reader_status));  EXPECT_EQ(column_families_.size(),            reactive_versions_->GetColumnFamilySet()->NumberOfColumnFamilies());  EXPECT_EQ(edits_[1].DebugString(),            edit_with_incorrect_group_size_.DebugString());}TEST_F(VersionSetAtomicGroupTest,       HandleIncorrectAtomicGroupSizeWithReactiveVersionSetReadAndApply) {  const int kAtomicGroupSize = 4;  SetupIncorrectAtomicGroup(kAtomicGroupSize);  InstrumentedMutex mu;  std::unordered_set<ColumnFamilyData*> cfds_changed;  std::unique_ptr<log::FragmentBufferedReader> manifest_reader;  std::unique_ptr<log::Reader::Reporter> manifest_reporter;  std::unique_ptr<Status> manifest_reader_status;  EXPECT_OK(reactive_versions_->Recover(column_families_, &manifest_reader,                                        &manifest_reporter,                                        &manifest_reader_status));  AddNewEditsToLog(kAtomicGroupSize);  mu.Lock();  EXPECT_OK(      reactive_versions_->ReadAndApply(&mu, &manifest_reader, &cfds_changed));  mu.Unlock();  EXPECT_EQ(edits_[1].DebugString(),            edit_with_incorrect_group_size_.DebugString());}class VersionSetTestDropOneCF : public VersionSetTestBase,                                public testing::TestWithParam<std::string> { public:  VersionSetTestDropOneCF() : VersionSetTestBase() {}};// This test simulates the following execution sequence// Time  thread1                  bg_flush_thr//  |                             Prepare version edits (e1,e2,e3) for atomic//  |                             flush cf1, cf2, cf3//  |    Enqueue e to drop cfi//  |    to manifest_writers_//  |                             Enqueue (e1,e2,e3) to manifest_writers_//  |//  |    Apply e,//  |    cfi.IsDropped() is true//  |                             Apply (e1,e2,e3),//  |                             since cfi.IsDropped() == true, we need to//  |                             drop ei and write the rest to MANIFEST.//  V////  Repeat the test for i = 1, 2, 3 to simulate dropping the first, middle and//  last column family in an atomic group.TEST_P(VersionSetTestDropOneCF, HandleDroppedColumnFamilyInAtomicGroup) {  std::vector<ColumnFamilyDescriptor> column_families;  SequenceNumber last_seqno;  std::unique_ptr<log::Writer> log_writer;  PrepareManifest(&column_families, &last_seqno, &log_writer);  Status s = SetCurrentFile(env_, dbname_, 1, nullptr);  ASSERT_OK(s);  EXPECT_OK(versions_->Recover(column_families, false /* read_only */));  EXPECT_EQ(column_families.size(),            versions_->GetColumnFamilySet()->NumberOfColumnFamilies());  const int kAtomicGroupSize = 3;  const std::vector<std::string> non_default_cf_names = {      kColumnFamilyName1, kColumnFamilyName2, kColumnFamilyName3};  // Drop one column family  VersionEdit drop_cf_edit;  drop_cf_edit.DropColumnFamily();  const std::string cf_to_drop_name(GetParam());  auto cfd_to_drop =      versions_->GetColumnFamilySet()->GetColumnFamily(cf_to_drop_name);  ASSERT_NE(nullptr, cfd_to_drop);  // Increase its refcount because cfd_to_drop is used later, and we need to  // prevent it from being deleted.  cfd_to_drop->Ref();  drop_cf_edit.SetColumnFamily(cfd_to_drop->GetID());  mutex_.Lock();  s = versions_->LogAndApply(cfd_to_drop,                             *cfd_to_drop->GetLatestMutableCFOptions(),                             &drop_cf_edit, &mutex_);  mutex_.Unlock();  ASSERT_OK(s);  std::vector<VersionEdit> edits(kAtomicGroupSize);  uint32_t remaining = kAtomicGroupSize;  size_t i = 0;  autovector<ColumnFamilyData*> cfds;  autovector<const MutableCFOptions*> mutable_cf_options_list;  autovector<autovector<VersionEdit*>> edit_lists;  for (const auto& cf_name : non_default_cf_names) {    auto cfd = (cf_name != cf_to_drop_name)                   ? versions_->GetColumnFamilySet()->GetColumnFamily(cf_name)                   : cfd_to_drop;    ASSERT_NE(nullptr, cfd);    cfds.push_back(cfd);    mutable_cf_options_list.emplace_back(cfd->GetLatestMutableCFOptions());    edits[i].SetColumnFamily(cfd->GetID());    edits[i].SetLogNumber(0);    edits[i].SetNextFile(2);    edits[i].MarkAtomicGroup(--remaining);    edits[i].SetLastSequence(last_seqno++);    autovector<VersionEdit*> tmp_edits;    tmp_edits.push_back(&edits[i]);    edit_lists.emplace_back(tmp_edits);    ++i;  }  int called = 0;  SyncPoint::GetInstance()->DisableProcessing();  SyncPoint::GetInstance()->ClearAllCallBacks();  SyncPoint::GetInstance()->SetCallBack(      "VersionSet::ProcessManifestWrites:CheckOneAtomicGroup", [&](void* arg) {        std::vector<VersionEdit*>* tmp_edits =            reinterpret_cast<std::vector<VersionEdit*>*>(arg);        EXPECT_EQ(kAtomicGroupSize - 1, tmp_edits->size());        for (const auto e : *tmp_edits) {          bool found = false;          for (const auto& e2 : edits) {            if (&e2 == e) {              found = true;              break;            }          }          ASSERT_TRUE(found);        }        ++called;      });  SyncPoint::GetInstance()->EnableProcessing();  mutex_.Lock();  s = versions_->LogAndApply(cfds, mutable_cf_options_list, edit_lists,                             &mutex_);  mutex_.Unlock();  ASSERT_OK(s);  ASSERT_EQ(1, called);  if (cfd_to_drop->Unref()) {    delete cfd_to_drop;    cfd_to_drop = nullptr;  }}INSTANTIATE_TEST_CASE_P(    AtomicGroup, VersionSetTestDropOneCF,    testing::Values(VersionSetTestBase::kColumnFamilyName1,                    VersionSetTestBase::kColumnFamilyName2,                    VersionSetTestBase::kColumnFamilyName3));}  // namespace ROCKSDB_NAMESPACEint main(int argc, char** argv) {  ::testing::InitGoogleTest(&argc, argv);  return RUN_ALL_TESTS();}
 |