db_log_iter_test.cc 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. // Introduction of SyncPoint effectively disabled building and running this test
  10. // in Release build.
  11. // which is a pity, it is a good test
  12. #if !defined(ROCKSDB_LITE)
  13. #include "db/db_test_util.h"
  14. #include "port/stack_trace.h"
  15. namespace ROCKSDB_NAMESPACE {
  16. class DBTestXactLogIterator : public DBTestBase {
  17. public:
  18. DBTestXactLogIterator() : DBTestBase("/db_log_iter_test") {}
  19. std::unique_ptr<TransactionLogIterator> OpenTransactionLogIter(
  20. const SequenceNumber seq) {
  21. std::unique_ptr<TransactionLogIterator> iter;
  22. Status status = dbfull()->GetUpdatesSince(seq, &iter);
  23. EXPECT_OK(status);
  24. EXPECT_TRUE(iter->Valid());
  25. return iter;
  26. }
  27. };
  28. namespace {
  29. SequenceNumber ReadRecords(
  30. std::unique_ptr<TransactionLogIterator>& iter,
  31. int& count) {
  32. count = 0;
  33. SequenceNumber lastSequence = 0;
  34. BatchResult res;
  35. while (iter->Valid()) {
  36. res = iter->GetBatch();
  37. EXPECT_TRUE(res.sequence > lastSequence);
  38. ++count;
  39. lastSequence = res.sequence;
  40. EXPECT_OK(iter->status());
  41. iter->Next();
  42. }
  43. return res.sequence;
  44. }
  45. void ExpectRecords(
  46. const int expected_no_records,
  47. std::unique_ptr<TransactionLogIterator>& iter) {
  48. int num_records;
  49. ReadRecords(iter, num_records);
  50. ASSERT_EQ(num_records, expected_no_records);
  51. }
  52. } // namespace
  53. TEST_F(DBTestXactLogIterator, TransactionLogIterator) {
  54. do {
  55. Options options = OptionsForLogIterTest();
  56. DestroyAndReopen(options);
  57. CreateAndReopenWithCF({"pikachu"}, options);
  58. Put(0, "key1", DummyString(1024));
  59. Put(1, "key2", DummyString(1024));
  60. Put(1, "key2", DummyString(1024));
  61. ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 3U);
  62. {
  63. auto iter = OpenTransactionLogIter(0);
  64. ExpectRecords(3, iter);
  65. }
  66. ReopenWithColumnFamilies({"default", "pikachu"}, options);
  67. env_->SleepForMicroseconds(2 * 1000 * 1000);
  68. {
  69. Put(0, "key4", DummyString(1024));
  70. Put(1, "key5", DummyString(1024));
  71. Put(0, "key6", DummyString(1024));
  72. }
  73. {
  74. auto iter = OpenTransactionLogIter(0);
  75. ExpectRecords(6, iter);
  76. }
  77. } while (ChangeCompactOptions());
  78. }
  79. #ifndef NDEBUG // sync point is not included with DNDEBUG build
  80. TEST_F(DBTestXactLogIterator, TransactionLogIteratorRace) {
  81. static const int LOG_ITERATOR_RACE_TEST_COUNT = 2;
  82. static const char* sync_points[LOG_ITERATOR_RACE_TEST_COUNT][4] = {
  83. {"WalManager::GetSortedWalFiles:1", "WalManager::PurgeObsoleteFiles:1",
  84. "WalManager::PurgeObsoleteFiles:2", "WalManager::GetSortedWalFiles:2"},
  85. {"WalManager::GetSortedWalsOfType:1",
  86. "WalManager::PurgeObsoleteFiles:1",
  87. "WalManager::PurgeObsoleteFiles:2",
  88. "WalManager::GetSortedWalsOfType:2"}};
  89. for (int test = 0; test < LOG_ITERATOR_RACE_TEST_COUNT; ++test) {
  90. // Setup sync point dependency to reproduce the race condition of
  91. // a log file moved to archived dir, in the middle of GetSortedWalFiles
  92. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
  93. {sync_points[test][0], sync_points[test][1]},
  94. {sync_points[test][2], sync_points[test][3]},
  95. });
  96. do {
  97. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearTrace();
  98. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
  99. Options options = OptionsForLogIterTest();
  100. DestroyAndReopen(options);
  101. Put("key1", DummyString(1024));
  102. dbfull()->Flush(FlushOptions());
  103. Put("key2", DummyString(1024));
  104. dbfull()->Flush(FlushOptions());
  105. Put("key3", DummyString(1024));
  106. dbfull()->Flush(FlushOptions());
  107. Put("key4", DummyString(1024));
  108. ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 4U);
  109. dbfull()->FlushWAL(false);
  110. {
  111. auto iter = OpenTransactionLogIter(0);
  112. ExpectRecords(4, iter);
  113. }
  114. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  115. // trigger async flush, and log move. Well, log move will
  116. // wait until the GetSortedWalFiles:1 to reproduce the race
  117. // condition
  118. FlushOptions flush_options;
  119. flush_options.wait = false;
  120. dbfull()->Flush(flush_options);
  121. // "key5" would be written in a new memtable and log
  122. Put("key5", DummyString(1024));
  123. dbfull()->FlushWAL(false);
  124. {
  125. // this iter would miss "key4" if not fixed
  126. auto iter = OpenTransactionLogIter(0);
  127. ExpectRecords(5, iter);
  128. }
  129. } while (ChangeCompactOptions());
  130. }
  131. }
  132. #endif
  133. TEST_F(DBTestXactLogIterator, TransactionLogIteratorStallAtLastRecord) {
  134. do {
  135. Options options = OptionsForLogIterTest();
  136. DestroyAndReopen(options);
  137. Put("key1", DummyString(1024));
  138. auto iter = OpenTransactionLogIter(0);
  139. ASSERT_OK(iter->status());
  140. ASSERT_TRUE(iter->Valid());
  141. iter->Next();
  142. ASSERT_TRUE(!iter->Valid());
  143. ASSERT_OK(iter->status());
  144. Put("key2", DummyString(1024));
  145. iter->Next();
  146. ASSERT_OK(iter->status());
  147. ASSERT_TRUE(iter->Valid());
  148. } while (ChangeCompactOptions());
  149. }
  150. TEST_F(DBTestXactLogIterator, TransactionLogIteratorCheckAfterRestart) {
  151. do {
  152. Options options = OptionsForLogIterTest();
  153. DestroyAndReopen(options);
  154. Put("key1", DummyString(1024));
  155. Put("key2", DummyString(1023));
  156. dbfull()->Flush(FlushOptions());
  157. Reopen(options);
  158. auto iter = OpenTransactionLogIter(0);
  159. ExpectRecords(2, iter);
  160. } while (ChangeCompactOptions());
  161. }
  162. TEST_F(DBTestXactLogIterator, TransactionLogIteratorCorruptedLog) {
  163. do {
  164. Options options = OptionsForLogIterTest();
  165. DestroyAndReopen(options);
  166. for (int i = 0; i < 1024; i++) {
  167. Put("key"+ToString(i), DummyString(10));
  168. }
  169. dbfull()->Flush(FlushOptions());
  170. dbfull()->FlushWAL(false);
  171. // Corrupt this log to create a gap
  172. ROCKSDB_NAMESPACE::VectorLogPtr wal_files;
  173. ASSERT_OK(dbfull()->GetSortedWalFiles(wal_files));
  174. const auto logfile_path = dbname_ + "/" + wal_files.front()->PathName();
  175. if (mem_env_) {
  176. mem_env_->Truncate(logfile_path, wal_files.front()->SizeFileBytes() / 2);
  177. } else {
  178. ASSERT_EQ(0, truncate(logfile_path.c_str(),
  179. wal_files.front()->SizeFileBytes() / 2));
  180. }
  181. // Insert a new entry to a new log file
  182. Put("key1025", DummyString(10));
  183. dbfull()->FlushWAL(false);
  184. // Try to read from the beginning. Should stop before the gap and read less
  185. // than 1025 entries
  186. auto iter = OpenTransactionLogIter(0);
  187. int count;
  188. SequenceNumber last_sequence_read = ReadRecords(iter, count);
  189. ASSERT_LT(last_sequence_read, 1025U);
  190. // Try to read past the gap, should be able to seek to key1025
  191. auto iter2 = OpenTransactionLogIter(last_sequence_read + 1);
  192. ExpectRecords(1, iter2);
  193. } while (ChangeCompactOptions());
  194. }
  195. TEST_F(DBTestXactLogIterator, TransactionLogIteratorBatchOperations) {
  196. do {
  197. Options options = OptionsForLogIterTest();
  198. DestroyAndReopen(options);
  199. CreateAndReopenWithCF({"pikachu"}, options);
  200. WriteBatch batch;
  201. batch.Put(handles_[1], "key1", DummyString(1024));
  202. batch.Put(handles_[0], "key2", DummyString(1024));
  203. batch.Put(handles_[1], "key3", DummyString(1024));
  204. batch.Delete(handles_[0], "key2");
  205. dbfull()->Write(WriteOptions(), &batch);
  206. Flush(1);
  207. Flush(0);
  208. ReopenWithColumnFamilies({"default", "pikachu"}, options);
  209. Put(1, "key4", DummyString(1024));
  210. auto iter = OpenTransactionLogIter(3);
  211. ExpectRecords(2, iter);
  212. } while (ChangeCompactOptions());
  213. }
  214. TEST_F(DBTestXactLogIterator, TransactionLogIteratorBlobs) {
  215. Options options = OptionsForLogIterTest();
  216. DestroyAndReopen(options);
  217. CreateAndReopenWithCF({"pikachu"}, options);
  218. {
  219. WriteBatch batch;
  220. batch.Put(handles_[1], "key1", DummyString(1024));
  221. batch.Put(handles_[0], "key2", DummyString(1024));
  222. batch.PutLogData(Slice("blob1"));
  223. batch.Put(handles_[1], "key3", DummyString(1024));
  224. batch.PutLogData(Slice("blob2"));
  225. batch.Delete(handles_[0], "key2");
  226. dbfull()->Write(WriteOptions(), &batch);
  227. ReopenWithColumnFamilies({"default", "pikachu"}, options);
  228. }
  229. auto res = OpenTransactionLogIter(0)->GetBatch();
  230. struct Handler : public WriteBatch::Handler {
  231. std::string seen;
  232. Status PutCF(uint32_t cf, const Slice& key, const Slice& value) override {
  233. seen += "Put(" + ToString(cf) + ", " + key.ToString() + ", " +
  234. ToString(value.size()) + ")";
  235. return Status::OK();
  236. }
  237. Status MergeCF(uint32_t cf, const Slice& key, const Slice& value) override {
  238. seen += "Merge(" + ToString(cf) + ", " + key.ToString() + ", " +
  239. ToString(value.size()) + ")";
  240. return Status::OK();
  241. }
  242. void LogData(const Slice& blob) override {
  243. seen += "LogData(" + blob.ToString() + ")";
  244. }
  245. Status DeleteCF(uint32_t cf, const Slice& key) override {
  246. seen += "Delete(" + ToString(cf) + ", " + key.ToString() + ")";
  247. return Status::OK();
  248. }
  249. } handler;
  250. res.writeBatchPtr->Iterate(&handler);
  251. ASSERT_EQ(
  252. "Put(1, key1, 1024)"
  253. "Put(0, key2, 1024)"
  254. "LogData(blob1)"
  255. "Put(1, key3, 1024)"
  256. "LogData(blob2)"
  257. "Delete(0, key2)",
  258. handler.seen);
  259. }
  260. } // namespace ROCKSDB_NAMESPACE
  261. #endif // !defined(ROCKSDB_LITE)
  262. int main(int argc, char** argv) {
  263. #if !defined(ROCKSDB_LITE)
  264. ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
  265. ::testing::InitGoogleTest(&argc, argv);
  266. return RUN_ALL_TESTS();
  267. #else
  268. (void) argc;
  269. (void) argv;
  270. return 0;
  271. #endif
  272. }