db_flush_test.cc 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #include <atomic>
  10. #include "db/db_impl/db_impl.h"
  11. #include "db/db_test_util.h"
  12. #include "port/port.h"
  13. #include "port/stack_trace.h"
  14. #include "test_util/fault_injection_test_env.h"
  15. #include "test_util/sync_point.h"
  16. #include "util/cast_util.h"
  17. #include "util/mutexlock.h"
  18. namespace ROCKSDB_NAMESPACE {
  19. class DBFlushTest : public DBTestBase {
  20. public:
  21. DBFlushTest() : DBTestBase("/db_flush_test") {}
  22. };
  23. class DBFlushDirectIOTest : public DBFlushTest,
  24. public ::testing::WithParamInterface<bool> {
  25. public:
  26. DBFlushDirectIOTest() : DBFlushTest() {}
  27. };
  28. class DBAtomicFlushTest : public DBFlushTest,
  29. public ::testing::WithParamInterface<bool> {
  30. public:
  31. DBAtomicFlushTest() : DBFlushTest() {}
  32. };
  33. // We had issue when two background threads trying to flush at the same time,
  34. // only one of them get committed. The test verifies the issue is fixed.
  35. TEST_F(DBFlushTest, FlushWhileWritingManifest) {
  36. Options options;
  37. options.disable_auto_compactions = true;
  38. options.max_background_flushes = 2;
  39. options.env = env_;
  40. Reopen(options);
  41. FlushOptions no_wait;
  42. no_wait.wait = false;
  43. no_wait.allow_write_stall=true;
  44. SyncPoint::GetInstance()->LoadDependency(
  45. {{"VersionSet::LogAndApply:WriteManifest",
  46. "DBFlushTest::FlushWhileWritingManifest:1"},
  47. {"MemTableList::TryInstallMemtableFlushResults:InProgress",
  48. "VersionSet::LogAndApply:WriteManifestDone"}});
  49. SyncPoint::GetInstance()->EnableProcessing();
  50. ASSERT_OK(Put("foo", "v"));
  51. ASSERT_OK(dbfull()->Flush(no_wait));
  52. TEST_SYNC_POINT("DBFlushTest::FlushWhileWritingManifest:1");
  53. ASSERT_OK(Put("bar", "v"));
  54. ASSERT_OK(dbfull()->Flush(no_wait));
  55. // If the issue is hit we will wait here forever.
  56. dbfull()->TEST_WaitForFlushMemTable();
  57. #ifndef ROCKSDB_LITE
  58. ASSERT_EQ(2, TotalTableFiles());
  59. #endif // ROCKSDB_LITE
  60. }
  61. // Disable this test temporarily on Travis as it fails intermittently.
  62. // Github issue: #4151
  63. TEST_F(DBFlushTest, SyncFail) {
  64. std::unique_ptr<FaultInjectionTestEnv> fault_injection_env(
  65. new FaultInjectionTestEnv(env_));
  66. Options options;
  67. options.disable_auto_compactions = true;
  68. options.env = fault_injection_env.get();
  69. SyncPoint::GetInstance()->LoadDependency(
  70. {{"DBFlushTest::SyncFail:GetVersionRefCount:1",
  71. "DBImpl::FlushMemTableToOutputFile:BeforePickMemtables"},
  72. {"DBImpl::FlushMemTableToOutputFile:AfterPickMemtables",
  73. "DBFlushTest::SyncFail:GetVersionRefCount:2"},
  74. {"DBFlushTest::SyncFail:1", "DBImpl::SyncClosedLogs:Start"},
  75. {"DBImpl::SyncClosedLogs:Failed", "DBFlushTest::SyncFail:2"}});
  76. SyncPoint::GetInstance()->EnableProcessing();
  77. CreateAndReopenWithCF({"pikachu"}, options);
  78. Put("key", "value");
  79. auto* cfd =
  80. reinterpret_cast<ColumnFamilyHandleImpl*>(db_->DefaultColumnFamily())
  81. ->cfd();
  82. FlushOptions flush_options;
  83. flush_options.wait = false;
  84. ASSERT_OK(dbfull()->Flush(flush_options));
  85. // Flush installs a new super-version. Get the ref count after that.
  86. auto current_before = cfd->current();
  87. int refs_before = cfd->current()->TEST_refs();
  88. TEST_SYNC_POINT("DBFlushTest::SyncFail:GetVersionRefCount:1");
  89. TEST_SYNC_POINT("DBFlushTest::SyncFail:GetVersionRefCount:2");
  90. int refs_after_picking_memtables = cfd->current()->TEST_refs();
  91. ASSERT_EQ(refs_before + 1, refs_after_picking_memtables);
  92. fault_injection_env->SetFilesystemActive(false);
  93. TEST_SYNC_POINT("DBFlushTest::SyncFail:1");
  94. TEST_SYNC_POINT("DBFlushTest::SyncFail:2");
  95. fault_injection_env->SetFilesystemActive(true);
  96. // Now the background job will do the flush; wait for it.
  97. dbfull()->TEST_WaitForFlushMemTable();
  98. #ifndef ROCKSDB_LITE
  99. ASSERT_EQ("", FilesPerLevel()); // flush failed.
  100. #endif // ROCKSDB_LITE
  101. // Backgroun flush job should release ref count to current version.
  102. ASSERT_EQ(current_before, cfd->current());
  103. ASSERT_EQ(refs_before, cfd->current()->TEST_refs());
  104. Destroy(options);
  105. }
  106. TEST_F(DBFlushTest, SyncSkip) {
  107. Options options = CurrentOptions();
  108. SyncPoint::GetInstance()->LoadDependency(
  109. {{"DBFlushTest::SyncSkip:1", "DBImpl::SyncClosedLogs:Skip"},
  110. {"DBImpl::SyncClosedLogs:Skip", "DBFlushTest::SyncSkip:2"}});
  111. SyncPoint::GetInstance()->EnableProcessing();
  112. Reopen(options);
  113. Put("key", "value");
  114. FlushOptions flush_options;
  115. flush_options.wait = false;
  116. ASSERT_OK(dbfull()->Flush(flush_options));
  117. TEST_SYNC_POINT("DBFlushTest::SyncSkip:1");
  118. TEST_SYNC_POINT("DBFlushTest::SyncSkip:2");
  119. // Now the background job will do the flush; wait for it.
  120. dbfull()->TEST_WaitForFlushMemTable();
  121. Destroy(options);
  122. }
  123. TEST_F(DBFlushTest, FlushInLowPriThreadPool) {
  124. // Verify setting an empty high-pri (flush) thread pool causes flushes to be
  125. // scheduled in the low-pri (compaction) thread pool.
  126. Options options = CurrentOptions();
  127. options.level0_file_num_compaction_trigger = 4;
  128. options.memtable_factory.reset(new SpecialSkipListFactory(1));
  129. Reopen(options);
  130. env_->SetBackgroundThreads(0, Env::HIGH);
  131. std::thread::id tid;
  132. int num_flushes = 0, num_compactions = 0;
  133. SyncPoint::GetInstance()->SetCallBack(
  134. "DBImpl::BGWorkFlush", [&](void* /*arg*/) {
  135. if (tid == std::thread::id()) {
  136. tid = std::this_thread::get_id();
  137. } else {
  138. ASSERT_EQ(tid, std::this_thread::get_id());
  139. }
  140. ++num_flushes;
  141. });
  142. SyncPoint::GetInstance()->SetCallBack(
  143. "DBImpl::BGWorkCompaction", [&](void* /*arg*/) {
  144. ASSERT_EQ(tid, std::this_thread::get_id());
  145. ++num_compactions;
  146. });
  147. SyncPoint::GetInstance()->EnableProcessing();
  148. ASSERT_OK(Put("key", "val"));
  149. for (int i = 0; i < 4; ++i) {
  150. ASSERT_OK(Put("key", "val"));
  151. dbfull()->TEST_WaitForFlushMemTable();
  152. }
  153. dbfull()->TEST_WaitForCompact();
  154. ASSERT_EQ(4, num_flushes);
  155. ASSERT_EQ(1, num_compactions);
  156. }
  157. TEST_F(DBFlushTest, ManualFlushWithMinWriteBufferNumberToMerge) {
  158. Options options = CurrentOptions();
  159. options.write_buffer_size = 100;
  160. options.max_write_buffer_number = 4;
  161. options.min_write_buffer_number_to_merge = 3;
  162. Reopen(options);
  163. SyncPoint::GetInstance()->LoadDependency(
  164. {{"DBImpl::BGWorkFlush",
  165. "DBFlushTest::ManualFlushWithMinWriteBufferNumberToMerge:1"},
  166. {"DBFlushTest::ManualFlushWithMinWriteBufferNumberToMerge:2",
  167. "FlushJob::WriteLevel0Table"}});
  168. SyncPoint::GetInstance()->EnableProcessing();
  169. ASSERT_OK(Put("key1", "value1"));
  170. port::Thread t([&]() {
  171. // The call wait for flush to finish, i.e. with flush_options.wait = true.
  172. ASSERT_OK(Flush());
  173. });
  174. // Wait for flush start.
  175. TEST_SYNC_POINT("DBFlushTest::ManualFlushWithMinWriteBufferNumberToMerge:1");
  176. // Insert a second memtable before the manual flush finish.
  177. // At the end of the manual flush job, it will check if further flush
  178. // is needed, but it will not trigger flush of the second memtable because
  179. // min_write_buffer_number_to_merge is not reached.
  180. ASSERT_OK(Put("key2", "value2"));
  181. ASSERT_OK(dbfull()->TEST_SwitchMemtable());
  182. TEST_SYNC_POINT("DBFlushTest::ManualFlushWithMinWriteBufferNumberToMerge:2");
  183. // Manual flush should return, without waiting for flush indefinitely.
  184. t.join();
  185. }
  186. TEST_F(DBFlushTest, ScheduleOnlyOneBgThread) {
  187. Options options = CurrentOptions();
  188. Reopen(options);
  189. SyncPoint::GetInstance()->DisableProcessing();
  190. SyncPoint::GetInstance()->ClearAllCallBacks();
  191. int called = 0;
  192. SyncPoint::GetInstance()->SetCallBack(
  193. "DBImpl::MaybeScheduleFlushOrCompaction:AfterSchedule:0", [&](void* arg) {
  194. ASSERT_NE(nullptr, arg);
  195. auto unscheduled_flushes = *reinterpret_cast<int*>(arg);
  196. ASSERT_EQ(0, unscheduled_flushes);
  197. ++called;
  198. });
  199. SyncPoint::GetInstance()->EnableProcessing();
  200. ASSERT_OK(Put("a", "foo"));
  201. FlushOptions flush_opts;
  202. ASSERT_OK(dbfull()->Flush(flush_opts));
  203. ASSERT_EQ(1, called);
  204. SyncPoint::GetInstance()->DisableProcessing();
  205. SyncPoint::GetInstance()->ClearAllCallBacks();
  206. }
  207. TEST_P(DBFlushDirectIOTest, DirectIO) {
  208. Options options;
  209. options.create_if_missing = true;
  210. options.disable_auto_compactions = true;
  211. options.max_background_flushes = 2;
  212. options.use_direct_io_for_flush_and_compaction = GetParam();
  213. options.env = new MockEnv(Env::Default());
  214. SyncPoint::GetInstance()->SetCallBack(
  215. "BuildTable:create_file", [&](void* arg) {
  216. bool* use_direct_writes = static_cast<bool*>(arg);
  217. ASSERT_EQ(*use_direct_writes,
  218. options.use_direct_io_for_flush_and_compaction);
  219. });
  220. SyncPoint::GetInstance()->EnableProcessing();
  221. Reopen(options);
  222. ASSERT_OK(Put("foo", "v"));
  223. FlushOptions flush_options;
  224. flush_options.wait = true;
  225. ASSERT_OK(dbfull()->Flush(flush_options));
  226. Destroy(options);
  227. delete options.env;
  228. }
  229. TEST_F(DBFlushTest, FlushError) {
  230. Options options;
  231. std::unique_ptr<FaultInjectionTestEnv> fault_injection_env(
  232. new FaultInjectionTestEnv(env_));
  233. options.write_buffer_size = 100;
  234. options.max_write_buffer_number = 4;
  235. options.min_write_buffer_number_to_merge = 3;
  236. options.disable_auto_compactions = true;
  237. options.env = fault_injection_env.get();
  238. Reopen(options);
  239. ASSERT_OK(Put("key1", "value1"));
  240. ASSERT_OK(Put("key2", "value2"));
  241. fault_injection_env->SetFilesystemActive(false);
  242. Status s = dbfull()->TEST_SwitchMemtable();
  243. fault_injection_env->SetFilesystemActive(true);
  244. Destroy(options);
  245. ASSERT_NE(s, Status::OK());
  246. }
  247. TEST_F(DBFlushTest, ManualFlushFailsInReadOnlyMode) {
  248. // Regression test for bug where manual flush hangs forever when the DB
  249. // is in read-only mode. Verify it now at least returns, despite failing.
  250. Options options;
  251. std::unique_ptr<FaultInjectionTestEnv> fault_injection_env(
  252. new FaultInjectionTestEnv(env_));
  253. options.env = fault_injection_env.get();
  254. options.max_write_buffer_number = 2;
  255. Reopen(options);
  256. // Trigger a first flush but don't let it run
  257. ASSERT_OK(db_->PauseBackgroundWork());
  258. ASSERT_OK(Put("key1", "value1"));
  259. FlushOptions flush_opts;
  260. flush_opts.wait = false;
  261. ASSERT_OK(db_->Flush(flush_opts));
  262. // Write a key to the second memtable so we have something to flush later
  263. // after the DB is in read-only mode.
  264. ASSERT_OK(Put("key2", "value2"));
  265. // Let the first flush continue, hit an error, and put the DB in read-only
  266. // mode.
  267. fault_injection_env->SetFilesystemActive(false);
  268. ASSERT_OK(db_->ContinueBackgroundWork());
  269. dbfull()->TEST_WaitForFlushMemTable();
  270. #ifndef ROCKSDB_LITE
  271. uint64_t num_bg_errors;
  272. ASSERT_TRUE(db_->GetIntProperty(DB::Properties::kBackgroundErrors,
  273. &num_bg_errors));
  274. ASSERT_GT(num_bg_errors, 0);
  275. #endif // ROCKSDB_LITE
  276. // In the bug scenario, triggering another flush would cause the second flush
  277. // to hang forever. After the fix we expect it to return an error.
  278. ASSERT_NOK(db_->Flush(FlushOptions()));
  279. Close();
  280. }
  281. TEST_F(DBFlushTest, CFDropRaceWithWaitForFlushMemTables) {
  282. Options options = CurrentOptions();
  283. options.create_if_missing = true;
  284. CreateAndReopenWithCF({"pikachu"}, options);
  285. SyncPoint::GetInstance()->DisableProcessing();
  286. SyncPoint::GetInstance()->LoadDependency(
  287. {{"DBImpl::FlushMemTable:AfterScheduleFlush",
  288. "DBFlushTest::CFDropRaceWithWaitForFlushMemTables:BeforeDrop"},
  289. {"DBFlushTest::CFDropRaceWithWaitForFlushMemTables:AfterFree",
  290. "DBImpl::BackgroundCallFlush:start"},
  291. {"DBImpl::BackgroundCallFlush:start",
  292. "DBImpl::FlushMemTable:BeforeWaitForBgFlush"}});
  293. SyncPoint::GetInstance()->EnableProcessing();
  294. ASSERT_EQ(2, handles_.size());
  295. ASSERT_OK(Put(1, "key", "value"));
  296. auto* cfd = static_cast<ColumnFamilyHandleImpl*>(handles_[1])->cfd();
  297. port::Thread drop_cf_thr([&]() {
  298. TEST_SYNC_POINT(
  299. "DBFlushTest::CFDropRaceWithWaitForFlushMemTables:BeforeDrop");
  300. ASSERT_OK(dbfull()->DropColumnFamily(handles_[1]));
  301. ASSERT_OK(dbfull()->DestroyColumnFamilyHandle(handles_[1]));
  302. handles_.resize(1);
  303. TEST_SYNC_POINT(
  304. "DBFlushTest::CFDropRaceWithWaitForFlushMemTables:AfterFree");
  305. });
  306. FlushOptions flush_opts;
  307. flush_opts.allow_write_stall = true;
  308. ASSERT_NOK(dbfull()->TEST_FlushMemTable(cfd, flush_opts));
  309. drop_cf_thr.join();
  310. Close();
  311. SyncPoint::GetInstance()->DisableProcessing();
  312. }
  313. #ifndef ROCKSDB_LITE
  314. TEST_F(DBFlushTest, FireOnFlushCompletedAfterCommittedResult) {
  315. class TestListener : public EventListener {
  316. public:
  317. void OnFlushCompleted(DB* db, const FlushJobInfo& info) override {
  318. // There's only one key in each flush.
  319. ASSERT_EQ(info.smallest_seqno, info.largest_seqno);
  320. ASSERT_NE(0, info.smallest_seqno);
  321. if (info.smallest_seqno == seq1) {
  322. // First flush completed
  323. ASSERT_FALSE(completed1);
  324. completed1 = true;
  325. CheckFlushResultCommitted(db, seq1);
  326. } else {
  327. // Second flush completed
  328. ASSERT_FALSE(completed2);
  329. completed2 = true;
  330. ASSERT_EQ(info.smallest_seqno, seq2);
  331. CheckFlushResultCommitted(db, seq2);
  332. }
  333. }
  334. void CheckFlushResultCommitted(DB* db, SequenceNumber seq) {
  335. DBImpl* db_impl = static_cast_with_check<DBImpl>(db);
  336. InstrumentedMutex* mutex = db_impl->mutex();
  337. mutex->Lock();
  338. auto* cfd =
  339. reinterpret_cast<ColumnFamilyHandleImpl*>(db->DefaultColumnFamily())
  340. ->cfd();
  341. ASSERT_LT(seq, cfd->imm()->current()->GetEarliestSequenceNumber());
  342. mutex->Unlock();
  343. }
  344. std::atomic<SequenceNumber> seq1{0};
  345. std::atomic<SequenceNumber> seq2{0};
  346. std::atomic<bool> completed1{false};
  347. std::atomic<bool> completed2{false};
  348. };
  349. std::shared_ptr<TestListener> listener = std::make_shared<TestListener>();
  350. SyncPoint::GetInstance()->LoadDependency(
  351. {{"DBImpl::BackgroundCallFlush:start",
  352. "DBFlushTest::FireOnFlushCompletedAfterCommittedResult:WaitFirst"},
  353. {"DBImpl::FlushMemTableToOutputFile:Finish",
  354. "DBFlushTest::FireOnFlushCompletedAfterCommittedResult:WaitSecond"}});
  355. SyncPoint::GetInstance()->SetCallBack(
  356. "FlushJob::WriteLevel0Table", [&listener](void* arg) {
  357. // Wait for the second flush finished, out of mutex.
  358. auto* mems = reinterpret_cast<autovector<MemTable*>*>(arg);
  359. if (mems->front()->GetEarliestSequenceNumber() == listener->seq1 - 1) {
  360. TEST_SYNC_POINT(
  361. "DBFlushTest::FireOnFlushCompletedAfterCommittedResult:"
  362. "WaitSecond");
  363. }
  364. });
  365. Options options = CurrentOptions();
  366. options.create_if_missing = true;
  367. options.listeners.push_back(listener);
  368. // Setting max_flush_jobs = max_background_jobs / 4 = 2.
  369. options.max_background_jobs = 8;
  370. // Allow 2 immutable memtables.
  371. options.max_write_buffer_number = 3;
  372. Reopen(options);
  373. SyncPoint::GetInstance()->EnableProcessing();
  374. ASSERT_OK(Put("foo", "v"));
  375. listener->seq1 = db_->GetLatestSequenceNumber();
  376. // t1 will wait for the second flush complete before committing flush result.
  377. auto t1 = port::Thread([&]() {
  378. // flush_opts.wait = true
  379. ASSERT_OK(db_->Flush(FlushOptions()));
  380. });
  381. // Wait for first flush started.
  382. TEST_SYNC_POINT(
  383. "DBFlushTest::FireOnFlushCompletedAfterCommittedResult:WaitFirst");
  384. // The second flush will exit early without commit its result. The work
  385. // is delegated to the first flush.
  386. ASSERT_OK(Put("bar", "v"));
  387. listener->seq2 = db_->GetLatestSequenceNumber();
  388. FlushOptions flush_opts;
  389. flush_opts.wait = false;
  390. ASSERT_OK(db_->Flush(flush_opts));
  391. t1.join();
  392. ASSERT_TRUE(listener->completed1);
  393. ASSERT_TRUE(listener->completed2);
  394. SyncPoint::GetInstance()->DisableProcessing();
  395. SyncPoint::GetInstance()->ClearAllCallBacks();
  396. }
  397. #endif // !ROCKSDB_LITE
  398. TEST_P(DBAtomicFlushTest, ManualAtomicFlush) {
  399. Options options = CurrentOptions();
  400. options.create_if_missing = true;
  401. options.atomic_flush = GetParam();
  402. options.write_buffer_size = (static_cast<size_t>(64) << 20);
  403. CreateAndReopenWithCF({"pikachu", "eevee"}, options);
  404. size_t num_cfs = handles_.size();
  405. ASSERT_EQ(3, num_cfs);
  406. WriteOptions wopts;
  407. wopts.disableWAL = true;
  408. for (size_t i = 0; i != num_cfs; ++i) {
  409. ASSERT_OK(Put(static_cast<int>(i) /*cf*/, "key", "value", wopts));
  410. }
  411. std::vector<int> cf_ids;
  412. for (size_t i = 0; i != num_cfs; ++i) {
  413. cf_ids.emplace_back(static_cast<int>(i));
  414. }
  415. ASSERT_OK(Flush(cf_ids));
  416. for (size_t i = 0; i != num_cfs; ++i) {
  417. auto cfh = static_cast<ColumnFamilyHandleImpl*>(handles_[i]);
  418. ASSERT_EQ(0, cfh->cfd()->imm()->NumNotFlushed());
  419. ASSERT_TRUE(cfh->cfd()->mem()->IsEmpty());
  420. }
  421. }
  422. TEST_P(DBAtomicFlushTest, AtomicFlushTriggeredByMemTableFull) {
  423. Options options = CurrentOptions();
  424. options.create_if_missing = true;
  425. options.atomic_flush = GetParam();
  426. // 4KB so that we can easily trigger auto flush.
  427. options.write_buffer_size = 4096;
  428. SyncPoint::GetInstance()->LoadDependency(
  429. {{"DBImpl::BackgroundCallFlush:FlushFinish:0",
  430. "DBAtomicFlushTest::AtomicFlushTriggeredByMemTableFull:BeforeCheck"}});
  431. SyncPoint::GetInstance()->EnableProcessing();
  432. CreateAndReopenWithCF({"pikachu", "eevee"}, options);
  433. size_t num_cfs = handles_.size();
  434. ASSERT_EQ(3, num_cfs);
  435. WriteOptions wopts;
  436. wopts.disableWAL = true;
  437. for (size_t i = 0; i != num_cfs; ++i) {
  438. ASSERT_OK(Put(static_cast<int>(i) /*cf*/, "key", "value", wopts));
  439. }
  440. // Keep writing to one of them column families to trigger auto flush.
  441. for (int i = 0; i != 4000; ++i) {
  442. ASSERT_OK(Put(static_cast<int>(num_cfs) - 1 /*cf*/,
  443. "key" + std::to_string(i), "value" + std::to_string(i),
  444. wopts));
  445. }
  446. TEST_SYNC_POINT(
  447. "DBAtomicFlushTest::AtomicFlushTriggeredByMemTableFull:BeforeCheck");
  448. if (options.atomic_flush) {
  449. for (size_t i = 0; i != num_cfs - 1; ++i) {
  450. auto cfh = static_cast<ColumnFamilyHandleImpl*>(handles_[i]);
  451. ASSERT_EQ(0, cfh->cfd()->imm()->NumNotFlushed());
  452. ASSERT_TRUE(cfh->cfd()->mem()->IsEmpty());
  453. }
  454. } else {
  455. for (size_t i = 0; i != num_cfs - 1; ++i) {
  456. auto cfh = static_cast<ColumnFamilyHandleImpl*>(handles_[i]);
  457. ASSERT_EQ(0, cfh->cfd()->imm()->NumNotFlushed());
  458. ASSERT_FALSE(cfh->cfd()->mem()->IsEmpty());
  459. }
  460. }
  461. SyncPoint::GetInstance()->DisableProcessing();
  462. }
  463. TEST_P(DBAtomicFlushTest, AtomicFlushRollbackSomeJobs) {
  464. bool atomic_flush = GetParam();
  465. if (!atomic_flush) {
  466. return;
  467. }
  468. std::unique_ptr<FaultInjectionTestEnv> fault_injection_env(
  469. new FaultInjectionTestEnv(env_));
  470. Options options = CurrentOptions();
  471. options.create_if_missing = true;
  472. options.atomic_flush = atomic_flush;
  473. options.env = fault_injection_env.get();
  474. SyncPoint::GetInstance()->DisableProcessing();
  475. SyncPoint::GetInstance()->LoadDependency(
  476. {{"DBImpl::AtomicFlushMemTablesToOutputFiles:SomeFlushJobsComplete:1",
  477. "DBAtomicFlushTest::AtomicFlushRollbackSomeJobs:1"},
  478. {"DBAtomicFlushTest::AtomicFlushRollbackSomeJobs:2",
  479. "DBImpl::AtomicFlushMemTablesToOutputFiles:SomeFlushJobsComplete:2"}});
  480. SyncPoint::GetInstance()->EnableProcessing();
  481. CreateAndReopenWithCF({"pikachu", "eevee"}, options);
  482. size_t num_cfs = handles_.size();
  483. ASSERT_EQ(3, num_cfs);
  484. WriteOptions wopts;
  485. wopts.disableWAL = true;
  486. for (size_t i = 0; i != num_cfs; ++i) {
  487. int cf_id = static_cast<int>(i);
  488. ASSERT_OK(Put(cf_id, "key", "value", wopts));
  489. }
  490. FlushOptions flush_opts;
  491. flush_opts.wait = false;
  492. ASSERT_OK(dbfull()->Flush(flush_opts, handles_));
  493. TEST_SYNC_POINT("DBAtomicFlushTest::AtomicFlushRollbackSomeJobs:1");
  494. fault_injection_env->SetFilesystemActive(false);
  495. TEST_SYNC_POINT("DBAtomicFlushTest::AtomicFlushRollbackSomeJobs:2");
  496. for (auto* cfh : handles_) {
  497. dbfull()->TEST_WaitForFlushMemTable(cfh);
  498. }
  499. for (size_t i = 0; i != num_cfs; ++i) {
  500. auto cfh = static_cast<ColumnFamilyHandleImpl*>(handles_[i]);
  501. ASSERT_EQ(1, cfh->cfd()->imm()->NumNotFlushed());
  502. ASSERT_TRUE(cfh->cfd()->mem()->IsEmpty());
  503. }
  504. fault_injection_env->SetFilesystemActive(true);
  505. Destroy(options);
  506. }
  507. TEST_P(DBAtomicFlushTest, FlushMultipleCFs_DropSomeBeforeRequestFlush) {
  508. bool atomic_flush = GetParam();
  509. if (!atomic_flush) {
  510. return;
  511. }
  512. Options options = CurrentOptions();
  513. options.create_if_missing = true;
  514. options.atomic_flush = atomic_flush;
  515. SyncPoint::GetInstance()->DisableProcessing();
  516. SyncPoint::GetInstance()->ClearAllCallBacks();
  517. SyncPoint::GetInstance()->EnableProcessing();
  518. CreateAndReopenWithCF({"pikachu", "eevee"}, options);
  519. size_t num_cfs = handles_.size();
  520. ASSERT_EQ(3, num_cfs);
  521. WriteOptions wopts;
  522. wopts.disableWAL = true;
  523. std::vector<int> cf_ids;
  524. for (size_t i = 0; i != num_cfs; ++i) {
  525. int cf_id = static_cast<int>(i);
  526. ASSERT_OK(Put(cf_id, "key", "value", wopts));
  527. cf_ids.push_back(cf_id);
  528. }
  529. ASSERT_OK(dbfull()->DropColumnFamily(handles_[1]));
  530. ASSERT_TRUE(Flush(cf_ids).IsColumnFamilyDropped());
  531. Destroy(options);
  532. }
  533. TEST_P(DBAtomicFlushTest,
  534. FlushMultipleCFs_DropSomeAfterScheduleFlushBeforeFlushJobRun) {
  535. bool atomic_flush = GetParam();
  536. if (!atomic_flush) {
  537. return;
  538. }
  539. Options options = CurrentOptions();
  540. options.create_if_missing = true;
  541. options.atomic_flush = atomic_flush;
  542. CreateAndReopenWithCF({"pikachu", "eevee"}, options);
  543. SyncPoint::GetInstance()->DisableProcessing();
  544. SyncPoint::GetInstance()->ClearAllCallBacks();
  545. SyncPoint::GetInstance()->LoadDependency(
  546. {{"DBImpl::AtomicFlushMemTables:AfterScheduleFlush",
  547. "DBAtomicFlushTest::BeforeDropCF"},
  548. {"DBAtomicFlushTest::AfterDropCF",
  549. "DBImpl::BackgroundCallFlush:start"}});
  550. SyncPoint::GetInstance()->EnableProcessing();
  551. size_t num_cfs = handles_.size();
  552. ASSERT_EQ(3, num_cfs);
  553. WriteOptions wopts;
  554. wopts.disableWAL = true;
  555. for (size_t i = 0; i != num_cfs; ++i) {
  556. int cf_id = static_cast<int>(i);
  557. ASSERT_OK(Put(cf_id, "key", "value", wopts));
  558. }
  559. port::Thread user_thread([&]() {
  560. TEST_SYNC_POINT("DBAtomicFlushTest::BeforeDropCF");
  561. ASSERT_OK(dbfull()->DropColumnFamily(handles_[1]));
  562. TEST_SYNC_POINT("DBAtomicFlushTest::AfterDropCF");
  563. });
  564. FlushOptions flush_opts;
  565. flush_opts.wait = true;
  566. ASSERT_OK(dbfull()->Flush(flush_opts, handles_));
  567. user_thread.join();
  568. for (size_t i = 0; i != num_cfs; ++i) {
  569. int cf_id = static_cast<int>(i);
  570. ASSERT_EQ("value", Get(cf_id, "key"));
  571. }
  572. ReopenWithColumnFamilies({kDefaultColumnFamilyName, "eevee"}, options);
  573. num_cfs = handles_.size();
  574. ASSERT_EQ(2, num_cfs);
  575. for (size_t i = 0; i != num_cfs; ++i) {
  576. int cf_id = static_cast<int>(i);
  577. ASSERT_EQ("value", Get(cf_id, "key"));
  578. }
  579. Destroy(options);
  580. }
  581. TEST_P(DBAtomicFlushTest, TriggerFlushAndClose) {
  582. bool atomic_flush = GetParam();
  583. if (!atomic_flush) {
  584. return;
  585. }
  586. const int kNumKeysTriggerFlush = 4;
  587. Options options = CurrentOptions();
  588. options.create_if_missing = true;
  589. options.atomic_flush = atomic_flush;
  590. options.memtable_factory.reset(
  591. new SpecialSkipListFactory(kNumKeysTriggerFlush));
  592. CreateAndReopenWithCF({"pikachu"}, options);
  593. for (int i = 0; i != kNumKeysTriggerFlush; ++i) {
  594. ASSERT_OK(Put(0, "key" + std::to_string(i), "value" + std::to_string(i)));
  595. }
  596. SyncPoint::GetInstance()->DisableProcessing();
  597. SyncPoint::GetInstance()->ClearAllCallBacks();
  598. SyncPoint::GetInstance()->EnableProcessing();
  599. ASSERT_OK(Put(0, "key", "value"));
  600. Close();
  601. ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options);
  602. ASSERT_EQ("value", Get(0, "key"));
  603. }
  604. TEST_P(DBAtomicFlushTest, PickMemtablesRaceWithBackgroundFlush) {
  605. bool atomic_flush = GetParam();
  606. Options options = CurrentOptions();
  607. options.create_if_missing = true;
  608. options.atomic_flush = atomic_flush;
  609. options.max_write_buffer_number = 4;
  610. // Set min_write_buffer_number_to_merge to be greater than 1, so that
  611. // a column family with one memtable in the imm will not cause IsFlushPending
  612. // to return true when flush_requested_ is false.
  613. options.min_write_buffer_number_to_merge = 2;
  614. CreateAndReopenWithCF({"pikachu"}, options);
  615. ASSERT_EQ(2, handles_.size());
  616. ASSERT_OK(dbfull()->PauseBackgroundWork());
  617. ASSERT_OK(Put(0, "key00", "value00"));
  618. ASSERT_OK(Put(1, "key10", "value10"));
  619. FlushOptions flush_opts;
  620. flush_opts.wait = false;
  621. ASSERT_OK(dbfull()->Flush(flush_opts, handles_));
  622. ASSERT_OK(Put(0, "key01", "value01"));
  623. // Since max_write_buffer_number is 4, the following flush won't cause write
  624. // stall.
  625. ASSERT_OK(dbfull()->Flush(flush_opts));
  626. ASSERT_OK(dbfull()->DropColumnFamily(handles_[1]));
  627. ASSERT_OK(dbfull()->DestroyColumnFamilyHandle(handles_[1]));
  628. handles_[1] = nullptr;
  629. ASSERT_OK(dbfull()->ContinueBackgroundWork());
  630. ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[0]));
  631. delete handles_[0];
  632. handles_.clear();
  633. }
  634. TEST_P(DBAtomicFlushTest, CFDropRaceWithWaitForFlushMemTables) {
  635. bool atomic_flush = GetParam();
  636. if (!atomic_flush) {
  637. return;
  638. }
  639. Options options = CurrentOptions();
  640. options.create_if_missing = true;
  641. options.atomic_flush = atomic_flush;
  642. CreateAndReopenWithCF({"pikachu"}, options);
  643. SyncPoint::GetInstance()->DisableProcessing();
  644. SyncPoint::GetInstance()->LoadDependency(
  645. {{"DBImpl::AtomicFlushMemTables:AfterScheduleFlush",
  646. "DBAtomicFlushTest::CFDropRaceWithWaitForFlushMemTables:BeforeDrop"},
  647. {"DBAtomicFlushTest::CFDropRaceWithWaitForFlushMemTables:AfterFree",
  648. "DBImpl::BackgroundCallFlush:start"},
  649. {"DBImpl::BackgroundCallFlush:start",
  650. "DBImpl::AtomicFlushMemTables:BeforeWaitForBgFlush"}});
  651. SyncPoint::GetInstance()->EnableProcessing();
  652. ASSERT_EQ(2, handles_.size());
  653. ASSERT_OK(Put(0, "key", "value"));
  654. ASSERT_OK(Put(1, "key", "value"));
  655. auto* cfd_default =
  656. static_cast<ColumnFamilyHandleImpl*>(dbfull()->DefaultColumnFamily())
  657. ->cfd();
  658. auto* cfd_pikachu = static_cast<ColumnFamilyHandleImpl*>(handles_[1])->cfd();
  659. port::Thread drop_cf_thr([&]() {
  660. TEST_SYNC_POINT(
  661. "DBAtomicFlushTest::CFDropRaceWithWaitForFlushMemTables:BeforeDrop");
  662. ASSERT_OK(dbfull()->DropColumnFamily(handles_[1]));
  663. delete handles_[1];
  664. handles_.resize(1);
  665. TEST_SYNC_POINT(
  666. "DBAtomicFlushTest::CFDropRaceWithWaitForFlushMemTables:AfterFree");
  667. });
  668. FlushOptions flush_opts;
  669. flush_opts.allow_write_stall = true;
  670. ASSERT_OK(dbfull()->TEST_AtomicFlushMemTables({cfd_default, cfd_pikachu},
  671. flush_opts));
  672. drop_cf_thr.join();
  673. Close();
  674. SyncPoint::GetInstance()->DisableProcessing();
  675. }
  676. TEST_P(DBAtomicFlushTest, RollbackAfterFailToInstallResults) {
  677. bool atomic_flush = GetParam();
  678. if (!atomic_flush) {
  679. return;
  680. }
  681. auto fault_injection_env = std::make_shared<FaultInjectionTestEnv>(env_);
  682. Options options = CurrentOptions();
  683. options.env = fault_injection_env.get();
  684. options.create_if_missing = true;
  685. options.atomic_flush = atomic_flush;
  686. CreateAndReopenWithCF({"pikachu"}, options);
  687. ASSERT_EQ(2, handles_.size());
  688. for (size_t cf = 0; cf < handles_.size(); ++cf) {
  689. ASSERT_OK(Put(static_cast<int>(cf), "a", "value"));
  690. }
  691. SyncPoint::GetInstance()->DisableProcessing();
  692. SyncPoint::GetInstance()->ClearAllCallBacks();
  693. SyncPoint::GetInstance()->SetCallBack(
  694. "VersionSet::ProcessManifestWrites:BeforeWriteLastVersionEdit:0",
  695. [&](void* /*arg*/) { fault_injection_env->SetFilesystemActive(false); });
  696. SyncPoint::GetInstance()->EnableProcessing();
  697. FlushOptions flush_opts;
  698. Status s = db_->Flush(flush_opts, handles_);
  699. ASSERT_NOK(s);
  700. fault_injection_env->SetFilesystemActive(true);
  701. Close();
  702. SyncPoint::GetInstance()->ClearAllCallBacks();
  703. }
  704. INSTANTIATE_TEST_CASE_P(DBFlushDirectIOTest, DBFlushDirectIOTest,
  705. testing::Bool());
  706. INSTANTIATE_TEST_CASE_P(DBAtomicFlushTest, DBAtomicFlushTest, testing::Bool());
  707. } // namespace ROCKSDB_NAMESPACE
  708. int main(int argc, char** argv) {
  709. ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
  710. ::testing::InitGoogleTest(&argc, argv);
  711. return RUN_ALL_TESTS();
  712. }