| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294 |
- // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
- // This source code is licensed under both the GPLv2 (found in the
- // COPYING file in the root directory) and Apache 2.0 License
- // (found in the LICENSE.Apache file in the root directory).
- //
- // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
- // Use of this source code is governed by a BSD-style license that can be
- // found in the LICENSE file. See the AUTHORS file for names of contributors.
- // Introduction of SyncPoint effectively disabled building and running this test
- // in Release build.
- // which is a pity, it is a good test
- #if !defined(ROCKSDB_LITE)
- #include "db/db_test_util.h"
- #include "port/stack_trace.h"
- namespace ROCKSDB_NAMESPACE {
- class DBTestXactLogIterator : public DBTestBase {
- public:
- DBTestXactLogIterator() : DBTestBase("/db_log_iter_test") {}
- std::unique_ptr<TransactionLogIterator> OpenTransactionLogIter(
- const SequenceNumber seq) {
- std::unique_ptr<TransactionLogIterator> iter;
- Status status = dbfull()->GetUpdatesSince(seq, &iter);
- EXPECT_OK(status);
- EXPECT_TRUE(iter->Valid());
- return iter;
- }
- };
- namespace {
- SequenceNumber ReadRecords(
- std::unique_ptr<TransactionLogIterator>& iter,
- int& count) {
- count = 0;
- SequenceNumber lastSequence = 0;
- BatchResult res;
- while (iter->Valid()) {
- res = iter->GetBatch();
- EXPECT_TRUE(res.sequence > lastSequence);
- ++count;
- lastSequence = res.sequence;
- EXPECT_OK(iter->status());
- iter->Next();
- }
- return res.sequence;
- }
- void ExpectRecords(
- const int expected_no_records,
- std::unique_ptr<TransactionLogIterator>& iter) {
- int num_records;
- ReadRecords(iter, num_records);
- ASSERT_EQ(num_records, expected_no_records);
- }
- } // namespace
- TEST_F(DBTestXactLogIterator, TransactionLogIterator) {
- do {
- Options options = OptionsForLogIterTest();
- DestroyAndReopen(options);
- CreateAndReopenWithCF({"pikachu"}, options);
- Put(0, "key1", DummyString(1024));
- Put(1, "key2", DummyString(1024));
- Put(1, "key2", DummyString(1024));
- ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 3U);
- {
- auto iter = OpenTransactionLogIter(0);
- ExpectRecords(3, iter);
- }
- ReopenWithColumnFamilies({"default", "pikachu"}, options);
- env_->SleepForMicroseconds(2 * 1000 * 1000);
- {
- Put(0, "key4", DummyString(1024));
- Put(1, "key5", DummyString(1024));
- Put(0, "key6", DummyString(1024));
- }
- {
- auto iter = OpenTransactionLogIter(0);
- ExpectRecords(6, iter);
- }
- } while (ChangeCompactOptions());
- }
- #ifndef NDEBUG // sync point is not included with DNDEBUG build
- TEST_F(DBTestXactLogIterator, TransactionLogIteratorRace) {
- static const int LOG_ITERATOR_RACE_TEST_COUNT = 2;
- static const char* sync_points[LOG_ITERATOR_RACE_TEST_COUNT][4] = {
- {"WalManager::GetSortedWalFiles:1", "WalManager::PurgeObsoleteFiles:1",
- "WalManager::PurgeObsoleteFiles:2", "WalManager::GetSortedWalFiles:2"},
- {"WalManager::GetSortedWalsOfType:1",
- "WalManager::PurgeObsoleteFiles:1",
- "WalManager::PurgeObsoleteFiles:2",
- "WalManager::GetSortedWalsOfType:2"}};
- for (int test = 0; test < LOG_ITERATOR_RACE_TEST_COUNT; ++test) {
- // Setup sync point dependency to reproduce the race condition of
- // a log file moved to archived dir, in the middle of GetSortedWalFiles
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
- {sync_points[test][0], sync_points[test][1]},
- {sync_points[test][2], sync_points[test][3]},
- });
- do {
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearTrace();
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
- Options options = OptionsForLogIterTest();
- DestroyAndReopen(options);
- Put("key1", DummyString(1024));
- dbfull()->Flush(FlushOptions());
- Put("key2", DummyString(1024));
- dbfull()->Flush(FlushOptions());
- Put("key3", DummyString(1024));
- dbfull()->Flush(FlushOptions());
- Put("key4", DummyString(1024));
- ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 4U);
- dbfull()->FlushWAL(false);
- {
- auto iter = OpenTransactionLogIter(0);
- ExpectRecords(4, iter);
- }
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
- // trigger async flush, and log move. Well, log move will
- // wait until the GetSortedWalFiles:1 to reproduce the race
- // condition
- FlushOptions flush_options;
- flush_options.wait = false;
- dbfull()->Flush(flush_options);
- // "key5" would be written in a new memtable and log
- Put("key5", DummyString(1024));
- dbfull()->FlushWAL(false);
- {
- // this iter would miss "key4" if not fixed
- auto iter = OpenTransactionLogIter(0);
- ExpectRecords(5, iter);
- }
- } while (ChangeCompactOptions());
- }
- }
- #endif
- TEST_F(DBTestXactLogIterator, TransactionLogIteratorStallAtLastRecord) {
- do {
- Options options = OptionsForLogIterTest();
- DestroyAndReopen(options);
- Put("key1", DummyString(1024));
- auto iter = OpenTransactionLogIter(0);
- ASSERT_OK(iter->status());
- ASSERT_TRUE(iter->Valid());
- iter->Next();
- ASSERT_TRUE(!iter->Valid());
- ASSERT_OK(iter->status());
- Put("key2", DummyString(1024));
- iter->Next();
- ASSERT_OK(iter->status());
- ASSERT_TRUE(iter->Valid());
- } while (ChangeCompactOptions());
- }
- TEST_F(DBTestXactLogIterator, TransactionLogIteratorCheckAfterRestart) {
- do {
- Options options = OptionsForLogIterTest();
- DestroyAndReopen(options);
- Put("key1", DummyString(1024));
- Put("key2", DummyString(1023));
- dbfull()->Flush(FlushOptions());
- Reopen(options);
- auto iter = OpenTransactionLogIter(0);
- ExpectRecords(2, iter);
- } while (ChangeCompactOptions());
- }
- TEST_F(DBTestXactLogIterator, TransactionLogIteratorCorruptedLog) {
- do {
- Options options = OptionsForLogIterTest();
- DestroyAndReopen(options);
- for (int i = 0; i < 1024; i++) {
- Put("key"+ToString(i), DummyString(10));
- }
- dbfull()->Flush(FlushOptions());
- dbfull()->FlushWAL(false);
- // Corrupt this log to create a gap
- ROCKSDB_NAMESPACE::VectorLogPtr wal_files;
- ASSERT_OK(dbfull()->GetSortedWalFiles(wal_files));
- const auto logfile_path = dbname_ + "/" + wal_files.front()->PathName();
- if (mem_env_) {
- mem_env_->Truncate(logfile_path, wal_files.front()->SizeFileBytes() / 2);
- } else {
- ASSERT_EQ(0, truncate(logfile_path.c_str(),
- wal_files.front()->SizeFileBytes() / 2));
- }
- // Insert a new entry to a new log file
- Put("key1025", DummyString(10));
- dbfull()->FlushWAL(false);
- // Try to read from the beginning. Should stop before the gap and read less
- // than 1025 entries
- auto iter = OpenTransactionLogIter(0);
- int count;
- SequenceNumber last_sequence_read = ReadRecords(iter, count);
- ASSERT_LT(last_sequence_read, 1025U);
- // Try to read past the gap, should be able to seek to key1025
- auto iter2 = OpenTransactionLogIter(last_sequence_read + 1);
- ExpectRecords(1, iter2);
- } while (ChangeCompactOptions());
- }
- TEST_F(DBTestXactLogIterator, TransactionLogIteratorBatchOperations) {
- do {
- Options options = OptionsForLogIterTest();
- DestroyAndReopen(options);
- CreateAndReopenWithCF({"pikachu"}, options);
- WriteBatch batch;
- batch.Put(handles_[1], "key1", DummyString(1024));
- batch.Put(handles_[0], "key2", DummyString(1024));
- batch.Put(handles_[1], "key3", DummyString(1024));
- batch.Delete(handles_[0], "key2");
- dbfull()->Write(WriteOptions(), &batch);
- Flush(1);
- Flush(0);
- ReopenWithColumnFamilies({"default", "pikachu"}, options);
- Put(1, "key4", DummyString(1024));
- auto iter = OpenTransactionLogIter(3);
- ExpectRecords(2, iter);
- } while (ChangeCompactOptions());
- }
- TEST_F(DBTestXactLogIterator, TransactionLogIteratorBlobs) {
- Options options = OptionsForLogIterTest();
- DestroyAndReopen(options);
- CreateAndReopenWithCF({"pikachu"}, options);
- {
- WriteBatch batch;
- batch.Put(handles_[1], "key1", DummyString(1024));
- batch.Put(handles_[0], "key2", DummyString(1024));
- batch.PutLogData(Slice("blob1"));
- batch.Put(handles_[1], "key3", DummyString(1024));
- batch.PutLogData(Slice("blob2"));
- batch.Delete(handles_[0], "key2");
- dbfull()->Write(WriteOptions(), &batch);
- ReopenWithColumnFamilies({"default", "pikachu"}, options);
- }
- auto res = OpenTransactionLogIter(0)->GetBatch();
- struct Handler : public WriteBatch::Handler {
- std::string seen;
- Status PutCF(uint32_t cf, const Slice& key, const Slice& value) override {
- seen += "Put(" + ToString(cf) + ", " + key.ToString() + ", " +
- ToString(value.size()) + ")";
- return Status::OK();
- }
- Status MergeCF(uint32_t cf, const Slice& key, const Slice& value) override {
- seen += "Merge(" + ToString(cf) + ", " + key.ToString() + ", " +
- ToString(value.size()) + ")";
- return Status::OK();
- }
- void LogData(const Slice& blob) override {
- seen += "LogData(" + blob.ToString() + ")";
- }
- Status DeleteCF(uint32_t cf, const Slice& key) override {
- seen += "Delete(" + ToString(cf) + ", " + key.ToString() + ")";
- return Status::OK();
- }
- } handler;
- res.writeBatchPtr->Iterate(&handler);
- ASSERT_EQ(
- "Put(1, key1, 1024)"
- "Put(0, key2, 1024)"
- "LogData(blob1)"
- "Put(1, key3, 1024)"
- "LogData(blob2)"
- "Delete(0, key2)",
- handler.seen);
- }
- } // namespace ROCKSDB_NAMESPACE
- #endif // !defined(ROCKSDB_LITE)
- int main(int argc, char** argv) {
- #if !defined(ROCKSDB_LITE)
- ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
- ::testing::InitGoogleTest(&argc, argv);
- return RUN_ALL_TESTS();
- #else
- (void) argc;
- (void) argv;
- return 0;
- #endif
- }
|