db_tailing_iter_test.cc 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. // Introduction of SyncPoint effectively disabled building and running this test
  10. // in Release build.
  11. // which is a pity, it is a good test
  12. #if !defined(ROCKSDB_LITE)
  13. #include "db/db_test_util.h"
  14. #include "db/forward_iterator.h"
  15. #include "port/stack_trace.h"
  16. namespace ROCKSDB_NAMESPACE {
  17. class DBTestTailingIterator : public DBTestBase {
  18. public:
  19. DBTestTailingIterator() : DBTestBase("/db_tailing_iterator_test") {}
  20. };
  21. TEST_F(DBTestTailingIterator, TailingIteratorSingle) {
  22. ReadOptions read_options;
  23. read_options.tailing = true;
  24. std::unique_ptr<Iterator> iter(db_->NewIterator(read_options));
  25. iter->SeekToFirst();
  26. ASSERT_TRUE(!iter->Valid());
  27. // add a record and check that iter can see it
  28. ASSERT_OK(db_->Put(WriteOptions(), "mirko", "fodor"));
  29. iter->SeekToFirst();
  30. ASSERT_TRUE(iter->Valid());
  31. ASSERT_EQ(iter->key().ToString(), "mirko");
  32. iter->Next();
  33. ASSERT_TRUE(!iter->Valid());
  34. }
  35. TEST_F(DBTestTailingIterator, TailingIteratorKeepAdding) {
  36. CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
  37. ReadOptions read_options;
  38. read_options.tailing = true;
  39. std::unique_ptr<Iterator> iter(db_->NewIterator(read_options, handles_[1]));
  40. std::string value(1024, 'a');
  41. const int num_records = 10000;
  42. for (int i = 0; i < num_records; ++i) {
  43. char buf[32];
  44. snprintf(buf, sizeof(buf), "%016d", i);
  45. Slice key(buf, 16);
  46. ASSERT_OK(Put(1, key, value));
  47. iter->Seek(key);
  48. ASSERT_TRUE(iter->Valid());
  49. ASSERT_EQ(iter->key().compare(key), 0);
  50. }
  51. }
  52. TEST_F(DBTestTailingIterator, TailingIteratorSeekToNext) {
  53. CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
  54. ReadOptions read_options;
  55. read_options.tailing = true;
  56. std::unique_ptr<Iterator> iter(db_->NewIterator(read_options, handles_[1]));
  57. std::unique_ptr<Iterator> itern(db_->NewIterator(read_options, handles_[1]));
  58. std::string value(1024, 'a');
  59. const int num_records = 1000;
  60. for (int i = 1; i < num_records; ++i) {
  61. char buf1[32];
  62. char buf2[32];
  63. snprintf(buf1, sizeof(buf1), "00a0%016d", i * 5);
  64. Slice key(buf1, 20);
  65. ASSERT_OK(Put(1, key, value));
  66. if (i % 100 == 99) {
  67. ASSERT_OK(Flush(1));
  68. }
  69. snprintf(buf2, sizeof(buf2), "00a0%016d", i * 5 - 2);
  70. Slice target(buf2, 20);
  71. iter->Seek(target);
  72. ASSERT_TRUE(iter->Valid());
  73. ASSERT_EQ(iter->key().compare(key), 0);
  74. if (i == 1) {
  75. itern->SeekToFirst();
  76. } else {
  77. itern->Next();
  78. }
  79. ASSERT_TRUE(itern->Valid());
  80. ASSERT_EQ(itern->key().compare(key), 0);
  81. }
  82. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
  83. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
  84. for (int i = 2 * num_records; i > 0; --i) {
  85. char buf1[32];
  86. char buf2[32];
  87. snprintf(buf1, sizeof(buf1), "00a0%016d", i * 5);
  88. Slice key(buf1, 20);
  89. ASSERT_OK(Put(1, key, value));
  90. if (i % 100 == 99) {
  91. ASSERT_OK(Flush(1));
  92. }
  93. snprintf(buf2, sizeof(buf2), "00a0%016d", i * 5 - 2);
  94. Slice target(buf2, 20);
  95. iter->Seek(target);
  96. ASSERT_TRUE(iter->Valid());
  97. ASSERT_EQ(iter->key().compare(key), 0);
  98. }
  99. }
  100. TEST_F(DBTestTailingIterator, TailingIteratorTrimSeekToNext) {
  101. const uint64_t k150KB = 150 * 1024;
  102. Options options;
  103. options.write_buffer_size = k150KB;
  104. options.max_write_buffer_number = 3;
  105. options.min_write_buffer_number_to_merge = 2;
  106. options.env = env_;
  107. CreateAndReopenWithCF({"pikachu"}, options);
  108. ReadOptions read_options;
  109. read_options.tailing = true;
  110. int num_iters, deleted_iters;
  111. char bufe[32];
  112. snprintf(bufe, sizeof(bufe), "00b0%016d", 0);
  113. Slice keyu(bufe, 20);
  114. read_options.iterate_upper_bound = &keyu;
  115. std::unique_ptr<Iterator> iter(db_->NewIterator(read_options, handles_[1]));
  116. std::unique_ptr<Iterator> itern(db_->NewIterator(read_options, handles_[1]));
  117. std::unique_ptr<Iterator> iterh(db_->NewIterator(read_options, handles_[1]));
  118. std::string value(1024, 'a');
  119. bool file_iters_deleted = false;
  120. bool file_iters_renewed_null = false;
  121. bool file_iters_renewed_copy = false;
  122. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  123. "ForwardIterator::SeekInternal:Return", [&](void* arg) {
  124. ForwardIterator* fiter = reinterpret_cast<ForwardIterator*>(arg);
  125. ASSERT_TRUE(!file_iters_deleted ||
  126. fiter->TEST_CheckDeletedIters(&deleted_iters, &num_iters));
  127. });
  128. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  129. "ForwardIterator::Next:Return", [&](void* arg) {
  130. ForwardIterator* fiter = reinterpret_cast<ForwardIterator*>(arg);
  131. ASSERT_TRUE(!file_iters_deleted ||
  132. fiter->TEST_CheckDeletedIters(&deleted_iters, &num_iters));
  133. });
  134. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  135. "ForwardIterator::RenewIterators:Null",
  136. [&](void* /*arg*/) { file_iters_renewed_null = true; });
  137. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  138. "ForwardIterator::RenewIterators:Copy",
  139. [&](void* /*arg*/) { file_iters_renewed_copy = true; });
  140. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  141. const int num_records = 1000;
  142. for (int i = 1; i < num_records; ++i) {
  143. char buf1[32];
  144. char buf2[32];
  145. char buf3[32];
  146. char buf4[32];
  147. snprintf(buf1, sizeof(buf1), "00a0%016d", i * 5);
  148. snprintf(buf3, sizeof(buf3), "00b0%016d", i * 5);
  149. Slice key(buf1, 20);
  150. ASSERT_OK(Put(1, key, value));
  151. Slice keyn(buf3, 20);
  152. ASSERT_OK(Put(1, keyn, value));
  153. if (i % 100 == 99) {
  154. ASSERT_OK(Flush(1));
  155. dbfull()->TEST_WaitForCompact();
  156. if (i == 299) {
  157. file_iters_deleted = true;
  158. }
  159. snprintf(buf4, sizeof(buf4), "00a0%016d", i * 5 / 2);
  160. Slice target(buf4, 20);
  161. iterh->Seek(target);
  162. ASSERT_TRUE(iter->Valid());
  163. for (int j = (i + 1) * 5 / 2; j < i * 5; j += 5) {
  164. iterh->Next();
  165. ASSERT_TRUE(iterh->Valid());
  166. }
  167. if (i == 299) {
  168. file_iters_deleted = false;
  169. }
  170. }
  171. file_iters_deleted = true;
  172. snprintf(buf2, sizeof(buf2), "00a0%016d", i * 5 - 2);
  173. Slice target(buf2, 20);
  174. iter->Seek(target);
  175. ASSERT_TRUE(iter->Valid());
  176. ASSERT_EQ(iter->key().compare(key), 0);
  177. ASSERT_LE(num_iters, 1);
  178. if (i == 1) {
  179. itern->SeekToFirst();
  180. } else {
  181. itern->Next();
  182. }
  183. ASSERT_TRUE(itern->Valid());
  184. ASSERT_EQ(itern->key().compare(key), 0);
  185. ASSERT_LE(num_iters, 1);
  186. file_iters_deleted = false;
  187. }
  188. ASSERT_TRUE(file_iters_renewed_null);
  189. ASSERT_TRUE(file_iters_renewed_copy);
  190. iter = nullptr;
  191. itern = nullptr;
  192. iterh = nullptr;
  193. BlockBasedTableOptions table_options;
  194. table_options.no_block_cache = true;
  195. table_options.block_cache_compressed = nullptr;
  196. options.table_factory.reset(NewBlockBasedTableFactory(table_options));
  197. ReopenWithColumnFamilies({"default", "pikachu"}, options);
  198. read_options.read_tier = kBlockCacheTier;
  199. std::unique_ptr<Iterator> iteri(db_->NewIterator(read_options, handles_[1]));
  200. char buf5[32];
  201. snprintf(buf5, sizeof(buf5), "00a0%016d", (num_records / 2) * 5 - 2);
  202. Slice target1(buf5, 20);
  203. iteri->Seek(target1);
  204. ASSERT_TRUE(iteri->status().IsIncomplete());
  205. iteri = nullptr;
  206. read_options.read_tier = kReadAllTier;
  207. options.table_factory.reset(NewBlockBasedTableFactory());
  208. ReopenWithColumnFamilies({"default", "pikachu"}, options);
  209. iter.reset(db_->NewIterator(read_options, handles_[1]));
  210. for (int i = 2 * num_records; i > 0; --i) {
  211. char buf1[32];
  212. char buf2[32];
  213. snprintf(buf1, sizeof(buf1), "00a0%016d", i * 5);
  214. Slice key(buf1, 20);
  215. ASSERT_OK(Put(1, key, value));
  216. if (i % 100 == 99) {
  217. ASSERT_OK(Flush(1));
  218. }
  219. snprintf(buf2, sizeof(buf2), "00a0%016d", i * 5 - 2);
  220. Slice target(buf2, 20);
  221. iter->Seek(target);
  222. ASSERT_TRUE(iter->Valid());
  223. ASSERT_EQ(iter->key().compare(key), 0);
  224. }
  225. }
  226. TEST_F(DBTestTailingIterator, TailingIteratorDeletes) {
  227. CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
  228. ReadOptions read_options;
  229. read_options.tailing = true;
  230. std::unique_ptr<Iterator> iter(db_->NewIterator(read_options, handles_[1]));
  231. // write a single record, read it using the iterator, then delete it
  232. ASSERT_OK(Put(1, "0test", "test"));
  233. iter->SeekToFirst();
  234. ASSERT_TRUE(iter->Valid());
  235. ASSERT_EQ(iter->key().ToString(), "0test");
  236. ASSERT_OK(Delete(1, "0test"));
  237. // write many more records
  238. const int num_records = 10000;
  239. std::string value(1024, 'A');
  240. for (int i = 0; i < num_records; ++i) {
  241. char buf[32];
  242. snprintf(buf, sizeof(buf), "1%015d", i);
  243. Slice key(buf, 16);
  244. ASSERT_OK(Put(1, key, value));
  245. }
  246. // force a flush to make sure that no records are read from memtable
  247. ASSERT_OK(Flush(1));
  248. // skip "0test"
  249. iter->Next();
  250. // make sure we can read all new records using the existing iterator
  251. int count = 0;
  252. for (; iter->Valid(); iter->Next(), ++count) ;
  253. ASSERT_EQ(count, num_records);
  254. }
  255. TEST_F(DBTestTailingIterator, TailingIteratorPrefixSeek) {
  256. ReadOptions read_options;
  257. read_options.tailing = true;
  258. Options options = CurrentOptions();
  259. options.create_if_missing = true;
  260. options.disable_auto_compactions = true;
  261. options.prefix_extractor.reset(NewFixedPrefixTransform(2));
  262. options.memtable_factory.reset(NewHashSkipListRepFactory(16));
  263. options.allow_concurrent_memtable_write = false;
  264. DestroyAndReopen(options);
  265. CreateAndReopenWithCF({"pikachu"}, options);
  266. std::unique_ptr<Iterator> iter(db_->NewIterator(read_options, handles_[1]));
  267. ASSERT_OK(Put(1, "0101", "test"));
  268. ASSERT_OK(Flush(1));
  269. ASSERT_OK(Put(1, "0202", "test"));
  270. // Seek(0102) shouldn't find any records since 0202 has a different prefix
  271. iter->Seek("0102");
  272. ASSERT_TRUE(!iter->Valid());
  273. iter->Seek("0202");
  274. ASSERT_TRUE(iter->Valid());
  275. ASSERT_EQ(iter->key().ToString(), "0202");
  276. iter->Next();
  277. ASSERT_TRUE(!iter->Valid());
  278. }
  279. TEST_F(DBTestTailingIterator, TailingIteratorIncomplete) {
  280. CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
  281. ReadOptions read_options;
  282. read_options.tailing = true;
  283. read_options.read_tier = kBlockCacheTier;
  284. std::string key("key");
  285. std::string value("value");
  286. ASSERT_OK(db_->Put(WriteOptions(), key, value));
  287. std::unique_ptr<Iterator> iter(db_->NewIterator(read_options));
  288. iter->SeekToFirst();
  289. // we either see the entry or it's not in cache
  290. ASSERT_TRUE(iter->Valid() || iter->status().IsIncomplete());
  291. ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
  292. iter->SeekToFirst();
  293. // should still be true after compaction
  294. ASSERT_TRUE(iter->Valid() || iter->status().IsIncomplete());
  295. }
  296. TEST_F(DBTestTailingIterator, TailingIteratorSeekToSame) {
  297. Options options = CurrentOptions();
  298. options.compaction_style = kCompactionStyleUniversal;
  299. options.write_buffer_size = 1000;
  300. CreateAndReopenWithCF({"pikachu"}, options);
  301. ReadOptions read_options;
  302. read_options.tailing = true;
  303. const int NROWS = 10000;
  304. // Write rows with keys 00000, 00002, 00004 etc.
  305. for (int i = 0; i < NROWS; ++i) {
  306. char buf[100];
  307. snprintf(buf, sizeof(buf), "%05d", 2*i);
  308. std::string key(buf);
  309. std::string value("value");
  310. ASSERT_OK(db_->Put(WriteOptions(), key, value));
  311. }
  312. std::unique_ptr<Iterator> iter(db_->NewIterator(read_options));
  313. // Seek to 00001. We expect to find 00002.
  314. std::string start_key = "00001";
  315. iter->Seek(start_key);
  316. ASSERT_TRUE(iter->Valid());
  317. std::string found = iter->key().ToString();
  318. ASSERT_EQ("00002", found);
  319. // Now seek to the same key. The iterator should remain in the same
  320. // position.
  321. iter->Seek(found);
  322. ASSERT_TRUE(iter->Valid());
  323. ASSERT_EQ(found, iter->key().ToString());
  324. }
  325. // Sets iterate_upper_bound and verifies that ForwardIterator doesn't call
  326. // Seek() on immutable iterators when target key is >= prev_key and all
  327. // iterators, including the memtable iterator, are over the upper bound.
  328. TEST_F(DBTestTailingIterator, TailingIteratorUpperBound) {
  329. CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
  330. const Slice upper_bound("20", 3);
  331. ReadOptions read_options;
  332. read_options.tailing = true;
  333. read_options.iterate_upper_bound = &upper_bound;
  334. ASSERT_OK(Put(1, "11", "11"));
  335. ASSERT_OK(Put(1, "12", "12"));
  336. ASSERT_OK(Put(1, "22", "22"));
  337. ASSERT_OK(Flush(1)); // flush all those keys to an immutable SST file
  338. // Add another key to the memtable.
  339. ASSERT_OK(Put(1, "21", "21"));
  340. std::unique_ptr<Iterator> it(db_->NewIterator(read_options, handles_[1]));
  341. it->Seek("12");
  342. ASSERT_TRUE(it->Valid());
  343. ASSERT_EQ("12", it->key().ToString());
  344. it->Next();
  345. // Not valid since "21" is over the upper bound.
  346. ASSERT_FALSE(it->Valid());
  347. // This keeps track of the number of times NeedToSeekImmutable() was true.
  348. int immutable_seeks = 0;
  349. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  350. "ForwardIterator::SeekInternal:Immutable",
  351. [&](void* /*arg*/) { ++immutable_seeks; });
  352. // Seek to 13. This should not require any immutable seeks.
  353. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  354. it->Seek("13");
  355. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
  356. ASSERT_FALSE(it->Valid());
  357. ASSERT_EQ(0, immutable_seeks);
  358. }
  359. TEST_F(DBTestTailingIterator, TailingIteratorGap) {
  360. // level 1: [20, 25] [35, 40]
  361. // level 2: [10 - 15] [45 - 50]
  362. // level 3: [20, 30, 40]
  363. // Previously there is a bug in tailing_iterator that if there is a gap in
  364. // lower level, the key will be skipped if it is within the range between
  365. // the largest key of index n file and the smallest key of index n+1 file
  366. // if both file fit in that gap. In this example, 25 < key < 35
  367. // https://github.com/facebook/rocksdb/issues/1372
  368. CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
  369. ReadOptions read_options;
  370. read_options.tailing = true;
  371. ASSERT_OK(Put(1, "20", "20"));
  372. ASSERT_OK(Put(1, "30", "30"));
  373. ASSERT_OK(Put(1, "40", "40"));
  374. ASSERT_OK(Flush(1));
  375. MoveFilesToLevel(3, 1);
  376. ASSERT_OK(Put(1, "10", "10"));
  377. ASSERT_OK(Put(1, "15", "15"));
  378. ASSERT_OK(Flush(1));
  379. ASSERT_OK(Put(1, "45", "45"));
  380. ASSERT_OK(Put(1, "50", "50"));
  381. ASSERT_OK(Flush(1));
  382. MoveFilesToLevel(2, 1);
  383. ASSERT_OK(Put(1, "20", "20"));
  384. ASSERT_OK(Put(1, "25", "25"));
  385. ASSERT_OK(Flush(1));
  386. ASSERT_OK(Put(1, "35", "35"));
  387. ASSERT_OK(Put(1, "40", "40"));
  388. ASSERT_OK(Flush(1));
  389. MoveFilesToLevel(1, 1);
  390. ColumnFamilyMetaData meta;
  391. db_->GetColumnFamilyMetaData(handles_[1], &meta);
  392. std::unique_ptr<Iterator> it(db_->NewIterator(read_options, handles_[1]));
  393. it->Seek("30");
  394. ASSERT_TRUE(it->Valid());
  395. ASSERT_EQ("30", it->key().ToString());
  396. it->Next();
  397. ASSERT_TRUE(it->Valid());
  398. ASSERT_EQ("35", it->key().ToString());
  399. it->Next();
  400. ASSERT_TRUE(it->Valid());
  401. ASSERT_EQ("40", it->key().ToString());
  402. }
  403. TEST_F(DBTestTailingIterator, SeekWithUpperBoundBug) {
  404. ReadOptions read_options;
  405. read_options.tailing = true;
  406. const Slice upper_bound("cc", 3);
  407. read_options.iterate_upper_bound = &upper_bound;
  408. // 1st L0 file
  409. ASSERT_OK(db_->Put(WriteOptions(), "aa", "SEEN"));
  410. ASSERT_OK(Flush());
  411. // 2nd L0 file
  412. ASSERT_OK(db_->Put(WriteOptions(), "zz", "NOT-SEEN"));
  413. ASSERT_OK(Flush());
  414. std::unique_ptr<Iterator> iter(db_->NewIterator(read_options));
  415. iter->Seek("aa");
  416. ASSERT_TRUE(iter->Valid());
  417. ASSERT_EQ(iter->key().ToString(), "aa");
  418. }
  419. TEST_F(DBTestTailingIterator, SeekToFirstWithUpperBoundBug) {
  420. ReadOptions read_options;
  421. read_options.tailing = true;
  422. const Slice upper_bound("cc", 3);
  423. read_options.iterate_upper_bound = &upper_bound;
  424. // 1st L0 file
  425. ASSERT_OK(db_->Put(WriteOptions(), "aa", "SEEN"));
  426. ASSERT_OK(Flush());
  427. // 2nd L0 file
  428. ASSERT_OK(db_->Put(WriteOptions(), "zz", "NOT-SEEN"));
  429. ASSERT_OK(Flush());
  430. std::unique_ptr<Iterator> iter(db_->NewIterator(read_options));
  431. iter->SeekToFirst();
  432. ASSERT_TRUE(iter->Valid());
  433. ASSERT_EQ(iter->key().ToString(), "aa");
  434. iter->Next();
  435. ASSERT_FALSE(iter->Valid());
  436. iter->SeekToFirst();
  437. ASSERT_TRUE(iter->Valid());
  438. ASSERT_EQ(iter->key().ToString(), "aa");
  439. }
  440. } // namespace ROCKSDB_NAMESPACE
  441. #endif // !defined(ROCKSDB_LITE)
  442. int main(int argc, char** argv) {
  443. #if !defined(ROCKSDB_LITE)
  444. ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
  445. ::testing::InitGoogleTest(&argc, argv);
  446. return RUN_ALL_TESTS();
  447. #else
  448. (void) argc;
  449. (void) argv;
  450. return 0;
  451. #endif
  452. }