db_log_iter_test.cc 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. // Introduction of SyncPoint effectively disabled building and running this test
  10. // in Release build.
  11. // which is a pity, it is a good test
  12. #include "db/db_test_util.h"
  13. #include "env/mock_env.h"
  14. #include "port/stack_trace.h"
  15. #include "util/atomic.h"
  16. namespace ROCKSDB_NAMESPACE {
  17. class DBTestXactLogIterator : public DBTestBase {
  18. public:
  19. DBTestXactLogIterator()
  20. : DBTestBase("db_log_iter_test", /*env_do_fsync=*/true) {}
  21. std::unique_ptr<TransactionLogIterator> OpenTransactionLogIter(
  22. const SequenceNumber seq) {
  23. std::unique_ptr<TransactionLogIterator> iter;
  24. Status status = dbfull()->GetUpdatesSince(seq, &iter);
  25. EXPECT_OK(status);
  26. EXPECT_TRUE(iter->Valid());
  27. return iter;
  28. }
  29. };
  30. namespace {
  31. SequenceNumber ReadRecords(std::unique_ptr<TransactionLogIterator>& iter,
  32. int& count, bool expect_ok = true) {
  33. count = 0;
  34. SequenceNumber lastSequence = 0;
  35. BatchResult res;
  36. while (iter->Valid()) {
  37. res = iter->GetBatch();
  38. EXPECT_TRUE(res.sequence > lastSequence);
  39. ++count;
  40. lastSequence = res.sequence;
  41. EXPECT_OK(iter->status());
  42. iter->Next();
  43. }
  44. if (expect_ok) {
  45. EXPECT_OK(iter->status());
  46. } else {
  47. EXPECT_NOK(iter->status());
  48. }
  49. return res.sequence;
  50. }
  51. void ExpectRecords(const int expected_no_records,
  52. std::unique_ptr<TransactionLogIterator>& iter) {
  53. int num_records;
  54. ReadRecords(iter, num_records);
  55. ASSERT_EQ(num_records, expected_no_records);
  56. }
  57. } // anonymous namespace
  58. TEST_F(DBTestXactLogIterator, TransactionLogIterator) {
  59. do {
  60. Options options = OptionsForLogIterTest();
  61. DestroyAndReopen(options);
  62. CreateAndReopenWithCF({"pikachu"}, options);
  63. ASSERT_OK(Put(0, "key1", DummyString(1024)));
  64. ASSERT_OK(Put(1, "key2", DummyString(1024)));
  65. ASSERT_OK(Put(1, "key2", DummyString(1024)));
  66. ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 3U);
  67. {
  68. auto iter = OpenTransactionLogIter(0);
  69. ExpectRecords(3, iter);
  70. }
  71. ReopenWithColumnFamilies({"default", "pikachu"}, options);
  72. env_->SleepForMicroseconds(2 * 1000 * 1000);
  73. {
  74. ASSERT_OK(Put(0, "key4", DummyString(1024)));
  75. ASSERT_OK(Put(1, "key5", DummyString(1024)));
  76. ASSERT_OK(Put(0, "key6", DummyString(1024)));
  77. }
  78. {
  79. auto iter = OpenTransactionLogIter(0);
  80. ExpectRecords(6, iter);
  81. }
  82. } while (ChangeCompactOptions());
  83. }
  84. #ifndef NDEBUG // sync point is not included with DNDEBUG build
  85. TEST_F(DBTestXactLogIterator, TransactionLogIteratorRace) {
  86. static const int LOG_ITERATOR_RACE_TEST_COUNT = 2;
  87. static const char* sync_points[LOG_ITERATOR_RACE_TEST_COUNT][4] = {
  88. {"WalManager::GetSortedWalFiles:1", "WalManager::PurgeObsoleteFiles:1",
  89. "WalManager::PurgeObsoleteFiles:2", "WalManager::GetSortedWalFiles:2"},
  90. {"WalManager::GetSortedWalsOfType:1", "WalManager::PurgeObsoleteFiles:1",
  91. "WalManager::PurgeObsoleteFiles:2",
  92. "WalManager::GetSortedWalsOfType:2"}};
  93. for (int test = 0; test < LOG_ITERATOR_RACE_TEST_COUNT; ++test) {
  94. // Setup sync point dependency to reproduce the race condition of
  95. // a log file moved to archived dir, in the middle of GetSortedWalFiles
  96. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
  97. {sync_points[test][0], sync_points[test][1]},
  98. {sync_points[test][2], sync_points[test][3]},
  99. });
  100. do {
  101. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearTrace();
  102. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
  103. Options options = OptionsForLogIterTest();
  104. DestroyAndReopen(options);
  105. ASSERT_OK(Put("key1", DummyString(1024)));
  106. ASSERT_OK(dbfull()->Flush(FlushOptions()));
  107. ASSERT_OK(Put("key2", DummyString(1024)));
  108. ASSERT_OK(dbfull()->Flush(FlushOptions()));
  109. ASSERT_OK(Put("key3", DummyString(1024)));
  110. ASSERT_OK(dbfull()->Flush(FlushOptions()));
  111. ASSERT_OK(Put("key4", DummyString(1024)));
  112. ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 4U);
  113. ASSERT_OK(dbfull()->FlushWAL(false));
  114. {
  115. auto iter = OpenTransactionLogIter(0);
  116. ExpectRecords(4, iter);
  117. }
  118. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  119. // trigger async flush, and log move. Well, log move will
  120. // wait until the GetSortedWalFiles:1 to reproduce the race
  121. // condition
  122. FlushOptions flush_options;
  123. flush_options.wait = false;
  124. ASSERT_OK(dbfull()->Flush(flush_options));
  125. // "key5" would be written in a new memtable and log
  126. ASSERT_OK(Put("key5", DummyString(1024)));
  127. ASSERT_OK(dbfull()->FlushWAL(false));
  128. {
  129. // this iter would miss "key4" if not fixed
  130. auto iter = OpenTransactionLogIter(0);
  131. ExpectRecords(5, iter);
  132. }
  133. } while (ChangeCompactOptions());
  134. }
  135. }
  136. TEST_F(DBTestXactLogIterator, TransactionLogIteratorCheckWhenArchive) {
  137. RelaxedAtomic<bool> callback_hit{};
  138. do {
  139. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearTrace();
  140. Options options = OptionsForLogIterTest();
  141. DestroyAndReopen(options);
  142. ColumnFamilyHandle* cf;
  143. auto s = dbfull()->CreateColumnFamily(ColumnFamilyOptions(), "CF", &cf);
  144. ASSERT_TRUE(s.ok());
  145. ASSERT_OK(dbfull()->Put(WriteOptions(), cf, "key1", DummyString(1024)));
  146. ASSERT_OK(dbfull()->Put(WriteOptions(), "key2", DummyString(1024)));
  147. ASSERT_OK(dbfull()->Flush(FlushOptions()));
  148. ASSERT_OK(dbfull()->Put(WriteOptions(), "key3", DummyString(1024)));
  149. ASSERT_OK(dbfull()->Flush(FlushOptions()));
  150. ASSERT_OK(dbfull()->Put(WriteOptions(), "key4", DummyString(1024)));
  151. ASSERT_OK(dbfull()->Flush(FlushOptions()));
  152. callback_hit.StoreRelaxed(false);
  153. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  154. "WalManager::PurgeObsoleteFiles:1", [&](void*) {
  155. auto iter = OpenTransactionLogIter(0);
  156. ExpectRecords(4, iter);
  157. callback_hit.StoreRelaxed(true);
  158. });
  159. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  160. ASSERT_OK(dbfull()->Flush(FlushOptions(), cf));
  161. delete cf;
  162. // Normally hit several times; WART: perhaps more in parallel after flush
  163. // FIXME: this test is flaky
  164. // ASSERT_TRUE(callback_hit.LoadRelaxed());
  165. } while (ChangeCompactOptions());
  166. Close();
  167. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
  168. }
  169. #endif
  170. TEST_F(DBTestXactLogIterator, TransactionLogIteratorStallAtLastRecord) {
  171. do {
  172. Options options = OptionsForLogIterTest();
  173. DestroyAndReopen(options);
  174. ASSERT_OK(Put("key1", DummyString(1024)));
  175. auto iter = OpenTransactionLogIter(0);
  176. ASSERT_OK(iter->status());
  177. ASSERT_TRUE(iter->Valid());
  178. iter->Next();
  179. ASSERT_TRUE(!iter->Valid());
  180. ASSERT_OK(iter->status());
  181. ASSERT_OK(Put("key2", DummyString(1024)));
  182. iter->Next();
  183. ASSERT_OK(iter->status());
  184. ASSERT_TRUE(iter->Valid());
  185. } while (ChangeCompactOptions());
  186. }
  187. TEST_F(DBTestXactLogIterator, TransactionLogIteratorCheckAfterRestart) {
  188. do {
  189. Options options = OptionsForLogIterTest();
  190. DestroyAndReopen(options);
  191. ASSERT_OK(Put("key1", DummyString(1024)));
  192. ASSERT_OK(Put("key2", DummyString(1023)));
  193. ASSERT_OK(dbfull()->Flush(FlushOptions()));
  194. Reopen(options);
  195. auto iter = OpenTransactionLogIter(0);
  196. ExpectRecords(2, iter);
  197. } while (ChangeCompactOptions());
  198. }
  199. TEST_F(DBTestXactLogIterator, TransactionLogIteratorCorruptedLog) {
  200. do {
  201. Options options = OptionsForLogIterTest();
  202. DestroyAndReopen(options);
  203. for (int i = 0; i < 1024; i++) {
  204. ASSERT_OK(Put("key" + std::to_string(i), DummyString(10)));
  205. }
  206. ASSERT_OK(Flush());
  207. ASSERT_OK(db_->FlushWAL(false));
  208. // Corrupt this log to create a gap
  209. ASSERT_OK(db_->DisableFileDeletions());
  210. VectorLogPtr wal_files;
  211. ASSERT_OK(db_->GetSortedWalFiles(wal_files));
  212. ASSERT_FALSE(wal_files.empty());
  213. const auto logfile_path = dbname_ + "/" + wal_files.front()->PathName();
  214. ASSERT_OK(test::TruncateFile(env_, logfile_path,
  215. wal_files.front()->SizeFileBytes() / 2));
  216. ASSERT_OK(db_->EnableFileDeletions());
  217. // Insert a new entry to a new log file
  218. ASSERT_OK(Put("key1025", DummyString(10)));
  219. ASSERT_OK(db_->FlushWAL(false));
  220. // Try to read from the beginning. Should stop before the gap and read less
  221. // than 1025 entries
  222. auto iter = OpenTransactionLogIter(0);
  223. int count = 0;
  224. SequenceNumber last_sequence_read = ReadRecords(iter, count, false);
  225. ASSERT_LT(last_sequence_read, 1025U);
  226. // Try to read past the gap, should be able to seek to key1025
  227. auto iter2 = OpenTransactionLogIter(last_sequence_read + 1);
  228. ExpectRecords(1, iter2);
  229. } while (ChangeCompactOptions());
  230. }
  231. TEST_F(DBTestXactLogIterator, TransactionLogIteratorBatchOperations) {
  232. do {
  233. Options options = OptionsForLogIterTest();
  234. DestroyAndReopen(options);
  235. CreateAndReopenWithCF({"pikachu"}, options);
  236. WriteBatch batch;
  237. ASSERT_OK(batch.Put(handles_[1], "key1", DummyString(1024)));
  238. ASSERT_OK(batch.Put(handles_[0], "key2", DummyString(1024)));
  239. ASSERT_OK(batch.Put(handles_[1], "key3", DummyString(1024)));
  240. ASSERT_OK(batch.Delete(handles_[0], "key2"));
  241. ASSERT_OK(dbfull()->Write(WriteOptions(), &batch));
  242. ASSERT_OK(Flush(1));
  243. ASSERT_OK(Flush(0));
  244. ReopenWithColumnFamilies({"default", "pikachu"}, options);
  245. ASSERT_OK(Put(1, "key4", DummyString(1024)));
  246. auto iter = OpenTransactionLogIter(3);
  247. ExpectRecords(2, iter);
  248. } while (ChangeCompactOptions());
  249. }
  250. TEST_F(DBTestXactLogIterator, TransactionLogIteratorBlobs) {
  251. Options options = OptionsForLogIterTest();
  252. DestroyAndReopen(options);
  253. CreateAndReopenWithCF({"pikachu"}, options);
  254. {
  255. WriteBatch batch;
  256. ASSERT_OK(batch.Put(handles_[1], "key1", DummyString(1024)));
  257. ASSERT_OK(batch.Put(handles_[0], "key2", DummyString(1024)));
  258. ASSERT_OK(batch.PutLogData(Slice("blob1")));
  259. ASSERT_OK(batch.Put(handles_[1], "key3", DummyString(1024)));
  260. ASSERT_OK(batch.PutLogData(Slice("blob2")));
  261. ASSERT_OK(batch.Delete(handles_[0], "key2"));
  262. ASSERT_OK(dbfull()->Write(WriteOptions(), &batch));
  263. ReopenWithColumnFamilies({"default", "pikachu"}, options);
  264. }
  265. auto res = OpenTransactionLogIter(0)->GetBatch();
  266. struct Handler : public WriteBatch::Handler {
  267. std::string seen;
  268. Status PutCF(uint32_t cf, const Slice& key, const Slice& value) override {
  269. seen += "Put(" + std::to_string(cf) + ", " + key.ToString() + ", " +
  270. std::to_string(value.size()) + ")";
  271. return Status::OK();
  272. }
  273. Status MergeCF(uint32_t cf, const Slice& key, const Slice& value) override {
  274. seen += "Merge(" + std::to_string(cf) + ", " + key.ToString() + ", " +
  275. std::to_string(value.size()) + ")";
  276. return Status::OK();
  277. }
  278. void LogData(const Slice& blob) override {
  279. seen += "LogData(" + blob.ToString() + ")";
  280. }
  281. Status DeleteCF(uint32_t cf, const Slice& key) override {
  282. seen += "Delete(" + std::to_string(cf) + ", " + key.ToString() + ")";
  283. return Status::OK();
  284. }
  285. } handler;
  286. ASSERT_OK(res.writeBatchPtr->Iterate(&handler));
  287. ASSERT_EQ(
  288. "Put(1, key1, 1024)"
  289. "Put(0, key2, 1024)"
  290. "LogData(blob1)"
  291. "Put(1, key3, 1024)"
  292. "LogData(blob2)"
  293. "Delete(0, key2)",
  294. handler.seen);
  295. }
  296. } // namespace ROCKSDB_NAMESPACE
  297. int main(int argc, char** argv) {
  298. ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
  299. ::testing::InitGoogleTest(&argc, argv);
  300. return RUN_ALL_TESTS();
  301. }