memtable_list_test.cc 43 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #include "db/memtable_list.h"
  6. #include <algorithm>
  7. #include <string>
  8. #include <vector>
  9. #include "db/merge_context.h"
  10. #include "db/version_set.h"
  11. #include "db/write_controller.h"
  12. #include "rocksdb/db.h"
  13. #include "rocksdb/status.h"
  14. #include "rocksdb/write_buffer_manager.h"
  15. #include "test_util/testharness.h"
  16. #include "test_util/testutil.h"
  17. #include "util/string_util.h"
  18. #include "utilities/merge_operators.h"
  19. namespace ROCKSDB_NAMESPACE {
  20. namespace {
  21. std::string ValueWithWriteTime(std::string value, uint64_t write_time) {
  22. std::string result;
  23. result = value;
  24. PutFixed64(&result, write_time);
  25. return result;
  26. }
  27. } // namespace
  28. class MemTableListTest : public testing::Test {
  29. public:
  30. std::string dbname;
  31. DB* db;
  32. Options options;
  33. std::vector<ColumnFamilyHandle*> handles;
  34. std::atomic<uint64_t> file_number;
  35. MemTableListTest() : db(nullptr), file_number(1) {
  36. dbname = test::PerThreadDBPath("memtable_list_test");
  37. options.create_if_missing = true;
  38. EXPECT_OK(DestroyDB(dbname, options));
  39. }
  40. // Create a test db if not yet created
  41. void CreateDB() {
  42. if (db == nullptr) {
  43. options.create_if_missing = true;
  44. EXPECT_OK(DestroyDB(dbname, options));
  45. // Open DB only with default column family
  46. ColumnFamilyOptions cf_options;
  47. std::vector<ColumnFamilyDescriptor> cf_descs;
  48. if (udt_enabled_) {
  49. cf_options.comparator = test::BytewiseComparatorWithU64TsWrapper();
  50. }
  51. cf_descs.emplace_back(kDefaultColumnFamilyName, cf_options);
  52. Status s = DB::Open(options, dbname, cf_descs, &handles, &db);
  53. EXPECT_OK(s);
  54. ColumnFamilyOptions cf_opt1, cf_opt2;
  55. cf_opt1.cf_paths.emplace_back(dbname + "_one_1",
  56. std::numeric_limits<uint64_t>::max());
  57. cf_opt2.cf_paths.emplace_back(dbname + "_two_1",
  58. std::numeric_limits<uint64_t>::max());
  59. int sz = static_cast<int>(handles.size());
  60. handles.resize(sz + 2);
  61. s = db->CreateColumnFamily(cf_opt1, "one", &handles[1]);
  62. EXPECT_OK(s);
  63. s = db->CreateColumnFamily(cf_opt2, "two", &handles[2]);
  64. EXPECT_OK(s);
  65. cf_descs.emplace_back("one", cf_options);
  66. cf_descs.emplace_back("two", cf_options);
  67. }
  68. }
  69. ~MemTableListTest() override {
  70. if (db) {
  71. std::vector<ColumnFamilyDescriptor> cf_descs(handles.size());
  72. for (int i = 0; i != static_cast<int>(handles.size()); ++i) {
  73. EXPECT_OK(handles[i]->GetDescriptor(&cf_descs[i]));
  74. }
  75. for (auto h : handles) {
  76. if (h) {
  77. EXPECT_OK(db->DestroyColumnFamilyHandle(h));
  78. }
  79. }
  80. handles.clear();
  81. delete db;
  82. db = nullptr;
  83. EXPECT_OK(DestroyDB(dbname, options, cf_descs));
  84. }
  85. }
  86. // Calls MemTableList::TryInstallMemtableFlushResults() and sets up all
  87. // structures needed to call this function.
  88. Status Mock_InstallMemtableFlushResults(
  89. MemTableList* list, const autovector<ReadOnlyMemTable*>& m,
  90. autovector<ReadOnlyMemTable*>* to_delete) {
  91. // Create a mock Logger
  92. test::NullLogger logger;
  93. LogBuffer log_buffer(DEBUG_LEVEL, &logger);
  94. CreateDB();
  95. // Create a mock VersionSet
  96. DBOptions db_options;
  97. ImmutableDBOptions immutable_db_options(db_options);
  98. EnvOptions env_options;
  99. std::shared_ptr<Cache> table_cache(NewLRUCache(50000, 16));
  100. WriteBufferManager write_buffer_manager(db_options.db_write_buffer_size);
  101. WriteController write_controller(10000000u);
  102. VersionSet versions(dbname, &immutable_db_options, env_options,
  103. table_cache.get(), &write_buffer_manager,
  104. &write_controller, /*block_cache_tracer=*/nullptr,
  105. /*io_tracer=*/nullptr, /*db_id=*/"",
  106. /*db_session_id=*/"", /*daily_offpeak_time_utc=*/"",
  107. /*error_handler=*/nullptr, /*read_only=*/false);
  108. std::vector<ColumnFamilyDescriptor> cf_descs;
  109. cf_descs.emplace_back(kDefaultColumnFamilyName, ColumnFamilyOptions());
  110. cf_descs.emplace_back("one", ColumnFamilyOptions());
  111. cf_descs.emplace_back("two", ColumnFamilyOptions());
  112. EXPECT_OK(versions.Recover(cf_descs, false));
  113. // Create mock default ColumnFamilyData
  114. auto column_family_set = versions.GetColumnFamilySet();
  115. LogsWithPrepTracker dummy_prep_tracker;
  116. auto cfd = column_family_set->GetDefault();
  117. EXPECT_TRUE(nullptr != cfd);
  118. uint64_t file_num = file_number.fetch_add(1);
  119. IOStatus io_s;
  120. // Create dummy mutex.
  121. InstrumentedMutex mutex;
  122. InstrumentedMutexLock l(&mutex);
  123. std::list<std::unique_ptr<FlushJobInfo>> flush_jobs_info;
  124. Status s = list->TryInstallMemtableFlushResults(
  125. cfd, m, &dummy_prep_tracker, &versions, &mutex, file_num, to_delete,
  126. nullptr, &log_buffer, &flush_jobs_info);
  127. EXPECT_OK(io_s);
  128. return s;
  129. }
  130. // Calls MemTableList::InstallMemtableFlushResults() and sets up all
  131. // structures needed to call this function.
  132. Status Mock_InstallMemtableAtomicFlushResults(
  133. autovector<MemTableList*>& lists, const autovector<uint32_t>& cf_ids,
  134. const autovector<const autovector<ReadOnlyMemTable*>*>& mems_list,
  135. autovector<ReadOnlyMemTable*>* to_delete) {
  136. // Create a mock Logger
  137. test::NullLogger logger;
  138. LogBuffer log_buffer(DEBUG_LEVEL, &logger);
  139. CreateDB();
  140. // Create a mock VersionSet
  141. DBOptions db_options;
  142. ImmutableDBOptions immutable_db_options(db_options);
  143. EnvOptions env_options;
  144. std::shared_ptr<Cache> table_cache(NewLRUCache(50000, 16));
  145. WriteBufferManager write_buffer_manager(db_options.db_write_buffer_size);
  146. WriteController write_controller(10000000u);
  147. VersionSet versions(dbname, &immutable_db_options, env_options,
  148. table_cache.get(), &write_buffer_manager,
  149. &write_controller, /*block_cache_tracer=*/nullptr,
  150. /*io_tracer=*/nullptr, /*db_id=*/"",
  151. /*db_session_id=*/"", /*daily_offpeak_time_utc=*/"",
  152. /*error_handler=*/nullptr, /*read_only=*/false);
  153. std::vector<ColumnFamilyDescriptor> cf_descs;
  154. cf_descs.emplace_back(kDefaultColumnFamilyName, ColumnFamilyOptions());
  155. cf_descs.emplace_back("one", ColumnFamilyOptions());
  156. cf_descs.emplace_back("two", ColumnFamilyOptions());
  157. EXPECT_OK(versions.Recover(cf_descs, false));
  158. // Create mock default ColumnFamilyData
  159. auto column_family_set = versions.GetColumnFamilySet();
  160. LogsWithPrepTracker dummy_prep_tracker;
  161. autovector<ColumnFamilyData*> cfds;
  162. for (int i = 0; i != static_cast<int>(cf_ids.size()); ++i) {
  163. cfds.emplace_back(column_family_set->GetColumnFamily(cf_ids[i]));
  164. EXPECT_NE(nullptr, cfds[i]);
  165. }
  166. std::vector<FileMetaData> file_metas;
  167. file_metas.reserve(cf_ids.size());
  168. for (size_t i = 0; i != cf_ids.size(); ++i) {
  169. FileMetaData meta;
  170. uint64_t file_num = file_number.fetch_add(1);
  171. meta.fd = FileDescriptor(file_num, 0, 0);
  172. file_metas.emplace_back(meta);
  173. }
  174. autovector<FileMetaData*> file_meta_ptrs;
  175. for (auto& meta : file_metas) {
  176. file_meta_ptrs.push_back(&meta);
  177. }
  178. std::vector<std::list<std::unique_ptr<FlushJobInfo>>>
  179. committed_flush_jobs_info_storage(cf_ids.size());
  180. autovector<std::list<std::unique_ptr<FlushJobInfo>>*>
  181. committed_flush_jobs_info;
  182. for (int i = 0; i < static_cast<int>(cf_ids.size()); ++i) {
  183. committed_flush_jobs_info.push_back(
  184. &committed_flush_jobs_info_storage[i]);
  185. }
  186. InstrumentedMutex mutex;
  187. InstrumentedMutexLock l(&mutex);
  188. return InstallMemtableAtomicFlushResults(
  189. &lists, cfds, mems_list, &versions, nullptr /* prep_tracker */, &mutex,
  190. file_meta_ptrs, committed_flush_jobs_info, to_delete, nullptr,
  191. &log_buffer);
  192. }
  193. protected:
  194. bool udt_enabled_ = false;
  195. };
  196. TEST_F(MemTableListTest, Empty) {
  197. // Create an empty MemTableList and validate basic functions.
  198. MemTableList list(1, 0);
  199. ASSERT_EQ(0, list.NumNotFlushed());
  200. ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
  201. ASSERT_FALSE(list.IsFlushPending());
  202. autovector<ReadOnlyMemTable*> mems;
  203. list.PickMemtablesToFlush(
  204. std::numeric_limits<uint64_t>::max() /* memtable_id */, &mems);
  205. ASSERT_EQ(0, mems.size());
  206. autovector<ReadOnlyMemTable*> to_delete;
  207. list.current()->Unref(&to_delete);
  208. ASSERT_EQ(0, to_delete.size());
  209. }
  210. TEST_F(MemTableListTest, GetTest) {
  211. // Create MemTableList
  212. int min_write_buffer_number_to_merge = 2;
  213. int64_t max_write_buffer_size_to_maintain = 0;
  214. MemTableList list(min_write_buffer_number_to_merge,
  215. max_write_buffer_size_to_maintain);
  216. SequenceNumber seq = 1;
  217. std::string value;
  218. Status s;
  219. MergeContext merge_context;
  220. InternalKeyComparator ikey_cmp(options.comparator);
  221. SequenceNumber max_covering_tombstone_seq = 0;
  222. autovector<ReadOnlyMemTable*> to_delete;
  223. LookupKey lkey("key1", seq);
  224. bool found = list.current()->Get(lkey, &value, /*columns=*/nullptr,
  225. /*timestamp=*/nullptr, &s, &merge_context,
  226. &max_covering_tombstone_seq, ReadOptions());
  227. ASSERT_FALSE(found);
  228. // Create a MemTable
  229. InternalKeyComparator cmp(BytewiseComparator());
  230. auto factory = std::make_shared<SkipListFactory>();
  231. options.memtable_factory = factory;
  232. options.merge_operator = MergeOperators::CreateStringAppendOperator();
  233. ImmutableOptions ioptions(options);
  234. WriteBufferManager wb(options.db_write_buffer_size);
  235. MemTable* mem = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb,
  236. kMaxSequenceNumber, 0 /* column_family_id */);
  237. mem->Ref();
  238. // Write some keys to this memtable.
  239. ASSERT_OK(
  240. mem->Add(++seq, kTypeDeletion, "key1", "", nullptr /* kv_prot_info */));
  241. ASSERT_OK(mem->Add(++seq, kTypeValue, "key2", "value2",
  242. nullptr /* kv_prot_info */));
  243. ASSERT_OK(mem->Add(++seq, kTypeValue, "key1", "value1",
  244. nullptr /* kv_prot_info */));
  245. ASSERT_OK(mem->Add(++seq, kTypeValue, "key2", "value2.2",
  246. nullptr /* kv_prot_info */));
  247. ASSERT_OK(mem->Add(++seq, kTypeValuePreferredSeqno, "key3",
  248. ValueWithWriteTime("value3.1", 20),
  249. nullptr /* kv_prot_info */));
  250. // Fetch the newly written keys
  251. merge_context.Clear();
  252. s = Status::OK();
  253. found = mem->Get(LookupKey("key1", seq), &value, /*columns*/ nullptr,
  254. /*timestamp*/ nullptr, &s, &merge_context,
  255. &max_covering_tombstone_seq, ReadOptions(),
  256. false /* immutable_memtable */);
  257. ASSERT_TRUE(s.ok() && found);
  258. ASSERT_EQ(value, "value1");
  259. merge_context.Clear();
  260. s = Status::OK();
  261. found = mem->Get(LookupKey("key1", 2), &value, /*columns*/ nullptr,
  262. /*timestamp*/ nullptr, &s, &merge_context,
  263. &max_covering_tombstone_seq, ReadOptions(),
  264. false /* immutable_memtable */);
  265. // MemTable found out that this key is *not* found (at this sequence#)
  266. ASSERT_TRUE(found && s.IsNotFound());
  267. merge_context.Clear();
  268. s = Status::OK();
  269. found = mem->Get(LookupKey("key2", seq), &value, /*columns*/ nullptr,
  270. /*timestamp*/ nullptr, &s, &merge_context,
  271. &max_covering_tombstone_seq, ReadOptions(),
  272. false /* immutable_memtable */);
  273. ASSERT_TRUE(s.ok() && found);
  274. ASSERT_EQ(value, "value2.2");
  275. merge_context.Clear();
  276. s = Status::OK();
  277. found = mem->Get(LookupKey("key3", seq), &value, /*columns*/ nullptr,
  278. /*timestamp*/ nullptr, &s, &merge_context,
  279. &max_covering_tombstone_seq, ReadOptions(),
  280. false /* immutable_memtable */);
  281. ASSERT_TRUE(s.ok() && found);
  282. ASSERT_EQ(value, "value3.1");
  283. ASSERT_EQ(5, mem->NumEntries());
  284. ASSERT_EQ(1, mem->NumDeletion());
  285. // Add memtable to list
  286. // This is to make assert(memtable->IsFragmentedRangeTombstonesConstructed())
  287. // in MemTableListVersion::GetFromList work.
  288. mem->ConstructFragmentedRangeTombstones();
  289. mem->SetID(1);
  290. list.Add(mem, &to_delete);
  291. SequenceNumber saved_seq = seq;
  292. // Create another memtable and write some keys to it
  293. WriteBufferManager wb2(options.db_write_buffer_size);
  294. MemTable* mem2 = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb2,
  295. kMaxSequenceNumber, 0 /* column_family_id */);
  296. mem2->SetID(2);
  297. mem2->Ref();
  298. ASSERT_OK(
  299. mem2->Add(++seq, kTypeDeletion, "key1", "", nullptr /* kv_prot_info */));
  300. ASSERT_OK(mem2->Add(++seq, kTypeValue, "key2", "value2.3",
  301. nullptr /* kv_prot_info */));
  302. ASSERT_OK(mem2->Add(++seq, kTypeMerge, "key3", "value3.2",
  303. nullptr /* kv_prot_info */));
  304. // Add second memtable to list
  305. // This is to make assert(memtable->IsFragmentedRangeTombstonesConstructed())
  306. // in MemTableListVersion::GetFromList work.
  307. mem2->ConstructFragmentedRangeTombstones();
  308. list.Add(mem2, &to_delete);
  309. // Fetch keys via MemTableList
  310. merge_context.Clear();
  311. s = Status::OK();
  312. found =
  313. list.current()->Get(LookupKey("key1", seq), &value, /*columns=*/nullptr,
  314. /*timestamp=*/nullptr, &s, &merge_context,
  315. &max_covering_tombstone_seq, ReadOptions());
  316. ASSERT_TRUE(found && s.IsNotFound());
  317. merge_context.Clear();
  318. s = Status::OK();
  319. found = list.current()->Get(LookupKey("key1", saved_seq), &value,
  320. /*columns=*/nullptr, /*timestamp=*/nullptr, &s,
  321. &merge_context, &max_covering_tombstone_seq,
  322. ReadOptions());
  323. ASSERT_TRUE(s.ok() && found);
  324. ASSERT_EQ("value1", value);
  325. merge_context.Clear();
  326. s = Status::OK();
  327. found =
  328. list.current()->Get(LookupKey("key2", seq), &value, /*columns=*/nullptr,
  329. /*timestamp=*/nullptr, &s, &merge_context,
  330. &max_covering_tombstone_seq, ReadOptions());
  331. ASSERT_TRUE(s.ok() && found);
  332. ASSERT_EQ(value, "value2.3");
  333. merge_context.Clear();
  334. s = Status::OK();
  335. found = list.current()->Get(LookupKey("key2", 1), &value, /*columns=*/nullptr,
  336. /*timestamp=*/nullptr, &s, &merge_context,
  337. &max_covering_tombstone_seq, ReadOptions());
  338. ASSERT_FALSE(found);
  339. merge_context.Clear();
  340. s = Status::OK();
  341. found =
  342. list.current()->Get(LookupKey("key3", seq), &value, /*columns=*/nullptr,
  343. /*timestamp=*/nullptr, &s, &merge_context,
  344. &max_covering_tombstone_seq, ReadOptions());
  345. ASSERT_TRUE(s.ok() && found);
  346. ASSERT_EQ(value, "value3.1,value3.2");
  347. ASSERT_EQ(2, list.NumNotFlushed());
  348. list.current()->Unref(&to_delete);
  349. for (ReadOnlyMemTable* m : to_delete) {
  350. delete m;
  351. }
  352. }
  353. TEST_F(MemTableListTest, GetFromHistoryTest) {
  354. // Create MemTableList
  355. int min_write_buffer_number_to_merge = 2;
  356. int64_t max_write_buffer_size_to_maintain = 2 * Arena::kInlineSize;
  357. MemTableList list(min_write_buffer_number_to_merge,
  358. max_write_buffer_size_to_maintain);
  359. SequenceNumber seq = 1;
  360. std::string value;
  361. Status s;
  362. MergeContext merge_context;
  363. InternalKeyComparator ikey_cmp(options.comparator);
  364. SequenceNumber max_covering_tombstone_seq = 0;
  365. autovector<ReadOnlyMemTable*> to_delete;
  366. LookupKey lkey("key1", seq);
  367. bool found = list.current()->Get(lkey, &value, /*columns=*/nullptr,
  368. /*timestamp=*/nullptr, &s, &merge_context,
  369. &max_covering_tombstone_seq, ReadOptions());
  370. ASSERT_FALSE(found);
  371. // Create a MemTable
  372. InternalKeyComparator cmp(BytewiseComparator());
  373. auto factory = std::make_shared<SkipListFactory>();
  374. options.memtable_factory = factory;
  375. ImmutableOptions ioptions(options);
  376. WriteBufferManager wb(options.db_write_buffer_size);
  377. MemTable* mem = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb,
  378. kMaxSequenceNumber, 0 /* column_family_id */);
  379. mem->Ref();
  380. // Write some keys to this memtable.
  381. ASSERT_OK(
  382. mem->Add(++seq, kTypeDeletion, "key1", "", nullptr /* kv_prot_info */));
  383. ASSERT_OK(mem->Add(++seq, kTypeValue, "key2", "value2",
  384. nullptr /* kv_prot_info */));
  385. ASSERT_OK(mem->Add(++seq, kTypeValue, "key2", "value2.2",
  386. nullptr /* kv_prot_info */));
  387. // Fetch the newly written keys
  388. merge_context.Clear();
  389. s = Status::OK();
  390. found = mem->Get(LookupKey("key1", seq), &value, /*columns*/ nullptr,
  391. /*timestamp*/ nullptr, &s, &merge_context,
  392. &max_covering_tombstone_seq, ReadOptions(),
  393. false /* immutable_memtable */);
  394. // MemTable found out that this key is *not* found (at this sequence#)
  395. ASSERT_TRUE(found && s.IsNotFound());
  396. merge_context.Clear();
  397. s = Status::OK();
  398. found = mem->Get(LookupKey("key2", seq), &value, /*columns*/ nullptr,
  399. /*timestamp*/ nullptr, &s, &merge_context,
  400. &max_covering_tombstone_seq, ReadOptions(),
  401. false /* immutable_memtable */);
  402. ASSERT_TRUE(s.ok() && found);
  403. ASSERT_EQ(value, "value2.2");
  404. // Add memtable to list
  405. // This is to make assert(memtable->IsFragmentedRangeTombstonesConstructed())
  406. // in MemTableListVersion::GetFromList work.
  407. mem->ConstructFragmentedRangeTombstones();
  408. list.Add(mem, &to_delete);
  409. ASSERT_EQ(0, to_delete.size());
  410. // Fetch keys via MemTableList
  411. merge_context.Clear();
  412. s = Status::OK();
  413. found =
  414. list.current()->Get(LookupKey("key1", seq), &value, /*columns=*/nullptr,
  415. /*timestamp=*/nullptr, &s, &merge_context,
  416. &max_covering_tombstone_seq, ReadOptions());
  417. ASSERT_TRUE(found && s.IsNotFound());
  418. merge_context.Clear();
  419. s = Status::OK();
  420. found =
  421. list.current()->Get(LookupKey("key2", seq), &value, /*columns=*/nullptr,
  422. /*timestamp=*/nullptr, &s, &merge_context,
  423. &max_covering_tombstone_seq, ReadOptions());
  424. ASSERT_TRUE(s.ok() && found);
  425. ASSERT_EQ("value2.2", value);
  426. // Flush this memtable from the list.
  427. // (It will then be a part of the memtable history).
  428. autovector<ReadOnlyMemTable*> to_flush;
  429. list.PickMemtablesToFlush(
  430. std::numeric_limits<uint64_t>::max() /* memtable_id */, &to_flush);
  431. ASSERT_EQ(1, to_flush.size());
  432. s = Mock_InstallMemtableFlushResults(&list, to_flush, &to_delete);
  433. ASSERT_OK(s);
  434. ASSERT_EQ(0, list.NumNotFlushed());
  435. ASSERT_EQ(1, list.NumFlushed());
  436. ASSERT_EQ(0, to_delete.size());
  437. // Verify keys are no longer in MemTableList
  438. merge_context.Clear();
  439. found =
  440. list.current()->Get(LookupKey("key1", seq), &value, /*columns=*/nullptr,
  441. /*timestamp=*/nullptr, &s, &merge_context,
  442. &max_covering_tombstone_seq, ReadOptions());
  443. ASSERT_FALSE(found);
  444. merge_context.Clear();
  445. found =
  446. list.current()->Get(LookupKey("key2", seq), &value, /*columns=*/nullptr,
  447. /*timestamp=*/nullptr, &s, &merge_context,
  448. &max_covering_tombstone_seq, ReadOptions());
  449. ASSERT_FALSE(found);
  450. // Verify keys are present in history
  451. merge_context.Clear();
  452. s = Status::OK();
  453. found = list.current()->GetFromHistory(
  454. LookupKey("key1", seq), &value, /*columns=*/nullptr,
  455. /*timestamp=*/nullptr, &s, &merge_context, &max_covering_tombstone_seq,
  456. ReadOptions());
  457. ASSERT_TRUE(found && s.IsNotFound());
  458. merge_context.Clear();
  459. s = Status::OK();
  460. found = list.current()->GetFromHistory(
  461. LookupKey("key2", seq), &value, /*columns=*/nullptr,
  462. /*timestamp=*/nullptr, &s, &merge_context, &max_covering_tombstone_seq,
  463. ReadOptions());
  464. ASSERT_TRUE(found);
  465. ASSERT_EQ("value2.2", value);
  466. // Create another memtable and write some keys to it
  467. WriteBufferManager wb2(options.db_write_buffer_size);
  468. MemTable* mem2 = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb2,
  469. kMaxSequenceNumber, 0 /* column_family_id */);
  470. mem2->Ref();
  471. ASSERT_OK(
  472. mem2->Add(++seq, kTypeDeletion, "key1", "", nullptr /* kv_prot_info */));
  473. ASSERT_OK(mem2->Add(++seq, kTypeValue, "key3", "value3",
  474. nullptr /* kv_prot_info */));
  475. // Add second memtable to list
  476. // This is to make assert(memtable->IsFragmentedRangeTombstonesConstructed())
  477. // in MemTableListVersion::GetFromList work.
  478. mem2->ConstructFragmentedRangeTombstones();
  479. list.Add(mem2, &to_delete);
  480. ASSERT_EQ(0, to_delete.size());
  481. to_flush.clear();
  482. list.PickMemtablesToFlush(
  483. std::numeric_limits<uint64_t>::max() /* memtable_id */, &to_flush);
  484. ASSERT_EQ(1, to_flush.size());
  485. // Flush second memtable
  486. s = Mock_InstallMemtableFlushResults(&list, to_flush, &to_delete);
  487. ASSERT_OK(s);
  488. ASSERT_EQ(0, list.NumNotFlushed());
  489. ASSERT_EQ(2, list.NumFlushed());
  490. ASSERT_EQ(0, to_delete.size());
  491. // Add a third memtable to push the first memtable out of the history
  492. WriteBufferManager wb3(options.db_write_buffer_size);
  493. MemTable* mem3 = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb3,
  494. kMaxSequenceNumber, 0 /* column_family_id */);
  495. mem3->Ref();
  496. // This is to make assert(memtable->IsFragmentedRangeTombstonesConstructed())
  497. // in MemTableListVersion::GetFromList work.
  498. mem3->ConstructFragmentedRangeTombstones();
  499. list.Add(mem3, &to_delete);
  500. ASSERT_EQ(1, list.NumNotFlushed());
  501. ASSERT_EQ(1, list.NumFlushed());
  502. ASSERT_EQ(1, to_delete.size());
  503. // Verify keys are no longer in MemTableList
  504. merge_context.Clear();
  505. s = Status::OK();
  506. found =
  507. list.current()->Get(LookupKey("key1", seq), &value, /*columns=*/nullptr,
  508. /*timestamp=*/nullptr, &s, &merge_context,
  509. &max_covering_tombstone_seq, ReadOptions());
  510. ASSERT_FALSE(found);
  511. merge_context.Clear();
  512. s = Status::OK();
  513. found =
  514. list.current()->Get(LookupKey("key2", seq), &value, /*columns=*/nullptr,
  515. /*timestamp=*/nullptr, &s, &merge_context,
  516. &max_covering_tombstone_seq, ReadOptions());
  517. ASSERT_FALSE(found);
  518. merge_context.Clear();
  519. s = Status::OK();
  520. found =
  521. list.current()->Get(LookupKey("key3", seq), &value, /*columns=*/nullptr,
  522. /*timestamp=*/nullptr, &s, &merge_context,
  523. &max_covering_tombstone_seq, ReadOptions());
  524. ASSERT_FALSE(found);
  525. // Verify that the second memtable's keys are in the history
  526. merge_context.Clear();
  527. s = Status::OK();
  528. found = list.current()->GetFromHistory(
  529. LookupKey("key1", seq), &value, /*columns=*/nullptr,
  530. /*timestamp=*/nullptr, &s, &merge_context, &max_covering_tombstone_seq,
  531. ReadOptions());
  532. ASSERT_TRUE(found && s.IsNotFound());
  533. merge_context.Clear();
  534. s = Status::OK();
  535. found = list.current()->GetFromHistory(
  536. LookupKey("key3", seq), &value, /*columns=*/nullptr,
  537. /*timestamp=*/nullptr, &s, &merge_context, &max_covering_tombstone_seq,
  538. ReadOptions());
  539. ASSERT_TRUE(found);
  540. ASSERT_EQ("value3", value);
  541. // Verify that key2 from the first memtable is no longer in the history
  542. merge_context.Clear();
  543. s = Status::OK();
  544. found =
  545. list.current()->Get(LookupKey("key2", seq), &value, /*columns=*/nullptr,
  546. /*timestamp=*/nullptr, &s, &merge_context,
  547. &max_covering_tombstone_seq, ReadOptions());
  548. ASSERT_FALSE(found);
  549. // Cleanup
  550. list.current()->Unref(&to_delete);
  551. ASSERT_EQ(3, to_delete.size());
  552. for (ReadOnlyMemTable* m : to_delete) {
  553. delete m;
  554. }
  555. }
  556. TEST_F(MemTableListTest, FlushPendingTest) {
  557. const int num_tables = 6;
  558. SequenceNumber seq = 1;
  559. Status s;
  560. auto factory = std::make_shared<SkipListFactory>();
  561. options.memtable_factory = factory;
  562. ImmutableOptions ioptions(options);
  563. InternalKeyComparator cmp(BytewiseComparator());
  564. WriteBufferManager wb(options.db_write_buffer_size);
  565. autovector<ReadOnlyMemTable*> to_delete;
  566. // Create MemTableList
  567. int min_write_buffer_number_to_merge = 3;
  568. int64_t max_write_buffer_size_to_maintain =
  569. 7 * static_cast<int>(options.write_buffer_size);
  570. MemTableList list(min_write_buffer_number_to_merge,
  571. max_write_buffer_size_to_maintain);
  572. // Create some MemTables
  573. uint64_t memtable_id = 0;
  574. std::vector<MemTable*> tables;
  575. MutableCFOptions mutable_cf_options(options);
  576. for (int i = 0; i < num_tables; i++) {
  577. MemTable* mem = new MemTable(cmp, ioptions, mutable_cf_options, &wb,
  578. kMaxSequenceNumber, 0 /* column_family_id */);
  579. mem->SetID(memtable_id++);
  580. mem->Ref();
  581. std::string value;
  582. MergeContext merge_context;
  583. ASSERT_OK(mem->Add(++seq, kTypeValue, "key1", std::to_string(i),
  584. nullptr /* kv_prot_info */));
  585. ASSERT_OK(mem->Add(++seq, kTypeValue, "keyN" + std::to_string(i), "valueN",
  586. nullptr /* kv_prot_info */));
  587. ASSERT_OK(mem->Add(++seq, kTypeValue, "keyX" + std::to_string(i), "value",
  588. nullptr /* kv_prot_info */));
  589. ASSERT_OK(mem->Add(++seq, kTypeValue, "keyM" + std::to_string(i), "valueM",
  590. nullptr /* kv_prot_info */));
  591. ASSERT_OK(mem->Add(++seq, kTypeDeletion, "keyX" + std::to_string(i), "",
  592. nullptr /* kv_prot_info */));
  593. tables.push_back(mem);
  594. }
  595. // Nothing to flush
  596. ASSERT_FALSE(list.IsFlushPending());
  597. ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
  598. autovector<ReadOnlyMemTable*> to_flush;
  599. list.PickMemtablesToFlush(
  600. std::numeric_limits<uint64_t>::max() /* memtable_id */, &to_flush);
  601. ASSERT_EQ(0, to_flush.size());
  602. // Request a flush even though there is nothing to flush
  603. list.FlushRequested();
  604. ASSERT_FALSE(list.IsFlushPending());
  605. ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
  606. // Attempt to 'flush' to clear request for flush
  607. list.PickMemtablesToFlush(
  608. std::numeric_limits<uint64_t>::max() /* memtable_id */, &to_flush);
  609. ASSERT_EQ(0, to_flush.size());
  610. ASSERT_FALSE(list.IsFlushPending());
  611. ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
  612. // Request a flush again
  613. list.FlushRequested();
  614. // No flush pending since the list is empty.
  615. ASSERT_FALSE(list.IsFlushPending());
  616. ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
  617. // Add 2 tables
  618. list.Add(tables[0], &to_delete);
  619. list.Add(tables[1], &to_delete);
  620. ASSERT_EQ(2, list.NumNotFlushed());
  621. ASSERT_EQ(0, to_delete.size());
  622. // Even though we have less than the minimum to flush, a flush is
  623. // pending since we had previously requested a flush and never called
  624. // PickMemtablesToFlush() to clear the flush.
  625. ASSERT_TRUE(list.IsFlushPending());
  626. ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire));
  627. // Pick tables to flush
  628. list.PickMemtablesToFlush(
  629. std::numeric_limits<uint64_t>::max() /* memtable_id */, &to_flush);
  630. ASSERT_EQ(2, to_flush.size());
  631. ASSERT_EQ(2, list.NumNotFlushed());
  632. ASSERT_FALSE(list.IsFlushPending());
  633. ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
  634. // Revert flush
  635. list.RollbackMemtableFlush(to_flush, false);
  636. ASSERT_FALSE(list.IsFlushPending());
  637. ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire));
  638. to_flush.clear();
  639. // Add another table
  640. list.Add(tables[2], &to_delete);
  641. // We now have the minimum to flush regardles of whether FlushRequested()
  642. // was called.
  643. ASSERT_TRUE(list.IsFlushPending());
  644. ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire));
  645. ASSERT_EQ(0, to_delete.size());
  646. // Pick tables to flush
  647. list.PickMemtablesToFlush(
  648. std::numeric_limits<uint64_t>::max() /* memtable_id */, &to_flush);
  649. ASSERT_EQ(3, to_flush.size());
  650. ASSERT_EQ(3, list.NumNotFlushed());
  651. ASSERT_FALSE(list.IsFlushPending());
  652. ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
  653. // Pick tables to flush again
  654. autovector<ReadOnlyMemTable*> to_flush2;
  655. list.PickMemtablesToFlush(
  656. std::numeric_limits<uint64_t>::max() /* memtable_id */, &to_flush2);
  657. ASSERT_EQ(0, to_flush2.size());
  658. ASSERT_EQ(3, list.NumNotFlushed());
  659. ASSERT_FALSE(list.IsFlushPending());
  660. ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
  661. // Add another table
  662. list.Add(tables[3], &to_delete);
  663. ASSERT_FALSE(list.IsFlushPending());
  664. ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire));
  665. ASSERT_EQ(0, to_delete.size());
  666. // Request a flush again
  667. list.FlushRequested();
  668. ASSERT_TRUE(list.IsFlushPending());
  669. ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire));
  670. // Pick tables to flush again
  671. list.PickMemtablesToFlush(
  672. std::numeric_limits<uint64_t>::max() /* memtable_id */, &to_flush2);
  673. ASSERT_EQ(1, to_flush2.size());
  674. ASSERT_EQ(4, list.NumNotFlushed());
  675. ASSERT_FALSE(list.IsFlushPending());
  676. ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
  677. // Rollback first pick of tables
  678. list.RollbackMemtableFlush(to_flush, false);
  679. ASSERT_TRUE(list.IsFlushPending());
  680. ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire));
  681. to_flush.clear();
  682. // Add another tables
  683. list.Add(tables[4], &to_delete);
  684. ASSERT_EQ(5, list.NumNotFlushed());
  685. // We now have the minimum to flush regardles of whether FlushRequested()
  686. ASSERT_TRUE(list.IsFlushPending());
  687. ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire));
  688. ASSERT_EQ(0, to_delete.size());
  689. // Pick tables to flush
  690. list.PickMemtablesToFlush(
  691. std::numeric_limits<uint64_t>::max() /* memtable_id */, &to_flush);
  692. // Picks three oldest memtables. The fourth oldest is picked in `to_flush2` so
  693. // must be excluded. The newest (fifth oldest) is non-consecutive with the
  694. // three oldest due to omitting the fourth oldest so must not be picked.
  695. ASSERT_EQ(3, to_flush.size());
  696. ASSERT_EQ(5, list.NumNotFlushed());
  697. ASSERT_FALSE(list.IsFlushPending());
  698. ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire));
  699. // Pick tables to flush again
  700. autovector<ReadOnlyMemTable*> to_flush3;
  701. list.PickMemtablesToFlush(
  702. std::numeric_limits<uint64_t>::max() /* memtable_id */, &to_flush3);
  703. // Picks newest (fifth oldest)
  704. ASSERT_EQ(1, to_flush3.size());
  705. ASSERT_EQ(5, list.NumNotFlushed());
  706. ASSERT_FALSE(list.IsFlushPending());
  707. ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
  708. // Nothing left to flush
  709. autovector<ReadOnlyMemTable*> to_flush4;
  710. list.PickMemtablesToFlush(
  711. std::numeric_limits<uint64_t>::max() /* memtable_id */, &to_flush4);
  712. ASSERT_EQ(0, to_flush4.size());
  713. ASSERT_EQ(5, list.NumNotFlushed());
  714. ASSERT_FALSE(list.IsFlushPending());
  715. ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
  716. // Flush the 3 memtables that were picked in to_flush
  717. s = Mock_InstallMemtableFlushResults(&list, to_flush, &to_delete);
  718. ASSERT_OK(s);
  719. // Note: now to_flush contains tables[0,1,2]. to_flush2 contains
  720. // tables[3]. to_flush3 contains tables[4].
  721. // Current implementation will only commit memtables in the order they were
  722. // created. So TryInstallMemtableFlushResults will install the first 3 tables
  723. // in to_flush and stop when it encounters a table not yet flushed.
  724. ASSERT_EQ(2, list.NumNotFlushed());
  725. int num_in_history =
  726. std::min(3, static_cast<int>(max_write_buffer_size_to_maintain) /
  727. static_cast<int>(options.write_buffer_size));
  728. ASSERT_EQ(num_in_history, list.NumFlushed());
  729. ASSERT_EQ(5 - list.NumNotFlushed() - num_in_history, to_delete.size());
  730. // Request a flush again. Should be nothing to flush
  731. list.FlushRequested();
  732. ASSERT_FALSE(list.IsFlushPending());
  733. ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
  734. // Flush the 1 memtable (tables[4]) that was picked in to_flush3
  735. s = MemTableListTest::Mock_InstallMemtableFlushResults(&list, to_flush3,
  736. &to_delete);
  737. ASSERT_OK(s);
  738. // This will install 0 tables since tables[4] flushed while tables[3] has not
  739. // yet flushed.
  740. ASSERT_EQ(2, list.NumNotFlushed());
  741. ASSERT_EQ(0, to_delete.size());
  742. // Flush the 1 memtable (tables[3]) that was picked in to_flush2
  743. s = MemTableListTest::Mock_InstallMemtableFlushResults(&list, to_flush2,
  744. &to_delete);
  745. ASSERT_OK(s);
  746. // This will actually install 2 tables. The 1 we told it to flush, and also
  747. // tables[4] which has been waiting for tables[3] to commit.
  748. ASSERT_EQ(0, list.NumNotFlushed());
  749. num_in_history =
  750. std::min(5, static_cast<int>(max_write_buffer_size_to_maintain) /
  751. static_cast<int>(options.write_buffer_size));
  752. ASSERT_EQ(num_in_history, list.NumFlushed());
  753. ASSERT_EQ(5 - list.NumNotFlushed() - num_in_history, to_delete.size());
  754. for (const auto& m : to_delete) {
  755. // Refcount should be 0 after calling TryInstallMemtableFlushResults.
  756. // Verify this, by Ref'ing then UnRef'ing:
  757. m->Ref();
  758. ASSERT_EQ(m, m->Unref());
  759. delete m;
  760. }
  761. to_delete.clear();
  762. // Add another table
  763. list.Add(tables[5], &to_delete);
  764. ASSERT_EQ(1, list.NumNotFlushed());
  765. ASSERT_EQ(5, list.GetLatestMemTableID(false /* for_atomic_flush */));
  766. memtable_id = 4;
  767. // Pick tables to flush. The tables to pick must have ID smaller than or
  768. // equal to 4. Therefore, no table will be selected in this case.
  769. autovector<ReadOnlyMemTable*> to_flush5;
  770. list.FlushRequested();
  771. ASSERT_TRUE(list.HasFlushRequested());
  772. list.PickMemtablesToFlush(memtable_id, &to_flush5);
  773. ASSERT_TRUE(to_flush5.empty());
  774. ASSERT_EQ(1, list.NumNotFlushed());
  775. ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire));
  776. ASSERT_FALSE(list.IsFlushPending());
  777. ASSERT_FALSE(list.HasFlushRequested());
  778. // Pick tables to flush. The tables to pick must have ID smaller than or
  779. // equal to 5. Therefore, only tables[5] will be selected.
  780. memtable_id = 5;
  781. list.FlushRequested();
  782. list.PickMemtablesToFlush(memtable_id, &to_flush5);
  783. ASSERT_EQ(1, static_cast<int>(to_flush5.size()));
  784. ASSERT_EQ(1, list.NumNotFlushed());
  785. ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
  786. ASSERT_FALSE(list.IsFlushPending());
  787. to_delete.clear();
  788. list.current()->Unref(&to_delete);
  789. int to_delete_size =
  790. std::min(num_tables, static_cast<int>(max_write_buffer_size_to_maintain) /
  791. static_cast<int>(options.write_buffer_size));
  792. ASSERT_EQ(to_delete_size, to_delete.size());
  793. for (const auto& m : to_delete) {
  794. // Refcount should be 0 after calling TryInstallMemtableFlushResults.
  795. // Verify this, by Ref'ing then UnRef'ing:
  796. m->Ref();
  797. ASSERT_EQ(m, m->Unref());
  798. delete m;
  799. }
  800. to_delete.clear();
  801. }
  802. TEST_F(MemTableListTest, EmptyAtomicFlushTest) {
  803. autovector<MemTableList*> lists;
  804. autovector<uint32_t> cf_ids;
  805. autovector<const autovector<ReadOnlyMemTable*>*> to_flush;
  806. autovector<ReadOnlyMemTable*> to_delete;
  807. Status s = Mock_InstallMemtableAtomicFlushResults(lists, cf_ids, to_flush,
  808. &to_delete);
  809. ASSERT_OK(s);
  810. ASSERT_TRUE(to_delete.empty());
  811. }
  812. TEST_F(MemTableListTest, AtomicFlushTest) {
  813. const int num_cfs = 3;
  814. const int num_tables_per_cf = 2;
  815. SequenceNumber seq = 1;
  816. auto factory = std::make_shared<SkipListFactory>();
  817. options.memtable_factory = factory;
  818. ImmutableOptions ioptions(options);
  819. InternalKeyComparator cmp(BytewiseComparator());
  820. WriteBufferManager wb(options.db_write_buffer_size);
  821. // Create MemTableLists
  822. int min_write_buffer_number_to_merge = 3;
  823. int64_t max_write_buffer_size_to_maintain =
  824. 7 * static_cast<int64_t>(options.write_buffer_size);
  825. autovector<MemTableList*> lists;
  826. for (int i = 0; i != num_cfs; ++i) {
  827. lists.emplace_back(new MemTableList(min_write_buffer_number_to_merge,
  828. max_write_buffer_size_to_maintain));
  829. }
  830. autovector<uint32_t> cf_ids;
  831. std::vector<std::vector<MemTable*>> tables(num_cfs);
  832. autovector<const MutableCFOptions*> mutable_cf_options_list;
  833. uint32_t cf_id = 0;
  834. for (auto& elem : tables) {
  835. mutable_cf_options_list.emplace_back(new MutableCFOptions(options));
  836. uint64_t memtable_id = 0;
  837. for (int i = 0; i != num_tables_per_cf; ++i) {
  838. MemTable* mem =
  839. new MemTable(cmp, ioptions, *(mutable_cf_options_list.back()), &wb,
  840. kMaxSequenceNumber, cf_id);
  841. mem->SetID(memtable_id++);
  842. mem->Ref();
  843. std::string value;
  844. ASSERT_OK(mem->Add(++seq, kTypeValue, "key1", std::to_string(i),
  845. nullptr /* kv_prot_info */));
  846. ASSERT_OK(mem->Add(++seq, kTypeValue, "keyN" + std::to_string(i),
  847. "valueN", nullptr /* kv_prot_info */));
  848. ASSERT_OK(mem->Add(++seq, kTypeValue, "keyX" + std::to_string(i), "value",
  849. nullptr /* kv_prot_info */));
  850. ASSERT_OK(mem->Add(++seq, kTypeValue, "keyM" + std::to_string(i),
  851. "valueM", nullptr /* kv_prot_info */));
  852. ASSERT_OK(mem->Add(++seq, kTypeDeletion, "keyX" + std::to_string(i), "",
  853. nullptr /* kv_prot_info */));
  854. elem.push_back(mem);
  855. }
  856. cf_ids.push_back(cf_id++);
  857. }
  858. std::vector<autovector<ReadOnlyMemTable*>> flush_candidates(num_cfs);
  859. // Nothing to flush
  860. for (auto i = 0; i != num_cfs; ++i) {
  861. auto* list = lists[i];
  862. ASSERT_FALSE(list->IsFlushPending());
  863. ASSERT_FALSE(list->imm_flush_needed.load(std::memory_order_acquire));
  864. list->PickMemtablesToFlush(
  865. std::numeric_limits<uint64_t>::max() /* memtable_id */,
  866. &flush_candidates[i]);
  867. ASSERT_EQ(0, flush_candidates[i].size());
  868. }
  869. // Request flush even though there is nothing to flush
  870. for (auto i = 0; i != num_cfs; ++i) {
  871. auto* list = lists[i];
  872. list->FlushRequested();
  873. ASSERT_FALSE(list->IsFlushPending());
  874. ASSERT_FALSE(list->imm_flush_needed.load(std::memory_order_acquire));
  875. }
  876. autovector<ReadOnlyMemTable*> to_delete;
  877. // Add tables to the immutable memtalbe lists associated with column families
  878. for (auto i = 0; i != num_cfs; ++i) {
  879. for (auto j = 0; j != num_tables_per_cf; ++j) {
  880. lists[i]->Add(tables[i][j], &to_delete);
  881. }
  882. ASSERT_EQ(num_tables_per_cf, lists[i]->NumNotFlushed());
  883. ASSERT_TRUE(lists[i]->IsFlushPending());
  884. ASSERT_TRUE(lists[i]->imm_flush_needed.load(std::memory_order_acquire));
  885. }
  886. std::vector<uint64_t> flush_memtable_ids = {1, 1, 0};
  887. // +----+
  888. // list[0]: |0 1|
  889. // list[1]: |0 1|
  890. // | +--+
  891. // list[2]: |0| 1
  892. // +-+
  893. // Pick memtables to flush
  894. for (auto i = 0; i != num_cfs; ++i) {
  895. flush_candidates[i].clear();
  896. lists[i]->PickMemtablesToFlush(flush_memtable_ids[i], &flush_candidates[i]);
  897. ASSERT_EQ(flush_memtable_ids[i] - 0 + 1,
  898. static_cast<uint64_t>(flush_candidates[i].size()));
  899. }
  900. autovector<MemTableList*> tmp_lists;
  901. autovector<uint32_t> tmp_cf_ids;
  902. autovector<const autovector<ReadOnlyMemTable*>*> to_flush;
  903. for (auto i = 0; i != num_cfs; ++i) {
  904. if (!flush_candidates[i].empty()) {
  905. to_flush.push_back(&flush_candidates[i]);
  906. tmp_lists.push_back(lists[i]);
  907. tmp_cf_ids.push_back(i);
  908. }
  909. }
  910. Status s = Mock_InstallMemtableAtomicFlushResults(tmp_lists, tmp_cf_ids,
  911. to_flush, &to_delete);
  912. ASSERT_OK(s);
  913. for (auto i = 0; i != num_cfs; ++i) {
  914. for (auto j = 0; j != num_tables_per_cf; ++j) {
  915. if (static_cast<uint64_t>(j) <= flush_memtable_ids[i]) {
  916. ASSERT_LT(0, tables[i][j]->GetFileNumber());
  917. }
  918. }
  919. ASSERT_EQ(
  920. static_cast<size_t>(num_tables_per_cf) - flush_candidates[i].size(),
  921. lists[i]->NumNotFlushed());
  922. }
  923. to_delete.clear();
  924. for (auto list : lists) {
  925. list->current()->Unref(&to_delete);
  926. delete list;
  927. }
  928. for (auto& mutable_cf_options : mutable_cf_options_list) {
  929. if (mutable_cf_options != nullptr) {
  930. delete mutable_cf_options;
  931. mutable_cf_options = nullptr;
  932. }
  933. }
  934. // All memtables in tables array must have been flushed, thus ready to be
  935. // deleted.
  936. ASSERT_EQ(to_delete.size(), tables.size() * tables.front().size());
  937. for (const auto& m : to_delete) {
  938. // Refcount should be 0 after calling InstallMemtableFlushResults.
  939. // Verify this by Ref'ing and then Unref'ing.
  940. m->Ref();
  941. ASSERT_EQ(m, m->Unref());
  942. delete m;
  943. }
  944. }
  945. class MemTableListWithTimestampTest : public MemTableListTest {
  946. public:
  947. MemTableListWithTimestampTest() : MemTableListTest() {}
  948. void SetUp() override { udt_enabled_ = true; }
  949. };
  950. TEST_F(MemTableListWithTimestampTest, GetTableNewestUDT) {
  951. const int num_tables = 3;
  952. const int num_entries = 5;
  953. SequenceNumber seq = 1;
  954. auto factory = std::make_shared<SkipListFactory>();
  955. options.memtable_factory = factory;
  956. options.persist_user_defined_timestamps = false;
  957. ImmutableOptions ioptions(options);
  958. const Comparator* ucmp = test::BytewiseComparatorWithU64TsWrapper();
  959. InternalKeyComparator cmp(ucmp);
  960. WriteBufferManager wb(options.db_write_buffer_size);
  961. // Create MemTableList
  962. int min_write_buffer_number_to_merge = 1;
  963. int64_t max_write_buffer_size_to_maintain =
  964. 4 * static_cast<int>(options.write_buffer_size);
  965. MemTableList list(min_write_buffer_number_to_merge,
  966. max_write_buffer_size_to_maintain);
  967. // Create some MemTables
  968. uint64_t memtable_id = 0;
  969. std::vector<MemTable*> tables;
  970. MutableCFOptions mutable_cf_options(options);
  971. uint64_t current_ts = 0;
  972. autovector<ReadOnlyMemTable*> to_delete;
  973. std::vector<std::string> newest_udts;
  974. std::string key;
  975. std::string write_ts;
  976. for (int i = 0; i < num_tables; i++) {
  977. MemTable* mem = new MemTable(cmp, ioptions, mutable_cf_options, &wb,
  978. kMaxSequenceNumber, 0 /* column_family_id */);
  979. mem->SetID(memtable_id++);
  980. mem->Ref();
  981. std::string value;
  982. MergeContext merge_context;
  983. for (int j = 0; j < num_entries; j++) {
  984. key = "key1";
  985. write_ts.clear();
  986. PutFixed64(&write_ts, current_ts);
  987. key.append(write_ts);
  988. ASSERT_OK(mem->Add(++seq, kTypeValue, key, std::to_string(i),
  989. nullptr /* kv_prot_info */));
  990. current_ts++;
  991. }
  992. tables.push_back(mem);
  993. list.Add(tables.back(), &to_delete);
  994. newest_udts.push_back(write_ts);
  995. }
  996. ASSERT_EQ(num_tables, list.NumNotFlushed());
  997. ASSERT_TRUE(list.IsFlushPending());
  998. std::vector<Slice> tables_newest_udts = list.GetTablesNewestUDT(num_tables);
  999. ASSERT_EQ(newest_udts.size(), tables_newest_udts.size());
  1000. for (size_t i = 0; i < tables_newest_udts.size(); i++) {
  1001. const Slice& table_newest_udt = tables_newest_udts[i];
  1002. const Slice expected_newest_udt = newest_udts[i];
  1003. ASSERT_EQ(expected_newest_udt, table_newest_udt);
  1004. }
  1005. list.current()->Unref(&to_delete);
  1006. for (ReadOnlyMemTable* m : to_delete) {
  1007. delete m;
  1008. }
  1009. to_delete.clear();
  1010. }
  1011. } // namespace ROCKSDB_NAMESPACE
  1012. int main(int argc, char** argv) {
  1013. ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
  1014. ::testing::InitGoogleTest(&argc, argv);
  1015. return RUN_ALL_TESTS();
  1016. }