| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329 | //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.//  This source code is licensed under both the GPLv2 (found in the//  COPYING file in the root directory) and Apache 2.0 License//  (found in the LICENSE.Apache file in the root directory).#include <atomic>#include <memory>#include <thread>#include <vector>#include <fstream>#include "db/db_test_util.h"#include "db/write_batch_internal.h"#include "db/write_thread.h"#include "port/port.h"#include "port/stack_trace.h"#include "test_util/fault_injection_test_env.h"#include "test_util/sync_point.h"#include "util/string_util.h"namespace ROCKSDB_NAMESPACE {// Test variations of WriteImpl.class DBWriteTest : public DBTestBase, public testing::WithParamInterface<int> { public:  DBWriteTest() : DBTestBase("/db_write_test") {}  Options GetOptions() { return DBTestBase::GetOptions(GetParam()); }  void Open() { DBTestBase::Reopen(GetOptions()); }};// It is invalid to do sync write while disabling WAL.TEST_P(DBWriteTest, SyncAndDisableWAL) {  WriteOptions write_options;  write_options.sync = true;  write_options.disableWAL = true;  ASSERT_TRUE(dbfull()->Put(write_options, "foo", "bar").IsInvalidArgument());  WriteBatch batch;  ASSERT_OK(batch.Put("foo", "bar"));  ASSERT_TRUE(dbfull()->Write(write_options, &batch).IsInvalidArgument());}TEST_P(DBWriteTest, WriteThreadHangOnWriteStall) {  Options options = GetOptions();  options.level0_stop_writes_trigger = options.level0_slowdown_writes_trigger = 4;  std::vector<port::Thread> threads;  std::atomic<int> thread_num(0);  port::Mutex mutex;  port::CondVar cv(&mutex);  Reopen(options);  std::function<void()> write_slowdown_func = [&]() {    int a = thread_num.fetch_add(1);    std::string key = "foo" + std::to_string(a);    WriteOptions wo;    wo.no_slowdown = false;    dbfull()->Put(wo, key, "bar");  };  std::function<void()> write_no_slowdown_func = [&]() {    int a = thread_num.fetch_add(1);    std::string key = "foo" + std::to_string(a);    WriteOptions wo;    wo.no_slowdown = true;    dbfull()->Put(wo, key, "bar");  };  std::function<void(void *)> unblock_main_thread_func = [&](void *) {    mutex.Lock();    cv.SignalAll();    mutex.Unlock();  };  // Create 3 L0 files and schedule 4th without waiting  Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar");  Flush();  Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar");  Flush();  Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar");  Flush();  Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar");  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(      "WriteThread::JoinBatchGroup:Start", unblock_main_thread_func);  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(      {{"DBWriteTest::WriteThreadHangOnWriteStall:1",        "DBImpl::BackgroundCallFlush:start"},       {"DBWriteTest::WriteThreadHangOnWriteStall:2",        "DBImpl::WriteImpl:BeforeLeaderEnters"},       // Make compaction start wait for the write stall to be detected and       // implemented by a write group leader       {"DBWriteTest::WriteThreadHangOnWriteStall:3",        "BackgroundCallCompaction:0"}});  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();  // Schedule creation of 4th L0 file without waiting. This will seal the  // memtable and then wait for a sync point before writing the file. We need  // to do it this way because SwitchMemtable() needs to enter the  // write_thread  FlushOptions fopt;  fopt.wait = false;  dbfull()->Flush(fopt);  // Create a mix of slowdown/no_slowdown write threads  mutex.Lock();  // First leader  threads.emplace_back(write_slowdown_func);  cv.Wait();  // Second leader. Will stall writes  threads.emplace_back(write_slowdown_func);  cv.Wait();  threads.emplace_back(write_no_slowdown_func);  cv.Wait();  threads.emplace_back(write_slowdown_func);  cv.Wait();  threads.emplace_back(write_no_slowdown_func);  cv.Wait();  threads.emplace_back(write_slowdown_func);  cv.Wait();  mutex.Unlock();  TEST_SYNC_POINT("DBWriteTest::WriteThreadHangOnWriteStall:1");  dbfull()->TEST_WaitForFlushMemTable(nullptr);  // This would have triggered a write stall. Unblock the write group leader  TEST_SYNC_POINT("DBWriteTest::WriteThreadHangOnWriteStall:2");  // The leader is going to create missing newer links. When the leader finishes,  // the next leader is going to delay writes and fail writers with no_slowdown  TEST_SYNC_POINT("DBWriteTest::WriteThreadHangOnWriteStall:3");  for (auto& t : threads) {    t.join();  }}TEST_P(DBWriteTest, IOErrorOnWALWritePropagateToWriteThreadFollower) {  constexpr int kNumThreads = 5;  std::unique_ptr<FaultInjectionTestEnv> mock_env(      new FaultInjectionTestEnv(Env::Default()));  Options options = GetOptions();  options.env = mock_env.get();  Reopen(options);  std::atomic<int> ready_count{0};  std::atomic<int> leader_count{0};  std::vector<port::Thread> threads;  mock_env->SetFilesystemActive(false);  // Wait until all threads linked to write threads, to make sure  // all threads join the same batch group.  SyncPoint::GetInstance()->SetCallBack(      "WriteThread::JoinBatchGroup:Wait", [&](void* arg) {        ready_count++;        auto* w = reinterpret_cast<WriteThread::Writer*>(arg);        if (w->state == WriteThread::STATE_GROUP_LEADER) {          leader_count++;          while (ready_count < kNumThreads) {            // busy waiting          }        }      });  SyncPoint::GetInstance()->EnableProcessing();  for (int i = 0; i < kNumThreads; i++) {    threads.push_back(port::Thread(        [&](int index) {          // All threads should fail.          auto res = Put("key" + ToString(index), "value");          if (options.manual_wal_flush) {            ASSERT_TRUE(res.ok());            // we should see fs error when we do the flush            // TSAN reports a false alarm for lock-order-inversion but Open and            // FlushWAL are not run concurrently. Disabling this until TSAN is            // fixed.            // res = dbfull()->FlushWAL(false);            // ASSERT_FALSE(res.ok());          } else {            ASSERT_FALSE(res.ok());          }        },        i));  }  for (int i = 0; i < kNumThreads; i++) {    threads[i].join();  }  ASSERT_EQ(1, leader_count);  // Close before mock_env destruct.  Close();}TEST_P(DBWriteTest, ManualWalFlushInEffect) {  Options options = GetOptions();  Reopen(options);  // try the 1st WAL created during open  ASSERT_TRUE(Put("key" + ToString(0), "value").ok());  ASSERT_TRUE(options.manual_wal_flush != dbfull()->TEST_WALBufferIsEmpty());  ASSERT_TRUE(dbfull()->FlushWAL(false).ok());  ASSERT_TRUE(dbfull()->TEST_WALBufferIsEmpty());  // try the 2nd wal created during SwitchWAL  dbfull()->TEST_SwitchWAL();  ASSERT_TRUE(Put("key" + ToString(0), "value").ok());  ASSERT_TRUE(options.manual_wal_flush != dbfull()->TEST_WALBufferIsEmpty());  ASSERT_TRUE(dbfull()->FlushWAL(false).ok());  ASSERT_TRUE(dbfull()->TEST_WALBufferIsEmpty());}TEST_P(DBWriteTest, IOErrorOnWALWriteTriggersReadOnlyMode) {  std::unique_ptr<FaultInjectionTestEnv> mock_env(      new FaultInjectionTestEnv(Env::Default()));  Options options = GetOptions();  options.env = mock_env.get();  Reopen(options);  for (int i = 0; i < 2; i++) {    // Forcibly fail WAL write for the first Put only. Subsequent Puts should    // fail due to read-only mode    mock_env->SetFilesystemActive(i != 0);    auto res = Put("key" + ToString(i), "value");    // TSAN reports a false alarm for lock-order-inversion but Open and    // FlushWAL are not run concurrently. Disabling this until TSAN is    // fixed.    /*    if (options.manual_wal_flush && i == 0) {      // even with manual_wal_flush the 2nd Put should return error because of      // the read-only mode      ASSERT_TRUE(res.ok());      // we should see fs error when we do the flush      res = dbfull()->FlushWAL(false);    }    */    if (!options.manual_wal_flush) {      ASSERT_FALSE(res.ok());    }  }  // Close before mock_env destruct.  Close();}TEST_P(DBWriteTest, IOErrorOnSwitchMemtable) {  Random rnd(301);  std::unique_ptr<FaultInjectionTestEnv> mock_env(      new FaultInjectionTestEnv(Env::Default()));  Options options = GetOptions();  options.env = mock_env.get();  options.writable_file_max_buffer_size = 4 * 1024 * 1024;  options.write_buffer_size = 3 * 512 * 1024;  options.wal_bytes_per_sync = 256 * 1024;  options.manual_wal_flush = true;  Reopen(options);  mock_env->SetFilesystemActive(false, Status::IOError("Not active"));  Status s;  for (int i = 0; i < 4 * 512; ++i) {    s = Put(Key(i), RandomString(&rnd, 1024));    if (!s.ok()) {      break;    }  }  ASSERT_EQ(s.severity(), Status::Severity::kFatalError);  mock_env->SetFilesystemActive(true);  // Close before mock_env destruct.  Close();}// Test that db->LockWAL() flushes the WAL after locking.TEST_P(DBWriteTest, LockWalInEffect) {  Options options = GetOptions();  Reopen(options);  // try the 1st WAL created during open  ASSERT_OK(Put("key" + ToString(0), "value"));  ASSERT_TRUE(options.manual_wal_flush != dbfull()->TEST_WALBufferIsEmpty());  ASSERT_OK(dbfull()->LockWAL());  ASSERT_TRUE(dbfull()->TEST_WALBufferIsEmpty(false));  ASSERT_OK(dbfull()->UnlockWAL());  // try the 2nd wal created during SwitchWAL  dbfull()->TEST_SwitchWAL();  ASSERT_OK(Put("key" + ToString(0), "value"));  ASSERT_TRUE(options.manual_wal_flush != dbfull()->TEST_WALBufferIsEmpty());  ASSERT_OK(dbfull()->LockWAL());  ASSERT_TRUE(dbfull()->TEST_WALBufferIsEmpty(false));  ASSERT_OK(dbfull()->UnlockWAL());}TEST_P(DBWriteTest, ConcurrentlyDisabledWAL) {    Options options = GetOptions();    options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();    options.statistics->set_stats_level(StatsLevel::kAll);    Reopen(options);    std::string wal_key_prefix = "WAL_KEY_";    std::string no_wal_key_prefix = "K_";    // 100 KB value each for NO-WAL operation    std::string no_wal_value(1024 * 100, 'X');    // 1B value each for WAL operation    std::string wal_value = "0";    std::thread threads[10];    for (int t = 0; t < 10; t++) {        threads[t] = std::thread([t, wal_key_prefix, wal_value, no_wal_key_prefix, no_wal_value, this] {            for(int i = 0; i < 10; i++) {              ROCKSDB_NAMESPACE::WriteOptions write_option_disable;              write_option_disable.disableWAL = true;              ROCKSDB_NAMESPACE::WriteOptions write_option_default;              std::string no_wal_key = no_wal_key_prefix + std::to_string(t) +                                       "_" + std::to_string(i);              this->Put(no_wal_key, no_wal_value, write_option_disable);              std::string wal_key =                  wal_key_prefix + std::to_string(i) + "_" + std::to_string(i);              this->Put(wal_key, wal_value, write_option_default);              dbfull()->SyncWAL();            }            return 0;        });    }    for (auto& t: threads) {        t.join();    }    uint64_t bytes_num = options.statistics->getTickerCount(        ROCKSDB_NAMESPACE::Tickers::WAL_FILE_BYTES);    // written WAL size should less than 100KB (even included HEADER & FOOTER overhead)    ASSERT_LE(bytes_num, 1024 * 100);}INSTANTIATE_TEST_CASE_P(DBWriteTestInstance, DBWriteTest,                        testing::Values(DBTestBase::kDefault,                                        DBTestBase::kConcurrentWALWrites,                                        DBTestBase::kPipelinedWrite));}  // namespace ROCKSDB_NAMESPACEint main(int argc, char** argv) {  ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();  ::testing::InitGoogleTest(&argc, argv);  return RUN_ALL_TESTS();}
 |