| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922 |
- // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
- // This source code is licensed under both the GPLv2 (found in the
- // COPYING file in the root directory) and Apache 2.0 License
- // (found in the LICENSE.Apache file in the root directory).
- #include "db/memtable_list.h"
- #include <algorithm>
- #include <string>
- #include <vector>
- #include "db/merge_context.h"
- #include "db/version_set.h"
- #include "db/write_controller.h"
- #include "rocksdb/db.h"
- #include "rocksdb/status.h"
- #include "rocksdb/write_buffer_manager.h"
- #include "test_util/testharness.h"
- #include "test_util/testutil.h"
- #include "util/string_util.h"
- namespace ROCKSDB_NAMESPACE {
- class MemTableListTest : public testing::Test {
- public:
- std::string dbname;
- DB* db;
- Options options;
- std::vector<ColumnFamilyHandle*> handles;
- std::atomic<uint64_t> file_number;
- MemTableListTest() : db(nullptr), file_number(1) {
- dbname = test::PerThreadDBPath("memtable_list_test");
- options.create_if_missing = true;
- DestroyDB(dbname, options);
- }
- // Create a test db if not yet created
- void CreateDB() {
- if (db == nullptr) {
- options.create_if_missing = true;
- DestroyDB(dbname, options);
- // Open DB only with default column family
- ColumnFamilyOptions cf_options;
- std::vector<ColumnFamilyDescriptor> cf_descs;
- cf_descs.emplace_back(kDefaultColumnFamilyName, cf_options);
- Status s = DB::Open(options, dbname, cf_descs, &handles, &db);
- EXPECT_OK(s);
- ColumnFamilyOptions cf_opt1, cf_opt2;
- cf_opt1.cf_paths.emplace_back(dbname + "_one_1",
- std::numeric_limits<uint64_t>::max());
- cf_opt2.cf_paths.emplace_back(dbname + "_two_1",
- std::numeric_limits<uint64_t>::max());
- int sz = static_cast<int>(handles.size());
- handles.resize(sz + 2);
- s = db->CreateColumnFamily(cf_opt1, "one", &handles[1]);
- EXPECT_OK(s);
- s = db->CreateColumnFamily(cf_opt2, "two", &handles[2]);
- EXPECT_OK(s);
- cf_descs.emplace_back("one", cf_options);
- cf_descs.emplace_back("two", cf_options);
- }
- }
- ~MemTableListTest() override {
- if (db) {
- std::vector<ColumnFamilyDescriptor> cf_descs(handles.size());
- for (int i = 0; i != static_cast<int>(handles.size()); ++i) {
- handles[i]->GetDescriptor(&cf_descs[i]);
- }
- for (auto h : handles) {
- if (h) {
- db->DestroyColumnFamilyHandle(h);
- }
- }
- handles.clear();
- delete db;
- db = nullptr;
- DestroyDB(dbname, options, cf_descs);
- }
- }
- // Calls MemTableList::TryInstallMemtableFlushResults() and sets up all
- // structures needed to call this function.
- Status Mock_InstallMemtableFlushResults(
- MemTableList* list, const MutableCFOptions& mutable_cf_options,
- const autovector<MemTable*>& m, autovector<MemTable*>* to_delete) {
- // Create a mock Logger
- test::NullLogger logger;
- LogBuffer log_buffer(DEBUG_LEVEL, &logger);
- CreateDB();
- // Create a mock VersionSet
- DBOptions db_options;
- db_options.file_system = FileSystem::Default();
- ImmutableDBOptions immutable_db_options(db_options);
- EnvOptions env_options;
- std::shared_ptr<Cache> table_cache(NewLRUCache(50000, 16));
- WriteBufferManager write_buffer_manager(db_options.db_write_buffer_size);
- WriteController write_controller(10000000u);
- VersionSet versions(dbname, &immutable_db_options, env_options,
- table_cache.get(), &write_buffer_manager,
- &write_controller, /*block_cache_tracer=*/nullptr);
- std::vector<ColumnFamilyDescriptor> cf_descs;
- cf_descs.emplace_back(kDefaultColumnFamilyName, ColumnFamilyOptions());
- cf_descs.emplace_back("one", ColumnFamilyOptions());
- cf_descs.emplace_back("two", ColumnFamilyOptions());
- EXPECT_OK(versions.Recover(cf_descs, false));
- // Create mock default ColumnFamilyData
- auto column_family_set = versions.GetColumnFamilySet();
- LogsWithPrepTracker dummy_prep_tracker;
- auto cfd = column_family_set->GetDefault();
- EXPECT_TRUE(nullptr != cfd);
- uint64_t file_num = file_number.fetch_add(1);
- // Create dummy mutex.
- InstrumentedMutex mutex;
- InstrumentedMutexLock l(&mutex);
- std::list<std::unique_ptr<FlushJobInfo>> flush_jobs_info;
- Status s = list->TryInstallMemtableFlushResults(
- cfd, mutable_cf_options, m, &dummy_prep_tracker, &versions, &mutex,
- file_num, to_delete, nullptr, &log_buffer, &flush_jobs_info);
- return s;
- }
- // Calls MemTableList::InstallMemtableFlushResults() and sets up all
- // structures needed to call this function.
- Status Mock_InstallMemtableAtomicFlushResults(
- autovector<MemTableList*>& lists, const autovector<uint32_t>& cf_ids,
- const autovector<const MutableCFOptions*>& mutable_cf_options_list,
- const autovector<const autovector<MemTable*>*>& mems_list,
- autovector<MemTable*>* to_delete) {
- // Create a mock Logger
- test::NullLogger logger;
- LogBuffer log_buffer(DEBUG_LEVEL, &logger);
- CreateDB();
- // Create a mock VersionSet
- DBOptions db_options;
- db_options.file_system.reset(new LegacyFileSystemWrapper(db_options.env));
- ImmutableDBOptions immutable_db_options(db_options);
- EnvOptions env_options;
- std::shared_ptr<Cache> table_cache(NewLRUCache(50000, 16));
- WriteBufferManager write_buffer_manager(db_options.db_write_buffer_size);
- WriteController write_controller(10000000u);
- VersionSet versions(dbname, &immutable_db_options, env_options,
- table_cache.get(), &write_buffer_manager,
- &write_controller, /*block_cache_tracer=*/nullptr);
- std::vector<ColumnFamilyDescriptor> cf_descs;
- cf_descs.emplace_back(kDefaultColumnFamilyName, ColumnFamilyOptions());
- cf_descs.emplace_back("one", ColumnFamilyOptions());
- cf_descs.emplace_back("two", ColumnFamilyOptions());
- EXPECT_OK(versions.Recover(cf_descs, false));
- // Create mock default ColumnFamilyData
- auto column_family_set = versions.GetColumnFamilySet();
- LogsWithPrepTracker dummy_prep_tracker;
- autovector<ColumnFamilyData*> cfds;
- for (int i = 0; i != static_cast<int>(cf_ids.size()); ++i) {
- cfds.emplace_back(column_family_set->GetColumnFamily(cf_ids[i]));
- EXPECT_NE(nullptr, cfds[i]);
- }
- std::vector<FileMetaData> file_metas;
- file_metas.reserve(cf_ids.size());
- for (size_t i = 0; i != cf_ids.size(); ++i) {
- FileMetaData meta;
- uint64_t file_num = file_number.fetch_add(1);
- meta.fd = FileDescriptor(file_num, 0, 0);
- file_metas.emplace_back(meta);
- }
- autovector<FileMetaData*> file_meta_ptrs;
- for (auto& meta : file_metas) {
- file_meta_ptrs.push_back(&meta);
- }
- InstrumentedMutex mutex;
- InstrumentedMutexLock l(&mutex);
- return InstallMemtableAtomicFlushResults(
- &lists, cfds, mutable_cf_options_list, mems_list, &versions, &mutex,
- file_meta_ptrs, to_delete, nullptr, &log_buffer);
- }
- };
- TEST_F(MemTableListTest, Empty) {
- // Create an empty MemTableList and validate basic functions.
- MemTableList list(1, 0, 0);
- ASSERT_EQ(0, list.NumNotFlushed());
- ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
- ASSERT_FALSE(list.IsFlushPending());
- autovector<MemTable*> mems;
- list.PickMemtablesToFlush(nullptr /* memtable_id */, &mems);
- ASSERT_EQ(0, mems.size());
- autovector<MemTable*> to_delete;
- list.current()->Unref(&to_delete);
- ASSERT_EQ(0, to_delete.size());
- }
- TEST_F(MemTableListTest, GetTest) {
- // Create MemTableList
- int min_write_buffer_number_to_merge = 2;
- int max_write_buffer_number_to_maintain = 0;
- int64_t max_write_buffer_size_to_maintain = 0;
- MemTableList list(min_write_buffer_number_to_merge,
- max_write_buffer_number_to_maintain,
- max_write_buffer_size_to_maintain);
- SequenceNumber seq = 1;
- std::string value;
- Status s;
- MergeContext merge_context;
- InternalKeyComparator ikey_cmp(options.comparator);
- SequenceNumber max_covering_tombstone_seq = 0;
- autovector<MemTable*> to_delete;
- LookupKey lkey("key1", seq);
- bool found = list.current()->Get(lkey, &value, &s, &merge_context,
- &max_covering_tombstone_seq, ReadOptions());
- ASSERT_FALSE(found);
- // Create a MemTable
- InternalKeyComparator cmp(BytewiseComparator());
- auto factory = std::make_shared<SkipListFactory>();
- options.memtable_factory = factory;
- ImmutableCFOptions ioptions(options);
- WriteBufferManager wb(options.db_write_buffer_size);
- MemTable* mem = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb,
- kMaxSequenceNumber, 0 /* column_family_id */);
- mem->Ref();
- // Write some keys to this memtable.
- mem->Add(++seq, kTypeDeletion, "key1", "");
- mem->Add(++seq, kTypeValue, "key2", "value2");
- mem->Add(++seq, kTypeValue, "key1", "value1");
- mem->Add(++seq, kTypeValue, "key2", "value2.2");
- // Fetch the newly written keys
- merge_context.Clear();
- found = mem->Get(LookupKey("key1", seq), &value, &s, &merge_context,
- &max_covering_tombstone_seq, ReadOptions());
- ASSERT_TRUE(s.ok() && found);
- ASSERT_EQ(value, "value1");
- merge_context.Clear();
- found = mem->Get(LookupKey("key1", 2), &value, &s, &merge_context,
- &max_covering_tombstone_seq, ReadOptions());
- // MemTable found out that this key is *not* found (at this sequence#)
- ASSERT_TRUE(found && s.IsNotFound());
- merge_context.Clear();
- found = mem->Get(LookupKey("key2", seq), &value, &s, &merge_context,
- &max_covering_tombstone_seq, ReadOptions());
- ASSERT_TRUE(s.ok() && found);
- ASSERT_EQ(value, "value2.2");
- ASSERT_EQ(4, mem->num_entries());
- ASSERT_EQ(1, mem->num_deletes());
- // Add memtable to list
- list.Add(mem, &to_delete);
- SequenceNumber saved_seq = seq;
- // Create another memtable and write some keys to it
- WriteBufferManager wb2(options.db_write_buffer_size);
- MemTable* mem2 = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb2,
- kMaxSequenceNumber, 0 /* column_family_id */);
- mem2->Ref();
- mem2->Add(++seq, kTypeDeletion, "key1", "");
- mem2->Add(++seq, kTypeValue, "key2", "value2.3");
- // Add second memtable to list
- list.Add(mem2, &to_delete);
- // Fetch keys via MemTableList
- merge_context.Clear();
- found =
- list.current()->Get(LookupKey("key1", seq), &value, &s, &merge_context,
- &max_covering_tombstone_seq, ReadOptions());
- ASSERT_TRUE(found && s.IsNotFound());
- merge_context.Clear();
- found = list.current()->Get(LookupKey("key1", saved_seq), &value, &s,
- &merge_context, &max_covering_tombstone_seq,
- ReadOptions());
- ASSERT_TRUE(s.ok() && found);
- ASSERT_EQ("value1", value);
- merge_context.Clear();
- found =
- list.current()->Get(LookupKey("key2", seq), &value, &s, &merge_context,
- &max_covering_tombstone_seq, ReadOptions());
- ASSERT_TRUE(s.ok() && found);
- ASSERT_EQ(value, "value2.3");
- merge_context.Clear();
- found = list.current()->Get(LookupKey("key2", 1), &value, &s, &merge_context,
- &max_covering_tombstone_seq, ReadOptions());
- ASSERT_FALSE(found);
- ASSERT_EQ(2, list.NumNotFlushed());
- list.current()->Unref(&to_delete);
- for (MemTable* m : to_delete) {
- delete m;
- }
- }
- TEST_F(MemTableListTest, GetFromHistoryTest) {
- // Create MemTableList
- int min_write_buffer_number_to_merge = 2;
- int max_write_buffer_number_to_maintain = 2;
- int64_t max_write_buffer_size_to_maintain = 2000;
- MemTableList list(min_write_buffer_number_to_merge,
- max_write_buffer_number_to_maintain,
- max_write_buffer_size_to_maintain);
- SequenceNumber seq = 1;
- std::string value;
- Status s;
- MergeContext merge_context;
- InternalKeyComparator ikey_cmp(options.comparator);
- SequenceNumber max_covering_tombstone_seq = 0;
- autovector<MemTable*> to_delete;
- LookupKey lkey("key1", seq);
- bool found = list.current()->Get(lkey, &value, &s, &merge_context,
- &max_covering_tombstone_seq, ReadOptions());
- ASSERT_FALSE(found);
- // Create a MemTable
- InternalKeyComparator cmp(BytewiseComparator());
- auto factory = std::make_shared<SkipListFactory>();
- options.memtable_factory = factory;
- ImmutableCFOptions ioptions(options);
- WriteBufferManager wb(options.db_write_buffer_size);
- MemTable* mem = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb,
- kMaxSequenceNumber, 0 /* column_family_id */);
- mem->Ref();
- // Write some keys to this memtable.
- mem->Add(++seq, kTypeDeletion, "key1", "");
- mem->Add(++seq, kTypeValue, "key2", "value2");
- mem->Add(++seq, kTypeValue, "key2", "value2.2");
- // Fetch the newly written keys
- merge_context.Clear();
- found = mem->Get(LookupKey("key1", seq), &value, &s, &merge_context,
- &max_covering_tombstone_seq, ReadOptions());
- // MemTable found out that this key is *not* found (at this sequence#)
- ASSERT_TRUE(found && s.IsNotFound());
- merge_context.Clear();
- found = mem->Get(LookupKey("key2", seq), &value, &s, &merge_context,
- &max_covering_tombstone_seq, ReadOptions());
- ASSERT_TRUE(s.ok() && found);
- ASSERT_EQ(value, "value2.2");
- // Add memtable to list
- list.Add(mem, &to_delete);
- ASSERT_EQ(0, to_delete.size());
- // Fetch keys via MemTableList
- merge_context.Clear();
- found =
- list.current()->Get(LookupKey("key1", seq), &value, &s, &merge_context,
- &max_covering_tombstone_seq, ReadOptions());
- ASSERT_TRUE(found && s.IsNotFound());
- merge_context.Clear();
- found =
- list.current()->Get(LookupKey("key2", seq), &value, &s, &merge_context,
- &max_covering_tombstone_seq, ReadOptions());
- ASSERT_TRUE(s.ok() && found);
- ASSERT_EQ("value2.2", value);
- // Flush this memtable from the list.
- // (It will then be a part of the memtable history).
- autovector<MemTable*> to_flush;
- list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush);
- ASSERT_EQ(1, to_flush.size());
- MutableCFOptions mutable_cf_options(options);
- s = Mock_InstallMemtableFlushResults(&list, mutable_cf_options, to_flush,
- &to_delete);
- ASSERT_OK(s);
- ASSERT_EQ(0, list.NumNotFlushed());
- ASSERT_EQ(1, list.NumFlushed());
- ASSERT_EQ(0, to_delete.size());
- // Verify keys are no longer in MemTableList
- merge_context.Clear();
- found =
- list.current()->Get(LookupKey("key1", seq), &value, &s, &merge_context,
- &max_covering_tombstone_seq, ReadOptions());
- ASSERT_FALSE(found);
- merge_context.Clear();
- found =
- list.current()->Get(LookupKey("key2", seq), &value, &s, &merge_context,
- &max_covering_tombstone_seq, ReadOptions());
- ASSERT_FALSE(found);
- // Verify keys are present in history
- merge_context.Clear();
- found = list.current()->GetFromHistory(
- LookupKey("key1", seq), &value, &s, &merge_context,
- &max_covering_tombstone_seq, ReadOptions());
- ASSERT_TRUE(found && s.IsNotFound());
- merge_context.Clear();
- found = list.current()->GetFromHistory(
- LookupKey("key2", seq), &value, &s, &merge_context,
- &max_covering_tombstone_seq, ReadOptions());
- ASSERT_TRUE(found);
- ASSERT_EQ("value2.2", value);
- // Create another memtable and write some keys to it
- WriteBufferManager wb2(options.db_write_buffer_size);
- MemTable* mem2 = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb2,
- kMaxSequenceNumber, 0 /* column_family_id */);
- mem2->Ref();
- mem2->Add(++seq, kTypeDeletion, "key1", "");
- mem2->Add(++seq, kTypeValue, "key3", "value3");
- // Add second memtable to list
- list.Add(mem2, &to_delete);
- ASSERT_EQ(0, to_delete.size());
- to_flush.clear();
- list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush);
- ASSERT_EQ(1, to_flush.size());
- // Flush second memtable
- s = Mock_InstallMemtableFlushResults(&list, mutable_cf_options, to_flush,
- &to_delete);
- ASSERT_OK(s);
- ASSERT_EQ(0, list.NumNotFlushed());
- ASSERT_EQ(2, list.NumFlushed());
- ASSERT_EQ(0, to_delete.size());
- // Add a third memtable to push the first memtable out of the history
- WriteBufferManager wb3(options.db_write_buffer_size);
- MemTable* mem3 = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb3,
- kMaxSequenceNumber, 0 /* column_family_id */);
- mem3->Ref();
- list.Add(mem3, &to_delete);
- ASSERT_EQ(1, list.NumNotFlushed());
- ASSERT_EQ(1, list.NumFlushed());
- ASSERT_EQ(1, to_delete.size());
- // Verify keys are no longer in MemTableList
- merge_context.Clear();
- found =
- list.current()->Get(LookupKey("key1", seq), &value, &s, &merge_context,
- &max_covering_tombstone_seq, ReadOptions());
- ASSERT_FALSE(found);
- merge_context.Clear();
- found =
- list.current()->Get(LookupKey("key2", seq), &value, &s, &merge_context,
- &max_covering_tombstone_seq, ReadOptions());
- ASSERT_FALSE(found);
- merge_context.Clear();
- found =
- list.current()->Get(LookupKey("key3", seq), &value, &s, &merge_context,
- &max_covering_tombstone_seq, ReadOptions());
- ASSERT_FALSE(found);
- // Verify that the second memtable's keys are in the history
- merge_context.Clear();
- found = list.current()->GetFromHistory(
- LookupKey("key1", seq), &value, &s, &merge_context,
- &max_covering_tombstone_seq, ReadOptions());
- ASSERT_TRUE(found && s.IsNotFound());
- merge_context.Clear();
- found = list.current()->GetFromHistory(
- LookupKey("key3", seq), &value, &s, &merge_context,
- &max_covering_tombstone_seq, ReadOptions());
- ASSERT_TRUE(found);
- ASSERT_EQ("value3", value);
- // Verify that key2 from the first memtable is no longer in the history
- merge_context.Clear();
- found =
- list.current()->Get(LookupKey("key2", seq), &value, &s, &merge_context,
- &max_covering_tombstone_seq, ReadOptions());
- ASSERT_FALSE(found);
- // Cleanup
- list.current()->Unref(&to_delete);
- ASSERT_EQ(3, to_delete.size());
- for (MemTable* m : to_delete) {
- delete m;
- }
- }
- TEST_F(MemTableListTest, FlushPendingTest) {
- const int num_tables = 6;
- SequenceNumber seq = 1;
- Status s;
- auto factory = std::make_shared<SkipListFactory>();
- options.memtable_factory = factory;
- ImmutableCFOptions ioptions(options);
- InternalKeyComparator cmp(BytewiseComparator());
- WriteBufferManager wb(options.db_write_buffer_size);
- autovector<MemTable*> to_delete;
- // Create MemTableList
- int min_write_buffer_number_to_merge = 3;
- int max_write_buffer_number_to_maintain = 7;
- int64_t max_write_buffer_size_to_maintain =
- 7 * static_cast<int>(options.write_buffer_size);
- MemTableList list(min_write_buffer_number_to_merge,
- max_write_buffer_number_to_maintain,
- max_write_buffer_size_to_maintain);
- // Create some MemTables
- uint64_t memtable_id = 0;
- std::vector<MemTable*> tables;
- MutableCFOptions mutable_cf_options(options);
- for (int i = 0; i < num_tables; i++) {
- MemTable* mem = new MemTable(cmp, ioptions, mutable_cf_options, &wb,
- kMaxSequenceNumber, 0 /* column_family_id */);
- mem->SetID(memtable_id++);
- mem->Ref();
- std::string value;
- MergeContext merge_context;
- mem->Add(++seq, kTypeValue, "key1", ToString(i));
- mem->Add(++seq, kTypeValue, "keyN" + ToString(i), "valueN");
- mem->Add(++seq, kTypeValue, "keyX" + ToString(i), "value");
- mem->Add(++seq, kTypeValue, "keyM" + ToString(i), "valueM");
- mem->Add(++seq, kTypeDeletion, "keyX" + ToString(i), "");
- tables.push_back(mem);
- }
- // Nothing to flush
- ASSERT_FALSE(list.IsFlushPending());
- ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
- autovector<MemTable*> to_flush;
- list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush);
- ASSERT_EQ(0, to_flush.size());
- // Request a flush even though there is nothing to flush
- list.FlushRequested();
- ASSERT_FALSE(list.IsFlushPending());
- ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
- // Attempt to 'flush' to clear request for flush
- list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush);
- ASSERT_EQ(0, to_flush.size());
- ASSERT_FALSE(list.IsFlushPending());
- ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
- // Request a flush again
- list.FlushRequested();
- // No flush pending since the list is empty.
- ASSERT_FALSE(list.IsFlushPending());
- ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
- // Add 2 tables
- list.Add(tables[0], &to_delete);
- list.Add(tables[1], &to_delete);
- ASSERT_EQ(2, list.NumNotFlushed());
- ASSERT_EQ(0, to_delete.size());
- // Even though we have less than the minimum to flush, a flush is
- // pending since we had previously requested a flush and never called
- // PickMemtablesToFlush() to clear the flush.
- ASSERT_TRUE(list.IsFlushPending());
- ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire));
- // Pick tables to flush
- list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush);
- ASSERT_EQ(2, to_flush.size());
- ASSERT_EQ(2, list.NumNotFlushed());
- ASSERT_FALSE(list.IsFlushPending());
- ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
- // Revert flush
- list.RollbackMemtableFlush(to_flush, 0);
- ASSERT_FALSE(list.IsFlushPending());
- ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire));
- to_flush.clear();
- // Add another table
- list.Add(tables[2], &to_delete);
- // We now have the minimum to flush regardles of whether FlushRequested()
- // was called.
- ASSERT_TRUE(list.IsFlushPending());
- ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire));
- ASSERT_EQ(0, to_delete.size());
- // Pick tables to flush
- list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush);
- ASSERT_EQ(3, to_flush.size());
- ASSERT_EQ(3, list.NumNotFlushed());
- ASSERT_FALSE(list.IsFlushPending());
- ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
- // Pick tables to flush again
- autovector<MemTable*> to_flush2;
- list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush2);
- ASSERT_EQ(0, to_flush2.size());
- ASSERT_EQ(3, list.NumNotFlushed());
- ASSERT_FALSE(list.IsFlushPending());
- ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
- // Add another table
- list.Add(tables[3], &to_delete);
- ASSERT_FALSE(list.IsFlushPending());
- ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire));
- ASSERT_EQ(0, to_delete.size());
- // Request a flush again
- list.FlushRequested();
- ASSERT_TRUE(list.IsFlushPending());
- ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire));
- // Pick tables to flush again
- list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush2);
- ASSERT_EQ(1, to_flush2.size());
- ASSERT_EQ(4, list.NumNotFlushed());
- ASSERT_FALSE(list.IsFlushPending());
- ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
- // Rollback first pick of tables
- list.RollbackMemtableFlush(to_flush, 0);
- ASSERT_TRUE(list.IsFlushPending());
- ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire));
- to_flush.clear();
- // Add another tables
- list.Add(tables[4], &to_delete);
- ASSERT_EQ(5, list.NumNotFlushed());
- // We now have the minimum to flush regardles of whether FlushRequested()
- ASSERT_TRUE(list.IsFlushPending());
- ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire));
- ASSERT_EQ(0, to_delete.size());
- // Pick tables to flush
- list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush);
- // Should pick 4 of 5 since 1 table has been picked in to_flush2
- ASSERT_EQ(4, to_flush.size());
- ASSERT_EQ(5, list.NumNotFlushed());
- ASSERT_FALSE(list.IsFlushPending());
- ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
- // Pick tables to flush again
- autovector<MemTable*> to_flush3;
- list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush3);
- ASSERT_EQ(0, to_flush3.size()); // nothing not in progress of being flushed
- ASSERT_EQ(5, list.NumNotFlushed());
- ASSERT_FALSE(list.IsFlushPending());
- ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
- // Flush the 4 memtables that were picked in to_flush
- s = Mock_InstallMemtableFlushResults(&list, mutable_cf_options, to_flush,
- &to_delete);
- ASSERT_OK(s);
- // Note: now to_flush contains tables[0,1,2,4]. to_flush2 contains
- // tables[3].
- // Current implementation will only commit memtables in the order they were
- // created. So TryInstallMemtableFlushResults will install the first 3 tables
- // in to_flush and stop when it encounters a table not yet flushed.
- ASSERT_EQ(2, list.NumNotFlushed());
- int num_in_history =
- std::min(3, static_cast<int>(max_write_buffer_size_to_maintain) /
- static_cast<int>(options.write_buffer_size));
- ASSERT_EQ(num_in_history, list.NumFlushed());
- ASSERT_EQ(5 - list.NumNotFlushed() - num_in_history, to_delete.size());
- // Request a flush again. Should be nothing to flush
- list.FlushRequested();
- ASSERT_FALSE(list.IsFlushPending());
- ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
- // Flush the 1 memtable that was picked in to_flush2
- s = MemTableListTest::Mock_InstallMemtableFlushResults(
- &list, mutable_cf_options, to_flush2, &to_delete);
- ASSERT_OK(s);
- // This will actually install 2 tables. The 1 we told it to flush, and also
- // tables[4] which has been waiting for tables[3] to commit.
- ASSERT_EQ(0, list.NumNotFlushed());
- num_in_history =
- std::min(5, static_cast<int>(max_write_buffer_size_to_maintain) /
- static_cast<int>(options.write_buffer_size));
- ASSERT_EQ(num_in_history, list.NumFlushed());
- ASSERT_EQ(5 - list.NumNotFlushed() - num_in_history, to_delete.size());
- for (const auto& m : to_delete) {
- // Refcount should be 0 after calling TryInstallMemtableFlushResults.
- // Verify this, by Ref'ing then UnRef'ing:
- m->Ref();
- ASSERT_EQ(m, m->Unref());
- delete m;
- }
- to_delete.clear();
- // Add another table
- list.Add(tables[5], &to_delete);
- ASSERT_EQ(1, list.NumNotFlushed());
- ASSERT_EQ(5, list.GetLatestMemTableID());
- memtable_id = 4;
- // Pick tables to flush. The tables to pick must have ID smaller than or
- // equal to 4. Therefore, no table will be selected in this case.
- autovector<MemTable*> to_flush4;
- list.FlushRequested();
- ASSERT_TRUE(list.HasFlushRequested());
- list.PickMemtablesToFlush(&memtable_id, &to_flush4);
- ASSERT_TRUE(to_flush4.empty());
- ASSERT_EQ(1, list.NumNotFlushed());
- ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire));
- ASSERT_FALSE(list.IsFlushPending());
- ASSERT_FALSE(list.HasFlushRequested());
- // Pick tables to flush. The tables to pick must have ID smaller than or
- // equal to 5. Therefore, only tables[5] will be selected.
- memtable_id = 5;
- list.FlushRequested();
- list.PickMemtablesToFlush(&memtable_id, &to_flush4);
- ASSERT_EQ(1, static_cast<int>(to_flush4.size()));
- ASSERT_EQ(1, list.NumNotFlushed());
- ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
- ASSERT_FALSE(list.IsFlushPending());
- to_delete.clear();
- list.current()->Unref(&to_delete);
- int to_delete_size =
- std::min(num_tables, static_cast<int>(max_write_buffer_size_to_maintain) /
- static_cast<int>(options.write_buffer_size));
- ASSERT_EQ(to_delete_size, to_delete.size());
- for (const auto& m : to_delete) {
- // Refcount should be 0 after calling TryInstallMemtableFlushResults.
- // Verify this, by Ref'ing then UnRef'ing:
- m->Ref();
- ASSERT_EQ(m, m->Unref());
- delete m;
- }
- to_delete.clear();
- }
- TEST_F(MemTableListTest, EmptyAtomicFlusTest) {
- autovector<MemTableList*> lists;
- autovector<uint32_t> cf_ids;
- autovector<const MutableCFOptions*> options_list;
- autovector<const autovector<MemTable*>*> to_flush;
- autovector<MemTable*> to_delete;
- Status s = Mock_InstallMemtableAtomicFlushResults(lists, cf_ids, options_list,
- to_flush, &to_delete);
- ASSERT_OK(s);
- ASSERT_TRUE(to_delete.empty());
- }
- TEST_F(MemTableListTest, AtomicFlusTest) {
- const int num_cfs = 3;
- const int num_tables_per_cf = 2;
- SequenceNumber seq = 1;
- auto factory = std::make_shared<SkipListFactory>();
- options.memtable_factory = factory;
- ImmutableCFOptions ioptions(options);
- InternalKeyComparator cmp(BytewiseComparator());
- WriteBufferManager wb(options.db_write_buffer_size);
- // Create MemTableLists
- int min_write_buffer_number_to_merge = 3;
- int max_write_buffer_number_to_maintain = 7;
- int64_t max_write_buffer_size_to_maintain =
- 7 * static_cast<int64_t>(options.write_buffer_size);
- autovector<MemTableList*> lists;
- for (int i = 0; i != num_cfs; ++i) {
- lists.emplace_back(new MemTableList(min_write_buffer_number_to_merge,
- max_write_buffer_number_to_maintain,
- max_write_buffer_size_to_maintain));
- }
- autovector<uint32_t> cf_ids;
- std::vector<std::vector<MemTable*>> tables(num_cfs);
- autovector<const MutableCFOptions*> mutable_cf_options_list;
- uint32_t cf_id = 0;
- for (auto& elem : tables) {
- mutable_cf_options_list.emplace_back(new MutableCFOptions(options));
- uint64_t memtable_id = 0;
- for (int i = 0; i != num_tables_per_cf; ++i) {
- MemTable* mem =
- new MemTable(cmp, ioptions, *(mutable_cf_options_list.back()), &wb,
- kMaxSequenceNumber, cf_id);
- mem->SetID(memtable_id++);
- mem->Ref();
- std::string value;
- mem->Add(++seq, kTypeValue, "key1", ToString(i));
- mem->Add(++seq, kTypeValue, "keyN" + ToString(i), "valueN");
- mem->Add(++seq, kTypeValue, "keyX" + ToString(i), "value");
- mem->Add(++seq, kTypeValue, "keyM" + ToString(i), "valueM");
- mem->Add(++seq, kTypeDeletion, "keyX" + ToString(i), "");
- elem.push_back(mem);
- }
- cf_ids.push_back(cf_id++);
- }
- std::vector<autovector<MemTable*>> flush_candidates(num_cfs);
- // Nothing to flush
- for (auto i = 0; i != num_cfs; ++i) {
- auto* list = lists[i];
- ASSERT_FALSE(list->IsFlushPending());
- ASSERT_FALSE(list->imm_flush_needed.load(std::memory_order_acquire));
- list->PickMemtablesToFlush(nullptr /* memtable_id */, &flush_candidates[i]);
- ASSERT_EQ(0, flush_candidates[i].size());
- }
- // Request flush even though there is nothing to flush
- for (auto i = 0; i != num_cfs; ++i) {
- auto* list = lists[i];
- list->FlushRequested();
- ASSERT_FALSE(list->IsFlushPending());
- ASSERT_FALSE(list->imm_flush_needed.load(std::memory_order_acquire));
- }
- autovector<MemTable*> to_delete;
- // Add tables to the immutable memtalbe lists associated with column families
- for (auto i = 0; i != num_cfs; ++i) {
- for (auto j = 0; j != num_tables_per_cf; ++j) {
- lists[i]->Add(tables[i][j], &to_delete);
- }
- ASSERT_EQ(num_tables_per_cf, lists[i]->NumNotFlushed());
- ASSERT_TRUE(lists[i]->IsFlushPending());
- ASSERT_TRUE(lists[i]->imm_flush_needed.load(std::memory_order_acquire));
- }
- std::vector<uint64_t> flush_memtable_ids = {1, 1, 0};
- // +----+
- // list[0]: |0 1|
- // list[1]: |0 1|
- // | +--+
- // list[2]: |0| 1
- // +-+
- // Pick memtables to flush
- for (auto i = 0; i != num_cfs; ++i) {
- flush_candidates[i].clear();
- lists[i]->PickMemtablesToFlush(&flush_memtable_ids[i],
- &flush_candidates[i]);
- ASSERT_EQ(flush_memtable_ids[i] - 0 + 1,
- static_cast<uint64_t>(flush_candidates[i].size()));
- }
- autovector<MemTableList*> tmp_lists;
- autovector<uint32_t> tmp_cf_ids;
- autovector<const MutableCFOptions*> tmp_options_list;
- autovector<const autovector<MemTable*>*> to_flush;
- for (auto i = 0; i != num_cfs; ++i) {
- if (!flush_candidates[i].empty()) {
- to_flush.push_back(&flush_candidates[i]);
- tmp_lists.push_back(lists[i]);
- tmp_cf_ids.push_back(i);
- tmp_options_list.push_back(mutable_cf_options_list[i]);
- }
- }
- Status s = Mock_InstallMemtableAtomicFlushResults(
- tmp_lists, tmp_cf_ids, tmp_options_list, to_flush, &to_delete);
- ASSERT_OK(s);
- for (auto i = 0; i != num_cfs; ++i) {
- for (auto j = 0; j != num_tables_per_cf; ++j) {
- if (static_cast<uint64_t>(j) <= flush_memtable_ids[i]) {
- ASSERT_LT(0, tables[i][j]->GetFileNumber());
- }
- }
- ASSERT_EQ(
- static_cast<size_t>(num_tables_per_cf) - flush_candidates[i].size(),
- lists[i]->NumNotFlushed());
- }
- to_delete.clear();
- for (auto list : lists) {
- list->current()->Unref(&to_delete);
- delete list;
- }
- for (auto& mutable_cf_options : mutable_cf_options_list) {
- if (mutable_cf_options != nullptr) {
- delete mutable_cf_options;
- mutable_cf_options = nullptr;
- }
- }
- // All memtables in tables array must have been flushed, thus ready to be
- // deleted.
- ASSERT_EQ(to_delete.size(), tables.size() * tables.front().size());
- for (const auto& m : to_delete) {
- // Refcount should be 0 after calling InstallMemtableFlushResults.
- // Verify this by Ref'ing and then Unref'ing.
- m->Ref();
- ASSERT_EQ(m, m->Unref());
- delete m;
- }
- }
- } // namespace ROCKSDB_NAMESPACE
- int main(int argc, char** argv) {
- ::testing::InitGoogleTest(&argc, argv);
- return RUN_ALL_TESTS();
- }
|