| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466 |
- // Copyright (c) Meta Platforms, Inc. and affiliates. All rights reserved.
- // This source code is licensed under both the GPLv2 (found in the
- // COPYING file in the root directory) and Apache 2.0 License
- // (found in the LICENSE.Apache file in the root directory).
- #include <cassert>
- #include "util/cast_util.h"
- #include "utilities/transactions/transaction_test.h"
- namespace ROCKSDB_NAMESPACE {
- constexpr std::array TimestampedSnapshotWithTsSanityCheck_Params = {
- std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite),
- std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite),
- std::make_tuple(false, false, WRITE_UNPREPARED, kOrderedWrite)};
- INSTANTIATE_TEST_CASE_P(
- Unsupported, TimestampedSnapshotWithTsSanityCheck,
- ::testing::ValuesIn(WRAP_PARAM_WITH_PER_KEY_POINT_LOCK_MANAGER_PARAMS(
- WRAP_PARAM(bool, bool, TxnDBWritePolicy, WriteOrdering),
- TimestampedSnapshotWithTsSanityCheck_Params)));
- INSTANTIATE_TEST_CASE_P(
- WriteCommitted, TransactionTest,
- ::testing::Combine(/*use_stackable_db=*/::testing::Bool(),
- /*two_write_queue=*/::testing::Bool(),
- ::testing::Values(WRITE_COMMITTED),
- ::testing::Values(kOrderedWrite),
- /*use_per_key_point_lock_mgr=*/::testing::Bool(),
- /*deadlock_timeout_us=*/::testing::Values(0, 1000)));
- namespace {
- // Not thread-safe. Caller needs to provide external synchronization.
- class TsCheckingTxnNotifier : public TransactionNotifier {
- public:
- explicit TsCheckingTxnNotifier() = default;
- ~TsCheckingTxnNotifier() override = default;
- void SnapshotCreated(const Snapshot* new_snapshot) override {
- assert(new_snapshot);
- if (prev_snapshot_seq_ != kMaxSequenceNumber) {
- assert(prev_snapshot_seq_ <= new_snapshot->GetSequenceNumber());
- }
- prev_snapshot_seq_ = new_snapshot->GetSequenceNumber();
- if (prev_snapshot_ts_ != kMaxTxnTimestamp) {
- assert(prev_snapshot_ts_ <= new_snapshot->GetTimestamp());
- }
- prev_snapshot_ts_ = new_snapshot->GetTimestamp();
- }
- TxnTimestamp prev_snapshot_ts() const { return prev_snapshot_ts_; }
- private:
- SequenceNumber prev_snapshot_seq_ = kMaxSequenceNumber;
- TxnTimestamp prev_snapshot_ts_ = kMaxTxnTimestamp;
- };
- } // anonymous namespace
- TEST_P(TimestampedSnapshotWithTsSanityCheck, WithoutCommitTs) {
- std::unique_ptr<Transaction> txn(
- db->BeginTransaction(WriteOptions(), TransactionOptions()));
- assert(txn);
- ASSERT_OK(txn->SetName("txn0"));
- ASSERT_OK(txn->Put("a", "v"));
- ASSERT_OK(txn->Prepare());
- Status s = txn->CommitAndTryCreateSnapshot();
- ASSERT_TRUE(s.IsInvalidArgument());
- ASSERT_OK(txn->Rollback());
- txn.reset(db->BeginTransaction(WriteOptions(), TransactionOptions()));
- assert(txn);
- ASSERT_OK(txn->SetName("txn0"));
- ASSERT_OK(txn->Put("a", "v"));
- s = txn->CommitAndTryCreateSnapshot();
- ASSERT_TRUE(s.IsInvalidArgument());
- }
- TEST_P(TimestampedSnapshotWithTsSanityCheck, SetCommitTs) {
- std::unique_ptr<Transaction> txn(
- db->BeginTransaction(WriteOptions(), TransactionOptions()));
- assert(txn);
- ASSERT_OK(txn->SetName("txn0"));
- ASSERT_OK(txn->Put("a", "v"));
- ASSERT_OK(txn->Prepare());
- std::shared_ptr<const Snapshot> snapshot;
- Status s = txn->CommitAndTryCreateSnapshot(nullptr, 10, &snapshot);
- ASSERT_TRUE(s.IsNotSupported());
- ASSERT_OK(txn->Rollback());
- txn.reset(db->BeginTransaction(WriteOptions(), TransactionOptions()));
- assert(txn);
- ASSERT_OK(txn->SetName("txn0"));
- ASSERT_OK(txn->Put("a", "v"));
- s = txn->CommitAndTryCreateSnapshot(nullptr, 10, &snapshot);
- ASSERT_TRUE(s.IsNotSupported());
- }
- TEST_P(TransactionTest, WithoutCommitTs) {
- std::unique_ptr<Transaction> txn(
- db->BeginTransaction(WriteOptions(), TransactionOptions()));
- assert(txn);
- ASSERT_OK(txn->SetName("txn0"));
- ASSERT_OK(txn->Put("a", "v"));
- ASSERT_OK(txn->Prepare());
- Status s = txn->CommitAndTryCreateSnapshot();
- ASSERT_TRUE(s.IsInvalidArgument());
- ASSERT_OK(txn->Rollback());
- txn.reset(db->BeginTransaction(WriteOptions(), TransactionOptions()));
- assert(txn);
- ASSERT_OK(txn->SetName("txn0"));
- ASSERT_OK(txn->Put("a", "v"));
- s = txn->CommitAndTryCreateSnapshot();
- ASSERT_TRUE(s.IsInvalidArgument());
- }
- TEST_P(TransactionTest, ReuseExistingTxn) {
- Transaction* txn = db->BeginTransaction(WriteOptions(), TransactionOptions());
- assert(txn);
- ASSERT_OK(txn->SetName("txn0"));
- ASSERT_OK(txn->Put("a", "v1"));
- ASSERT_OK(txn->Prepare());
- auto notifier = std::make_shared<TsCheckingTxnNotifier>();
- std::shared_ptr<const Snapshot> snapshot1;
- Status s =
- txn->CommitAndTryCreateSnapshot(notifier, /*commit_ts=*/100, &snapshot1);
- ASSERT_OK(s);
- ASSERT_EQ(100, snapshot1->GetTimestamp());
- Transaction* txn1 =
- db->BeginTransaction(WriteOptions(), TransactionOptions(), txn);
- assert(txn1 == txn);
- ASSERT_OK(txn1->SetName("txn1"));
- ASSERT_OK(txn->Put("a", "v2"));
- ASSERT_OK(txn->Prepare());
- std::shared_ptr<const Snapshot> snapshot2;
- s = txn->CommitAndTryCreateSnapshot(notifier, /*commit_ts=*/110, &snapshot2);
- ASSERT_OK(s);
- ASSERT_EQ(110, snapshot2->GetTimestamp());
- delete txn;
- {
- std::string value;
- ReadOptions read_opts;
- read_opts.snapshot = snapshot1.get();
- ASSERT_OK(db->Get(read_opts, "a", &value));
- ASSERT_EQ("v1", value);
- read_opts.snapshot = snapshot2.get();
- ASSERT_OK(db->Get(read_opts, "a", &value));
- ASSERT_EQ("v2", value);
- }
- }
- TEST_P(TransactionTest, CreateSnapshotWhenCommit) {
- std::unique_ptr<Transaction> txn(
- db->BeginTransaction(WriteOptions(), TransactionOptions()));
- assert(txn);
- constexpr int batch_size = 10;
- for (int i = 0; i < batch_size; ++i) {
- ASSERT_OK(db->Put(WriteOptions(), "k" + std::to_string(i), "v0"));
- }
- const SequenceNumber seq0 = db->GetLatestSequenceNumber();
- ASSERT_EQ(static_cast<SequenceNumber>(batch_size), seq0);
- txn->SetSnapshot();
- {
- const Snapshot* const snapshot = txn->GetSnapshot();
- assert(snapshot);
- ASSERT_EQ(seq0, snapshot->GetSequenceNumber());
- }
- for (int i = 0; i < batch_size; ++i) {
- ASSERT_OK(txn->Put("k" + std::to_string(i), "v1"));
- }
- ASSERT_OK(txn->SetName("txn0"));
- ASSERT_OK(txn->Prepare());
- std::shared_ptr<const Snapshot> snapshot;
- constexpr TxnTimestamp timestamp = 1;
- auto notifier = std::make_shared<TsCheckingTxnNotifier>();
- Status s = txn->CommitAndTryCreateSnapshot(notifier, timestamp, &snapshot);
- ASSERT_OK(s);
- ASSERT_LT(notifier->prev_snapshot_ts(), kMaxTxnTimestamp);
- assert(snapshot);
- ASSERT_EQ(timestamp, snapshot->GetTimestamp());
- ASSERT_EQ(seq0 + batch_size, snapshot->GetSequenceNumber());
- const Snapshot* const raw_snapshot_ptr = txn->GetSnapshot();
- ASSERT_EQ(raw_snapshot_ptr, snapshot.get());
- ASSERT_EQ(snapshot, txn->GetTimestampedSnapshot());
- {
- std::shared_ptr<const Snapshot> snapshot1 =
- db->GetLatestTimestampedSnapshot();
- ASSERT_EQ(snapshot, snapshot1);
- }
- {
- std::shared_ptr<const Snapshot> snapshot1 =
- db->GetTimestampedSnapshot(timestamp);
- ASSERT_EQ(snapshot, snapshot1);
- }
- {
- std::vector<std::shared_ptr<const Snapshot> > snapshots;
- s = db->GetAllTimestampedSnapshots(snapshots);
- ASSERT_OK(s);
- ASSERT_EQ(std::vector<std::shared_ptr<const Snapshot> >{snapshot},
- snapshots);
- }
- }
- TEST_P(TransactionTest, CreateSnapshot) {
- // First create a non-timestamped snapshot
- ManagedSnapshot snapshot_guard(db);
- for (int i = 0; i < 10; ++i) {
- ASSERT_OK(db->Put(WriteOptions(), "k" + std::to_string(i),
- "v0_" + std::to_string(i)));
- }
- {
- auto ret = db->CreateTimestampedSnapshot(kMaxTxnTimestamp);
- ASSERT_TRUE(ret.first.IsInvalidArgument());
- auto snapshot = ret.second;
- ASSERT_EQ(nullptr, snapshot.get());
- }
- constexpr TxnTimestamp timestamp = 100;
- Status s;
- std::shared_ptr<const Snapshot> ts_snap0;
- std::tie(s, ts_snap0) = db->CreateTimestampedSnapshot(timestamp);
- ASSERT_OK(s);
- assert(ts_snap0);
- ASSERT_EQ(timestamp, ts_snap0->GetTimestamp());
- for (int i = 0; i < 10; ++i) {
- ASSERT_OK(db->Delete(WriteOptions(), "k" + std::to_string(i)));
- }
- {
- ReadOptions read_opts;
- read_opts.snapshot = ts_snap0.get();
- for (int i = 0; i < 10; ++i) {
- std::string value;
- s = db->Get(read_opts, "k" + std::to_string(i), &value);
- ASSERT_OK(s);
- ASSERT_EQ("v0_" + std::to_string(i), value);
- }
- }
- {
- std::shared_ptr<const Snapshot> snapshot =
- db->GetLatestTimestampedSnapshot();
- ASSERT_EQ(ts_snap0, snapshot);
- }
- {
- std::shared_ptr<const Snapshot> snapshot =
- db->GetTimestampedSnapshot(timestamp);
- ASSERT_OK(s);
- ASSERT_EQ(ts_snap0, snapshot);
- }
- {
- std::vector<std::shared_ptr<const Snapshot> > snapshots;
- s = db->GetAllTimestampedSnapshots(snapshots);
- ASSERT_OK(s);
- ASSERT_EQ(std::vector<std::shared_ptr<const Snapshot> >{ts_snap0},
- snapshots);
- }
- }
- TEST_P(TransactionTest, SequenceAndTsOrder) {
- Status s;
- std::shared_ptr<const Snapshot> snapshot;
- std::tie(s, snapshot) = db->CreateTimestampedSnapshot(100);
- ASSERT_OK(s);
- assert(snapshot);
- {
- // Cannot request smaller timestamp for the new timestamped snapshot.
- std::shared_ptr<const Snapshot> tmp_snapshot;
- std::tie(s, tmp_snapshot) = db->CreateTimestampedSnapshot(50);
- ASSERT_TRUE(s.IsInvalidArgument());
- ASSERT_EQ(nullptr, tmp_snapshot.get());
- }
- // If requesting a new timestamped snapshot with the same timestamp and
- // sequence number, we avoid creating new snapshot object but reuse
- // exisisting one.
- std::shared_ptr<const Snapshot> snapshot1;
- std::tie(s, snapshot1) = db->CreateTimestampedSnapshot(100);
- ASSERT_OK(s);
- ASSERT_EQ(snapshot.get(), snapshot1.get());
- // If there is no write, but we request a larger timestamp, we still create
- // a new snapshot object.
- std::shared_ptr<const Snapshot> snapshot2;
- std::tie(s, snapshot2) = db->CreateTimestampedSnapshot(200);
- ASSERT_OK(s);
- assert(snapshot2);
- ASSERT_NE(snapshot.get(), snapshot2.get());
- ASSERT_EQ(snapshot2->GetSequenceNumber(), snapshot->GetSequenceNumber());
- ASSERT_EQ(200, snapshot2->GetTimestamp());
- // Increase sequence number.
- ASSERT_OK(db->Put(WriteOptions(), "foo", "v0"));
- {
- // We are requesting the same timestamp for a larger sequence number, thus
- // we cannot create timestamped snapshot.
- std::shared_ptr<const Snapshot> tmp_snapshot;
- std::tie(s, tmp_snapshot) = db->CreateTimestampedSnapshot(200);
- ASSERT_TRUE(s.IsInvalidArgument());
- ASSERT_EQ(nullptr, tmp_snapshot.get());
- }
- {
- std::unique_ptr<Transaction> txn1(
- db->BeginTransaction(WriteOptions(), TransactionOptions()));
- ASSERT_OK(txn1->Put("bar", "v0"));
- std::shared_ptr<const Snapshot> ss;
- ASSERT_OK(txn1->CommitAndTryCreateSnapshot(nullptr, 200, &ss));
- // Cannot create snapshot because requested timestamp is the same as the
- // latest timestamped snapshot while sequence number is strictly higher.
- ASSERT_EQ(nullptr, ss);
- }
- {
- std::unique_ptr<Transaction> txn2(
- db->BeginTransaction(WriteOptions(), TransactionOptions()));
- ASSERT_OK(txn2->Put("bar", "v0"));
- std::shared_ptr<const Snapshot> ss;
- // Application should never do this. This is just to demonstrate error
- // handling.
- ASSERT_OK(txn2->CommitAndTryCreateSnapshot(nullptr, 100, &ss));
- // Cannot create snapshot because requested timestamp is smaller than
- // latest timestamped snapshot.
- ASSERT_EQ(nullptr, ss);
- }
- }
- TEST_P(TransactionTest, CloseDbWithSnapshots) {
- std::unique_ptr<Transaction> txn(
- db->BeginTransaction(WriteOptions(), TransactionOptions()));
- ASSERT_OK(txn->SetName("txn0"));
- ASSERT_OK(txn->Put("foo", "v"));
- ASSERT_OK(txn->Prepare());
- std::shared_ptr<const Snapshot> snapshot;
- constexpr TxnTimestamp timestamp = 121;
- auto notifier = std::make_shared<TsCheckingTxnNotifier>();
- ASSERT_OK(txn->CommitAndTryCreateSnapshot(notifier, timestamp, &snapshot));
- assert(snapshot);
- ASSERT_LT(notifier->prev_snapshot_ts(), kMaxTxnTimestamp);
- ASSERT_EQ(timestamp, snapshot->GetTimestamp());
- ASSERT_TRUE(db->Close().IsAborted());
- }
- TEST_P(TransactionTest, MultipleTimestampedSnapshots) {
- auto* dbimpl = static_cast_with_check<DBImpl>(db->GetRootDB());
- assert(dbimpl);
- const bool seq_per_batch = dbimpl->seq_per_batch();
- // TODO: remove the following assert(!seq_per_batch) once timestamped snapshot
- // is supported in write-prepared/write-unprepared transactions.
- assert(!seq_per_batch);
- constexpr size_t txn_size = 10;
- constexpr TxnTimestamp ts_delta = 10;
- constexpr size_t num_txns = 100;
- std::vector<std::shared_ptr<const Snapshot> > snapshots(num_txns);
- constexpr TxnTimestamp start_ts = 10000;
- auto notifier = std::make_shared<TsCheckingTxnNotifier>();
- for (size_t i = 0; i < num_txns; ++i) {
- std::unique_ptr<Transaction> txn(
- db->BeginTransaction(WriteOptions(), TransactionOptions()));
- ASSERT_OK(txn->SetName("txn" + std::to_string(i)));
- for (size_t j = 0; j < txn_size; ++j) {
- ASSERT_OK(txn->Put("k" + std::to_string(j),
- "v" + std::to_string(j) + "_" + std::to_string(i)));
- }
- if (0 == (i % 2)) {
- ASSERT_OK(txn->Prepare());
- }
- ASSERT_OK(txn->CommitAndTryCreateSnapshot(notifier, start_ts + i * ts_delta,
- &snapshots[i]));
- assert(snapshots[i]);
- ASSERT_LT(notifier->prev_snapshot_ts(), kMaxTxnTimestamp);
- ASSERT_EQ(start_ts + i * ts_delta, snapshots[i]->GetTimestamp());
- }
- {
- auto snapshot = db->GetTimestampedSnapshot(start_ts + 1);
- ASSERT_EQ(nullptr, snapshot);
- }
- constexpr TxnTimestamp max_ts = start_ts + num_txns * ts_delta;
- for (size_t i = 0; i < num_txns; ++i) {
- auto snapshot = db->GetTimestampedSnapshot(start_ts + i * ts_delta);
- ASSERT_EQ(snapshots[i], snapshot);
- std::vector<std::shared_ptr<const Snapshot> > tmp_snapshots;
- Status s = db->GetTimestampedSnapshots(max_ts, start_ts + i * ts_delta,
- tmp_snapshots);
- ASSERT_TRUE(s.IsInvalidArgument());
- ASSERT_TRUE(tmp_snapshots.empty());
- for (size_t j = i; j < num_txns; ++j) {
- std::vector<std::shared_ptr<const Snapshot> > expected_snapshots(
- snapshots.begin() + i, snapshots.begin() + j);
- tmp_snapshots.clear();
- s = db->GetTimestampedSnapshots(start_ts + i * ts_delta,
- start_ts + j * ts_delta, tmp_snapshots);
- if (i < j) {
- ASSERT_OK(s);
- } else {
- ASSERT_TRUE(s.IsInvalidArgument());
- }
- ASSERT_EQ(expected_snapshots, tmp_snapshots);
- }
- }
- {
- std::vector<std::shared_ptr<const Snapshot> > tmp_snapshots;
- const Status s = db->GetAllTimestampedSnapshots(tmp_snapshots);
- ASSERT_OK(s);
- ASSERT_EQ(snapshots, tmp_snapshots);
- const std::shared_ptr<const Snapshot> latest_snapshot =
- db->GetLatestTimestampedSnapshot();
- ASSERT_EQ(snapshots.back(), latest_snapshot);
- }
- for (size_t i = 0; i <= num_txns; ++i) {
- std::vector<std::shared_ptr<const Snapshot> > snapshots1(
- snapshots.begin() + i, snapshots.end());
- if (i > 0) {
- auto snapshot1 =
- db->GetTimestampedSnapshot(start_ts + (i - 1) * ts_delta);
- assert(snapshot1);
- ASSERT_EQ(start_ts + (i - 1) * ts_delta, snapshot1->GetTimestamp());
- }
- db->ReleaseTimestampedSnapshotsOlderThan(start_ts + i * ts_delta);
- if (i > 0) {
- auto snapshot1 =
- db->GetTimestampedSnapshot(start_ts + (i - 1) * ts_delta);
- ASSERT_EQ(nullptr, snapshot1);
- }
- std::vector<std::shared_ptr<const Snapshot> > tmp_snapshots;
- const Status s = db->GetAllTimestampedSnapshots(tmp_snapshots);
- ASSERT_OK(s);
- ASSERT_EQ(snapshots1, tmp_snapshots);
- }
- // Even after released by db, the applications still hold reference to shared
- // snapshots.
- for (size_t i = 0; i < num_txns; ++i) {
- assert(snapshots[i]);
- ASSERT_EQ(start_ts + i * ts_delta, snapshots[i]->GetTimestamp());
- }
- snapshots.clear();
- ASSERT_OK(db->Close());
- delete db;
- db = nullptr;
- }
- } // namespace ROCKSDB_NAMESPACE
- int main(int argc, char** argv) {
- ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
- ::testing::InitGoogleTest(&argc, argv);
- return RUN_ALL_TESTS();
- }
|