db_write_test.cc 37 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #include <atomic>
  6. #include <cstdint>
  7. #include <fstream>
  8. #include <memory>
  9. #include <thread>
  10. #include <vector>
  11. #include "db/db_test_util.h"
  12. #include "db/write_batch_internal.h"
  13. #include "db/write_thread.h"
  14. #include "port/port.h"
  15. #include "port/stack_trace.h"
  16. #include "test_util/sync_point.h"
  17. #include "util/random.h"
  18. #include "util/string_util.h"
  19. #include "utilities/fault_injection_env.h"
  20. #include "utilities/fault_injection_fs.h"
  21. namespace ROCKSDB_NAMESPACE {
  22. // Test variations of WriteImpl.
  23. class DBWriteTest : public DBTestBase, public testing::WithParamInterface<int> {
  24. public:
  25. DBWriteTest() : DBTestBase("db_write_test", /*env_do_fsync=*/true) {}
  26. Options GetOptions() { return DBTestBase::GetOptions(GetParam()); }
  27. void Open() { DBTestBase::Reopen(GetOptions()); }
  28. };
  29. class DBWriteTestUnparameterized : public DBTestBase {
  30. public:
  31. explicit DBWriteTestUnparameterized()
  32. : DBTestBase("pipelined_write_test", /*env_do_fsync=*/false) {}
  33. };
  34. // It is invalid to do sync write while disabling WAL.
  35. TEST_P(DBWriteTest, SyncAndDisableWAL) {
  36. WriteOptions write_options;
  37. write_options.sync = true;
  38. write_options.disableWAL = true;
  39. ASSERT_TRUE(dbfull()->Put(write_options, "foo", "bar").IsInvalidArgument());
  40. WriteBatch batch;
  41. ASSERT_OK(batch.Put("foo", "bar"));
  42. ASSERT_TRUE(dbfull()->Write(write_options, &batch).IsInvalidArgument());
  43. }
  44. TEST_P(DBWriteTest, WriteStallRemoveNoSlowdownWrite) {
  45. Options options = GetOptions();
  46. options.level0_stop_writes_trigger = options.level0_slowdown_writes_trigger =
  47. 4;
  48. std::vector<port::Thread> threads;
  49. std::atomic<int> thread_num(0);
  50. port::Mutex mutex;
  51. port::CondVar cv(&mutex);
  52. // Guarded by mutex
  53. int writers = 0;
  54. Reopen(options);
  55. std::function<void()> write_slowdown_func = [&]() {
  56. int a = thread_num.fetch_add(1);
  57. std::string key = "foo" + std::to_string(a);
  58. WriteOptions wo;
  59. wo.no_slowdown = false;
  60. ASSERT_OK(dbfull()->Put(wo, key, "bar"));
  61. };
  62. std::function<void()> write_no_slowdown_func = [&]() {
  63. int a = thread_num.fetch_add(1);
  64. std::string key = "foo" + std::to_string(a);
  65. WriteOptions wo;
  66. wo.no_slowdown = true;
  67. Status s = dbfull()->Put(wo, key, "bar");
  68. ASSERT_TRUE(s.ok() || s.IsIncomplete());
  69. };
  70. std::function<void(void*)> unblock_main_thread_func = [&](void*) {
  71. mutex.Lock();
  72. ++writers;
  73. cv.SignalAll();
  74. mutex.Unlock();
  75. };
  76. // Create 3 L0 files and schedule 4th without waiting
  77. ASSERT_OK(Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar"));
  78. ASSERT_OK(Flush());
  79. ASSERT_OK(Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar"));
  80. ASSERT_OK(Flush());
  81. ASSERT_OK(Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar"));
  82. ASSERT_OK(Flush());
  83. ASSERT_OK(Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar"));
  84. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  85. "WriteThread::JoinBatchGroup:Start", unblock_main_thread_func);
  86. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
  87. {{"DBWriteTest::WriteStallRemoveNoSlowdownWrite:1",
  88. "DBImpl::BackgroundCallFlush:start"},
  89. {"DBWriteTest::WriteStallRemoveNoSlowdownWrite:2",
  90. "DBImplWrite::PipelinedWriteImpl:AfterJoinBatchGroup"},
  91. // Make compaction start wait for the write stall to be detected and
  92. // implemented by a write group leader
  93. {"DBWriteTest::WriteStallRemoveNoSlowdownWrite:3",
  94. "BackgroundCallCompaction:0"}});
  95. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  96. // Schedule creation of 4th L0 file without waiting. This will seal the
  97. // memtable and then wait for a sync point before writing the file. We need
  98. // to do it this way because SwitchMemtable() needs to enter the
  99. // write_thread
  100. FlushOptions fopt;
  101. fopt.wait = false;
  102. ASSERT_OK(dbfull()->Flush(fopt));
  103. // Create a mix of slowdown/no_slowdown write threads
  104. mutex.Lock();
  105. // First leader
  106. threads.emplace_back(write_slowdown_func);
  107. while (writers != 1) {
  108. cv.Wait();
  109. }
  110. // Second leader. Will stall writes
  111. // Build a writers list with no slowdown in the middle:
  112. // +-------------+
  113. // | slowdown +<----+ newest
  114. // +--+----------+
  115. // |
  116. // v
  117. // +--+----------+
  118. // | no slowdown |
  119. // +--+----------+
  120. // |
  121. // v
  122. // +--+----------+
  123. // | slowdown +
  124. // +-------------+
  125. threads.emplace_back(write_slowdown_func);
  126. while (writers != 2) {
  127. cv.Wait();
  128. }
  129. threads.emplace_back(write_no_slowdown_func);
  130. while (writers != 3) {
  131. cv.Wait();
  132. }
  133. threads.emplace_back(write_slowdown_func);
  134. while (writers != 4) {
  135. cv.Wait();
  136. }
  137. mutex.Unlock();
  138. TEST_SYNC_POINT("DBWriteTest::WriteStallRemoveNoSlowdownWrite:1");
  139. ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(nullptr));
  140. // This would have triggered a write stall. Unblock the write group leader
  141. TEST_SYNC_POINT("DBWriteTest::WriteStallRemoveNoSlowdownWrite:2");
  142. // The leader is going to create missing newer links. When the leader
  143. // finishes, the next leader is going to delay writes and fail writers with
  144. // no_slowdown
  145. TEST_SYNC_POINT("DBWriteTest::WriteStallRemoveNoSlowdownWrite:3");
  146. for (auto& t : threads) {
  147. t.join();
  148. }
  149. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
  150. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
  151. }
  152. TEST_P(DBWriteTest, WriteThreadHangOnWriteStall) {
  153. Options options = GetOptions();
  154. options.level0_stop_writes_trigger = options.level0_slowdown_writes_trigger =
  155. 4;
  156. std::vector<port::Thread> threads;
  157. std::atomic<int> thread_num(0);
  158. port::Mutex mutex;
  159. port::CondVar cv(&mutex);
  160. // Guarded by mutex
  161. int writers = 0;
  162. Reopen(options);
  163. std::function<void()> write_slowdown_func = [&]() {
  164. int a = thread_num.fetch_add(1);
  165. std::string key = "foo" + std::to_string(a);
  166. WriteOptions wo;
  167. wo.no_slowdown = false;
  168. ASSERT_OK(dbfull()->Put(wo, key, "bar"));
  169. };
  170. std::function<void()> write_no_slowdown_func = [&]() {
  171. int a = thread_num.fetch_add(1);
  172. std::string key = "foo" + std::to_string(a);
  173. WriteOptions wo;
  174. wo.no_slowdown = true;
  175. Status s = dbfull()->Put(wo, key, "bar");
  176. ASSERT_TRUE(s.ok() || s.IsIncomplete());
  177. };
  178. std::function<void(void*)> unblock_main_thread_func = [&](void*) {
  179. mutex.Lock();
  180. ++writers;
  181. cv.SignalAll();
  182. mutex.Unlock();
  183. };
  184. // Create 3 L0 files and schedule 4th without waiting
  185. ASSERT_OK(Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar"));
  186. ASSERT_OK(Flush());
  187. ASSERT_OK(Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar"));
  188. ASSERT_OK(Flush());
  189. ASSERT_OK(Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar"));
  190. ASSERT_OK(Flush());
  191. ASSERT_OK(Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar"));
  192. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  193. "WriteThread::JoinBatchGroup:Start", unblock_main_thread_func);
  194. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
  195. {{"DBWriteTest::WriteThreadHangOnWriteStall:1",
  196. "DBImpl::BackgroundCallFlush:start"},
  197. {"DBWriteTest::WriteThreadHangOnWriteStall:2",
  198. "DBImpl::WriteImpl:BeforeLeaderEnters"},
  199. // Make compaction start wait for the write stall to be detected and
  200. // implemented by a write group leader
  201. {"DBWriteTest::WriteThreadHangOnWriteStall:3",
  202. "BackgroundCallCompaction:0"}});
  203. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  204. // Schedule creation of 4th L0 file without waiting. This will seal the
  205. // memtable and then wait for a sync point before writing the file. We need
  206. // to do it this way because SwitchMemtable() needs to enter the
  207. // write_thread
  208. FlushOptions fopt;
  209. fopt.wait = false;
  210. ASSERT_OK(dbfull()->Flush(fopt));
  211. // Create a mix of slowdown/no_slowdown write threads
  212. mutex.Lock();
  213. // First leader
  214. threads.emplace_back(write_slowdown_func);
  215. while (writers != 1) {
  216. cv.Wait();
  217. }
  218. // Second leader. Will stall writes
  219. threads.emplace_back(write_slowdown_func);
  220. threads.emplace_back(write_no_slowdown_func);
  221. threads.emplace_back(write_slowdown_func);
  222. threads.emplace_back(write_no_slowdown_func);
  223. threads.emplace_back(write_slowdown_func);
  224. while (writers != 6) {
  225. cv.Wait();
  226. }
  227. mutex.Unlock();
  228. TEST_SYNC_POINT("DBWriteTest::WriteThreadHangOnWriteStall:1");
  229. ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(nullptr));
  230. // This would have triggered a write stall. Unblock the write group leader
  231. TEST_SYNC_POINT("DBWriteTest::WriteThreadHangOnWriteStall:2");
  232. // The leader is going to create missing newer links. When the leader
  233. // finishes, the next leader is going to delay writes and fail writers with
  234. // no_slowdown
  235. TEST_SYNC_POINT("DBWriteTest::WriteThreadHangOnWriteStall:3");
  236. for (auto& t : threads) {
  237. t.join();
  238. }
  239. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
  240. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
  241. }
  242. TEST_P(DBWriteTest, WriteThreadWaitNanosCounter) {
  243. Options options = GetOptions();
  244. std::vector<port::Thread> threads;
  245. Reopen(options);
  246. std::function<void()> write_func = [&]() {
  247. PerfContext* perf_ctx = get_perf_context();
  248. SetPerfLevel(PerfLevel::kEnableWait);
  249. perf_ctx->Reset();
  250. TEST_SYNC_POINT("DBWriteTest::WriteThreadWaitNanosCounter:WriteFunc");
  251. ASSERT_OK(dbfull()->Put(WriteOptions(), "bar", "val2"));
  252. ASSERT_GT(perf_ctx->write_thread_wait_nanos, 2000000U);
  253. };
  254. std::function<void()> sleep_func = [&]() {
  255. TEST_SYNC_POINT("DBWriteTest::WriteThreadWaitNanosCounter:SleepFunc:1");
  256. SystemClock::Default()->SleepForMicroseconds(2000);
  257. TEST_SYNC_POINT("DBWriteTest::WriteThreadWaitNanosCounter:SleepFunc:2");
  258. };
  259. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
  260. {{"WriteThread::EnterAsBatchGroupLeader:End",
  261. "DBWriteTest::WriteThreadWaitNanosCounter:WriteFunc"},
  262. {"WriteThread::AwaitState:BlockingWaiting",
  263. "DBWriteTest::WriteThreadWaitNanosCounter:SleepFunc:1"},
  264. {"DBWriteTest::WriteThreadWaitNanosCounter:SleepFunc:2",
  265. "WriteThread::ExitAsBatchGroupLeader:Start"}});
  266. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  267. threads.emplace_back(sleep_func);
  268. threads.emplace_back(write_func);
  269. ASSERT_OK(dbfull()->Put(WriteOptions(), "foo", "val1"));
  270. for (auto& t : threads) {
  271. t.join();
  272. }
  273. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
  274. }
  275. TEST_P(DBWriteTest, IOErrorOnWALWritePropagateToWriteThreadFollower) {
  276. constexpr int kNumThreads = 5;
  277. std::unique_ptr<FaultInjectionTestEnv> mock_env(
  278. new FaultInjectionTestEnv(env_));
  279. Options options = GetOptions();
  280. options.env = mock_env.get();
  281. Reopen(options);
  282. std::atomic<int> ready_count{0};
  283. std::atomic<int> leader_count{0};
  284. std::vector<port::Thread> threads;
  285. mock_env->SetFilesystemActive(false);
  286. // Wait until all threads linked to write threads, to make sure
  287. // all threads join the same batch group.
  288. SyncPoint::GetInstance()->SetCallBack(
  289. "WriteThread::JoinBatchGroup:Wait", [&](void* arg) {
  290. ready_count++;
  291. auto* w = static_cast<WriteThread::Writer*>(arg);
  292. if (w->state == WriteThread::STATE_GROUP_LEADER) {
  293. leader_count++;
  294. while (ready_count < kNumThreads) {
  295. // busy waiting
  296. }
  297. }
  298. });
  299. SyncPoint::GetInstance()->EnableProcessing();
  300. for (int i = 0; i < kNumThreads; i++) {
  301. threads.emplace_back(
  302. [&](int index) {
  303. // All threads should fail.
  304. auto res = Put("key" + std::to_string(index), "value");
  305. if (options.manual_wal_flush) {
  306. ASSERT_TRUE(res.ok());
  307. // we should see fs error when we do the flush
  308. // TSAN reports a false alarm for lock-order-inversion but Open and
  309. // FlushWAL are not run concurrently. Disabling this until TSAN is
  310. // fixed.
  311. // res = dbfull()->FlushWAL(false);
  312. // ASSERT_FALSE(res.ok());
  313. } else {
  314. ASSERT_FALSE(res.ok());
  315. }
  316. },
  317. i);
  318. }
  319. for (int i = 0; i < kNumThreads; i++) {
  320. threads[i].join();
  321. }
  322. ASSERT_EQ(1, leader_count);
  323. // The Failed PUT operations can cause a BG error to be set.
  324. // Mark it as Checked for the ASSERT_STATUS_CHECKED
  325. dbfull()->Resume().PermitUncheckedError();
  326. // Close before mock_env destruct.
  327. Close();
  328. }
  329. TEST_F(DBWriteTestUnparameterized, PipelinedWriteRace) {
  330. // This test was written to trigger a race in ExitAsBatchGroupLeader in case
  331. // enable_pipelined_write_ was true.
  332. // Writers for which ShouldWriteToMemtable() evaluates to false are removed
  333. // from the write_group via CompleteFollower/ CompleteLeader. Writers in the
  334. // middle of the group are fully unlinked, but if that writers is the
  335. // last_writer, then we did not update the predecessor's link_older, i.e.,
  336. // this writer was still reachable via newest_writer_.
  337. //
  338. // But the problem was, that CompleteFollower already wakes up the thread
  339. // owning that writer before the writer has been removed. This resulted in a
  340. // race - if the leader thread was fast enough, then everything was fine.
  341. // However, if the woken up thread finished the current write operation and
  342. // then performed yet another write, then a new writer instance was added
  343. // to newest_writer_. It is possible that the new writer is located on the
  344. // same address on stack, and if this happened, then we had a problem,
  345. // because the old code tried to find the last_writer in the list to unlink
  346. // it, which in this case produced a cycle in the list.
  347. // Whether two invocations of PipelinedWriteImpl() by the same thread actually
  348. // allocate the writer on the same address depends on the OS and/or compiler,
  349. // so it is rather hard to create a deterministic test for this.
  350. Options options = GetDefaultOptions();
  351. options.create_if_missing = true;
  352. options.enable_pipelined_write = true;
  353. std::vector<port::Thread> threads;
  354. std::atomic<int> write_counter{0};
  355. std::atomic<int> active_writers{0};
  356. std::atomic<bool> second_write_starting{false};
  357. std::atomic<bool> second_write_in_progress{false};
  358. std::atomic<WriteThread::Writer*> leader{nullptr};
  359. std::atomic<bool> finished_WAL_write{false};
  360. DestroyAndReopen(options);
  361. auto write_one_doc = [&]() {
  362. int a = write_counter.fetch_add(1);
  363. std::string key = "foo" + std::to_string(a);
  364. WriteOptions wo;
  365. ASSERT_OK(dbfull()->Put(wo, key, "bar"));
  366. --active_writers;
  367. };
  368. auto write_two_docs = [&]() {
  369. write_one_doc();
  370. second_write_starting = true;
  371. write_one_doc();
  372. };
  373. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  374. "WriteThread::JoinBatchGroup:Wait", [&](void* arg) {
  375. if (second_write_starting.load()) {
  376. second_write_in_progress = true;
  377. return;
  378. }
  379. auto* w = static_cast<WriteThread::Writer*>(arg);
  380. if (w->state == WriteThread::STATE_GROUP_LEADER) {
  381. active_writers++;
  382. if (leader.load() == nullptr) {
  383. leader.store(w);
  384. while (active_writers.load() < 2) {
  385. // wait for another thread to join the write_group
  386. }
  387. }
  388. } else {
  389. // we disable the memtable for all followers so that they they are
  390. // removed from the write_group before enqueuing it for the memtable
  391. // write
  392. w->disable_memtable = true;
  393. active_writers++;
  394. }
  395. });
  396. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  397. "WriteThread::ExitAsBatchGroupLeader:Start", [&](void* arg) {
  398. auto* wg = static_cast<WriteThread::WriteGroup*>(arg);
  399. if (wg->leader == leader && !finished_WAL_write) {
  400. finished_WAL_write = true;
  401. while (active_writers.load() < 3) {
  402. // wait for the new writer to be enqueued
  403. }
  404. }
  405. });
  406. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  407. "WriteThread::ExitAsBatchGroupLeader:AfterCompleteWriters",
  408. [&](void* arg) {
  409. auto* wg = static_cast<WriteThread::WriteGroup*>(arg);
  410. if (wg->leader == leader) {
  411. while (!second_write_in_progress.load()) {
  412. // wait for the old follower thread to start the next write
  413. }
  414. }
  415. });
  416. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  417. // start leader + one follower
  418. threads.emplace_back(write_one_doc);
  419. while (leader.load() == nullptr) {
  420. // wait for leader
  421. }
  422. // we perform two writes in the follower, so that for the second write
  423. // the thread reinserts a Writer with the same address
  424. threads.emplace_back(write_two_docs);
  425. // wait for the leader to enter ExitAsBatchGroupLeader
  426. while (!finished_WAL_write.load()) {
  427. // wait for write_group to have finished the WAL writes
  428. }
  429. // start another writer thread to be enqueued before the leader can
  430. // complete the writers from its write_group
  431. threads.emplace_back(write_one_doc);
  432. for (auto& t : threads) {
  433. t.join();
  434. }
  435. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
  436. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
  437. }
  438. TEST_P(DBWriteTest, ManualWalFlushInEffect) {
  439. Options options = GetOptions();
  440. Reopen(options);
  441. // try the 1st WAL created during open
  442. ASSERT_TRUE(Put("key" + std::to_string(0), "value").ok());
  443. ASSERT_TRUE(options.manual_wal_flush != dbfull()->WALBufferIsEmpty());
  444. ASSERT_TRUE(dbfull()->FlushWAL(false).ok());
  445. ASSERT_TRUE(dbfull()->WALBufferIsEmpty());
  446. // try the 2nd wal created during SwitchWAL
  447. ASSERT_OK(dbfull()->TEST_SwitchWAL());
  448. ASSERT_TRUE(Put("key" + std::to_string(0), "value").ok());
  449. ASSERT_TRUE(options.manual_wal_flush != dbfull()->WALBufferIsEmpty());
  450. ASSERT_TRUE(dbfull()->FlushWAL(false).ok());
  451. ASSERT_TRUE(dbfull()->WALBufferIsEmpty());
  452. }
  453. TEST_P(DBWriteTest, UnflushedPutRaceWithTrackedWalSync) {
  454. // Repro race condition bug where unflushed WAL data extended the synced size
  455. // recorded to MANIFEST despite being unrecoverable.
  456. Options options = GetOptions();
  457. std::unique_ptr<FaultInjectionTestEnv> fault_env(
  458. new FaultInjectionTestEnv(env_));
  459. options.env = fault_env.get();
  460. options.manual_wal_flush = true;
  461. options.track_and_verify_wals_in_manifest = true;
  462. Reopen(options);
  463. ASSERT_OK(Put("key1", "val1"));
  464. SyncPoint::GetInstance()->SetCallBack(
  465. "DBImpl::SyncWAL:Begin",
  466. [this](void* /* arg */) { ASSERT_OK(Put("key2", "val2")); });
  467. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  468. ASSERT_OK(db_->FlushWAL(true /* sync */));
  469. // Ensure callback ran.
  470. ASSERT_EQ("val2", Get("key2"));
  471. Close();
  472. // Simulate full loss of unsynced data. This drops "key2" -> "val2" from the
  473. // DB WAL.
  474. ASSERT_OK(fault_env->DropUnsyncedFileData());
  475. Reopen(options);
  476. // Need to close before `fault_env` goes out of scope.
  477. Close();
  478. }
  479. TEST_P(DBWriteTest, InactiveWalFullySyncedBeforeUntracked) {
  480. // Repro bug where a WAL is appended and switched after
  481. // `FlushWAL(true /* sync */)`'s sync finishes and before it untracks fully
  482. // synced inactive logs. Previously such a WAL would be wrongly untracked
  483. // so the final append would never be synced.
  484. Options options = GetOptions();
  485. std::unique_ptr<FaultInjectionTestEnv> fault_env(
  486. new FaultInjectionTestEnv(env_));
  487. options.env = fault_env.get();
  488. Reopen(options);
  489. ASSERT_OK(Put("key1", "val1"));
  490. SyncPoint::GetInstance()->SetCallBack(
  491. "DBImpl::SyncWAL:BeforeMarkLogsSynced:1", [this](void* /* arg */) {
  492. ASSERT_OK(Put("key2", "val2"));
  493. ASSERT_OK(dbfull()->TEST_SwitchMemtable());
  494. });
  495. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  496. ASSERT_OK(db_->FlushWAL(true /* sync */));
  497. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
  498. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
  499. ASSERT_OK(Put("key3", "val3"));
  500. ASSERT_OK(db_->FlushWAL(true /* sync */));
  501. Close();
  502. // Simulate full loss of unsynced data. This should drop nothing since we did
  503. // `FlushWAL(true /* sync */)` before `Close()`.
  504. ASSERT_OK(fault_env->DropUnsyncedFileData());
  505. Reopen(options);
  506. ASSERT_EQ("val1", Get("key1"));
  507. ASSERT_EQ("val2", Get("key2"));
  508. ASSERT_EQ("val3", Get("key3"));
  509. // Need to close before `fault_env` goes out of scope.
  510. Close();
  511. }
  512. TEST_P(DBWriteTest, IOErrorOnWALWriteTriggersReadOnlyMode) {
  513. std::unique_ptr<FaultInjectionTestEnv> mock_env(
  514. new FaultInjectionTestEnv(env_));
  515. Options options = GetOptions();
  516. options.env = mock_env.get();
  517. Reopen(options);
  518. for (int i = 0; i < 2; i++) {
  519. // Forcibly fail WAL write for the first Put only. Subsequent Puts should
  520. // fail due to read-only mode
  521. mock_env->SetFilesystemActive(i != 0);
  522. auto res = Put("key" + std::to_string(i), "value");
  523. // TSAN reports a false alarm for lock-order-inversion but Open and
  524. // FlushWAL are not run concurrently. Disabling this until TSAN is
  525. // fixed.
  526. /*
  527. if (options.manual_wal_flush && i == 0) {
  528. // even with manual_wal_flush the 2nd Put should return error because of
  529. // the read-only mode
  530. ASSERT_TRUE(res.ok());
  531. // we should see fs error when we do the flush
  532. res = dbfull()->FlushWAL(false);
  533. }
  534. */
  535. if (!options.manual_wal_flush) {
  536. ASSERT_NOK(res);
  537. } else {
  538. ASSERT_OK(res);
  539. }
  540. }
  541. // Close before mock_env destruct.
  542. Close();
  543. }
  544. TEST_P(DBWriteTest, IOErrorOnSwitchMemtable) {
  545. Random rnd(301);
  546. std::unique_ptr<FaultInjectionTestEnv> mock_env(
  547. new FaultInjectionTestEnv(env_));
  548. Options options = GetOptions();
  549. options.env = mock_env.get();
  550. options.writable_file_max_buffer_size = 4 * 1024 * 1024;
  551. options.write_buffer_size = 3 * 512 * 1024;
  552. options.wal_bytes_per_sync = 256 * 1024;
  553. options.manual_wal_flush = true;
  554. Reopen(options);
  555. mock_env->SetFilesystemActive(false, Status::IOError("Not active"));
  556. Status s;
  557. for (int i = 0; i < 4 * 512; ++i) {
  558. s = Put(Key(i), rnd.RandomString(1024));
  559. if (!s.ok()) {
  560. break;
  561. }
  562. }
  563. ASSERT_EQ(s.severity(), Status::Severity::kFatalError);
  564. mock_env->SetFilesystemActive(true);
  565. // Close before mock_env destruct.
  566. Close();
  567. }
  568. // Test that db->LockWAL() flushes the WAL after locking, which can fail
  569. TEST_P(DBWriteTest, LockWALInEffect) {
  570. if (mem_env_ || encrypted_env_) {
  571. ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment");
  572. return;
  573. }
  574. Options options = GetOptions();
  575. std::shared_ptr<FaultInjectionTestFS> fault_fs(
  576. new FaultInjectionTestFS(FileSystem::Default()));
  577. std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
  578. options.env = fault_fs_env.get();
  579. options.disable_auto_compactions = true;
  580. options.paranoid_checks = false;
  581. options.max_bgerror_resume_count = 0; // manual Resume()
  582. Reopen(options);
  583. // try the 1st WAL created during open
  584. ASSERT_OK(Put("key0", "value"));
  585. ASSERT_NE(options.manual_wal_flush, dbfull()->WALBufferIsEmpty());
  586. ASSERT_OK(db_->LockWAL());
  587. ASSERT_TRUE(dbfull()->WALBufferIsEmpty());
  588. uint64_t wal_num = dbfull()->TEST_GetCurrentLogNumber();
  589. // Manual flush with wait=false should abruptly fail with TryAgain
  590. FlushOptions flush_opts;
  591. flush_opts.wait = false;
  592. for (bool allow_write_stall : {true, false}) {
  593. flush_opts.allow_write_stall = allow_write_stall;
  594. ASSERT_TRUE(db_->Flush(flush_opts).IsTryAgain());
  595. }
  596. ASSERT_EQ(wal_num, dbfull()->TEST_GetCurrentLogNumber());
  597. ASSERT_OK(db_->UnlockWAL());
  598. // try the 2nd wal created during SwitchWAL (not locked this time)
  599. ASSERT_OK(dbfull()->TEST_SwitchWAL());
  600. ASSERT_NE(wal_num, dbfull()->TEST_GetCurrentLogNumber());
  601. ASSERT_OK(Put("key1", "value"));
  602. ASSERT_NE(options.manual_wal_flush, dbfull()->WALBufferIsEmpty());
  603. ASSERT_OK(db_->LockWAL());
  604. ASSERT_TRUE(dbfull()->WALBufferIsEmpty());
  605. ASSERT_OK(db_->UnlockWAL());
  606. // The above `TEST_SwitchWAL()` triggered a flush. That flush needs to finish
  607. // before we make the filesystem inactive, otherwise the flush might hit an
  608. // unrecoverable error (e.g., failed MANIFEST update).
  609. ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(nullptr));
  610. // Fail the WAL flush if applicable
  611. fault_fs->SetFilesystemActive(false);
  612. Status s = Put("key2", "value");
  613. if (options.manual_wal_flush) {
  614. ASSERT_OK(s);
  615. // I/O failure
  616. ASSERT_NOK(db_->LockWAL());
  617. // Should not need UnlockWAL after LockWAL fails
  618. } else {
  619. ASSERT_NOK(s);
  620. ASSERT_OK(db_->LockWAL());
  621. ASSERT_OK(db_->UnlockWAL());
  622. }
  623. fault_fs->SetFilesystemActive(true);
  624. ASSERT_OK(db_->Resume());
  625. // Writes should work again
  626. ASSERT_OK(Put("key3", "value"));
  627. ASSERT_EQ(Get("key3"), "value");
  628. // Should be extraneous, but allowed
  629. ASSERT_NOK(db_->UnlockWAL());
  630. // Close before mock_env destruct.
  631. Close();
  632. }
  633. TEST_P(DBWriteTest, LockWALConcurrentRecursive) {
  634. // This is a micro-stress test of LockWAL and concurrency handling.
  635. // It is considered the most convenient way to balance functional
  636. // coverage and reproducibility (vs. the two extremes of (a) unit tests
  637. // tailored to specific interleavings and (b) db_stress)
  638. Options options = GetOptions();
  639. Reopen(options);
  640. ASSERT_OK(Put("k1", "k1_orig"));
  641. ASSERT_OK(db_->LockWAL()); // 0 -> 1
  642. auto frozen_seqno = db_->GetLatestSequenceNumber();
  643. std::string ingest_file = dbname_ + "/external.sst";
  644. {
  645. SstFileWriter sst_file_writer(EnvOptions(), options);
  646. ASSERT_OK(sst_file_writer.Open(ingest_file));
  647. ASSERT_OK(sst_file_writer.Put("k2", "k2_val"));
  648. ExternalSstFileInfo external_info;
  649. ASSERT_OK(sst_file_writer.Finish(&external_info));
  650. }
  651. AcqRelAtomic<bool> parallel_ingest_completed{false};
  652. port::Thread parallel_ingest{[&]() {
  653. IngestExternalFileOptions ingest_opts;
  654. ingest_opts.move_files = true; // faster than copy
  655. // Shouldn't finish until WAL unlocked
  656. ASSERT_OK(db_->IngestExternalFile({ingest_file}, ingest_opts));
  657. parallel_ingest_completed.Store(true);
  658. }};
  659. AcqRelAtomic<bool> flush_completed{false};
  660. port::Thread parallel_flush{[&]() {
  661. FlushOptions flush_opts;
  662. // NB: Flush with wait=false case is tested above in LockWALInEffect
  663. flush_opts.wait = true;
  664. // allow_write_stall = true blocks in fewer cases
  665. flush_opts.allow_write_stall = true;
  666. // Shouldn't finish until WAL unlocked
  667. ASSERT_OK(db_->Flush(flush_opts));
  668. flush_completed.Store(true);
  669. }};
  670. AcqRelAtomic<bool> parallel_put_completed{false};
  671. port::Thread parallel_put{[&]() {
  672. // This can make certain failure scenarios more likely:
  673. // sleep(1);
  674. // Shouldn't finish until WAL unlocked
  675. ASSERT_OK(Put("k1", "k1_mod"));
  676. parallel_put_completed.Store(true);
  677. }};
  678. ASSERT_OK(db_->LockWAL()); // 1 -> 2
  679. // Read-only ops are OK
  680. ASSERT_EQ(Get("k1"), "k1_orig");
  681. {
  682. std::vector<LiveFileStorageInfo> files;
  683. LiveFilesStorageInfoOptions lf_opts;
  684. // A DB flush could deadlock
  685. lf_opts.wal_size_for_flush = UINT64_MAX;
  686. ASSERT_OK(db_->GetLiveFilesStorageInfo({lf_opts}, &files));
  687. }
  688. port::Thread parallel_lock_wal{[&]() {
  689. ASSERT_OK(db_->LockWAL()); // 2 -> 3 or 1 -> 2
  690. }};
  691. ASSERT_OK(db_->UnlockWAL()); // 2 -> 1 or 3 -> 2
  692. // Give parallel_put an extra chance to jump in case of bug
  693. std::this_thread::yield();
  694. parallel_lock_wal.join();
  695. ASSERT_FALSE(parallel_put_completed.Load());
  696. ASSERT_FALSE(parallel_ingest_completed.Load());
  697. ASSERT_FALSE(flush_completed.Load());
  698. // Should now have 2 outstanding LockWAL
  699. ASSERT_EQ(Get("k1"), "k1_orig");
  700. ASSERT_OK(db_->UnlockWAL()); // 2 -> 1
  701. ASSERT_FALSE(parallel_put_completed.Load());
  702. ASSERT_FALSE(parallel_ingest_completed.Load());
  703. ASSERT_FALSE(flush_completed.Load());
  704. ASSERT_EQ(Get("k1"), "k1_orig");
  705. ASSERT_EQ(Get("k2"), "NOT_FOUND");
  706. ASSERT_EQ(frozen_seqno, db_->GetLatestSequenceNumber());
  707. // Ensure final Unlock is concurrency safe and extra Unlock is safe but
  708. // non-OK
  709. std::atomic<int> unlock_ok{0};
  710. port::Thread parallel_stuff{[&]() {
  711. if (db_->UnlockWAL().ok()) {
  712. unlock_ok++;
  713. }
  714. ASSERT_OK(db_->LockWAL());
  715. if (db_->UnlockWAL().ok()) {
  716. unlock_ok++;
  717. }
  718. }};
  719. if (db_->UnlockWAL().ok()) {
  720. unlock_ok++;
  721. }
  722. parallel_stuff.join();
  723. // There was one extra unlock, so just one non-ok
  724. ASSERT_EQ(unlock_ok.load(), 2);
  725. // Write can proceed
  726. parallel_put.join();
  727. ASSERT_TRUE(parallel_put_completed.Load());
  728. ASSERT_EQ(Get("k1"), "k1_mod");
  729. parallel_ingest.join();
  730. ASSERT_TRUE(parallel_ingest_completed.Load());
  731. ASSERT_EQ(Get("k2"), "k2_val");
  732. parallel_flush.join();
  733. ASSERT_TRUE(flush_completed.Load());
  734. // And new writes
  735. ASSERT_OK(Put("k3", "val"));
  736. ASSERT_EQ(Get("k3"), "val");
  737. }
  738. TEST_P(DBWriteTest, ConcurrentlyDisabledWAL) {
  739. Options options = GetOptions();
  740. options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
  741. options.statistics->set_stats_level(StatsLevel::kAll);
  742. Reopen(options);
  743. std::string wal_key_prefix = "WAL_KEY_";
  744. std::string no_wal_key_prefix = "K_";
  745. // 100 KB value each for NO-WAL operation
  746. std::string no_wal_value(1024 * 100, 'X');
  747. // 1B value each for WAL operation
  748. std::string wal_value = "0";
  749. std::thread threads[10];
  750. for (int t = 0; t < 10; t++) {
  751. threads[t] = std::thread([t, wal_key_prefix, wal_value, no_wal_key_prefix,
  752. no_wal_value, &options, this] {
  753. for (int i = 0; i < 10; i++) {
  754. ROCKSDB_NAMESPACE::WriteOptions write_option_disable;
  755. write_option_disable.disableWAL = true;
  756. ROCKSDB_NAMESPACE::WriteOptions write_option_default;
  757. std::string no_wal_key =
  758. no_wal_key_prefix + std::to_string(t) + "_" + std::to_string(i);
  759. ASSERT_OK(this->Put(no_wal_key, no_wal_value, write_option_disable));
  760. std::string wal_key =
  761. wal_key_prefix + std::to_string(i) + "_" + std::to_string(i);
  762. ASSERT_OK(this->Put(wal_key, wal_value, write_option_default));
  763. ASSERT_OK(dbfull()->SyncWAL())
  764. << "options.env: " << options.env << ", env_: " << env_
  765. << ", env_->is_wal_sync_thread_safe_: "
  766. << env_->is_wal_sync_thread_safe_.load();
  767. }
  768. return;
  769. });
  770. }
  771. for (auto& t : threads) {
  772. t.join();
  773. }
  774. uint64_t bytes_num = options.statistics->getTickerCount(
  775. ROCKSDB_NAMESPACE::Tickers::WAL_FILE_BYTES);
  776. // written WAL size should less than 100KB (even included HEADER & FOOTER
  777. // overhead)
  778. ASSERT_LE(bytes_num, 1024 * 100);
  779. }
  780. void CorruptLogFile(Env* env, Options& options, std::string log_path,
  781. uint64_t log_num, int record_num) {
  782. std::shared_ptr<FileSystem> fs = env->GetFileSystem();
  783. std::unique_ptr<SequentialFileReader> file_reader;
  784. Status status;
  785. {
  786. std::unique_ptr<FSSequentialFile> file;
  787. status = fs->NewSequentialFile(log_path, FileOptions(), &file, nullptr);
  788. ASSERT_EQ(status, IOStatus::OK());
  789. file_reader.reset(new SequentialFileReader(std::move(file), log_path));
  790. }
  791. std::unique_ptr<log::Reader> reader(new log::Reader(
  792. nullptr, std::move(file_reader), nullptr, false, log_num));
  793. std::string scratch;
  794. Slice record;
  795. uint64_t record_checksum;
  796. for (int i = 0; i < record_num; ++i) {
  797. ASSERT_TRUE(reader->ReadRecord(&record, &scratch, options.wal_recovery_mode,
  798. &record_checksum));
  799. }
  800. uint64_t rec_start = reader->LastRecordOffset();
  801. reader.reset();
  802. {
  803. std::unique_ptr<FSRandomRWFile> file;
  804. status = fs->NewRandomRWFile(log_path, FileOptions(), &file, nullptr);
  805. ASSERT_EQ(status, IOStatus::OK());
  806. uint32_t bad_lognum = 0xff;
  807. ASSERT_EQ(file->Write(
  808. rec_start + 7,
  809. Slice(reinterpret_cast<char*>(&bad_lognum), sizeof(uint32_t)),
  810. IOOptions(), nullptr),
  811. IOStatus::OK());
  812. ASSERT_OK(file->Close(IOOptions(), nullptr));
  813. file.reset();
  814. }
  815. }
  816. TEST_P(DBWriteTest, RecycleLogTest) {
  817. Options options = GetOptions();
  818. options.recycle_log_file_num = 0;
  819. options.avoid_flush_during_recovery = true;
  820. options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
  821. Reopen(options);
  822. ASSERT_OK(Put(Key(1), "val1"));
  823. ASSERT_OK(Put(Key(2), "val1"));
  824. uint64_t latest_log_num = 0;
  825. std::unique_ptr<LogFile> log_file;
  826. ASSERT_OK(dbfull()->GetCurrentWalFile(&log_file));
  827. latest_log_num = log_file->LogNumber();
  828. Reopen(options);
  829. ASSERT_OK(Put(Key(3), "val3"));
  830. // Corrupt second entry of first log
  831. std::string log_path = LogFileName(dbname_, latest_log_num);
  832. CorruptLogFile(env_, options, log_path, latest_log_num, 2);
  833. Reopen(options);
  834. ASSERT_EQ(Get(Key(1)), "val1");
  835. ASSERT_EQ(Get(Key(2)), "NOT_FOUND");
  836. ASSERT_EQ(Get(Key(3)), "NOT_FOUND");
  837. }
  838. TEST_P(DBWriteTest, RecycleLogTestCFAheadOfWAL) {
  839. Options options = GetOptions();
  840. options.recycle_log_file_num = 0;
  841. options.avoid_flush_during_recovery = true;
  842. options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
  843. CreateAndReopenWithCF({"pikachu"}, options);
  844. ASSERT_OK(Put(1, Key(1), "val1"));
  845. ASSERT_OK(Put(0, Key(2), "val2"));
  846. uint64_t latest_log_num = 0;
  847. std::unique_ptr<LogFile> log_file;
  848. ASSERT_OK(dbfull()->GetCurrentWalFile(&log_file));
  849. latest_log_num = log_file->LogNumber();
  850. ASSERT_OK(Flush(1));
  851. ASSERT_OK(Put(1, Key(3), "val3"));
  852. // Corrupt second entry of first log
  853. std::string log_path = LogFileName(dbname_, latest_log_num);
  854. CorruptLogFile(env_, options, log_path, latest_log_num, 2);
  855. ASSERT_EQ(TryReopenWithColumnFamilies({"default", "pikachu"}, options),
  856. Status::Corruption());
  857. }
  858. TEST_P(DBWriteTest, RecycleLogToggleTest) {
  859. Options options = GetOptions();
  860. options.recycle_log_file_num = 0;
  861. options.avoid_flush_during_recovery = true;
  862. options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
  863. Destroy(options);
  864. Reopen(options);
  865. // After opening, a new log gets created, say 1.log
  866. ASSERT_OK(Put(Key(1), "val1"));
  867. options.recycle_log_file_num = 1;
  868. Reopen(options);
  869. // 1.log is added to alive_wal_files_
  870. ASSERT_OK(Put(Key(2), "val1"));
  871. ASSERT_OK(Flush());
  872. // 1.log should be deleted and not recycled, since it
  873. // was created by the previous Reopen
  874. ASSERT_OK(Put(Key(1), "val2"));
  875. ASSERT_OK(Flush());
  876. options.recycle_log_file_num = 1;
  877. Reopen(options);
  878. ASSERT_EQ(Get(Key(1)), "val2");
  879. }
  880. TEST_P(DBWriteTest, IngestWriteBatchWithIndex) {
  881. if (GetParam() == kPipelinedWrite) {
  882. return;
  883. }
  884. Options options = GetOptions();
  885. options.disable_auto_compactions = true;
  886. Reopen(options);
  887. Options cf_options = GetOptions();
  888. cf_options.merge_operator = MergeOperators::CreateStringAppendOperator();
  889. CreateColumnFamilies({"cf1", "cf2"}, cf_options);
  890. ReopenWithColumnFamilies({"default", "cf1", "cf2"},
  891. {options, cf_options, cf_options});
  892. // default cf
  893. auto wbwi1 = std::make_shared<WriteBatchWithIndex>(options.comparator, 0,
  894. /*overwrite_key=*/true);
  895. ASSERT_OK(wbwi1->Put("key1", "value1"));
  896. ASSERT_OK(wbwi1->Put("key2", "value2"));
  897. if (GetParam() == kPipelinedWrite) {
  898. ASSERT_TRUE(db_->IngestWriteBatchWithIndex({}, wbwi1).IsNotSupported());
  899. return;
  900. }
  901. // Test disableWAL=false
  902. ASSERT_TRUE(db_->IngestWriteBatchWithIndex({}, wbwi1).IsNotSupported());
  903. WriteOptions wo;
  904. wo.disableWAL = true;
  905. ASSERT_OK(db_->IngestWriteBatchWithIndex(wo, wbwi1));
  906. ASSERT_EQ("value1", Get("key1"));
  907. ASSERT_EQ("value2", Get("key2"));
  908. // Test with overwrites
  909. auto wbwi = std::make_shared<WriteBatchWithIndex>(options.comparator, 0,
  910. /*overwrite_key=*/true);
  911. ASSERT_OK(wbwi->Put("key2", "value3"));
  912. ASSERT_OK(wbwi->Delete("key1")); // Delete an existing key
  913. ASSERT_OK(db_->IngestWriteBatchWithIndex(wo, wbwi));
  914. ASSERT_EQ("NOT_FOUND", Get("key1"));
  915. ASSERT_EQ("value3", Get("key2"));
  916. auto wbwi2 = std::make_shared<WriteBatchWithIndex>(options.comparator, 0,
  917. /*overwrite_key=*/true);
  918. ASSERT_OK(wbwi2->Put(handles_[1], "cf1_key1", "cf1_value1"));
  919. ASSERT_OK(wbwi2->Delete(handles_[1], "cf1_key2"));
  920. // Test ingestion with column family
  921. ASSERT_OK(db_->IngestWriteBatchWithIndex(wo, wbwi2));
  922. ASSERT_EQ("cf1_value1", Get(1, "cf1_key1"));
  923. ASSERT_EQ("NOT_FOUND", Get(1, "cf1_key2"));
  924. auto wbwi3 = std::make_shared<WriteBatchWithIndex>(options.comparator, 0,
  925. /*overwrite_key=*/true);
  926. ASSERT_OK(wbwi3->Merge(handles_[2], "cf2_key1", "cf2_value1"));
  927. ASSERT_OK(wbwi3->Merge(handles_[2], "cf2_key1", "cf2_value2"));
  928. // Test ingestion with merge operations
  929. ASSERT_OK(db_->IngestWriteBatchWithIndex(wo, wbwi3));
  930. ASSERT_EQ("cf2_value1,cf2_value2", Get(2, "cf2_key1"));
  931. // Test with overwrite_key = false
  932. auto wbwi_no_overwrite = std::make_shared<WriteBatchWithIndex>(
  933. options.comparator, 0, /*overwrite_key=*/false);
  934. ASSERT_OK(wbwi_no_overwrite->Put("key1", "value1"));
  935. Status s = db_->IngestWriteBatchWithIndex(wo, wbwi_no_overwrite);
  936. ASSERT_TRUE(s.IsNotSupported());
  937. auto empty_wbwi = std::make_shared<WriteBatchWithIndex>(
  938. options.comparator, 0, /*overwrite_key=*/true);
  939. ASSERT_OK(db_->IngestWriteBatchWithIndex(wo, empty_wbwi));
  940. DestroyAndReopen(options);
  941. // Should fail when trying to ingest to non-existent column family
  942. ASSERT_NOK(db_->IngestWriteBatchWithIndex(wo, wbwi2));
  943. }
  944. INSTANTIATE_TEST_CASE_P(DBWriteTestInstance, DBWriteTest,
  945. testing::Values(DBTestBase::kDefault,
  946. DBTestBase::kConcurrentWALWrites,
  947. DBTestBase::kPipelinedWrite));
  948. } // namespace ROCKSDB_NAMESPACE
  949. int main(int argc, char** argv) {
  950. ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
  951. ::testing::InitGoogleTest(&argc, argv);
  952. RegisterCustomObjects(argc, argv);
  953. return RUN_ALL_TESTS();
  954. }