memtable_list_test.cc 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #include "db/memtable_list.h"
  6. #include <algorithm>
  7. #include <string>
  8. #include <vector>
  9. #include "db/merge_context.h"
  10. #include "db/version_set.h"
  11. #include "db/write_controller.h"
  12. #include "rocksdb/db.h"
  13. #include "rocksdb/status.h"
  14. #include "rocksdb/write_buffer_manager.h"
  15. #include "test_util/testharness.h"
  16. #include "test_util/testutil.h"
  17. #include "util/string_util.h"
  18. namespace ROCKSDB_NAMESPACE {
  19. class MemTableListTest : public testing::Test {
  20. public:
  21. std::string dbname;
  22. DB* db;
  23. Options options;
  24. std::vector<ColumnFamilyHandle*> handles;
  25. std::atomic<uint64_t> file_number;
  26. MemTableListTest() : db(nullptr), file_number(1) {
  27. dbname = test::PerThreadDBPath("memtable_list_test");
  28. options.create_if_missing = true;
  29. DestroyDB(dbname, options);
  30. }
  31. // Create a test db if not yet created
  32. void CreateDB() {
  33. if (db == nullptr) {
  34. options.create_if_missing = true;
  35. DestroyDB(dbname, options);
  36. // Open DB only with default column family
  37. ColumnFamilyOptions cf_options;
  38. std::vector<ColumnFamilyDescriptor> cf_descs;
  39. cf_descs.emplace_back(kDefaultColumnFamilyName, cf_options);
  40. Status s = DB::Open(options, dbname, cf_descs, &handles, &db);
  41. EXPECT_OK(s);
  42. ColumnFamilyOptions cf_opt1, cf_opt2;
  43. cf_opt1.cf_paths.emplace_back(dbname + "_one_1",
  44. std::numeric_limits<uint64_t>::max());
  45. cf_opt2.cf_paths.emplace_back(dbname + "_two_1",
  46. std::numeric_limits<uint64_t>::max());
  47. int sz = static_cast<int>(handles.size());
  48. handles.resize(sz + 2);
  49. s = db->CreateColumnFamily(cf_opt1, "one", &handles[1]);
  50. EXPECT_OK(s);
  51. s = db->CreateColumnFamily(cf_opt2, "two", &handles[2]);
  52. EXPECT_OK(s);
  53. cf_descs.emplace_back("one", cf_options);
  54. cf_descs.emplace_back("two", cf_options);
  55. }
  56. }
  57. ~MemTableListTest() override {
  58. if (db) {
  59. std::vector<ColumnFamilyDescriptor> cf_descs(handles.size());
  60. for (int i = 0; i != static_cast<int>(handles.size()); ++i) {
  61. handles[i]->GetDescriptor(&cf_descs[i]);
  62. }
  63. for (auto h : handles) {
  64. if (h) {
  65. db->DestroyColumnFamilyHandle(h);
  66. }
  67. }
  68. handles.clear();
  69. delete db;
  70. db = nullptr;
  71. DestroyDB(dbname, options, cf_descs);
  72. }
  73. }
  74. // Calls MemTableList::TryInstallMemtableFlushResults() and sets up all
  75. // structures needed to call this function.
  76. Status Mock_InstallMemtableFlushResults(
  77. MemTableList* list, const MutableCFOptions& mutable_cf_options,
  78. const autovector<MemTable*>& m, autovector<MemTable*>* to_delete) {
  79. // Create a mock Logger
  80. test::NullLogger logger;
  81. LogBuffer log_buffer(DEBUG_LEVEL, &logger);
  82. CreateDB();
  83. // Create a mock VersionSet
  84. DBOptions db_options;
  85. db_options.file_system = FileSystem::Default();
  86. ImmutableDBOptions immutable_db_options(db_options);
  87. EnvOptions env_options;
  88. std::shared_ptr<Cache> table_cache(NewLRUCache(50000, 16));
  89. WriteBufferManager write_buffer_manager(db_options.db_write_buffer_size);
  90. WriteController write_controller(10000000u);
  91. VersionSet versions(dbname, &immutable_db_options, env_options,
  92. table_cache.get(), &write_buffer_manager,
  93. &write_controller, /*block_cache_tracer=*/nullptr);
  94. std::vector<ColumnFamilyDescriptor> cf_descs;
  95. cf_descs.emplace_back(kDefaultColumnFamilyName, ColumnFamilyOptions());
  96. cf_descs.emplace_back("one", ColumnFamilyOptions());
  97. cf_descs.emplace_back("two", ColumnFamilyOptions());
  98. EXPECT_OK(versions.Recover(cf_descs, false));
  99. // Create mock default ColumnFamilyData
  100. auto column_family_set = versions.GetColumnFamilySet();
  101. LogsWithPrepTracker dummy_prep_tracker;
  102. auto cfd = column_family_set->GetDefault();
  103. EXPECT_TRUE(nullptr != cfd);
  104. uint64_t file_num = file_number.fetch_add(1);
  105. // Create dummy mutex.
  106. InstrumentedMutex mutex;
  107. InstrumentedMutexLock l(&mutex);
  108. std::list<std::unique_ptr<FlushJobInfo>> flush_jobs_info;
  109. Status s = list->TryInstallMemtableFlushResults(
  110. cfd, mutable_cf_options, m, &dummy_prep_tracker, &versions, &mutex,
  111. file_num, to_delete, nullptr, &log_buffer, &flush_jobs_info);
  112. return s;
  113. }
  114. // Calls MemTableList::InstallMemtableFlushResults() and sets up all
  115. // structures needed to call this function.
  116. Status Mock_InstallMemtableAtomicFlushResults(
  117. autovector<MemTableList*>& lists, const autovector<uint32_t>& cf_ids,
  118. const autovector<const MutableCFOptions*>& mutable_cf_options_list,
  119. const autovector<const autovector<MemTable*>*>& mems_list,
  120. autovector<MemTable*>* to_delete) {
  121. // Create a mock Logger
  122. test::NullLogger logger;
  123. LogBuffer log_buffer(DEBUG_LEVEL, &logger);
  124. CreateDB();
  125. // Create a mock VersionSet
  126. DBOptions db_options;
  127. db_options.file_system.reset(new LegacyFileSystemWrapper(db_options.env));
  128. ImmutableDBOptions immutable_db_options(db_options);
  129. EnvOptions env_options;
  130. std::shared_ptr<Cache> table_cache(NewLRUCache(50000, 16));
  131. WriteBufferManager write_buffer_manager(db_options.db_write_buffer_size);
  132. WriteController write_controller(10000000u);
  133. VersionSet versions(dbname, &immutable_db_options, env_options,
  134. table_cache.get(), &write_buffer_manager,
  135. &write_controller, /*block_cache_tracer=*/nullptr);
  136. std::vector<ColumnFamilyDescriptor> cf_descs;
  137. cf_descs.emplace_back(kDefaultColumnFamilyName, ColumnFamilyOptions());
  138. cf_descs.emplace_back("one", ColumnFamilyOptions());
  139. cf_descs.emplace_back("two", ColumnFamilyOptions());
  140. EXPECT_OK(versions.Recover(cf_descs, false));
  141. // Create mock default ColumnFamilyData
  142. auto column_family_set = versions.GetColumnFamilySet();
  143. LogsWithPrepTracker dummy_prep_tracker;
  144. autovector<ColumnFamilyData*> cfds;
  145. for (int i = 0; i != static_cast<int>(cf_ids.size()); ++i) {
  146. cfds.emplace_back(column_family_set->GetColumnFamily(cf_ids[i]));
  147. EXPECT_NE(nullptr, cfds[i]);
  148. }
  149. std::vector<FileMetaData> file_metas;
  150. file_metas.reserve(cf_ids.size());
  151. for (size_t i = 0; i != cf_ids.size(); ++i) {
  152. FileMetaData meta;
  153. uint64_t file_num = file_number.fetch_add(1);
  154. meta.fd = FileDescriptor(file_num, 0, 0);
  155. file_metas.emplace_back(meta);
  156. }
  157. autovector<FileMetaData*> file_meta_ptrs;
  158. for (auto& meta : file_metas) {
  159. file_meta_ptrs.push_back(&meta);
  160. }
  161. InstrumentedMutex mutex;
  162. InstrumentedMutexLock l(&mutex);
  163. return InstallMemtableAtomicFlushResults(
  164. &lists, cfds, mutable_cf_options_list, mems_list, &versions, &mutex,
  165. file_meta_ptrs, to_delete, nullptr, &log_buffer);
  166. }
  167. };
  168. TEST_F(MemTableListTest, Empty) {
  169. // Create an empty MemTableList and validate basic functions.
  170. MemTableList list(1, 0, 0);
  171. ASSERT_EQ(0, list.NumNotFlushed());
  172. ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
  173. ASSERT_FALSE(list.IsFlushPending());
  174. autovector<MemTable*> mems;
  175. list.PickMemtablesToFlush(nullptr /* memtable_id */, &mems);
  176. ASSERT_EQ(0, mems.size());
  177. autovector<MemTable*> to_delete;
  178. list.current()->Unref(&to_delete);
  179. ASSERT_EQ(0, to_delete.size());
  180. }
  181. TEST_F(MemTableListTest, GetTest) {
  182. // Create MemTableList
  183. int min_write_buffer_number_to_merge = 2;
  184. int max_write_buffer_number_to_maintain = 0;
  185. int64_t max_write_buffer_size_to_maintain = 0;
  186. MemTableList list(min_write_buffer_number_to_merge,
  187. max_write_buffer_number_to_maintain,
  188. max_write_buffer_size_to_maintain);
  189. SequenceNumber seq = 1;
  190. std::string value;
  191. Status s;
  192. MergeContext merge_context;
  193. InternalKeyComparator ikey_cmp(options.comparator);
  194. SequenceNumber max_covering_tombstone_seq = 0;
  195. autovector<MemTable*> to_delete;
  196. LookupKey lkey("key1", seq);
  197. bool found = list.current()->Get(lkey, &value, &s, &merge_context,
  198. &max_covering_tombstone_seq, ReadOptions());
  199. ASSERT_FALSE(found);
  200. // Create a MemTable
  201. InternalKeyComparator cmp(BytewiseComparator());
  202. auto factory = std::make_shared<SkipListFactory>();
  203. options.memtable_factory = factory;
  204. ImmutableCFOptions ioptions(options);
  205. WriteBufferManager wb(options.db_write_buffer_size);
  206. MemTable* mem = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb,
  207. kMaxSequenceNumber, 0 /* column_family_id */);
  208. mem->Ref();
  209. // Write some keys to this memtable.
  210. mem->Add(++seq, kTypeDeletion, "key1", "");
  211. mem->Add(++seq, kTypeValue, "key2", "value2");
  212. mem->Add(++seq, kTypeValue, "key1", "value1");
  213. mem->Add(++seq, kTypeValue, "key2", "value2.2");
  214. // Fetch the newly written keys
  215. merge_context.Clear();
  216. found = mem->Get(LookupKey("key1", seq), &value, &s, &merge_context,
  217. &max_covering_tombstone_seq, ReadOptions());
  218. ASSERT_TRUE(s.ok() && found);
  219. ASSERT_EQ(value, "value1");
  220. merge_context.Clear();
  221. found = mem->Get(LookupKey("key1", 2), &value, &s, &merge_context,
  222. &max_covering_tombstone_seq, ReadOptions());
  223. // MemTable found out that this key is *not* found (at this sequence#)
  224. ASSERT_TRUE(found && s.IsNotFound());
  225. merge_context.Clear();
  226. found = mem->Get(LookupKey("key2", seq), &value, &s, &merge_context,
  227. &max_covering_tombstone_seq, ReadOptions());
  228. ASSERT_TRUE(s.ok() && found);
  229. ASSERT_EQ(value, "value2.2");
  230. ASSERT_EQ(4, mem->num_entries());
  231. ASSERT_EQ(1, mem->num_deletes());
  232. // Add memtable to list
  233. list.Add(mem, &to_delete);
  234. SequenceNumber saved_seq = seq;
  235. // Create another memtable and write some keys to it
  236. WriteBufferManager wb2(options.db_write_buffer_size);
  237. MemTable* mem2 = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb2,
  238. kMaxSequenceNumber, 0 /* column_family_id */);
  239. mem2->Ref();
  240. mem2->Add(++seq, kTypeDeletion, "key1", "");
  241. mem2->Add(++seq, kTypeValue, "key2", "value2.3");
  242. // Add second memtable to list
  243. list.Add(mem2, &to_delete);
  244. // Fetch keys via MemTableList
  245. merge_context.Clear();
  246. found =
  247. list.current()->Get(LookupKey("key1", seq), &value, &s, &merge_context,
  248. &max_covering_tombstone_seq, ReadOptions());
  249. ASSERT_TRUE(found && s.IsNotFound());
  250. merge_context.Clear();
  251. found = list.current()->Get(LookupKey("key1", saved_seq), &value, &s,
  252. &merge_context, &max_covering_tombstone_seq,
  253. ReadOptions());
  254. ASSERT_TRUE(s.ok() && found);
  255. ASSERT_EQ("value1", value);
  256. merge_context.Clear();
  257. found =
  258. list.current()->Get(LookupKey("key2", seq), &value, &s, &merge_context,
  259. &max_covering_tombstone_seq, ReadOptions());
  260. ASSERT_TRUE(s.ok() && found);
  261. ASSERT_EQ(value, "value2.3");
  262. merge_context.Clear();
  263. found = list.current()->Get(LookupKey("key2", 1), &value, &s, &merge_context,
  264. &max_covering_tombstone_seq, ReadOptions());
  265. ASSERT_FALSE(found);
  266. ASSERT_EQ(2, list.NumNotFlushed());
  267. list.current()->Unref(&to_delete);
  268. for (MemTable* m : to_delete) {
  269. delete m;
  270. }
  271. }
  272. TEST_F(MemTableListTest, GetFromHistoryTest) {
  273. // Create MemTableList
  274. int min_write_buffer_number_to_merge = 2;
  275. int max_write_buffer_number_to_maintain = 2;
  276. int64_t max_write_buffer_size_to_maintain = 2000;
  277. MemTableList list(min_write_buffer_number_to_merge,
  278. max_write_buffer_number_to_maintain,
  279. max_write_buffer_size_to_maintain);
  280. SequenceNumber seq = 1;
  281. std::string value;
  282. Status s;
  283. MergeContext merge_context;
  284. InternalKeyComparator ikey_cmp(options.comparator);
  285. SequenceNumber max_covering_tombstone_seq = 0;
  286. autovector<MemTable*> to_delete;
  287. LookupKey lkey("key1", seq);
  288. bool found = list.current()->Get(lkey, &value, &s, &merge_context,
  289. &max_covering_tombstone_seq, ReadOptions());
  290. ASSERT_FALSE(found);
  291. // Create a MemTable
  292. InternalKeyComparator cmp(BytewiseComparator());
  293. auto factory = std::make_shared<SkipListFactory>();
  294. options.memtable_factory = factory;
  295. ImmutableCFOptions ioptions(options);
  296. WriteBufferManager wb(options.db_write_buffer_size);
  297. MemTable* mem = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb,
  298. kMaxSequenceNumber, 0 /* column_family_id */);
  299. mem->Ref();
  300. // Write some keys to this memtable.
  301. mem->Add(++seq, kTypeDeletion, "key1", "");
  302. mem->Add(++seq, kTypeValue, "key2", "value2");
  303. mem->Add(++seq, kTypeValue, "key2", "value2.2");
  304. // Fetch the newly written keys
  305. merge_context.Clear();
  306. found = mem->Get(LookupKey("key1", seq), &value, &s, &merge_context,
  307. &max_covering_tombstone_seq, ReadOptions());
  308. // MemTable found out that this key is *not* found (at this sequence#)
  309. ASSERT_TRUE(found && s.IsNotFound());
  310. merge_context.Clear();
  311. found = mem->Get(LookupKey("key2", seq), &value, &s, &merge_context,
  312. &max_covering_tombstone_seq, ReadOptions());
  313. ASSERT_TRUE(s.ok() && found);
  314. ASSERT_EQ(value, "value2.2");
  315. // Add memtable to list
  316. list.Add(mem, &to_delete);
  317. ASSERT_EQ(0, to_delete.size());
  318. // Fetch keys via MemTableList
  319. merge_context.Clear();
  320. found =
  321. list.current()->Get(LookupKey("key1", seq), &value, &s, &merge_context,
  322. &max_covering_tombstone_seq, ReadOptions());
  323. ASSERT_TRUE(found && s.IsNotFound());
  324. merge_context.Clear();
  325. found =
  326. list.current()->Get(LookupKey("key2", seq), &value, &s, &merge_context,
  327. &max_covering_tombstone_seq, ReadOptions());
  328. ASSERT_TRUE(s.ok() && found);
  329. ASSERT_EQ("value2.2", value);
  330. // Flush this memtable from the list.
  331. // (It will then be a part of the memtable history).
  332. autovector<MemTable*> to_flush;
  333. list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush);
  334. ASSERT_EQ(1, to_flush.size());
  335. MutableCFOptions mutable_cf_options(options);
  336. s = Mock_InstallMemtableFlushResults(&list, mutable_cf_options, to_flush,
  337. &to_delete);
  338. ASSERT_OK(s);
  339. ASSERT_EQ(0, list.NumNotFlushed());
  340. ASSERT_EQ(1, list.NumFlushed());
  341. ASSERT_EQ(0, to_delete.size());
  342. // Verify keys are no longer in MemTableList
  343. merge_context.Clear();
  344. found =
  345. list.current()->Get(LookupKey("key1", seq), &value, &s, &merge_context,
  346. &max_covering_tombstone_seq, ReadOptions());
  347. ASSERT_FALSE(found);
  348. merge_context.Clear();
  349. found =
  350. list.current()->Get(LookupKey("key2", seq), &value, &s, &merge_context,
  351. &max_covering_tombstone_seq, ReadOptions());
  352. ASSERT_FALSE(found);
  353. // Verify keys are present in history
  354. merge_context.Clear();
  355. found = list.current()->GetFromHistory(
  356. LookupKey("key1", seq), &value, &s, &merge_context,
  357. &max_covering_tombstone_seq, ReadOptions());
  358. ASSERT_TRUE(found && s.IsNotFound());
  359. merge_context.Clear();
  360. found = list.current()->GetFromHistory(
  361. LookupKey("key2", seq), &value, &s, &merge_context,
  362. &max_covering_tombstone_seq, ReadOptions());
  363. ASSERT_TRUE(found);
  364. ASSERT_EQ("value2.2", value);
  365. // Create another memtable and write some keys to it
  366. WriteBufferManager wb2(options.db_write_buffer_size);
  367. MemTable* mem2 = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb2,
  368. kMaxSequenceNumber, 0 /* column_family_id */);
  369. mem2->Ref();
  370. mem2->Add(++seq, kTypeDeletion, "key1", "");
  371. mem2->Add(++seq, kTypeValue, "key3", "value3");
  372. // Add second memtable to list
  373. list.Add(mem2, &to_delete);
  374. ASSERT_EQ(0, to_delete.size());
  375. to_flush.clear();
  376. list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush);
  377. ASSERT_EQ(1, to_flush.size());
  378. // Flush second memtable
  379. s = Mock_InstallMemtableFlushResults(&list, mutable_cf_options, to_flush,
  380. &to_delete);
  381. ASSERT_OK(s);
  382. ASSERT_EQ(0, list.NumNotFlushed());
  383. ASSERT_EQ(2, list.NumFlushed());
  384. ASSERT_EQ(0, to_delete.size());
  385. // Add a third memtable to push the first memtable out of the history
  386. WriteBufferManager wb3(options.db_write_buffer_size);
  387. MemTable* mem3 = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb3,
  388. kMaxSequenceNumber, 0 /* column_family_id */);
  389. mem3->Ref();
  390. list.Add(mem3, &to_delete);
  391. ASSERT_EQ(1, list.NumNotFlushed());
  392. ASSERT_EQ(1, list.NumFlushed());
  393. ASSERT_EQ(1, to_delete.size());
  394. // Verify keys are no longer in MemTableList
  395. merge_context.Clear();
  396. found =
  397. list.current()->Get(LookupKey("key1", seq), &value, &s, &merge_context,
  398. &max_covering_tombstone_seq, ReadOptions());
  399. ASSERT_FALSE(found);
  400. merge_context.Clear();
  401. found =
  402. list.current()->Get(LookupKey("key2", seq), &value, &s, &merge_context,
  403. &max_covering_tombstone_seq, ReadOptions());
  404. ASSERT_FALSE(found);
  405. merge_context.Clear();
  406. found =
  407. list.current()->Get(LookupKey("key3", seq), &value, &s, &merge_context,
  408. &max_covering_tombstone_seq, ReadOptions());
  409. ASSERT_FALSE(found);
  410. // Verify that the second memtable's keys are in the history
  411. merge_context.Clear();
  412. found = list.current()->GetFromHistory(
  413. LookupKey("key1", seq), &value, &s, &merge_context,
  414. &max_covering_tombstone_seq, ReadOptions());
  415. ASSERT_TRUE(found && s.IsNotFound());
  416. merge_context.Clear();
  417. found = list.current()->GetFromHistory(
  418. LookupKey("key3", seq), &value, &s, &merge_context,
  419. &max_covering_tombstone_seq, ReadOptions());
  420. ASSERT_TRUE(found);
  421. ASSERT_EQ("value3", value);
  422. // Verify that key2 from the first memtable is no longer in the history
  423. merge_context.Clear();
  424. found =
  425. list.current()->Get(LookupKey("key2", seq), &value, &s, &merge_context,
  426. &max_covering_tombstone_seq, ReadOptions());
  427. ASSERT_FALSE(found);
  428. // Cleanup
  429. list.current()->Unref(&to_delete);
  430. ASSERT_EQ(3, to_delete.size());
  431. for (MemTable* m : to_delete) {
  432. delete m;
  433. }
  434. }
  435. TEST_F(MemTableListTest, FlushPendingTest) {
  436. const int num_tables = 6;
  437. SequenceNumber seq = 1;
  438. Status s;
  439. auto factory = std::make_shared<SkipListFactory>();
  440. options.memtable_factory = factory;
  441. ImmutableCFOptions ioptions(options);
  442. InternalKeyComparator cmp(BytewiseComparator());
  443. WriteBufferManager wb(options.db_write_buffer_size);
  444. autovector<MemTable*> to_delete;
  445. // Create MemTableList
  446. int min_write_buffer_number_to_merge = 3;
  447. int max_write_buffer_number_to_maintain = 7;
  448. int64_t max_write_buffer_size_to_maintain =
  449. 7 * static_cast<int>(options.write_buffer_size);
  450. MemTableList list(min_write_buffer_number_to_merge,
  451. max_write_buffer_number_to_maintain,
  452. max_write_buffer_size_to_maintain);
  453. // Create some MemTables
  454. uint64_t memtable_id = 0;
  455. std::vector<MemTable*> tables;
  456. MutableCFOptions mutable_cf_options(options);
  457. for (int i = 0; i < num_tables; i++) {
  458. MemTable* mem = new MemTable(cmp, ioptions, mutable_cf_options, &wb,
  459. kMaxSequenceNumber, 0 /* column_family_id */);
  460. mem->SetID(memtable_id++);
  461. mem->Ref();
  462. std::string value;
  463. MergeContext merge_context;
  464. mem->Add(++seq, kTypeValue, "key1", ToString(i));
  465. mem->Add(++seq, kTypeValue, "keyN" + ToString(i), "valueN");
  466. mem->Add(++seq, kTypeValue, "keyX" + ToString(i), "value");
  467. mem->Add(++seq, kTypeValue, "keyM" + ToString(i), "valueM");
  468. mem->Add(++seq, kTypeDeletion, "keyX" + ToString(i), "");
  469. tables.push_back(mem);
  470. }
  471. // Nothing to flush
  472. ASSERT_FALSE(list.IsFlushPending());
  473. ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
  474. autovector<MemTable*> to_flush;
  475. list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush);
  476. ASSERT_EQ(0, to_flush.size());
  477. // Request a flush even though there is nothing to flush
  478. list.FlushRequested();
  479. ASSERT_FALSE(list.IsFlushPending());
  480. ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
  481. // Attempt to 'flush' to clear request for flush
  482. list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush);
  483. ASSERT_EQ(0, to_flush.size());
  484. ASSERT_FALSE(list.IsFlushPending());
  485. ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
  486. // Request a flush again
  487. list.FlushRequested();
  488. // No flush pending since the list is empty.
  489. ASSERT_FALSE(list.IsFlushPending());
  490. ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
  491. // Add 2 tables
  492. list.Add(tables[0], &to_delete);
  493. list.Add(tables[1], &to_delete);
  494. ASSERT_EQ(2, list.NumNotFlushed());
  495. ASSERT_EQ(0, to_delete.size());
  496. // Even though we have less than the minimum to flush, a flush is
  497. // pending since we had previously requested a flush and never called
  498. // PickMemtablesToFlush() to clear the flush.
  499. ASSERT_TRUE(list.IsFlushPending());
  500. ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire));
  501. // Pick tables to flush
  502. list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush);
  503. ASSERT_EQ(2, to_flush.size());
  504. ASSERT_EQ(2, list.NumNotFlushed());
  505. ASSERT_FALSE(list.IsFlushPending());
  506. ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
  507. // Revert flush
  508. list.RollbackMemtableFlush(to_flush, 0);
  509. ASSERT_FALSE(list.IsFlushPending());
  510. ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire));
  511. to_flush.clear();
  512. // Add another table
  513. list.Add(tables[2], &to_delete);
  514. // We now have the minimum to flush regardles of whether FlushRequested()
  515. // was called.
  516. ASSERT_TRUE(list.IsFlushPending());
  517. ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire));
  518. ASSERT_EQ(0, to_delete.size());
  519. // Pick tables to flush
  520. list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush);
  521. ASSERT_EQ(3, to_flush.size());
  522. ASSERT_EQ(3, list.NumNotFlushed());
  523. ASSERT_FALSE(list.IsFlushPending());
  524. ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
  525. // Pick tables to flush again
  526. autovector<MemTable*> to_flush2;
  527. list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush2);
  528. ASSERT_EQ(0, to_flush2.size());
  529. ASSERT_EQ(3, list.NumNotFlushed());
  530. ASSERT_FALSE(list.IsFlushPending());
  531. ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
  532. // Add another table
  533. list.Add(tables[3], &to_delete);
  534. ASSERT_FALSE(list.IsFlushPending());
  535. ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire));
  536. ASSERT_EQ(0, to_delete.size());
  537. // Request a flush again
  538. list.FlushRequested();
  539. ASSERT_TRUE(list.IsFlushPending());
  540. ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire));
  541. // Pick tables to flush again
  542. list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush2);
  543. ASSERT_EQ(1, to_flush2.size());
  544. ASSERT_EQ(4, list.NumNotFlushed());
  545. ASSERT_FALSE(list.IsFlushPending());
  546. ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
  547. // Rollback first pick of tables
  548. list.RollbackMemtableFlush(to_flush, 0);
  549. ASSERT_TRUE(list.IsFlushPending());
  550. ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire));
  551. to_flush.clear();
  552. // Add another tables
  553. list.Add(tables[4], &to_delete);
  554. ASSERT_EQ(5, list.NumNotFlushed());
  555. // We now have the minimum to flush regardles of whether FlushRequested()
  556. ASSERT_TRUE(list.IsFlushPending());
  557. ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire));
  558. ASSERT_EQ(0, to_delete.size());
  559. // Pick tables to flush
  560. list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush);
  561. // Should pick 4 of 5 since 1 table has been picked in to_flush2
  562. ASSERT_EQ(4, to_flush.size());
  563. ASSERT_EQ(5, list.NumNotFlushed());
  564. ASSERT_FALSE(list.IsFlushPending());
  565. ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
  566. // Pick tables to flush again
  567. autovector<MemTable*> to_flush3;
  568. list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush3);
  569. ASSERT_EQ(0, to_flush3.size()); // nothing not in progress of being flushed
  570. ASSERT_EQ(5, list.NumNotFlushed());
  571. ASSERT_FALSE(list.IsFlushPending());
  572. ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
  573. // Flush the 4 memtables that were picked in to_flush
  574. s = Mock_InstallMemtableFlushResults(&list, mutable_cf_options, to_flush,
  575. &to_delete);
  576. ASSERT_OK(s);
  577. // Note: now to_flush contains tables[0,1,2,4]. to_flush2 contains
  578. // tables[3].
  579. // Current implementation will only commit memtables in the order they were
  580. // created. So TryInstallMemtableFlushResults will install the first 3 tables
  581. // in to_flush and stop when it encounters a table not yet flushed.
  582. ASSERT_EQ(2, list.NumNotFlushed());
  583. int num_in_history =
  584. std::min(3, static_cast<int>(max_write_buffer_size_to_maintain) /
  585. static_cast<int>(options.write_buffer_size));
  586. ASSERT_EQ(num_in_history, list.NumFlushed());
  587. ASSERT_EQ(5 - list.NumNotFlushed() - num_in_history, to_delete.size());
  588. // Request a flush again. Should be nothing to flush
  589. list.FlushRequested();
  590. ASSERT_FALSE(list.IsFlushPending());
  591. ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
  592. // Flush the 1 memtable that was picked in to_flush2
  593. s = MemTableListTest::Mock_InstallMemtableFlushResults(
  594. &list, mutable_cf_options, to_flush2, &to_delete);
  595. ASSERT_OK(s);
  596. // This will actually install 2 tables. The 1 we told it to flush, and also
  597. // tables[4] which has been waiting for tables[3] to commit.
  598. ASSERT_EQ(0, list.NumNotFlushed());
  599. num_in_history =
  600. std::min(5, static_cast<int>(max_write_buffer_size_to_maintain) /
  601. static_cast<int>(options.write_buffer_size));
  602. ASSERT_EQ(num_in_history, list.NumFlushed());
  603. ASSERT_EQ(5 - list.NumNotFlushed() - num_in_history, to_delete.size());
  604. for (const auto& m : to_delete) {
  605. // Refcount should be 0 after calling TryInstallMemtableFlushResults.
  606. // Verify this, by Ref'ing then UnRef'ing:
  607. m->Ref();
  608. ASSERT_EQ(m, m->Unref());
  609. delete m;
  610. }
  611. to_delete.clear();
  612. // Add another table
  613. list.Add(tables[5], &to_delete);
  614. ASSERT_EQ(1, list.NumNotFlushed());
  615. ASSERT_EQ(5, list.GetLatestMemTableID());
  616. memtable_id = 4;
  617. // Pick tables to flush. The tables to pick must have ID smaller than or
  618. // equal to 4. Therefore, no table will be selected in this case.
  619. autovector<MemTable*> to_flush4;
  620. list.FlushRequested();
  621. ASSERT_TRUE(list.HasFlushRequested());
  622. list.PickMemtablesToFlush(&memtable_id, &to_flush4);
  623. ASSERT_TRUE(to_flush4.empty());
  624. ASSERT_EQ(1, list.NumNotFlushed());
  625. ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire));
  626. ASSERT_FALSE(list.IsFlushPending());
  627. ASSERT_FALSE(list.HasFlushRequested());
  628. // Pick tables to flush. The tables to pick must have ID smaller than or
  629. // equal to 5. Therefore, only tables[5] will be selected.
  630. memtable_id = 5;
  631. list.FlushRequested();
  632. list.PickMemtablesToFlush(&memtable_id, &to_flush4);
  633. ASSERT_EQ(1, static_cast<int>(to_flush4.size()));
  634. ASSERT_EQ(1, list.NumNotFlushed());
  635. ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
  636. ASSERT_FALSE(list.IsFlushPending());
  637. to_delete.clear();
  638. list.current()->Unref(&to_delete);
  639. int to_delete_size =
  640. std::min(num_tables, static_cast<int>(max_write_buffer_size_to_maintain) /
  641. static_cast<int>(options.write_buffer_size));
  642. ASSERT_EQ(to_delete_size, to_delete.size());
  643. for (const auto& m : to_delete) {
  644. // Refcount should be 0 after calling TryInstallMemtableFlushResults.
  645. // Verify this, by Ref'ing then UnRef'ing:
  646. m->Ref();
  647. ASSERT_EQ(m, m->Unref());
  648. delete m;
  649. }
  650. to_delete.clear();
  651. }
  652. TEST_F(MemTableListTest, EmptyAtomicFlusTest) {
  653. autovector<MemTableList*> lists;
  654. autovector<uint32_t> cf_ids;
  655. autovector<const MutableCFOptions*> options_list;
  656. autovector<const autovector<MemTable*>*> to_flush;
  657. autovector<MemTable*> to_delete;
  658. Status s = Mock_InstallMemtableAtomicFlushResults(lists, cf_ids, options_list,
  659. to_flush, &to_delete);
  660. ASSERT_OK(s);
  661. ASSERT_TRUE(to_delete.empty());
  662. }
  663. TEST_F(MemTableListTest, AtomicFlusTest) {
  664. const int num_cfs = 3;
  665. const int num_tables_per_cf = 2;
  666. SequenceNumber seq = 1;
  667. auto factory = std::make_shared<SkipListFactory>();
  668. options.memtable_factory = factory;
  669. ImmutableCFOptions ioptions(options);
  670. InternalKeyComparator cmp(BytewiseComparator());
  671. WriteBufferManager wb(options.db_write_buffer_size);
  672. // Create MemTableLists
  673. int min_write_buffer_number_to_merge = 3;
  674. int max_write_buffer_number_to_maintain = 7;
  675. int64_t max_write_buffer_size_to_maintain =
  676. 7 * static_cast<int64_t>(options.write_buffer_size);
  677. autovector<MemTableList*> lists;
  678. for (int i = 0; i != num_cfs; ++i) {
  679. lists.emplace_back(new MemTableList(min_write_buffer_number_to_merge,
  680. max_write_buffer_number_to_maintain,
  681. max_write_buffer_size_to_maintain));
  682. }
  683. autovector<uint32_t> cf_ids;
  684. std::vector<std::vector<MemTable*>> tables(num_cfs);
  685. autovector<const MutableCFOptions*> mutable_cf_options_list;
  686. uint32_t cf_id = 0;
  687. for (auto& elem : tables) {
  688. mutable_cf_options_list.emplace_back(new MutableCFOptions(options));
  689. uint64_t memtable_id = 0;
  690. for (int i = 0; i != num_tables_per_cf; ++i) {
  691. MemTable* mem =
  692. new MemTable(cmp, ioptions, *(mutable_cf_options_list.back()), &wb,
  693. kMaxSequenceNumber, cf_id);
  694. mem->SetID(memtable_id++);
  695. mem->Ref();
  696. std::string value;
  697. mem->Add(++seq, kTypeValue, "key1", ToString(i));
  698. mem->Add(++seq, kTypeValue, "keyN" + ToString(i), "valueN");
  699. mem->Add(++seq, kTypeValue, "keyX" + ToString(i), "value");
  700. mem->Add(++seq, kTypeValue, "keyM" + ToString(i), "valueM");
  701. mem->Add(++seq, kTypeDeletion, "keyX" + ToString(i), "");
  702. elem.push_back(mem);
  703. }
  704. cf_ids.push_back(cf_id++);
  705. }
  706. std::vector<autovector<MemTable*>> flush_candidates(num_cfs);
  707. // Nothing to flush
  708. for (auto i = 0; i != num_cfs; ++i) {
  709. auto* list = lists[i];
  710. ASSERT_FALSE(list->IsFlushPending());
  711. ASSERT_FALSE(list->imm_flush_needed.load(std::memory_order_acquire));
  712. list->PickMemtablesToFlush(nullptr /* memtable_id */, &flush_candidates[i]);
  713. ASSERT_EQ(0, flush_candidates[i].size());
  714. }
  715. // Request flush even though there is nothing to flush
  716. for (auto i = 0; i != num_cfs; ++i) {
  717. auto* list = lists[i];
  718. list->FlushRequested();
  719. ASSERT_FALSE(list->IsFlushPending());
  720. ASSERT_FALSE(list->imm_flush_needed.load(std::memory_order_acquire));
  721. }
  722. autovector<MemTable*> to_delete;
  723. // Add tables to the immutable memtalbe lists associated with column families
  724. for (auto i = 0; i != num_cfs; ++i) {
  725. for (auto j = 0; j != num_tables_per_cf; ++j) {
  726. lists[i]->Add(tables[i][j], &to_delete);
  727. }
  728. ASSERT_EQ(num_tables_per_cf, lists[i]->NumNotFlushed());
  729. ASSERT_TRUE(lists[i]->IsFlushPending());
  730. ASSERT_TRUE(lists[i]->imm_flush_needed.load(std::memory_order_acquire));
  731. }
  732. std::vector<uint64_t> flush_memtable_ids = {1, 1, 0};
  733. // +----+
  734. // list[0]: |0 1|
  735. // list[1]: |0 1|
  736. // | +--+
  737. // list[2]: |0| 1
  738. // +-+
  739. // Pick memtables to flush
  740. for (auto i = 0; i != num_cfs; ++i) {
  741. flush_candidates[i].clear();
  742. lists[i]->PickMemtablesToFlush(&flush_memtable_ids[i],
  743. &flush_candidates[i]);
  744. ASSERT_EQ(flush_memtable_ids[i] - 0 + 1,
  745. static_cast<uint64_t>(flush_candidates[i].size()));
  746. }
  747. autovector<MemTableList*> tmp_lists;
  748. autovector<uint32_t> tmp_cf_ids;
  749. autovector<const MutableCFOptions*> tmp_options_list;
  750. autovector<const autovector<MemTable*>*> to_flush;
  751. for (auto i = 0; i != num_cfs; ++i) {
  752. if (!flush_candidates[i].empty()) {
  753. to_flush.push_back(&flush_candidates[i]);
  754. tmp_lists.push_back(lists[i]);
  755. tmp_cf_ids.push_back(i);
  756. tmp_options_list.push_back(mutable_cf_options_list[i]);
  757. }
  758. }
  759. Status s = Mock_InstallMemtableAtomicFlushResults(
  760. tmp_lists, tmp_cf_ids, tmp_options_list, to_flush, &to_delete);
  761. ASSERT_OK(s);
  762. for (auto i = 0; i != num_cfs; ++i) {
  763. for (auto j = 0; j != num_tables_per_cf; ++j) {
  764. if (static_cast<uint64_t>(j) <= flush_memtable_ids[i]) {
  765. ASSERT_LT(0, tables[i][j]->GetFileNumber());
  766. }
  767. }
  768. ASSERT_EQ(
  769. static_cast<size_t>(num_tables_per_cf) - flush_candidates[i].size(),
  770. lists[i]->NumNotFlushed());
  771. }
  772. to_delete.clear();
  773. for (auto list : lists) {
  774. list->current()->Unref(&to_delete);
  775. delete list;
  776. }
  777. for (auto& mutable_cf_options : mutable_cf_options_list) {
  778. if (mutable_cf_options != nullptr) {
  779. delete mutable_cf_options;
  780. mutable_cf_options = nullptr;
  781. }
  782. }
  783. // All memtables in tables array must have been flushed, thus ready to be
  784. // deleted.
  785. ASSERT_EQ(to_delete.size(), tables.size() * tables.front().size());
  786. for (const auto& m : to_delete) {
  787. // Refcount should be 0 after calling InstallMemtableFlushResults.
  788. // Verify this by Ref'ing and then Unref'ing.
  789. m->Ref();
  790. ASSERT_EQ(m, m->Unref());
  791. delete m;
  792. }
  793. }
  794. } // namespace ROCKSDB_NAMESPACE
  795. int main(int argc, char** argv) {
  796. ::testing::InitGoogleTest(&argc, argv);
  797. return RUN_ALL_TESTS();
  798. }