| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784 | //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.//  This source code is licensed under both the GPLv2 (found in the//  COPYING file in the root directory) and Apache 2.0 License//  (found in the LICENSE.Apache file in the root directory).//// Copyright (c) 2011 The LevelDB Authors. All rights reserved.// Use of this source code is governed by a BSD-style license that can be// found in the LICENSE file. See the AUTHORS file for names of contributors.#include <atomic>#include "db/db_impl/db_impl.h"#include "db/db_test_util.h"#include "port/port.h"#include "port/stack_trace.h"#include "test_util/fault_injection_test_env.h"#include "test_util/sync_point.h"#include "util/cast_util.h"#include "util/mutexlock.h"namespace ROCKSDB_NAMESPACE {class DBFlushTest : public DBTestBase { public:  DBFlushTest() : DBTestBase("/db_flush_test") {}};class DBFlushDirectIOTest : public DBFlushTest,                            public ::testing::WithParamInterface<bool> { public:  DBFlushDirectIOTest() : DBFlushTest() {}};class DBAtomicFlushTest : public DBFlushTest,                          public ::testing::WithParamInterface<bool> { public:  DBAtomicFlushTest() : DBFlushTest() {}};// We had issue when two background threads trying to flush at the same time,// only one of them get committed. The test verifies the issue is fixed.TEST_F(DBFlushTest, FlushWhileWritingManifest) {  Options options;  options.disable_auto_compactions = true;  options.max_background_flushes = 2;  options.env = env_;  Reopen(options);  FlushOptions no_wait;  no_wait.wait = false;  no_wait.allow_write_stall=true;  SyncPoint::GetInstance()->LoadDependency(      {{"VersionSet::LogAndApply:WriteManifest",        "DBFlushTest::FlushWhileWritingManifest:1"},       {"MemTableList::TryInstallMemtableFlushResults:InProgress",        "VersionSet::LogAndApply:WriteManifestDone"}});  SyncPoint::GetInstance()->EnableProcessing();  ASSERT_OK(Put("foo", "v"));  ASSERT_OK(dbfull()->Flush(no_wait));  TEST_SYNC_POINT("DBFlushTest::FlushWhileWritingManifest:1");  ASSERT_OK(Put("bar", "v"));  ASSERT_OK(dbfull()->Flush(no_wait));  // If the issue is hit we will wait here forever.  dbfull()->TEST_WaitForFlushMemTable();#ifndef ROCKSDB_LITE  ASSERT_EQ(2, TotalTableFiles());#endif  // ROCKSDB_LITE}// Disable this test temporarily on Travis as it fails intermittently.// Github issue: #4151TEST_F(DBFlushTest, SyncFail) {  std::unique_ptr<FaultInjectionTestEnv> fault_injection_env(      new FaultInjectionTestEnv(env_));  Options options;  options.disable_auto_compactions = true;  options.env = fault_injection_env.get();  SyncPoint::GetInstance()->LoadDependency(      {{"DBFlushTest::SyncFail:GetVersionRefCount:1",        "DBImpl::FlushMemTableToOutputFile:BeforePickMemtables"},       {"DBImpl::FlushMemTableToOutputFile:AfterPickMemtables",        "DBFlushTest::SyncFail:GetVersionRefCount:2"},       {"DBFlushTest::SyncFail:1", "DBImpl::SyncClosedLogs:Start"},       {"DBImpl::SyncClosedLogs:Failed", "DBFlushTest::SyncFail:2"}});  SyncPoint::GetInstance()->EnableProcessing();  CreateAndReopenWithCF({"pikachu"}, options);  Put("key", "value");  auto* cfd =      reinterpret_cast<ColumnFamilyHandleImpl*>(db_->DefaultColumnFamily())          ->cfd();  FlushOptions flush_options;  flush_options.wait = false;  ASSERT_OK(dbfull()->Flush(flush_options));  // Flush installs a new super-version. Get the ref count after that.  auto current_before = cfd->current();  int refs_before = cfd->current()->TEST_refs();  TEST_SYNC_POINT("DBFlushTest::SyncFail:GetVersionRefCount:1");  TEST_SYNC_POINT("DBFlushTest::SyncFail:GetVersionRefCount:2");  int refs_after_picking_memtables = cfd->current()->TEST_refs();  ASSERT_EQ(refs_before + 1, refs_after_picking_memtables);  fault_injection_env->SetFilesystemActive(false);  TEST_SYNC_POINT("DBFlushTest::SyncFail:1");  TEST_SYNC_POINT("DBFlushTest::SyncFail:2");  fault_injection_env->SetFilesystemActive(true);  // Now the background job will do the flush; wait for it.  dbfull()->TEST_WaitForFlushMemTable();#ifndef ROCKSDB_LITE  ASSERT_EQ("", FilesPerLevel());  // flush failed.#endif                             // ROCKSDB_LITE  // Backgroun flush job should release ref count to current version.  ASSERT_EQ(current_before, cfd->current());  ASSERT_EQ(refs_before, cfd->current()->TEST_refs());  Destroy(options);}TEST_F(DBFlushTest, SyncSkip) {  Options options = CurrentOptions();  SyncPoint::GetInstance()->LoadDependency(      {{"DBFlushTest::SyncSkip:1", "DBImpl::SyncClosedLogs:Skip"},       {"DBImpl::SyncClosedLogs:Skip", "DBFlushTest::SyncSkip:2"}});  SyncPoint::GetInstance()->EnableProcessing();  Reopen(options);  Put("key", "value");  FlushOptions flush_options;  flush_options.wait = false;  ASSERT_OK(dbfull()->Flush(flush_options));  TEST_SYNC_POINT("DBFlushTest::SyncSkip:1");  TEST_SYNC_POINT("DBFlushTest::SyncSkip:2");  // Now the background job will do the flush; wait for it.  dbfull()->TEST_WaitForFlushMemTable();  Destroy(options);}TEST_F(DBFlushTest, FlushInLowPriThreadPool) {  // Verify setting an empty high-pri (flush) thread pool causes flushes to be  // scheduled in the low-pri (compaction) thread pool.  Options options = CurrentOptions();  options.level0_file_num_compaction_trigger = 4;  options.memtable_factory.reset(new SpecialSkipListFactory(1));  Reopen(options);  env_->SetBackgroundThreads(0, Env::HIGH);  std::thread::id tid;  int num_flushes = 0, num_compactions = 0;  SyncPoint::GetInstance()->SetCallBack(      "DBImpl::BGWorkFlush", [&](void* /*arg*/) {        if (tid == std::thread::id()) {          tid = std::this_thread::get_id();        } else {          ASSERT_EQ(tid, std::this_thread::get_id());        }        ++num_flushes;      });  SyncPoint::GetInstance()->SetCallBack(      "DBImpl::BGWorkCompaction", [&](void* /*arg*/) {        ASSERT_EQ(tid, std::this_thread::get_id());        ++num_compactions;      });  SyncPoint::GetInstance()->EnableProcessing();  ASSERT_OK(Put("key", "val"));  for (int i = 0; i < 4; ++i) {    ASSERT_OK(Put("key", "val"));    dbfull()->TEST_WaitForFlushMemTable();  }  dbfull()->TEST_WaitForCompact();  ASSERT_EQ(4, num_flushes);  ASSERT_EQ(1, num_compactions);}TEST_F(DBFlushTest, ManualFlushWithMinWriteBufferNumberToMerge) {  Options options = CurrentOptions();  options.write_buffer_size = 100;  options.max_write_buffer_number = 4;  options.min_write_buffer_number_to_merge = 3;  Reopen(options);  SyncPoint::GetInstance()->LoadDependency(      {{"DBImpl::BGWorkFlush",        "DBFlushTest::ManualFlushWithMinWriteBufferNumberToMerge:1"},       {"DBFlushTest::ManualFlushWithMinWriteBufferNumberToMerge:2",        "FlushJob::WriteLevel0Table"}});  SyncPoint::GetInstance()->EnableProcessing();  ASSERT_OK(Put("key1", "value1"));  port::Thread t([&]() {    // The call wait for flush to finish, i.e. with flush_options.wait = true.    ASSERT_OK(Flush());  });  // Wait for flush start.  TEST_SYNC_POINT("DBFlushTest::ManualFlushWithMinWriteBufferNumberToMerge:1");  // Insert a second memtable before the manual flush finish.  // At the end of the manual flush job, it will check if further flush  // is needed, but it will not trigger flush of the second memtable because  // min_write_buffer_number_to_merge is not reached.  ASSERT_OK(Put("key2", "value2"));  ASSERT_OK(dbfull()->TEST_SwitchMemtable());  TEST_SYNC_POINT("DBFlushTest::ManualFlushWithMinWriteBufferNumberToMerge:2");  // Manual flush should return, without waiting for flush indefinitely.  t.join();}TEST_F(DBFlushTest, ScheduleOnlyOneBgThread) {  Options options = CurrentOptions();  Reopen(options);  SyncPoint::GetInstance()->DisableProcessing();  SyncPoint::GetInstance()->ClearAllCallBacks();  int called = 0;  SyncPoint::GetInstance()->SetCallBack(      "DBImpl::MaybeScheduleFlushOrCompaction:AfterSchedule:0", [&](void* arg) {        ASSERT_NE(nullptr, arg);        auto unscheduled_flushes = *reinterpret_cast<int*>(arg);        ASSERT_EQ(0, unscheduled_flushes);        ++called;      });  SyncPoint::GetInstance()->EnableProcessing();  ASSERT_OK(Put("a", "foo"));  FlushOptions flush_opts;  ASSERT_OK(dbfull()->Flush(flush_opts));  ASSERT_EQ(1, called);  SyncPoint::GetInstance()->DisableProcessing();  SyncPoint::GetInstance()->ClearAllCallBacks();}TEST_P(DBFlushDirectIOTest, DirectIO) {  Options options;  options.create_if_missing = true;  options.disable_auto_compactions = true;  options.max_background_flushes = 2;  options.use_direct_io_for_flush_and_compaction = GetParam();  options.env = new MockEnv(Env::Default());  SyncPoint::GetInstance()->SetCallBack(      "BuildTable:create_file", [&](void* arg) {        bool* use_direct_writes = static_cast<bool*>(arg);        ASSERT_EQ(*use_direct_writes,                  options.use_direct_io_for_flush_and_compaction);      });  SyncPoint::GetInstance()->EnableProcessing();  Reopen(options);  ASSERT_OK(Put("foo", "v"));  FlushOptions flush_options;  flush_options.wait = true;  ASSERT_OK(dbfull()->Flush(flush_options));  Destroy(options);  delete options.env;}TEST_F(DBFlushTest, FlushError) {  Options options;  std::unique_ptr<FaultInjectionTestEnv> fault_injection_env(      new FaultInjectionTestEnv(env_));  options.write_buffer_size = 100;  options.max_write_buffer_number = 4;  options.min_write_buffer_number_to_merge = 3;  options.disable_auto_compactions = true;  options.env = fault_injection_env.get();  Reopen(options);  ASSERT_OK(Put("key1", "value1"));  ASSERT_OK(Put("key2", "value2"));  fault_injection_env->SetFilesystemActive(false);  Status s = dbfull()->TEST_SwitchMemtable();  fault_injection_env->SetFilesystemActive(true);  Destroy(options);  ASSERT_NE(s, Status::OK());}TEST_F(DBFlushTest, ManualFlushFailsInReadOnlyMode) {  // Regression test for bug where manual flush hangs forever when the DB  // is in read-only mode. Verify it now at least returns, despite failing.  Options options;  std::unique_ptr<FaultInjectionTestEnv> fault_injection_env(      new FaultInjectionTestEnv(env_));  options.env = fault_injection_env.get();  options.max_write_buffer_number = 2;  Reopen(options);  // Trigger a first flush but don't let it run  ASSERT_OK(db_->PauseBackgroundWork());  ASSERT_OK(Put("key1", "value1"));  FlushOptions flush_opts;  flush_opts.wait = false;  ASSERT_OK(db_->Flush(flush_opts));  // Write a key to the second memtable so we have something to flush later  // after the DB is in read-only mode.  ASSERT_OK(Put("key2", "value2"));  // Let the first flush continue, hit an error, and put the DB in read-only  // mode.  fault_injection_env->SetFilesystemActive(false);  ASSERT_OK(db_->ContinueBackgroundWork());  dbfull()->TEST_WaitForFlushMemTable();#ifndef ROCKSDB_LITE  uint64_t num_bg_errors;  ASSERT_TRUE(db_->GetIntProperty(DB::Properties::kBackgroundErrors,                                  &num_bg_errors));  ASSERT_GT(num_bg_errors, 0);#endif  // ROCKSDB_LITE  // In the bug scenario, triggering another flush would cause the second flush  // to hang forever. After the fix we expect it to return an error.  ASSERT_NOK(db_->Flush(FlushOptions()));  Close();}TEST_F(DBFlushTest, CFDropRaceWithWaitForFlushMemTables) {  Options options = CurrentOptions();  options.create_if_missing = true;  CreateAndReopenWithCF({"pikachu"}, options);  SyncPoint::GetInstance()->DisableProcessing();  SyncPoint::GetInstance()->LoadDependency(      {{"DBImpl::FlushMemTable:AfterScheduleFlush",        "DBFlushTest::CFDropRaceWithWaitForFlushMemTables:BeforeDrop"},       {"DBFlushTest::CFDropRaceWithWaitForFlushMemTables:AfterFree",        "DBImpl::BackgroundCallFlush:start"},       {"DBImpl::BackgroundCallFlush:start",        "DBImpl::FlushMemTable:BeforeWaitForBgFlush"}});  SyncPoint::GetInstance()->EnableProcessing();  ASSERT_EQ(2, handles_.size());  ASSERT_OK(Put(1, "key", "value"));  auto* cfd = static_cast<ColumnFamilyHandleImpl*>(handles_[1])->cfd();  port::Thread drop_cf_thr([&]() {    TEST_SYNC_POINT(        "DBFlushTest::CFDropRaceWithWaitForFlushMemTables:BeforeDrop");    ASSERT_OK(dbfull()->DropColumnFamily(handles_[1]));    ASSERT_OK(dbfull()->DestroyColumnFamilyHandle(handles_[1]));    handles_.resize(1);    TEST_SYNC_POINT(        "DBFlushTest::CFDropRaceWithWaitForFlushMemTables:AfterFree");  });  FlushOptions flush_opts;  flush_opts.allow_write_stall = true;  ASSERT_NOK(dbfull()->TEST_FlushMemTable(cfd, flush_opts));  drop_cf_thr.join();  Close();  SyncPoint::GetInstance()->DisableProcessing();}#ifndef ROCKSDB_LITETEST_F(DBFlushTest, FireOnFlushCompletedAfterCommittedResult) {  class TestListener : public EventListener {   public:    void OnFlushCompleted(DB* db, const FlushJobInfo& info) override {      // There's only one key in each flush.      ASSERT_EQ(info.smallest_seqno, info.largest_seqno);      ASSERT_NE(0, info.smallest_seqno);      if (info.smallest_seqno == seq1) {        // First flush completed        ASSERT_FALSE(completed1);        completed1 = true;        CheckFlushResultCommitted(db, seq1);      } else {        // Second flush completed        ASSERT_FALSE(completed2);        completed2 = true;        ASSERT_EQ(info.smallest_seqno, seq2);        CheckFlushResultCommitted(db, seq2);      }    }    void CheckFlushResultCommitted(DB* db, SequenceNumber seq) {      DBImpl* db_impl = static_cast_with_check<DBImpl>(db);      InstrumentedMutex* mutex = db_impl->mutex();      mutex->Lock();      auto* cfd =          reinterpret_cast<ColumnFamilyHandleImpl*>(db->DefaultColumnFamily())              ->cfd();      ASSERT_LT(seq, cfd->imm()->current()->GetEarliestSequenceNumber());      mutex->Unlock();    }    std::atomic<SequenceNumber> seq1{0};    std::atomic<SequenceNumber> seq2{0};    std::atomic<bool> completed1{false};    std::atomic<bool> completed2{false};  };  std::shared_ptr<TestListener> listener = std::make_shared<TestListener>();  SyncPoint::GetInstance()->LoadDependency(      {{"DBImpl::BackgroundCallFlush:start",        "DBFlushTest::FireOnFlushCompletedAfterCommittedResult:WaitFirst"},       {"DBImpl::FlushMemTableToOutputFile:Finish",        "DBFlushTest::FireOnFlushCompletedAfterCommittedResult:WaitSecond"}});  SyncPoint::GetInstance()->SetCallBack(      "FlushJob::WriteLevel0Table", [&listener](void* arg) {        // Wait for the second flush finished, out of mutex.        auto* mems = reinterpret_cast<autovector<MemTable*>*>(arg);        if (mems->front()->GetEarliestSequenceNumber() == listener->seq1 - 1) {          TEST_SYNC_POINT(              "DBFlushTest::FireOnFlushCompletedAfterCommittedResult:"              "WaitSecond");        }      });  Options options = CurrentOptions();  options.create_if_missing = true;  options.listeners.push_back(listener);  // Setting max_flush_jobs = max_background_jobs / 4 = 2.  options.max_background_jobs = 8;  // Allow 2 immutable memtables.  options.max_write_buffer_number = 3;  Reopen(options);  SyncPoint::GetInstance()->EnableProcessing();  ASSERT_OK(Put("foo", "v"));  listener->seq1 = db_->GetLatestSequenceNumber();  // t1 will wait for the second flush complete before committing flush result.  auto t1 = port::Thread([&]() {    // flush_opts.wait = true    ASSERT_OK(db_->Flush(FlushOptions()));  });  // Wait for first flush started.  TEST_SYNC_POINT(      "DBFlushTest::FireOnFlushCompletedAfterCommittedResult:WaitFirst");  // The second flush will exit early without commit its result. The work  // is delegated to the first flush.  ASSERT_OK(Put("bar", "v"));  listener->seq2 = db_->GetLatestSequenceNumber();  FlushOptions flush_opts;  flush_opts.wait = false;  ASSERT_OK(db_->Flush(flush_opts));  t1.join();  ASSERT_TRUE(listener->completed1);  ASSERT_TRUE(listener->completed2);  SyncPoint::GetInstance()->DisableProcessing();  SyncPoint::GetInstance()->ClearAllCallBacks();}#endif  // !ROCKSDB_LITETEST_P(DBAtomicFlushTest, ManualAtomicFlush) {  Options options = CurrentOptions();  options.create_if_missing = true;  options.atomic_flush = GetParam();  options.write_buffer_size = (static_cast<size_t>(64) << 20);  CreateAndReopenWithCF({"pikachu", "eevee"}, options);  size_t num_cfs = handles_.size();  ASSERT_EQ(3, num_cfs);  WriteOptions wopts;  wopts.disableWAL = true;  for (size_t i = 0; i != num_cfs; ++i) {    ASSERT_OK(Put(static_cast<int>(i) /*cf*/, "key", "value", wopts));  }  std::vector<int> cf_ids;  for (size_t i = 0; i != num_cfs; ++i) {    cf_ids.emplace_back(static_cast<int>(i));  }  ASSERT_OK(Flush(cf_ids));  for (size_t i = 0; i != num_cfs; ++i) {    auto cfh = static_cast<ColumnFamilyHandleImpl*>(handles_[i]);    ASSERT_EQ(0, cfh->cfd()->imm()->NumNotFlushed());    ASSERT_TRUE(cfh->cfd()->mem()->IsEmpty());  }}TEST_P(DBAtomicFlushTest, AtomicFlushTriggeredByMemTableFull) {  Options options = CurrentOptions();  options.create_if_missing = true;  options.atomic_flush = GetParam();  // 4KB so that we can easily trigger auto flush.  options.write_buffer_size = 4096;  SyncPoint::GetInstance()->LoadDependency(      {{"DBImpl::BackgroundCallFlush:FlushFinish:0",        "DBAtomicFlushTest::AtomicFlushTriggeredByMemTableFull:BeforeCheck"}});  SyncPoint::GetInstance()->EnableProcessing();  CreateAndReopenWithCF({"pikachu", "eevee"}, options);  size_t num_cfs = handles_.size();  ASSERT_EQ(3, num_cfs);  WriteOptions wopts;  wopts.disableWAL = true;  for (size_t i = 0; i != num_cfs; ++i) {    ASSERT_OK(Put(static_cast<int>(i) /*cf*/, "key", "value", wopts));  }  // Keep writing to one of them column families to trigger auto flush.  for (int i = 0; i != 4000; ++i) {    ASSERT_OK(Put(static_cast<int>(num_cfs) - 1 /*cf*/,                  "key" + std::to_string(i), "value" + std::to_string(i),                  wopts));  }  TEST_SYNC_POINT(      "DBAtomicFlushTest::AtomicFlushTriggeredByMemTableFull:BeforeCheck");  if (options.atomic_flush) {    for (size_t i = 0; i != num_cfs - 1; ++i) {      auto cfh = static_cast<ColumnFamilyHandleImpl*>(handles_[i]);      ASSERT_EQ(0, cfh->cfd()->imm()->NumNotFlushed());      ASSERT_TRUE(cfh->cfd()->mem()->IsEmpty());    }  } else {    for (size_t i = 0; i != num_cfs - 1; ++i) {      auto cfh = static_cast<ColumnFamilyHandleImpl*>(handles_[i]);      ASSERT_EQ(0, cfh->cfd()->imm()->NumNotFlushed());      ASSERT_FALSE(cfh->cfd()->mem()->IsEmpty());    }  }  SyncPoint::GetInstance()->DisableProcessing();}TEST_P(DBAtomicFlushTest, AtomicFlushRollbackSomeJobs) {  bool atomic_flush = GetParam();  if (!atomic_flush) {    return;  }  std::unique_ptr<FaultInjectionTestEnv> fault_injection_env(      new FaultInjectionTestEnv(env_));  Options options = CurrentOptions();  options.create_if_missing = true;  options.atomic_flush = atomic_flush;  options.env = fault_injection_env.get();  SyncPoint::GetInstance()->DisableProcessing();  SyncPoint::GetInstance()->LoadDependency(      {{"DBImpl::AtomicFlushMemTablesToOutputFiles:SomeFlushJobsComplete:1",        "DBAtomicFlushTest::AtomicFlushRollbackSomeJobs:1"},       {"DBAtomicFlushTest::AtomicFlushRollbackSomeJobs:2",        "DBImpl::AtomicFlushMemTablesToOutputFiles:SomeFlushJobsComplete:2"}});  SyncPoint::GetInstance()->EnableProcessing();  CreateAndReopenWithCF({"pikachu", "eevee"}, options);  size_t num_cfs = handles_.size();  ASSERT_EQ(3, num_cfs);  WriteOptions wopts;  wopts.disableWAL = true;  for (size_t i = 0; i != num_cfs; ++i) {    int cf_id = static_cast<int>(i);    ASSERT_OK(Put(cf_id, "key", "value", wopts));  }  FlushOptions flush_opts;  flush_opts.wait = false;  ASSERT_OK(dbfull()->Flush(flush_opts, handles_));  TEST_SYNC_POINT("DBAtomicFlushTest::AtomicFlushRollbackSomeJobs:1");  fault_injection_env->SetFilesystemActive(false);  TEST_SYNC_POINT("DBAtomicFlushTest::AtomicFlushRollbackSomeJobs:2");  for (auto* cfh : handles_) {    dbfull()->TEST_WaitForFlushMemTable(cfh);  }  for (size_t i = 0; i != num_cfs; ++i) {    auto cfh = static_cast<ColumnFamilyHandleImpl*>(handles_[i]);    ASSERT_EQ(1, cfh->cfd()->imm()->NumNotFlushed());    ASSERT_TRUE(cfh->cfd()->mem()->IsEmpty());  }  fault_injection_env->SetFilesystemActive(true);  Destroy(options);}TEST_P(DBAtomicFlushTest, FlushMultipleCFs_DropSomeBeforeRequestFlush) {  bool atomic_flush = GetParam();  if (!atomic_flush) {    return;  }  Options options = CurrentOptions();  options.create_if_missing = true;  options.atomic_flush = atomic_flush;  SyncPoint::GetInstance()->DisableProcessing();  SyncPoint::GetInstance()->ClearAllCallBacks();  SyncPoint::GetInstance()->EnableProcessing();  CreateAndReopenWithCF({"pikachu", "eevee"}, options);  size_t num_cfs = handles_.size();  ASSERT_EQ(3, num_cfs);  WriteOptions wopts;  wopts.disableWAL = true;  std::vector<int> cf_ids;  for (size_t i = 0; i != num_cfs; ++i) {    int cf_id = static_cast<int>(i);    ASSERT_OK(Put(cf_id, "key", "value", wopts));    cf_ids.push_back(cf_id);  }  ASSERT_OK(dbfull()->DropColumnFamily(handles_[1]));  ASSERT_TRUE(Flush(cf_ids).IsColumnFamilyDropped());  Destroy(options);}TEST_P(DBAtomicFlushTest,       FlushMultipleCFs_DropSomeAfterScheduleFlushBeforeFlushJobRun) {  bool atomic_flush = GetParam();  if (!atomic_flush) {    return;  }  Options options = CurrentOptions();  options.create_if_missing = true;  options.atomic_flush = atomic_flush;  CreateAndReopenWithCF({"pikachu", "eevee"}, options);  SyncPoint::GetInstance()->DisableProcessing();  SyncPoint::GetInstance()->ClearAllCallBacks();  SyncPoint::GetInstance()->LoadDependency(      {{"DBImpl::AtomicFlushMemTables:AfterScheduleFlush",        "DBAtomicFlushTest::BeforeDropCF"},       {"DBAtomicFlushTest::AfterDropCF",        "DBImpl::BackgroundCallFlush:start"}});  SyncPoint::GetInstance()->EnableProcessing();  size_t num_cfs = handles_.size();  ASSERT_EQ(3, num_cfs);  WriteOptions wopts;  wopts.disableWAL = true;  for (size_t i = 0; i != num_cfs; ++i) {    int cf_id = static_cast<int>(i);    ASSERT_OK(Put(cf_id, "key", "value", wopts));  }  port::Thread user_thread([&]() {    TEST_SYNC_POINT("DBAtomicFlushTest::BeforeDropCF");    ASSERT_OK(dbfull()->DropColumnFamily(handles_[1]));    TEST_SYNC_POINT("DBAtomicFlushTest::AfterDropCF");  });  FlushOptions flush_opts;  flush_opts.wait = true;  ASSERT_OK(dbfull()->Flush(flush_opts, handles_));  user_thread.join();  for (size_t i = 0; i != num_cfs; ++i) {    int cf_id = static_cast<int>(i);    ASSERT_EQ("value", Get(cf_id, "key"));  }  ReopenWithColumnFamilies({kDefaultColumnFamilyName, "eevee"}, options);  num_cfs = handles_.size();  ASSERT_EQ(2, num_cfs);  for (size_t i = 0; i != num_cfs; ++i) {    int cf_id = static_cast<int>(i);    ASSERT_EQ("value", Get(cf_id, "key"));  }  Destroy(options);}TEST_P(DBAtomicFlushTest, TriggerFlushAndClose) {  bool atomic_flush = GetParam();  if (!atomic_flush) {    return;  }  const int kNumKeysTriggerFlush = 4;  Options options = CurrentOptions();  options.create_if_missing = true;  options.atomic_flush = atomic_flush;  options.memtable_factory.reset(      new SpecialSkipListFactory(kNumKeysTriggerFlush));  CreateAndReopenWithCF({"pikachu"}, options);  for (int i = 0; i != kNumKeysTriggerFlush; ++i) {    ASSERT_OK(Put(0, "key" + std::to_string(i), "value" + std::to_string(i)));  }  SyncPoint::GetInstance()->DisableProcessing();  SyncPoint::GetInstance()->ClearAllCallBacks();  SyncPoint::GetInstance()->EnableProcessing();  ASSERT_OK(Put(0, "key", "value"));  Close();  ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options);  ASSERT_EQ("value", Get(0, "key"));}TEST_P(DBAtomicFlushTest, PickMemtablesRaceWithBackgroundFlush) {  bool atomic_flush = GetParam();  Options options = CurrentOptions();  options.create_if_missing = true;  options.atomic_flush = atomic_flush;  options.max_write_buffer_number = 4;  // Set min_write_buffer_number_to_merge to be greater than 1, so that  // a column family with one memtable in the imm will not cause IsFlushPending  // to return true when flush_requested_ is false.  options.min_write_buffer_number_to_merge = 2;  CreateAndReopenWithCF({"pikachu"}, options);  ASSERT_EQ(2, handles_.size());  ASSERT_OK(dbfull()->PauseBackgroundWork());  ASSERT_OK(Put(0, "key00", "value00"));  ASSERT_OK(Put(1, "key10", "value10"));  FlushOptions flush_opts;  flush_opts.wait = false;  ASSERT_OK(dbfull()->Flush(flush_opts, handles_));  ASSERT_OK(Put(0, "key01", "value01"));  // Since max_write_buffer_number is 4, the following flush won't cause write  // stall.  ASSERT_OK(dbfull()->Flush(flush_opts));  ASSERT_OK(dbfull()->DropColumnFamily(handles_[1]));  ASSERT_OK(dbfull()->DestroyColumnFamilyHandle(handles_[1]));  handles_[1] = nullptr;  ASSERT_OK(dbfull()->ContinueBackgroundWork());  ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[0]));  delete handles_[0];  handles_.clear();}TEST_P(DBAtomicFlushTest, CFDropRaceWithWaitForFlushMemTables) {  bool atomic_flush = GetParam();  if (!atomic_flush) {    return;  }  Options options = CurrentOptions();  options.create_if_missing = true;  options.atomic_flush = atomic_flush;  CreateAndReopenWithCF({"pikachu"}, options);  SyncPoint::GetInstance()->DisableProcessing();  SyncPoint::GetInstance()->LoadDependency(      {{"DBImpl::AtomicFlushMemTables:AfterScheduleFlush",        "DBAtomicFlushTest::CFDropRaceWithWaitForFlushMemTables:BeforeDrop"},       {"DBAtomicFlushTest::CFDropRaceWithWaitForFlushMemTables:AfterFree",        "DBImpl::BackgroundCallFlush:start"},       {"DBImpl::BackgroundCallFlush:start",        "DBImpl::AtomicFlushMemTables:BeforeWaitForBgFlush"}});  SyncPoint::GetInstance()->EnableProcessing();  ASSERT_EQ(2, handles_.size());  ASSERT_OK(Put(0, "key", "value"));  ASSERT_OK(Put(1, "key", "value"));  auto* cfd_default =      static_cast<ColumnFamilyHandleImpl*>(dbfull()->DefaultColumnFamily())          ->cfd();  auto* cfd_pikachu = static_cast<ColumnFamilyHandleImpl*>(handles_[1])->cfd();  port::Thread drop_cf_thr([&]() {    TEST_SYNC_POINT(        "DBAtomicFlushTest::CFDropRaceWithWaitForFlushMemTables:BeforeDrop");    ASSERT_OK(dbfull()->DropColumnFamily(handles_[1]));    delete handles_[1];    handles_.resize(1);    TEST_SYNC_POINT(        "DBAtomicFlushTest::CFDropRaceWithWaitForFlushMemTables:AfterFree");  });  FlushOptions flush_opts;  flush_opts.allow_write_stall = true;  ASSERT_OK(dbfull()->TEST_AtomicFlushMemTables({cfd_default, cfd_pikachu},                                                flush_opts));  drop_cf_thr.join();  Close();  SyncPoint::GetInstance()->DisableProcessing();}TEST_P(DBAtomicFlushTest, RollbackAfterFailToInstallResults) {  bool atomic_flush = GetParam();  if (!atomic_flush) {    return;  }  auto fault_injection_env = std::make_shared<FaultInjectionTestEnv>(env_);  Options options = CurrentOptions();  options.env = fault_injection_env.get();  options.create_if_missing = true;  options.atomic_flush = atomic_flush;  CreateAndReopenWithCF({"pikachu"}, options);  ASSERT_EQ(2, handles_.size());  for (size_t cf = 0; cf < handles_.size(); ++cf) {    ASSERT_OK(Put(static_cast<int>(cf), "a", "value"));  }  SyncPoint::GetInstance()->DisableProcessing();  SyncPoint::GetInstance()->ClearAllCallBacks();  SyncPoint::GetInstance()->SetCallBack(      "VersionSet::ProcessManifestWrites:BeforeWriteLastVersionEdit:0",      [&](void* /*arg*/) { fault_injection_env->SetFilesystemActive(false); });  SyncPoint::GetInstance()->EnableProcessing();  FlushOptions flush_opts;  Status s = db_->Flush(flush_opts, handles_);  ASSERT_NOK(s);  fault_injection_env->SetFilesystemActive(true);  Close();  SyncPoint::GetInstance()->ClearAllCallBacks();}INSTANTIATE_TEST_CASE_P(DBFlushDirectIOTest, DBFlushDirectIOTest,                        testing::Bool());INSTANTIATE_TEST_CASE_P(DBAtomicFlushTest, DBAtomicFlushTest, testing::Bool());}  // namespace ROCKSDB_NAMESPACEint main(int argc, char** argv) {  ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();  ::testing::InitGoogleTest(&argc, argv);  return RUN_ALL_TESTS();}
 |