| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329 |
- // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
- // This source code is licensed under both the GPLv2 (found in the
- // COPYING file in the root directory) and Apache 2.0 License
- // (found in the LICENSE.Apache file in the root directory).
- #include <atomic>
- #include <memory>
- #include <thread>
- #include <vector>
- #include <fstream>
- #include "db/db_test_util.h"
- #include "db/write_batch_internal.h"
- #include "db/write_thread.h"
- #include "port/port.h"
- #include "port/stack_trace.h"
- #include "test_util/fault_injection_test_env.h"
- #include "test_util/sync_point.h"
- #include "util/string_util.h"
- namespace ROCKSDB_NAMESPACE {
- // Test variations of WriteImpl.
- class DBWriteTest : public DBTestBase, public testing::WithParamInterface<int> {
- public:
- DBWriteTest() : DBTestBase("/db_write_test") {}
- Options GetOptions() { return DBTestBase::GetOptions(GetParam()); }
- void Open() { DBTestBase::Reopen(GetOptions()); }
- };
- // It is invalid to do sync write while disabling WAL.
- TEST_P(DBWriteTest, SyncAndDisableWAL) {
- WriteOptions write_options;
- write_options.sync = true;
- write_options.disableWAL = true;
- ASSERT_TRUE(dbfull()->Put(write_options, "foo", "bar").IsInvalidArgument());
- WriteBatch batch;
- ASSERT_OK(batch.Put("foo", "bar"));
- ASSERT_TRUE(dbfull()->Write(write_options, &batch).IsInvalidArgument());
- }
- TEST_P(DBWriteTest, WriteThreadHangOnWriteStall) {
- Options options = GetOptions();
- options.level0_stop_writes_trigger = options.level0_slowdown_writes_trigger = 4;
- std::vector<port::Thread> threads;
- std::atomic<int> thread_num(0);
- port::Mutex mutex;
- port::CondVar cv(&mutex);
- Reopen(options);
- std::function<void()> write_slowdown_func = [&]() {
- int a = thread_num.fetch_add(1);
- std::string key = "foo" + std::to_string(a);
- WriteOptions wo;
- wo.no_slowdown = false;
- dbfull()->Put(wo, key, "bar");
- };
- std::function<void()> write_no_slowdown_func = [&]() {
- int a = thread_num.fetch_add(1);
- std::string key = "foo" + std::to_string(a);
- WriteOptions wo;
- wo.no_slowdown = true;
- dbfull()->Put(wo, key, "bar");
- };
- std::function<void(void *)> unblock_main_thread_func = [&](void *) {
- mutex.Lock();
- cv.SignalAll();
- mutex.Unlock();
- };
- // Create 3 L0 files and schedule 4th without waiting
- Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar");
- Flush();
- Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar");
- Flush();
- Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar");
- Flush();
- Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar");
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
- "WriteThread::JoinBatchGroup:Start", unblock_main_thread_func);
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
- {{"DBWriteTest::WriteThreadHangOnWriteStall:1",
- "DBImpl::BackgroundCallFlush:start"},
- {"DBWriteTest::WriteThreadHangOnWriteStall:2",
- "DBImpl::WriteImpl:BeforeLeaderEnters"},
- // Make compaction start wait for the write stall to be detected and
- // implemented by a write group leader
- {"DBWriteTest::WriteThreadHangOnWriteStall:3",
- "BackgroundCallCompaction:0"}});
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
- // Schedule creation of 4th L0 file without waiting. This will seal the
- // memtable and then wait for a sync point before writing the file. We need
- // to do it this way because SwitchMemtable() needs to enter the
- // write_thread
- FlushOptions fopt;
- fopt.wait = false;
- dbfull()->Flush(fopt);
- // Create a mix of slowdown/no_slowdown write threads
- mutex.Lock();
- // First leader
- threads.emplace_back(write_slowdown_func);
- cv.Wait();
- // Second leader. Will stall writes
- threads.emplace_back(write_slowdown_func);
- cv.Wait();
- threads.emplace_back(write_no_slowdown_func);
- cv.Wait();
- threads.emplace_back(write_slowdown_func);
- cv.Wait();
- threads.emplace_back(write_no_slowdown_func);
- cv.Wait();
- threads.emplace_back(write_slowdown_func);
- cv.Wait();
- mutex.Unlock();
- TEST_SYNC_POINT("DBWriteTest::WriteThreadHangOnWriteStall:1");
- dbfull()->TEST_WaitForFlushMemTable(nullptr);
- // This would have triggered a write stall. Unblock the write group leader
- TEST_SYNC_POINT("DBWriteTest::WriteThreadHangOnWriteStall:2");
- // The leader is going to create missing newer links. When the leader finishes,
- // the next leader is going to delay writes and fail writers with no_slowdown
- TEST_SYNC_POINT("DBWriteTest::WriteThreadHangOnWriteStall:3");
- for (auto& t : threads) {
- t.join();
- }
- }
- TEST_P(DBWriteTest, IOErrorOnWALWritePropagateToWriteThreadFollower) {
- constexpr int kNumThreads = 5;
- std::unique_ptr<FaultInjectionTestEnv> mock_env(
- new FaultInjectionTestEnv(Env::Default()));
- Options options = GetOptions();
- options.env = mock_env.get();
- Reopen(options);
- std::atomic<int> ready_count{0};
- std::atomic<int> leader_count{0};
- std::vector<port::Thread> threads;
- mock_env->SetFilesystemActive(false);
- // Wait until all threads linked to write threads, to make sure
- // all threads join the same batch group.
- SyncPoint::GetInstance()->SetCallBack(
- "WriteThread::JoinBatchGroup:Wait", [&](void* arg) {
- ready_count++;
- auto* w = reinterpret_cast<WriteThread::Writer*>(arg);
- if (w->state == WriteThread::STATE_GROUP_LEADER) {
- leader_count++;
- while (ready_count < kNumThreads) {
- // busy waiting
- }
- }
- });
- SyncPoint::GetInstance()->EnableProcessing();
- for (int i = 0; i < kNumThreads; i++) {
- threads.push_back(port::Thread(
- [&](int index) {
- // All threads should fail.
- auto res = Put("key" + ToString(index), "value");
- if (options.manual_wal_flush) {
- ASSERT_TRUE(res.ok());
- // we should see fs error when we do the flush
- // TSAN reports a false alarm for lock-order-inversion but Open and
- // FlushWAL are not run concurrently. Disabling this until TSAN is
- // fixed.
- // res = dbfull()->FlushWAL(false);
- // ASSERT_FALSE(res.ok());
- } else {
- ASSERT_FALSE(res.ok());
- }
- },
- i));
- }
- for (int i = 0; i < kNumThreads; i++) {
- threads[i].join();
- }
- ASSERT_EQ(1, leader_count);
- // Close before mock_env destruct.
- Close();
- }
- TEST_P(DBWriteTest, ManualWalFlushInEffect) {
- Options options = GetOptions();
- Reopen(options);
- // try the 1st WAL created during open
- ASSERT_TRUE(Put("key" + ToString(0), "value").ok());
- ASSERT_TRUE(options.manual_wal_flush != dbfull()->TEST_WALBufferIsEmpty());
- ASSERT_TRUE(dbfull()->FlushWAL(false).ok());
- ASSERT_TRUE(dbfull()->TEST_WALBufferIsEmpty());
- // try the 2nd wal created during SwitchWAL
- dbfull()->TEST_SwitchWAL();
- ASSERT_TRUE(Put("key" + ToString(0), "value").ok());
- ASSERT_TRUE(options.manual_wal_flush != dbfull()->TEST_WALBufferIsEmpty());
- ASSERT_TRUE(dbfull()->FlushWAL(false).ok());
- ASSERT_TRUE(dbfull()->TEST_WALBufferIsEmpty());
- }
- TEST_P(DBWriteTest, IOErrorOnWALWriteTriggersReadOnlyMode) {
- std::unique_ptr<FaultInjectionTestEnv> mock_env(
- new FaultInjectionTestEnv(Env::Default()));
- Options options = GetOptions();
- options.env = mock_env.get();
- Reopen(options);
- for (int i = 0; i < 2; i++) {
- // Forcibly fail WAL write for the first Put only. Subsequent Puts should
- // fail due to read-only mode
- mock_env->SetFilesystemActive(i != 0);
- auto res = Put("key" + ToString(i), "value");
- // TSAN reports a false alarm for lock-order-inversion but Open and
- // FlushWAL are not run concurrently. Disabling this until TSAN is
- // fixed.
- /*
- if (options.manual_wal_flush && i == 0) {
- // even with manual_wal_flush the 2nd Put should return error because of
- // the read-only mode
- ASSERT_TRUE(res.ok());
- // we should see fs error when we do the flush
- res = dbfull()->FlushWAL(false);
- }
- */
- if (!options.manual_wal_flush) {
- ASSERT_FALSE(res.ok());
- }
- }
- // Close before mock_env destruct.
- Close();
- }
- TEST_P(DBWriteTest, IOErrorOnSwitchMemtable) {
- Random rnd(301);
- std::unique_ptr<FaultInjectionTestEnv> mock_env(
- new FaultInjectionTestEnv(Env::Default()));
- Options options = GetOptions();
- options.env = mock_env.get();
- options.writable_file_max_buffer_size = 4 * 1024 * 1024;
- options.write_buffer_size = 3 * 512 * 1024;
- options.wal_bytes_per_sync = 256 * 1024;
- options.manual_wal_flush = true;
- Reopen(options);
- mock_env->SetFilesystemActive(false, Status::IOError("Not active"));
- Status s;
- for (int i = 0; i < 4 * 512; ++i) {
- s = Put(Key(i), RandomString(&rnd, 1024));
- if (!s.ok()) {
- break;
- }
- }
- ASSERT_EQ(s.severity(), Status::Severity::kFatalError);
- mock_env->SetFilesystemActive(true);
- // Close before mock_env destruct.
- Close();
- }
- // Test that db->LockWAL() flushes the WAL after locking.
- TEST_P(DBWriteTest, LockWalInEffect) {
- Options options = GetOptions();
- Reopen(options);
- // try the 1st WAL created during open
- ASSERT_OK(Put("key" + ToString(0), "value"));
- ASSERT_TRUE(options.manual_wal_flush != dbfull()->TEST_WALBufferIsEmpty());
- ASSERT_OK(dbfull()->LockWAL());
- ASSERT_TRUE(dbfull()->TEST_WALBufferIsEmpty(false));
- ASSERT_OK(dbfull()->UnlockWAL());
- // try the 2nd wal created during SwitchWAL
- dbfull()->TEST_SwitchWAL();
- ASSERT_OK(Put("key" + ToString(0), "value"));
- ASSERT_TRUE(options.manual_wal_flush != dbfull()->TEST_WALBufferIsEmpty());
- ASSERT_OK(dbfull()->LockWAL());
- ASSERT_TRUE(dbfull()->TEST_WALBufferIsEmpty(false));
- ASSERT_OK(dbfull()->UnlockWAL());
- }
- TEST_P(DBWriteTest, ConcurrentlyDisabledWAL) {
- Options options = GetOptions();
- options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
- options.statistics->set_stats_level(StatsLevel::kAll);
- Reopen(options);
- std::string wal_key_prefix = "WAL_KEY_";
- std::string no_wal_key_prefix = "K_";
- // 100 KB value each for NO-WAL operation
- std::string no_wal_value(1024 * 100, 'X');
- // 1B value each for WAL operation
- std::string wal_value = "0";
- std::thread threads[10];
- for (int t = 0; t < 10; t++) {
- threads[t] = std::thread([t, wal_key_prefix, wal_value, no_wal_key_prefix, no_wal_value, this] {
- for(int i = 0; i < 10; i++) {
- ROCKSDB_NAMESPACE::WriteOptions write_option_disable;
- write_option_disable.disableWAL = true;
- ROCKSDB_NAMESPACE::WriteOptions write_option_default;
- std::string no_wal_key = no_wal_key_prefix + std::to_string(t) +
- "_" + std::to_string(i);
- this->Put(no_wal_key, no_wal_value, write_option_disable);
- std::string wal_key =
- wal_key_prefix + std::to_string(i) + "_" + std::to_string(i);
- this->Put(wal_key, wal_value, write_option_default);
- dbfull()->SyncWAL();
- }
- return 0;
- });
- }
- for (auto& t: threads) {
- t.join();
- }
- uint64_t bytes_num = options.statistics->getTickerCount(
- ROCKSDB_NAMESPACE::Tickers::WAL_FILE_BYTES);
- // written WAL size should less than 100KB (even included HEADER & FOOTER overhead)
- ASSERT_LE(bytes_num, 1024 * 100);
- }
- INSTANTIATE_TEST_CASE_P(DBWriteTestInstance, DBWriteTest,
- testing::Values(DBTestBase::kDefault,
- DBTestBase::kConcurrentWALWrites,
- DBTestBase::kPipelinedWrite));
- } // namespace ROCKSDB_NAMESPACE
- int main(int argc, char** argv) {
- ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
- ::testing::InitGoogleTest(&argc, argv);
- return RUN_ALL_TESTS();
- }
|