| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294 | //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.//  This source code is licensed under both the GPLv2 (found in the//  COPYING file in the root directory) and Apache 2.0 License//  (found in the LICENSE.Apache file in the root directory).//// Copyright (c) 2011 The LevelDB Authors. All rights reserved.// Use of this source code is governed by a BSD-style license that can be// found in the LICENSE file. See the AUTHORS file for names of contributors.// Introduction of SyncPoint effectively disabled building and running this test// in Release build.// which is a pity, it is a good test#if !defined(ROCKSDB_LITE)#include "db/db_test_util.h"#include "port/stack_trace.h"namespace ROCKSDB_NAMESPACE {class DBTestXactLogIterator : public DBTestBase { public:  DBTestXactLogIterator() : DBTestBase("/db_log_iter_test") {}  std::unique_ptr<TransactionLogIterator> OpenTransactionLogIter(      const SequenceNumber seq) {    std::unique_ptr<TransactionLogIterator> iter;    Status status = dbfull()->GetUpdatesSince(seq, &iter);    EXPECT_OK(status);    EXPECT_TRUE(iter->Valid());    return iter;  }};namespace {SequenceNumber ReadRecords(    std::unique_ptr<TransactionLogIterator>& iter,    int& count) {  count = 0;  SequenceNumber lastSequence = 0;  BatchResult res;  while (iter->Valid()) {    res = iter->GetBatch();    EXPECT_TRUE(res.sequence > lastSequence);    ++count;    lastSequence = res.sequence;    EXPECT_OK(iter->status());    iter->Next();  }  return res.sequence;}void ExpectRecords(    const int expected_no_records,    std::unique_ptr<TransactionLogIterator>& iter) {  int num_records;  ReadRecords(iter, num_records);  ASSERT_EQ(num_records, expected_no_records);}}  // namespaceTEST_F(DBTestXactLogIterator, TransactionLogIterator) {  do {    Options options = OptionsForLogIterTest();    DestroyAndReopen(options);    CreateAndReopenWithCF({"pikachu"}, options);    Put(0, "key1", DummyString(1024));    Put(1, "key2", DummyString(1024));    Put(1, "key2", DummyString(1024));    ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 3U);    {      auto iter = OpenTransactionLogIter(0);      ExpectRecords(3, iter);    }    ReopenWithColumnFamilies({"default", "pikachu"}, options);    env_->SleepForMicroseconds(2 * 1000 * 1000);    {      Put(0, "key4", DummyString(1024));      Put(1, "key5", DummyString(1024));      Put(0, "key6", DummyString(1024));    }    {      auto iter = OpenTransactionLogIter(0);      ExpectRecords(6, iter);    }  } while (ChangeCompactOptions());}#ifndef NDEBUG  // sync point is not included with DNDEBUG buildTEST_F(DBTestXactLogIterator, TransactionLogIteratorRace) {  static const int LOG_ITERATOR_RACE_TEST_COUNT = 2;  static const char* sync_points[LOG_ITERATOR_RACE_TEST_COUNT][4] = {      {"WalManager::GetSortedWalFiles:1",  "WalManager::PurgeObsoleteFiles:1",       "WalManager::PurgeObsoleteFiles:2", "WalManager::GetSortedWalFiles:2"},      {"WalManager::GetSortedWalsOfType:1",       "WalManager::PurgeObsoleteFiles:1",       "WalManager::PurgeObsoleteFiles:2",       "WalManager::GetSortedWalsOfType:2"}};  for (int test = 0; test < LOG_ITERATOR_RACE_TEST_COUNT; ++test) {    // Setup sync point dependency to reproduce the race condition of    // a log file moved to archived dir, in the middle of GetSortedWalFiles    ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({        {sync_points[test][0], sync_points[test][1]},        {sync_points[test][2], sync_points[test][3]},    });    do {      ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearTrace();      ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();      Options options = OptionsForLogIterTest();      DestroyAndReopen(options);      Put("key1", DummyString(1024));      dbfull()->Flush(FlushOptions());      Put("key2", DummyString(1024));      dbfull()->Flush(FlushOptions());      Put("key3", DummyString(1024));      dbfull()->Flush(FlushOptions());      Put("key4", DummyString(1024));      ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 4U);      dbfull()->FlushWAL(false);      {        auto iter = OpenTransactionLogIter(0);        ExpectRecords(4, iter);      }      ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();      // trigger async flush, and log move. Well, log move will      // wait until the GetSortedWalFiles:1 to reproduce the race      // condition      FlushOptions flush_options;      flush_options.wait = false;      dbfull()->Flush(flush_options);      // "key5" would be written in a new memtable and log      Put("key5", DummyString(1024));      dbfull()->FlushWAL(false);      {        // this iter would miss "key4" if not fixed        auto iter = OpenTransactionLogIter(0);        ExpectRecords(5, iter);      }    } while (ChangeCompactOptions());  }}#endifTEST_F(DBTestXactLogIterator, TransactionLogIteratorStallAtLastRecord) {  do {    Options options = OptionsForLogIterTest();    DestroyAndReopen(options);    Put("key1", DummyString(1024));    auto iter = OpenTransactionLogIter(0);    ASSERT_OK(iter->status());    ASSERT_TRUE(iter->Valid());    iter->Next();    ASSERT_TRUE(!iter->Valid());    ASSERT_OK(iter->status());    Put("key2", DummyString(1024));    iter->Next();    ASSERT_OK(iter->status());    ASSERT_TRUE(iter->Valid());  } while (ChangeCompactOptions());}TEST_F(DBTestXactLogIterator, TransactionLogIteratorCheckAfterRestart) {  do {    Options options = OptionsForLogIterTest();    DestroyAndReopen(options);    Put("key1", DummyString(1024));    Put("key2", DummyString(1023));    dbfull()->Flush(FlushOptions());    Reopen(options);    auto iter = OpenTransactionLogIter(0);    ExpectRecords(2, iter);  } while (ChangeCompactOptions());}TEST_F(DBTestXactLogIterator, TransactionLogIteratorCorruptedLog) {  do {    Options options = OptionsForLogIterTest();    DestroyAndReopen(options);    for (int i = 0; i < 1024; i++) {      Put("key"+ToString(i), DummyString(10));    }    dbfull()->Flush(FlushOptions());    dbfull()->FlushWAL(false);    // Corrupt this log to create a gap    ROCKSDB_NAMESPACE::VectorLogPtr wal_files;    ASSERT_OK(dbfull()->GetSortedWalFiles(wal_files));    const auto logfile_path = dbname_ + "/" + wal_files.front()->PathName();    if (mem_env_) {      mem_env_->Truncate(logfile_path, wal_files.front()->SizeFileBytes() / 2);    } else {      ASSERT_EQ(0, truncate(logfile_path.c_str(),                   wal_files.front()->SizeFileBytes() / 2));    }    // Insert a new entry to a new log file    Put("key1025", DummyString(10));    dbfull()->FlushWAL(false);    // Try to read from the beginning. Should stop before the gap and read less    // than 1025 entries    auto iter = OpenTransactionLogIter(0);    int count;    SequenceNumber last_sequence_read = ReadRecords(iter, count);    ASSERT_LT(last_sequence_read, 1025U);    // Try to read past the gap, should be able to seek to key1025    auto iter2 = OpenTransactionLogIter(last_sequence_read + 1);    ExpectRecords(1, iter2);  } while (ChangeCompactOptions());}TEST_F(DBTestXactLogIterator, TransactionLogIteratorBatchOperations) {  do {    Options options = OptionsForLogIterTest();    DestroyAndReopen(options);    CreateAndReopenWithCF({"pikachu"}, options);    WriteBatch batch;    batch.Put(handles_[1], "key1", DummyString(1024));    batch.Put(handles_[0], "key2", DummyString(1024));    batch.Put(handles_[1], "key3", DummyString(1024));    batch.Delete(handles_[0], "key2");    dbfull()->Write(WriteOptions(), &batch);    Flush(1);    Flush(0);    ReopenWithColumnFamilies({"default", "pikachu"}, options);    Put(1, "key4", DummyString(1024));    auto iter = OpenTransactionLogIter(3);    ExpectRecords(2, iter);  } while (ChangeCompactOptions());}TEST_F(DBTestXactLogIterator, TransactionLogIteratorBlobs) {  Options options = OptionsForLogIterTest();  DestroyAndReopen(options);  CreateAndReopenWithCF({"pikachu"}, options);  {    WriteBatch batch;    batch.Put(handles_[1], "key1", DummyString(1024));    batch.Put(handles_[0], "key2", DummyString(1024));    batch.PutLogData(Slice("blob1"));    batch.Put(handles_[1], "key3", DummyString(1024));    batch.PutLogData(Slice("blob2"));    batch.Delete(handles_[0], "key2");    dbfull()->Write(WriteOptions(), &batch);    ReopenWithColumnFamilies({"default", "pikachu"}, options);  }  auto res = OpenTransactionLogIter(0)->GetBatch();  struct Handler : public WriteBatch::Handler {    std::string seen;    Status PutCF(uint32_t cf, const Slice& key, const Slice& value) override {      seen += "Put(" + ToString(cf) + ", " + key.ToString() + ", " +              ToString(value.size()) + ")";      return Status::OK();    }    Status MergeCF(uint32_t cf, const Slice& key, const Slice& value) override {      seen += "Merge(" + ToString(cf) + ", " + key.ToString() + ", " +              ToString(value.size()) + ")";      return Status::OK();    }    void LogData(const Slice& blob) override {      seen += "LogData(" + blob.ToString() + ")";    }    Status DeleteCF(uint32_t cf, const Slice& key) override {      seen += "Delete(" + ToString(cf) + ", " + key.ToString() + ")";      return Status::OK();    }  } handler;  res.writeBatchPtr->Iterate(&handler);  ASSERT_EQ(      "Put(1, key1, 1024)"      "Put(0, key2, 1024)"      "LogData(blob1)"      "Put(1, key3, 1024)"      "LogData(blob2)"      "Delete(0, key2)",      handler.seen);}}  // namespace ROCKSDB_NAMESPACE#endif  // !defined(ROCKSDB_LITE)int main(int argc, char** argv) {#if !defined(ROCKSDB_LITE)  ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();  ::testing::InitGoogleTest(&argc, argv);  return RUN_ALL_TESTS();#else  (void) argc;  (void) argv;  return 0;#endif}
 |