| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723 |
- // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
- // This source code is licensed under both the GPLv2 (found in the
- // COPYING file in the root directory) and Apache 2.0 License
- // (found in the LICENSE.Apache file in the root directory).
- #ifndef ROCKSDB_LITE
- #include "utilities/transactions/pessimistic_transaction.h"
- #include <map>
- #include <set>
- #include <string>
- #include <vector>
- #include "db/column_family.h"
- #include "db/db_impl/db_impl.h"
- #include "rocksdb/comparator.h"
- #include "rocksdb/db.h"
- #include "rocksdb/snapshot.h"
- #include "rocksdb/status.h"
- #include "rocksdb/utilities/transaction_db.h"
- #include "test_util/sync_point.h"
- #include "util/cast_util.h"
- #include "util/string_util.h"
- #include "utilities/transactions/pessimistic_transaction_db.h"
- #include "utilities/transactions/transaction_util.h"
- namespace ROCKSDB_NAMESPACE {
- struct WriteOptions;
- std::atomic<TransactionID> PessimisticTransaction::txn_id_counter_(1);
- TransactionID PessimisticTransaction::GenTxnID() {
- return txn_id_counter_.fetch_add(1);
- }
- PessimisticTransaction::PessimisticTransaction(
- TransactionDB* txn_db, const WriteOptions& write_options,
- const TransactionOptions& txn_options, const bool init)
- : TransactionBaseImpl(txn_db->GetRootDB(), write_options),
- txn_db_impl_(nullptr),
- expiration_time_(0),
- txn_id_(0),
- waiting_cf_id_(0),
- waiting_key_(nullptr),
- lock_timeout_(0),
- deadlock_detect_(false),
- deadlock_detect_depth_(0),
- skip_concurrency_control_(false) {
- txn_db_impl_ =
- static_cast_with_check<PessimisticTransactionDB, TransactionDB>(txn_db);
- db_impl_ = static_cast_with_check<DBImpl, DB>(db_);
- if (init) {
- Initialize(txn_options);
- }
- }
- void PessimisticTransaction::Initialize(const TransactionOptions& txn_options) {
- txn_id_ = GenTxnID();
- txn_state_ = STARTED;
- deadlock_detect_ = txn_options.deadlock_detect;
- deadlock_detect_depth_ = txn_options.deadlock_detect_depth;
- write_batch_.SetMaxBytes(txn_options.max_write_batch_size);
- skip_concurrency_control_ = txn_options.skip_concurrency_control;
- lock_timeout_ = txn_options.lock_timeout * 1000;
- if (lock_timeout_ < 0) {
- // Lock timeout not set, use default
- lock_timeout_ =
- txn_db_impl_->GetTxnDBOptions().transaction_lock_timeout * 1000;
- }
- if (txn_options.expiration >= 0) {
- expiration_time_ = start_time_ + txn_options.expiration * 1000;
- } else {
- expiration_time_ = 0;
- }
- if (txn_options.set_snapshot) {
- SetSnapshot();
- }
- if (expiration_time_ > 0) {
- txn_db_impl_->InsertExpirableTransaction(txn_id_, this);
- }
- use_only_the_last_commit_time_batch_for_recovery_ =
- txn_options.use_only_the_last_commit_time_batch_for_recovery;
- }
- PessimisticTransaction::~PessimisticTransaction() {
- txn_db_impl_->UnLock(this, &GetTrackedKeys());
- if (expiration_time_ > 0) {
- txn_db_impl_->RemoveExpirableTransaction(txn_id_);
- }
- if (!name_.empty() && txn_state_ != COMMITED) {
- txn_db_impl_->UnregisterTransaction(this);
- }
- }
- void PessimisticTransaction::Clear() {
- txn_db_impl_->UnLock(this, &GetTrackedKeys());
- TransactionBaseImpl::Clear();
- }
- void PessimisticTransaction::Reinitialize(
- TransactionDB* txn_db, const WriteOptions& write_options,
- const TransactionOptions& txn_options) {
- if (!name_.empty() && txn_state_ != COMMITED) {
- txn_db_impl_->UnregisterTransaction(this);
- }
- TransactionBaseImpl::Reinitialize(txn_db->GetRootDB(), write_options);
- Initialize(txn_options);
- }
- bool PessimisticTransaction::IsExpired() const {
- if (expiration_time_ > 0) {
- if (db_->GetEnv()->NowMicros() >= expiration_time_) {
- // Transaction is expired.
- return true;
- }
- }
- return false;
- }
- WriteCommittedTxn::WriteCommittedTxn(TransactionDB* txn_db,
- const WriteOptions& write_options,
- const TransactionOptions& txn_options)
- : PessimisticTransaction(txn_db, write_options, txn_options){};
- Status PessimisticTransaction::CommitBatch(WriteBatch* batch) {
- TransactionKeyMap keys_to_unlock;
- Status s = LockBatch(batch, &keys_to_unlock);
- if (!s.ok()) {
- return s;
- }
- bool can_commit = false;
- if (IsExpired()) {
- s = Status::Expired();
- } else if (expiration_time_ > 0) {
- TransactionState expected = STARTED;
- can_commit = std::atomic_compare_exchange_strong(&txn_state_, &expected,
- AWAITING_COMMIT);
- } else if (txn_state_ == STARTED) {
- // lock stealing is not a concern
- can_commit = true;
- }
- if (can_commit) {
- txn_state_.store(AWAITING_COMMIT);
- s = CommitBatchInternal(batch);
- if (s.ok()) {
- txn_state_.store(COMMITED);
- }
- } else if (txn_state_ == LOCKS_STOLEN) {
- s = Status::Expired();
- } else {
- s = Status::InvalidArgument("Transaction is not in state for commit.");
- }
- txn_db_impl_->UnLock(this, &keys_to_unlock);
- return s;
- }
- Status PessimisticTransaction::Prepare() {
- Status s;
- if (name_.empty()) {
- return Status::InvalidArgument(
- "Cannot prepare a transaction that has not been named.");
- }
- if (IsExpired()) {
- return Status::Expired();
- }
- bool can_prepare = false;
- if (expiration_time_ > 0) {
- // must concern ourselves with expiraton and/or lock stealing
- // need to compare/exchange bc locks could be stolen under us here
- TransactionState expected = STARTED;
- can_prepare = std::atomic_compare_exchange_strong(&txn_state_, &expected,
- AWAITING_PREPARE);
- } else if (txn_state_ == STARTED) {
- // expiration and lock stealing is not possible
- can_prepare = true;
- }
- if (can_prepare) {
- txn_state_.store(AWAITING_PREPARE);
- // transaction can't expire after preparation
- expiration_time_ = 0;
- assert(log_number_ == 0 ||
- txn_db_impl_->GetTxnDBOptions().write_policy == WRITE_UNPREPARED);
- s = PrepareInternal();
- if (s.ok()) {
- txn_state_.store(PREPARED);
- }
- } else if (txn_state_ == LOCKS_STOLEN) {
- s = Status::Expired();
- } else if (txn_state_ == PREPARED) {
- s = Status::InvalidArgument("Transaction has already been prepared.");
- } else if (txn_state_ == COMMITED) {
- s = Status::InvalidArgument("Transaction has already been committed.");
- } else if (txn_state_ == ROLLEDBACK) {
- s = Status::InvalidArgument("Transaction has already been rolledback.");
- } else {
- s = Status::InvalidArgument("Transaction is not in state for commit.");
- }
- return s;
- }
- Status WriteCommittedTxn::PrepareInternal() {
- WriteOptions write_options = write_options_;
- write_options.disableWAL = false;
- WriteBatchInternal::MarkEndPrepare(GetWriteBatch()->GetWriteBatch(), name_);
- class MarkLogCallback : public PreReleaseCallback {
- public:
- MarkLogCallback(DBImpl* db, bool two_write_queues)
- : db_(db), two_write_queues_(two_write_queues) {
- (void)two_write_queues_; // to silence unused private field warning
- }
- virtual Status Callback(SequenceNumber, bool is_mem_disabled,
- uint64_t log_number, size_t /*index*/,
- size_t /*total*/) override {
- #ifdef NDEBUG
- (void)is_mem_disabled;
- #endif
- assert(log_number != 0);
- assert(!two_write_queues_ || is_mem_disabled); // implies the 2nd queue
- db_->logs_with_prep_tracker()->MarkLogAsContainingPrepSection(log_number);
- return Status::OK();
- }
- private:
- DBImpl* db_;
- bool two_write_queues_;
- } mark_log_callback(db_impl_,
- db_impl_->immutable_db_options().two_write_queues);
- WriteCallback* const kNoWriteCallback = nullptr;
- const uint64_t kRefNoLog = 0;
- const bool kDisableMemtable = true;
- SequenceNumber* const KIgnoreSeqUsed = nullptr;
- const size_t kNoBatchCount = 0;
- Status s = db_impl_->WriteImpl(
- write_options, GetWriteBatch()->GetWriteBatch(), kNoWriteCallback,
- &log_number_, kRefNoLog, kDisableMemtable, KIgnoreSeqUsed, kNoBatchCount,
- &mark_log_callback);
- return s;
- }
- Status PessimisticTransaction::Commit() {
- Status s;
- bool commit_without_prepare = false;
- bool commit_prepared = false;
- if (IsExpired()) {
- return Status::Expired();
- }
- if (expiration_time_ > 0) {
- // we must atomicaly compare and exchange the state here because at
- // this state in the transaction it is possible for another thread
- // to change our state out from under us in the even that we expire and have
- // our locks stolen. In this case the only valid state is STARTED because
- // a state of PREPARED would have a cleared expiration_time_.
- TransactionState expected = STARTED;
- commit_without_prepare = std::atomic_compare_exchange_strong(
- &txn_state_, &expected, AWAITING_COMMIT);
- TEST_SYNC_POINT("TransactionTest::ExpirableTransactionDataRace:1");
- } else if (txn_state_ == PREPARED) {
- // expiration and lock stealing is not a concern
- commit_prepared = true;
- } else if (txn_state_ == STARTED) {
- // expiration and lock stealing is not a concern
- commit_without_prepare = true;
- // TODO(myabandeh): what if the user mistakenly forgets prepare? We should
- // add an option so that the user explictly express the intention of
- // skipping the prepare phase.
- }
- if (commit_without_prepare) {
- assert(!commit_prepared);
- if (WriteBatchInternal::Count(GetCommitTimeWriteBatch()) > 0) {
- s = Status::InvalidArgument(
- "Commit-time batch contains values that will not be committed.");
- } else {
- txn_state_.store(AWAITING_COMMIT);
- if (log_number_ > 0) {
- dbimpl_->logs_with_prep_tracker()->MarkLogAsHavingPrepSectionFlushed(
- log_number_);
- }
- s = CommitWithoutPrepareInternal();
- if (!name_.empty()) {
- txn_db_impl_->UnregisterTransaction(this);
- }
- Clear();
- if (s.ok()) {
- txn_state_.store(COMMITED);
- }
- }
- } else if (commit_prepared) {
- txn_state_.store(AWAITING_COMMIT);
- s = CommitInternal();
- if (!s.ok()) {
- ROCKS_LOG_WARN(db_impl_->immutable_db_options().info_log,
- "Commit write failed");
- return s;
- }
- // FindObsoleteFiles must now look to the memtables
- // to determine what prep logs must be kept around,
- // not the prep section heap.
- assert(log_number_ > 0);
- dbimpl_->logs_with_prep_tracker()->MarkLogAsHavingPrepSectionFlushed(
- log_number_);
- txn_db_impl_->UnregisterTransaction(this);
- Clear();
- txn_state_.store(COMMITED);
- } else if (txn_state_ == LOCKS_STOLEN) {
- s = Status::Expired();
- } else if (txn_state_ == COMMITED) {
- s = Status::InvalidArgument("Transaction has already been committed.");
- } else if (txn_state_ == ROLLEDBACK) {
- s = Status::InvalidArgument("Transaction has already been rolledback.");
- } else {
- s = Status::InvalidArgument("Transaction is not in state for commit.");
- }
- return s;
- }
- Status WriteCommittedTxn::CommitWithoutPrepareInternal() {
- uint64_t seq_used = kMaxSequenceNumber;
- auto s =
- db_impl_->WriteImpl(write_options_, GetWriteBatch()->GetWriteBatch(),
- /*callback*/ nullptr, /*log_used*/ nullptr,
- /*log_ref*/ 0, /*disable_memtable*/ false, &seq_used);
- assert(!s.ok() || seq_used != kMaxSequenceNumber);
- if (s.ok()) {
- SetId(seq_used);
- }
- return s;
- }
- Status WriteCommittedTxn::CommitBatchInternal(WriteBatch* batch, size_t) {
- uint64_t seq_used = kMaxSequenceNumber;
- auto s = db_impl_->WriteImpl(write_options_, batch, /*callback*/ nullptr,
- /*log_used*/ nullptr, /*log_ref*/ 0,
- /*disable_memtable*/ false, &seq_used);
- assert(!s.ok() || seq_used != kMaxSequenceNumber);
- if (s.ok()) {
- SetId(seq_used);
- }
- return s;
- }
- Status WriteCommittedTxn::CommitInternal() {
- // We take the commit-time batch and append the Commit marker.
- // The Memtable will ignore the Commit marker in non-recovery mode
- WriteBatch* working_batch = GetCommitTimeWriteBatch();
- WriteBatchInternal::MarkCommit(working_batch, name_);
- // any operations appended to this working_batch will be ignored from WAL
- working_batch->MarkWalTerminationPoint();
- // insert prepared batch into Memtable only skipping WAL.
- // Memtable will ignore BeginPrepare/EndPrepare markers
- // in non recovery mode and simply insert the values
- WriteBatchInternal::Append(working_batch, GetWriteBatch()->GetWriteBatch());
- uint64_t seq_used = kMaxSequenceNumber;
- auto s =
- db_impl_->WriteImpl(write_options_, working_batch, /*callback*/ nullptr,
- /*log_used*/ nullptr, /*log_ref*/ log_number_,
- /*disable_memtable*/ false, &seq_used);
- assert(!s.ok() || seq_used != kMaxSequenceNumber);
- if (s.ok()) {
- SetId(seq_used);
- }
- return s;
- }
- Status PessimisticTransaction::Rollback() {
- Status s;
- if (txn_state_ == PREPARED) {
- txn_state_.store(AWAITING_ROLLBACK);
- s = RollbackInternal();
- if (s.ok()) {
- // we do not need to keep our prepared section around
- assert(log_number_ > 0);
- dbimpl_->logs_with_prep_tracker()->MarkLogAsHavingPrepSectionFlushed(
- log_number_);
- Clear();
- txn_state_.store(ROLLEDBACK);
- }
- } else if (txn_state_ == STARTED) {
- if (log_number_ > 0) {
- assert(txn_db_impl_->GetTxnDBOptions().write_policy == WRITE_UNPREPARED);
- assert(GetId() > 0);
- s = RollbackInternal();
- if (s.ok()) {
- dbimpl_->logs_with_prep_tracker()->MarkLogAsHavingPrepSectionFlushed(
- log_number_);
- }
- }
- // prepare couldn't have taken place
- Clear();
- } else if (txn_state_ == COMMITED) {
- s = Status::InvalidArgument("This transaction has already been committed.");
- } else {
- s = Status::InvalidArgument(
- "Two phase transaction is not in state for rollback.");
- }
- return s;
- }
- Status WriteCommittedTxn::RollbackInternal() {
- WriteBatch rollback_marker;
- WriteBatchInternal::MarkRollback(&rollback_marker, name_);
- auto s = db_impl_->WriteImpl(write_options_, &rollback_marker);
- return s;
- }
- Status PessimisticTransaction::RollbackToSavePoint() {
- if (txn_state_ != STARTED) {
- return Status::InvalidArgument("Transaction is beyond state for rollback.");
- }
- // Unlock any keys locked since last transaction
- const std::unique_ptr<TransactionKeyMap>& keys =
- GetTrackedKeysSinceSavePoint();
- if (keys) {
- txn_db_impl_->UnLock(this, keys.get());
- }
- return TransactionBaseImpl::RollbackToSavePoint();
- }
- // Lock all keys in this batch.
- // On success, caller should unlock keys_to_unlock
- Status PessimisticTransaction::LockBatch(WriteBatch* batch,
- TransactionKeyMap* keys_to_unlock) {
- class Handler : public WriteBatch::Handler {
- public:
- // Sorted map of column_family_id to sorted set of keys.
- // Since LockBatch() always locks keys in sorted order, it cannot deadlock
- // with itself. We're not using a comparator here since it doesn't matter
- // what the sorting is as long as it's consistent.
- std::map<uint32_t, std::set<std::string>> keys_;
- Handler() {}
- void RecordKey(uint32_t column_family_id, const Slice& key) {
- std::string key_str = key.ToString();
- auto& cfh_keys = keys_[column_family_id];
- auto iter = cfh_keys.find(key_str);
- if (iter == cfh_keys.end()) {
- // key not yet seen, store it.
- cfh_keys.insert({std::move(key_str)});
- }
- }
- Status PutCF(uint32_t column_family_id, const Slice& key,
- const Slice& /* unused */) override {
- RecordKey(column_family_id, key);
- return Status::OK();
- }
- Status MergeCF(uint32_t column_family_id, const Slice& key,
- const Slice& /* unused */) override {
- RecordKey(column_family_id, key);
- return Status::OK();
- }
- Status DeleteCF(uint32_t column_family_id, const Slice& key) override {
- RecordKey(column_family_id, key);
- return Status::OK();
- }
- };
- // Iterating on this handler will add all keys in this batch into keys
- Handler handler;
- batch->Iterate(&handler);
- Status s;
- // Attempt to lock all keys
- for (const auto& cf_iter : handler.keys_) {
- uint32_t cfh_id = cf_iter.first;
- auto& cfh_keys = cf_iter.second;
- for (const auto& key_iter : cfh_keys) {
- const std::string& key = key_iter;
- s = txn_db_impl_->TryLock(this, cfh_id, key, true /* exclusive */);
- if (!s.ok()) {
- break;
- }
- TrackKey(keys_to_unlock, cfh_id, std::move(key), kMaxSequenceNumber,
- false, true /* exclusive */);
- }
- if (!s.ok()) {
- break;
- }
- }
- if (!s.ok()) {
- txn_db_impl_->UnLock(this, keys_to_unlock);
- }
- return s;
- }
- // Attempt to lock this key.
- // Returns OK if the key has been successfully locked. Non-ok, otherwise.
- // If check_shapshot is true and this transaction has a snapshot set,
- // this key will only be locked if there have been no writes to this key since
- // the snapshot time.
- Status PessimisticTransaction::TryLock(ColumnFamilyHandle* column_family,
- const Slice& key, bool read_only,
- bool exclusive, const bool do_validate,
- const bool assume_tracked) {
- assert(!assume_tracked || !do_validate);
- Status s;
- if (UNLIKELY(skip_concurrency_control_)) {
- return s;
- }
- uint32_t cfh_id = GetColumnFamilyID(column_family);
- std::string key_str = key.ToString();
- bool previously_locked;
- bool lock_upgrade = false;
- // lock this key if this transactions hasn't already locked it
- SequenceNumber tracked_at_seq = kMaxSequenceNumber;
- const auto& tracked_keys = GetTrackedKeys();
- const auto tracked_keys_cf = tracked_keys.find(cfh_id);
- if (tracked_keys_cf == tracked_keys.end()) {
- previously_locked = false;
- } else {
- auto iter = tracked_keys_cf->second.find(key_str);
- if (iter == tracked_keys_cf->second.end()) {
- previously_locked = false;
- } else {
- if (!iter->second.exclusive && exclusive) {
- lock_upgrade = true;
- }
- previously_locked = true;
- tracked_at_seq = iter->second.seq;
- }
- }
- // Lock this key if this transactions hasn't already locked it or we require
- // an upgrade.
- if (!previously_locked || lock_upgrade) {
- s = txn_db_impl_->TryLock(this, cfh_id, key_str, exclusive);
- }
- SetSnapshotIfNeeded();
- // Even though we do not care about doing conflict checking for this write,
- // we still need to take a lock to make sure we do not cause a conflict with
- // some other write. However, we do not need to check if there have been
- // any writes since this transaction's snapshot.
- // TODO(agiardullo): could optimize by supporting shared txn locks in the
- // future
- if (!do_validate || snapshot_ == nullptr) {
- if (assume_tracked && !previously_locked) {
- s = Status::InvalidArgument(
- "assume_tracked is set but it is not tracked yet");
- }
- // Need to remember the earliest sequence number that we know that this
- // key has not been modified after. This is useful if this same
- // transaction
- // later tries to lock this key again.
- if (tracked_at_seq == kMaxSequenceNumber) {
- // Since we haven't checked a snapshot, we only know this key has not
- // been modified since after we locked it.
- // Note: when last_seq_same_as_publish_seq_==false this is less than the
- // latest allocated seq but it is ok since i) this is just a heuristic
- // used only as a hint to avoid actual check for conflicts, ii) this would
- // cause a false positive only if the snapthot is taken right after the
- // lock, which would be an unusual sequence.
- tracked_at_seq = db_->GetLatestSequenceNumber();
- }
- } else {
- // If a snapshot is set, we need to make sure the key hasn't been modified
- // since the snapshot. This must be done after we locked the key.
- // If we already have validated an earilier snapshot it must has been
- // reflected in tracked_at_seq and ValidateSnapshot will return OK.
- if (s.ok()) {
- s = ValidateSnapshot(column_family, key, &tracked_at_seq);
- if (!s.ok()) {
- // Failed to validate key
- if (!previously_locked) {
- // Unlock key we just locked
- if (lock_upgrade) {
- s = txn_db_impl_->TryLock(this, cfh_id, key_str,
- false /* exclusive */);
- assert(s.ok());
- } else {
- txn_db_impl_->UnLock(this, cfh_id, key.ToString());
- }
- }
- }
- }
- }
- if (s.ok()) {
- // We must track all the locked keys so that we can unlock them later. If
- // the key is already locked, this func will update some stats on the
- // tracked key. It could also update the tracked_at_seq if it is lower
- // than the existing tracked key seq. These stats are necessary for
- // RollbackToSavePoint to determine whether a key can be safely removed
- // from tracked_keys_. Removal can only be done if a key was only locked
- // during the current savepoint.
- //
- // Recall that if assume_tracked is true, we assume that TrackKey has been
- // called previously since the last savepoint, with the same exclusive
- // setting, and at a lower sequence number, so skipping here should be
- // safe.
- if (!assume_tracked) {
- TrackKey(cfh_id, key_str, tracked_at_seq, read_only, exclusive);
- } else {
- #ifndef NDEBUG
- assert(tracked_keys_cf->second.count(key_str) > 0);
- const auto& info = tracked_keys_cf->second.find(key_str)->second;
- assert(info.seq <= tracked_at_seq);
- assert(info.exclusive == exclusive);
- #endif
- }
- }
- return s;
- }
- // Return OK() if this key has not been modified more recently than the
- // transaction snapshot_.
- // tracked_at_seq is the global seq at which we either locked the key or already
- // have done ValidateSnapshot.
- Status PessimisticTransaction::ValidateSnapshot(
- ColumnFamilyHandle* column_family, const Slice& key,
- SequenceNumber* tracked_at_seq) {
- assert(snapshot_);
- SequenceNumber snap_seq = snapshot_->GetSequenceNumber();
- if (*tracked_at_seq <= snap_seq) {
- // If the key has been previous validated (or locked) at a sequence number
- // earlier than the current snapshot's sequence number, we already know it
- // has not been modified aftter snap_seq either.
- return Status::OK();
- }
- // Otherwise we have either
- // 1: tracked_at_seq == kMaxSequenceNumber, i.e., first time tracking the key
- // 2: snap_seq < tracked_at_seq: last time we lock the key was via
- // do_validate=false which means we had skipped ValidateSnapshot. In both
- // cases we should do ValidateSnapshot now.
- *tracked_at_seq = snap_seq;
- ColumnFamilyHandle* cfh =
- column_family ? column_family : db_impl_->DefaultColumnFamily();
- return TransactionUtil::CheckKeyForConflicts(
- db_impl_, cfh, key.ToString(), snap_seq, false /* cache_only */);
- }
- bool PessimisticTransaction::TryStealingLocks() {
- assert(IsExpired());
- TransactionState expected = STARTED;
- return std::atomic_compare_exchange_strong(&txn_state_, &expected,
- LOCKS_STOLEN);
- }
- void PessimisticTransaction::UnlockGetForUpdate(
- ColumnFamilyHandle* column_family, const Slice& key) {
- txn_db_impl_->UnLock(this, GetColumnFamilyID(column_family), key.ToString());
- }
- Status PessimisticTransaction::SetName(const TransactionName& name) {
- Status s;
- if (txn_state_ == STARTED) {
- if (name_.length()) {
- s = Status::InvalidArgument("Transaction has already been named.");
- } else if (txn_db_impl_->GetTransactionByName(name) != nullptr) {
- s = Status::InvalidArgument("Transaction name must be unique.");
- } else if (name.length() < 1 || name.length() > 512) {
- s = Status::InvalidArgument(
- "Transaction name length must be between 1 and 512 chars.");
- } else {
- name_ = name;
- txn_db_impl_->RegisterTransaction(this);
- }
- } else {
- s = Status::InvalidArgument("Transaction is beyond state for naming.");
- }
- return s;
- }
- } // namespace ROCKSDB_NAMESPACE
- #endif // ROCKSDB_LITE
|