db_write_test.cc 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #include <atomic>
  6. #include <memory>
  7. #include <thread>
  8. #include <vector>
  9. #include <fstream>
  10. #include "db/db_test_util.h"
  11. #include "db/write_batch_internal.h"
  12. #include "db/write_thread.h"
  13. #include "port/port.h"
  14. #include "port/stack_trace.h"
  15. #include "test_util/fault_injection_test_env.h"
  16. #include "test_util/sync_point.h"
  17. #include "util/string_util.h"
  18. namespace ROCKSDB_NAMESPACE {
  19. // Test variations of WriteImpl.
  20. class DBWriteTest : public DBTestBase, public testing::WithParamInterface<int> {
  21. public:
  22. DBWriteTest() : DBTestBase("/db_write_test") {}
  23. Options GetOptions() { return DBTestBase::GetOptions(GetParam()); }
  24. void Open() { DBTestBase::Reopen(GetOptions()); }
  25. };
  26. // It is invalid to do sync write while disabling WAL.
  27. TEST_P(DBWriteTest, SyncAndDisableWAL) {
  28. WriteOptions write_options;
  29. write_options.sync = true;
  30. write_options.disableWAL = true;
  31. ASSERT_TRUE(dbfull()->Put(write_options, "foo", "bar").IsInvalidArgument());
  32. WriteBatch batch;
  33. ASSERT_OK(batch.Put("foo", "bar"));
  34. ASSERT_TRUE(dbfull()->Write(write_options, &batch).IsInvalidArgument());
  35. }
  36. TEST_P(DBWriteTest, WriteThreadHangOnWriteStall) {
  37. Options options = GetOptions();
  38. options.level0_stop_writes_trigger = options.level0_slowdown_writes_trigger = 4;
  39. std::vector<port::Thread> threads;
  40. std::atomic<int> thread_num(0);
  41. port::Mutex mutex;
  42. port::CondVar cv(&mutex);
  43. Reopen(options);
  44. std::function<void()> write_slowdown_func = [&]() {
  45. int a = thread_num.fetch_add(1);
  46. std::string key = "foo" + std::to_string(a);
  47. WriteOptions wo;
  48. wo.no_slowdown = false;
  49. dbfull()->Put(wo, key, "bar");
  50. };
  51. std::function<void()> write_no_slowdown_func = [&]() {
  52. int a = thread_num.fetch_add(1);
  53. std::string key = "foo" + std::to_string(a);
  54. WriteOptions wo;
  55. wo.no_slowdown = true;
  56. dbfull()->Put(wo, key, "bar");
  57. };
  58. std::function<void(void *)> unblock_main_thread_func = [&](void *) {
  59. mutex.Lock();
  60. cv.SignalAll();
  61. mutex.Unlock();
  62. };
  63. // Create 3 L0 files and schedule 4th without waiting
  64. Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar");
  65. Flush();
  66. Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar");
  67. Flush();
  68. Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar");
  69. Flush();
  70. Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar");
  71. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  72. "WriteThread::JoinBatchGroup:Start", unblock_main_thread_func);
  73. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
  74. {{"DBWriteTest::WriteThreadHangOnWriteStall:1",
  75. "DBImpl::BackgroundCallFlush:start"},
  76. {"DBWriteTest::WriteThreadHangOnWriteStall:2",
  77. "DBImpl::WriteImpl:BeforeLeaderEnters"},
  78. // Make compaction start wait for the write stall to be detected and
  79. // implemented by a write group leader
  80. {"DBWriteTest::WriteThreadHangOnWriteStall:3",
  81. "BackgroundCallCompaction:0"}});
  82. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  83. // Schedule creation of 4th L0 file without waiting. This will seal the
  84. // memtable and then wait for a sync point before writing the file. We need
  85. // to do it this way because SwitchMemtable() needs to enter the
  86. // write_thread
  87. FlushOptions fopt;
  88. fopt.wait = false;
  89. dbfull()->Flush(fopt);
  90. // Create a mix of slowdown/no_slowdown write threads
  91. mutex.Lock();
  92. // First leader
  93. threads.emplace_back(write_slowdown_func);
  94. cv.Wait();
  95. // Second leader. Will stall writes
  96. threads.emplace_back(write_slowdown_func);
  97. cv.Wait();
  98. threads.emplace_back(write_no_slowdown_func);
  99. cv.Wait();
  100. threads.emplace_back(write_slowdown_func);
  101. cv.Wait();
  102. threads.emplace_back(write_no_slowdown_func);
  103. cv.Wait();
  104. threads.emplace_back(write_slowdown_func);
  105. cv.Wait();
  106. mutex.Unlock();
  107. TEST_SYNC_POINT("DBWriteTest::WriteThreadHangOnWriteStall:1");
  108. dbfull()->TEST_WaitForFlushMemTable(nullptr);
  109. // This would have triggered a write stall. Unblock the write group leader
  110. TEST_SYNC_POINT("DBWriteTest::WriteThreadHangOnWriteStall:2");
  111. // The leader is going to create missing newer links. When the leader finishes,
  112. // the next leader is going to delay writes and fail writers with no_slowdown
  113. TEST_SYNC_POINT("DBWriteTest::WriteThreadHangOnWriteStall:3");
  114. for (auto& t : threads) {
  115. t.join();
  116. }
  117. }
  118. TEST_P(DBWriteTest, IOErrorOnWALWritePropagateToWriteThreadFollower) {
  119. constexpr int kNumThreads = 5;
  120. std::unique_ptr<FaultInjectionTestEnv> mock_env(
  121. new FaultInjectionTestEnv(Env::Default()));
  122. Options options = GetOptions();
  123. options.env = mock_env.get();
  124. Reopen(options);
  125. std::atomic<int> ready_count{0};
  126. std::atomic<int> leader_count{0};
  127. std::vector<port::Thread> threads;
  128. mock_env->SetFilesystemActive(false);
  129. // Wait until all threads linked to write threads, to make sure
  130. // all threads join the same batch group.
  131. SyncPoint::GetInstance()->SetCallBack(
  132. "WriteThread::JoinBatchGroup:Wait", [&](void* arg) {
  133. ready_count++;
  134. auto* w = reinterpret_cast<WriteThread::Writer*>(arg);
  135. if (w->state == WriteThread::STATE_GROUP_LEADER) {
  136. leader_count++;
  137. while (ready_count < kNumThreads) {
  138. // busy waiting
  139. }
  140. }
  141. });
  142. SyncPoint::GetInstance()->EnableProcessing();
  143. for (int i = 0; i < kNumThreads; i++) {
  144. threads.push_back(port::Thread(
  145. [&](int index) {
  146. // All threads should fail.
  147. auto res = Put("key" + ToString(index), "value");
  148. if (options.manual_wal_flush) {
  149. ASSERT_TRUE(res.ok());
  150. // we should see fs error when we do the flush
  151. // TSAN reports a false alarm for lock-order-inversion but Open and
  152. // FlushWAL are not run concurrently. Disabling this until TSAN is
  153. // fixed.
  154. // res = dbfull()->FlushWAL(false);
  155. // ASSERT_FALSE(res.ok());
  156. } else {
  157. ASSERT_FALSE(res.ok());
  158. }
  159. },
  160. i));
  161. }
  162. for (int i = 0; i < kNumThreads; i++) {
  163. threads[i].join();
  164. }
  165. ASSERT_EQ(1, leader_count);
  166. // Close before mock_env destruct.
  167. Close();
  168. }
  169. TEST_P(DBWriteTest, ManualWalFlushInEffect) {
  170. Options options = GetOptions();
  171. Reopen(options);
  172. // try the 1st WAL created during open
  173. ASSERT_TRUE(Put("key" + ToString(0), "value").ok());
  174. ASSERT_TRUE(options.manual_wal_flush != dbfull()->TEST_WALBufferIsEmpty());
  175. ASSERT_TRUE(dbfull()->FlushWAL(false).ok());
  176. ASSERT_TRUE(dbfull()->TEST_WALBufferIsEmpty());
  177. // try the 2nd wal created during SwitchWAL
  178. dbfull()->TEST_SwitchWAL();
  179. ASSERT_TRUE(Put("key" + ToString(0), "value").ok());
  180. ASSERT_TRUE(options.manual_wal_flush != dbfull()->TEST_WALBufferIsEmpty());
  181. ASSERT_TRUE(dbfull()->FlushWAL(false).ok());
  182. ASSERT_TRUE(dbfull()->TEST_WALBufferIsEmpty());
  183. }
  184. TEST_P(DBWriteTest, IOErrorOnWALWriteTriggersReadOnlyMode) {
  185. std::unique_ptr<FaultInjectionTestEnv> mock_env(
  186. new FaultInjectionTestEnv(Env::Default()));
  187. Options options = GetOptions();
  188. options.env = mock_env.get();
  189. Reopen(options);
  190. for (int i = 0; i < 2; i++) {
  191. // Forcibly fail WAL write for the first Put only. Subsequent Puts should
  192. // fail due to read-only mode
  193. mock_env->SetFilesystemActive(i != 0);
  194. auto res = Put("key" + ToString(i), "value");
  195. // TSAN reports a false alarm for lock-order-inversion but Open and
  196. // FlushWAL are not run concurrently. Disabling this until TSAN is
  197. // fixed.
  198. /*
  199. if (options.manual_wal_flush && i == 0) {
  200. // even with manual_wal_flush the 2nd Put should return error because of
  201. // the read-only mode
  202. ASSERT_TRUE(res.ok());
  203. // we should see fs error when we do the flush
  204. res = dbfull()->FlushWAL(false);
  205. }
  206. */
  207. if (!options.manual_wal_flush) {
  208. ASSERT_FALSE(res.ok());
  209. }
  210. }
  211. // Close before mock_env destruct.
  212. Close();
  213. }
  214. TEST_P(DBWriteTest, IOErrorOnSwitchMemtable) {
  215. Random rnd(301);
  216. std::unique_ptr<FaultInjectionTestEnv> mock_env(
  217. new FaultInjectionTestEnv(Env::Default()));
  218. Options options = GetOptions();
  219. options.env = mock_env.get();
  220. options.writable_file_max_buffer_size = 4 * 1024 * 1024;
  221. options.write_buffer_size = 3 * 512 * 1024;
  222. options.wal_bytes_per_sync = 256 * 1024;
  223. options.manual_wal_flush = true;
  224. Reopen(options);
  225. mock_env->SetFilesystemActive(false, Status::IOError("Not active"));
  226. Status s;
  227. for (int i = 0; i < 4 * 512; ++i) {
  228. s = Put(Key(i), RandomString(&rnd, 1024));
  229. if (!s.ok()) {
  230. break;
  231. }
  232. }
  233. ASSERT_EQ(s.severity(), Status::Severity::kFatalError);
  234. mock_env->SetFilesystemActive(true);
  235. // Close before mock_env destruct.
  236. Close();
  237. }
  238. // Test that db->LockWAL() flushes the WAL after locking.
  239. TEST_P(DBWriteTest, LockWalInEffect) {
  240. Options options = GetOptions();
  241. Reopen(options);
  242. // try the 1st WAL created during open
  243. ASSERT_OK(Put("key" + ToString(0), "value"));
  244. ASSERT_TRUE(options.manual_wal_flush != dbfull()->TEST_WALBufferIsEmpty());
  245. ASSERT_OK(dbfull()->LockWAL());
  246. ASSERT_TRUE(dbfull()->TEST_WALBufferIsEmpty(false));
  247. ASSERT_OK(dbfull()->UnlockWAL());
  248. // try the 2nd wal created during SwitchWAL
  249. dbfull()->TEST_SwitchWAL();
  250. ASSERT_OK(Put("key" + ToString(0), "value"));
  251. ASSERT_TRUE(options.manual_wal_flush != dbfull()->TEST_WALBufferIsEmpty());
  252. ASSERT_OK(dbfull()->LockWAL());
  253. ASSERT_TRUE(dbfull()->TEST_WALBufferIsEmpty(false));
  254. ASSERT_OK(dbfull()->UnlockWAL());
  255. }
  256. TEST_P(DBWriteTest, ConcurrentlyDisabledWAL) {
  257. Options options = GetOptions();
  258. options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
  259. options.statistics->set_stats_level(StatsLevel::kAll);
  260. Reopen(options);
  261. std::string wal_key_prefix = "WAL_KEY_";
  262. std::string no_wal_key_prefix = "K_";
  263. // 100 KB value each for NO-WAL operation
  264. std::string no_wal_value(1024 * 100, 'X');
  265. // 1B value each for WAL operation
  266. std::string wal_value = "0";
  267. std::thread threads[10];
  268. for (int t = 0; t < 10; t++) {
  269. threads[t] = std::thread([t, wal_key_prefix, wal_value, no_wal_key_prefix, no_wal_value, this] {
  270. for(int i = 0; i < 10; i++) {
  271. ROCKSDB_NAMESPACE::WriteOptions write_option_disable;
  272. write_option_disable.disableWAL = true;
  273. ROCKSDB_NAMESPACE::WriteOptions write_option_default;
  274. std::string no_wal_key = no_wal_key_prefix + std::to_string(t) +
  275. "_" + std::to_string(i);
  276. this->Put(no_wal_key, no_wal_value, write_option_disable);
  277. std::string wal_key =
  278. wal_key_prefix + std::to_string(i) + "_" + std::to_string(i);
  279. this->Put(wal_key, wal_value, write_option_default);
  280. dbfull()->SyncWAL();
  281. }
  282. return 0;
  283. });
  284. }
  285. for (auto& t: threads) {
  286. t.join();
  287. }
  288. uint64_t bytes_num = options.statistics->getTickerCount(
  289. ROCKSDB_NAMESPACE::Tickers::WAL_FILE_BYTES);
  290. // written WAL size should less than 100KB (even included HEADER & FOOTER overhead)
  291. ASSERT_LE(bytes_num, 1024 * 100);
  292. }
  293. INSTANTIATE_TEST_CASE_P(DBWriteTestInstance, DBWriteTest,
  294. testing::Values(DBTestBase::kDefault,
  295. DBTestBase::kConcurrentWALWrites,
  296. DBTestBase::kPipelinedWrite));
  297. } // namespace ROCKSDB_NAMESPACE
  298. int main(int argc, char** argv) {
  299. ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
  300. ::testing::InitGoogleTest(&argc, argv);
  301. return RUN_ALL_TESTS();
  302. }