| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723 | //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.//  This source code is licensed under both the GPLv2 (found in the//  COPYING file in the root directory) and Apache 2.0 License//  (found in the LICENSE.Apache file in the root directory).#ifndef ROCKSDB_LITE#include "utilities/transactions/pessimistic_transaction.h"#include <map>#include <set>#include <string>#include <vector>#include "db/column_family.h"#include "db/db_impl/db_impl.h"#include "rocksdb/comparator.h"#include "rocksdb/db.h"#include "rocksdb/snapshot.h"#include "rocksdb/status.h"#include "rocksdb/utilities/transaction_db.h"#include "test_util/sync_point.h"#include "util/cast_util.h"#include "util/string_util.h"#include "utilities/transactions/pessimistic_transaction_db.h"#include "utilities/transactions/transaction_util.h"namespace ROCKSDB_NAMESPACE {struct WriteOptions;std::atomic<TransactionID> PessimisticTransaction::txn_id_counter_(1);TransactionID PessimisticTransaction::GenTxnID() {  return txn_id_counter_.fetch_add(1);}PessimisticTransaction::PessimisticTransaction(    TransactionDB* txn_db, const WriteOptions& write_options,    const TransactionOptions& txn_options, const bool init)    : TransactionBaseImpl(txn_db->GetRootDB(), write_options),      txn_db_impl_(nullptr),      expiration_time_(0),      txn_id_(0),      waiting_cf_id_(0),      waiting_key_(nullptr),      lock_timeout_(0),      deadlock_detect_(false),      deadlock_detect_depth_(0),      skip_concurrency_control_(false) {  txn_db_impl_ =      static_cast_with_check<PessimisticTransactionDB, TransactionDB>(txn_db);  db_impl_ = static_cast_with_check<DBImpl, DB>(db_);  if (init) {    Initialize(txn_options);  }}void PessimisticTransaction::Initialize(const TransactionOptions& txn_options) {  txn_id_ = GenTxnID();  txn_state_ = STARTED;  deadlock_detect_ = txn_options.deadlock_detect;  deadlock_detect_depth_ = txn_options.deadlock_detect_depth;  write_batch_.SetMaxBytes(txn_options.max_write_batch_size);  skip_concurrency_control_ = txn_options.skip_concurrency_control;  lock_timeout_ = txn_options.lock_timeout * 1000;  if (lock_timeout_ < 0) {    // Lock timeout not set, use default    lock_timeout_ =        txn_db_impl_->GetTxnDBOptions().transaction_lock_timeout * 1000;  }  if (txn_options.expiration >= 0) {    expiration_time_ = start_time_ + txn_options.expiration * 1000;  } else {    expiration_time_ = 0;  }  if (txn_options.set_snapshot) {    SetSnapshot();  }  if (expiration_time_ > 0) {    txn_db_impl_->InsertExpirableTransaction(txn_id_, this);  }  use_only_the_last_commit_time_batch_for_recovery_ =      txn_options.use_only_the_last_commit_time_batch_for_recovery;}PessimisticTransaction::~PessimisticTransaction() {  txn_db_impl_->UnLock(this, &GetTrackedKeys());  if (expiration_time_ > 0) {    txn_db_impl_->RemoveExpirableTransaction(txn_id_);  }  if (!name_.empty() && txn_state_ != COMMITED) {    txn_db_impl_->UnregisterTransaction(this);  }}void PessimisticTransaction::Clear() {  txn_db_impl_->UnLock(this, &GetTrackedKeys());  TransactionBaseImpl::Clear();}void PessimisticTransaction::Reinitialize(    TransactionDB* txn_db, const WriteOptions& write_options,    const TransactionOptions& txn_options) {  if (!name_.empty() && txn_state_ != COMMITED) {    txn_db_impl_->UnregisterTransaction(this);  }  TransactionBaseImpl::Reinitialize(txn_db->GetRootDB(), write_options);  Initialize(txn_options);}bool PessimisticTransaction::IsExpired() const {  if (expiration_time_ > 0) {    if (db_->GetEnv()->NowMicros() >= expiration_time_) {      // Transaction is expired.      return true;    }  }  return false;}WriteCommittedTxn::WriteCommittedTxn(TransactionDB* txn_db,                                     const WriteOptions& write_options,                                     const TransactionOptions& txn_options)    : PessimisticTransaction(txn_db, write_options, txn_options){};Status PessimisticTransaction::CommitBatch(WriteBatch* batch) {  TransactionKeyMap keys_to_unlock;  Status s = LockBatch(batch, &keys_to_unlock);  if (!s.ok()) {    return s;  }  bool can_commit = false;  if (IsExpired()) {    s = Status::Expired();  } else if (expiration_time_ > 0) {    TransactionState expected = STARTED;    can_commit = std::atomic_compare_exchange_strong(&txn_state_, &expected,                                                     AWAITING_COMMIT);  } else if (txn_state_ == STARTED) {    // lock stealing is not a concern    can_commit = true;  }  if (can_commit) {    txn_state_.store(AWAITING_COMMIT);    s = CommitBatchInternal(batch);    if (s.ok()) {      txn_state_.store(COMMITED);    }  } else if (txn_state_ == LOCKS_STOLEN) {    s = Status::Expired();  } else {    s = Status::InvalidArgument("Transaction is not in state for commit.");  }  txn_db_impl_->UnLock(this, &keys_to_unlock);  return s;}Status PessimisticTransaction::Prepare() {  Status s;  if (name_.empty()) {    return Status::InvalidArgument(        "Cannot prepare a transaction that has not been named.");  }  if (IsExpired()) {    return Status::Expired();  }  bool can_prepare = false;  if (expiration_time_ > 0) {    // must concern ourselves with expiraton and/or lock stealing    // need to compare/exchange bc locks could be stolen under us here    TransactionState expected = STARTED;    can_prepare = std::atomic_compare_exchange_strong(&txn_state_, &expected,                                                      AWAITING_PREPARE);  } else if (txn_state_ == STARTED) {    // expiration and lock stealing is not possible    can_prepare = true;  }  if (can_prepare) {    txn_state_.store(AWAITING_PREPARE);    // transaction can't expire after preparation    expiration_time_ = 0;    assert(log_number_ == 0 ||           txn_db_impl_->GetTxnDBOptions().write_policy == WRITE_UNPREPARED);    s = PrepareInternal();    if (s.ok()) {      txn_state_.store(PREPARED);    }  } else if (txn_state_ == LOCKS_STOLEN) {    s = Status::Expired();  } else if (txn_state_ == PREPARED) {    s = Status::InvalidArgument("Transaction has already been prepared.");  } else if (txn_state_ == COMMITED) {    s = Status::InvalidArgument("Transaction has already been committed.");  } else if (txn_state_ == ROLLEDBACK) {    s = Status::InvalidArgument("Transaction has already been rolledback.");  } else {    s = Status::InvalidArgument("Transaction is not in state for commit.");  }  return s;}Status WriteCommittedTxn::PrepareInternal() {  WriteOptions write_options = write_options_;  write_options.disableWAL = false;  WriteBatchInternal::MarkEndPrepare(GetWriteBatch()->GetWriteBatch(), name_);  class MarkLogCallback : public PreReleaseCallback {   public:    MarkLogCallback(DBImpl* db, bool two_write_queues)        : db_(db), two_write_queues_(two_write_queues) {      (void)two_write_queues_;  // to silence unused private field warning    }    virtual Status Callback(SequenceNumber, bool is_mem_disabled,                            uint64_t log_number, size_t /*index*/,                            size_t /*total*/) override {#ifdef NDEBUG      (void)is_mem_disabled;#endif      assert(log_number != 0);      assert(!two_write_queues_ || is_mem_disabled);  // implies the 2nd queue      db_->logs_with_prep_tracker()->MarkLogAsContainingPrepSection(log_number);      return Status::OK();    }   private:    DBImpl* db_;    bool two_write_queues_;  } mark_log_callback(db_impl_,                      db_impl_->immutable_db_options().two_write_queues);  WriteCallback* const kNoWriteCallback = nullptr;  const uint64_t kRefNoLog = 0;  const bool kDisableMemtable = true;  SequenceNumber* const KIgnoreSeqUsed = nullptr;  const size_t kNoBatchCount = 0;  Status s = db_impl_->WriteImpl(      write_options, GetWriteBatch()->GetWriteBatch(), kNoWriteCallback,      &log_number_, kRefNoLog, kDisableMemtable, KIgnoreSeqUsed, kNoBatchCount,      &mark_log_callback);  return s;}Status PessimisticTransaction::Commit() {  Status s;  bool commit_without_prepare = false;  bool commit_prepared = false;  if (IsExpired()) {    return Status::Expired();  }  if (expiration_time_ > 0) {    // we must atomicaly compare and exchange the state here because at    // this state in the transaction it is possible for another thread    // to change our state out from under us in the even that we expire and have    // our locks stolen. In this case the only valid state is STARTED because    // a state of PREPARED would have a cleared expiration_time_.    TransactionState expected = STARTED;    commit_without_prepare = std::atomic_compare_exchange_strong(        &txn_state_, &expected, AWAITING_COMMIT);    TEST_SYNC_POINT("TransactionTest::ExpirableTransactionDataRace:1");  } else if (txn_state_ == PREPARED) {    // expiration and lock stealing is not a concern    commit_prepared = true;  } else if (txn_state_ == STARTED) {    // expiration and lock stealing is not a concern    commit_without_prepare = true;    // TODO(myabandeh): what if the user mistakenly forgets prepare? We should    // add an option so that the user explictly express the intention of    // skipping the prepare phase.  }  if (commit_without_prepare) {    assert(!commit_prepared);    if (WriteBatchInternal::Count(GetCommitTimeWriteBatch()) > 0) {      s = Status::InvalidArgument(          "Commit-time batch contains values that will not be committed.");    } else {      txn_state_.store(AWAITING_COMMIT);      if (log_number_ > 0) {        dbimpl_->logs_with_prep_tracker()->MarkLogAsHavingPrepSectionFlushed(            log_number_);      }      s = CommitWithoutPrepareInternal();      if (!name_.empty()) {        txn_db_impl_->UnregisterTransaction(this);      }      Clear();      if (s.ok()) {        txn_state_.store(COMMITED);      }    }  } else if (commit_prepared) {    txn_state_.store(AWAITING_COMMIT);    s = CommitInternal();    if (!s.ok()) {      ROCKS_LOG_WARN(db_impl_->immutable_db_options().info_log,                     "Commit write failed");      return s;    }    // FindObsoleteFiles must now look to the memtables    // to determine what prep logs must be kept around,    // not the prep section heap.    assert(log_number_ > 0);    dbimpl_->logs_with_prep_tracker()->MarkLogAsHavingPrepSectionFlushed(        log_number_);    txn_db_impl_->UnregisterTransaction(this);    Clear();    txn_state_.store(COMMITED);  } else if (txn_state_ == LOCKS_STOLEN) {    s = Status::Expired();  } else if (txn_state_ == COMMITED) {    s = Status::InvalidArgument("Transaction has already been committed.");  } else if (txn_state_ == ROLLEDBACK) {    s = Status::InvalidArgument("Transaction has already been rolledback.");  } else {    s = Status::InvalidArgument("Transaction is not in state for commit.");  }  return s;}Status WriteCommittedTxn::CommitWithoutPrepareInternal() {  uint64_t seq_used = kMaxSequenceNumber;  auto s =      db_impl_->WriteImpl(write_options_, GetWriteBatch()->GetWriteBatch(),                          /*callback*/ nullptr, /*log_used*/ nullptr,                          /*log_ref*/ 0, /*disable_memtable*/ false, &seq_used);  assert(!s.ok() || seq_used != kMaxSequenceNumber);  if (s.ok()) {    SetId(seq_used);  }  return s;}Status WriteCommittedTxn::CommitBatchInternal(WriteBatch* batch, size_t) {  uint64_t seq_used = kMaxSequenceNumber;  auto s = db_impl_->WriteImpl(write_options_, batch, /*callback*/ nullptr,                               /*log_used*/ nullptr, /*log_ref*/ 0,                               /*disable_memtable*/ false, &seq_used);  assert(!s.ok() || seq_used != kMaxSequenceNumber);  if (s.ok()) {    SetId(seq_used);  }  return s;}Status WriteCommittedTxn::CommitInternal() {  // We take the commit-time batch and append the Commit marker.  // The Memtable will ignore the Commit marker in non-recovery mode  WriteBatch* working_batch = GetCommitTimeWriteBatch();  WriteBatchInternal::MarkCommit(working_batch, name_);  // any operations appended to this working_batch will be ignored from WAL  working_batch->MarkWalTerminationPoint();  // insert prepared batch into Memtable only skipping WAL.  // Memtable will ignore BeginPrepare/EndPrepare markers  // in non recovery mode and simply insert the values  WriteBatchInternal::Append(working_batch, GetWriteBatch()->GetWriteBatch());  uint64_t seq_used = kMaxSequenceNumber;  auto s =      db_impl_->WriteImpl(write_options_, working_batch, /*callback*/ nullptr,                          /*log_used*/ nullptr, /*log_ref*/ log_number_,                          /*disable_memtable*/ false, &seq_used);    assert(!s.ok() || seq_used != kMaxSequenceNumber);  if (s.ok()) {    SetId(seq_used);  }  return s;}Status PessimisticTransaction::Rollback() {  Status s;  if (txn_state_ == PREPARED) {    txn_state_.store(AWAITING_ROLLBACK);    s = RollbackInternal();    if (s.ok()) {      // we do not need to keep our prepared section around      assert(log_number_ > 0);      dbimpl_->logs_with_prep_tracker()->MarkLogAsHavingPrepSectionFlushed(          log_number_);      Clear();      txn_state_.store(ROLLEDBACK);    }  } else if (txn_state_ == STARTED) {    if (log_number_ > 0) {      assert(txn_db_impl_->GetTxnDBOptions().write_policy == WRITE_UNPREPARED);      assert(GetId() > 0);      s = RollbackInternal();      if (s.ok()) {        dbimpl_->logs_with_prep_tracker()->MarkLogAsHavingPrepSectionFlushed(            log_number_);      }    }    // prepare couldn't have taken place    Clear();  } else if (txn_state_ == COMMITED) {    s = Status::InvalidArgument("This transaction has already been committed.");  } else {    s = Status::InvalidArgument(        "Two phase transaction is not in state for rollback.");  }  return s;}Status WriteCommittedTxn::RollbackInternal() {  WriteBatch rollback_marker;  WriteBatchInternal::MarkRollback(&rollback_marker, name_);  auto s = db_impl_->WriteImpl(write_options_, &rollback_marker);  return s;}Status PessimisticTransaction::RollbackToSavePoint() {  if (txn_state_ != STARTED) {    return Status::InvalidArgument("Transaction is beyond state for rollback.");  }  // Unlock any keys locked since last transaction  const std::unique_ptr<TransactionKeyMap>& keys =      GetTrackedKeysSinceSavePoint();  if (keys) {    txn_db_impl_->UnLock(this, keys.get());  }  return TransactionBaseImpl::RollbackToSavePoint();}// Lock all keys in this batch.// On success, caller should unlock keys_to_unlockStatus PessimisticTransaction::LockBatch(WriteBatch* batch,                                         TransactionKeyMap* keys_to_unlock) {  class Handler : public WriteBatch::Handler {   public:    // Sorted map of column_family_id to sorted set of keys.    // Since LockBatch() always locks keys in sorted order, it cannot deadlock    // with itself.  We're not using a comparator here since it doesn't matter    // what the sorting is as long as it's consistent.    std::map<uint32_t, std::set<std::string>> keys_;    Handler() {}    void RecordKey(uint32_t column_family_id, const Slice& key) {      std::string key_str = key.ToString();      auto& cfh_keys = keys_[column_family_id];      auto iter = cfh_keys.find(key_str);      if (iter == cfh_keys.end()) {        // key not yet seen, store it.        cfh_keys.insert({std::move(key_str)});      }    }    Status PutCF(uint32_t column_family_id, const Slice& key,                 const Slice& /* unused */) override {      RecordKey(column_family_id, key);      return Status::OK();    }    Status MergeCF(uint32_t column_family_id, const Slice& key,                   const Slice& /* unused */) override {      RecordKey(column_family_id, key);      return Status::OK();    }    Status DeleteCF(uint32_t column_family_id, const Slice& key) override {      RecordKey(column_family_id, key);      return Status::OK();    }  };  // Iterating on this handler will add all keys in this batch into keys  Handler handler;  batch->Iterate(&handler);  Status s;  // Attempt to lock all keys  for (const auto& cf_iter : handler.keys_) {    uint32_t cfh_id = cf_iter.first;    auto& cfh_keys = cf_iter.second;    for (const auto& key_iter : cfh_keys) {      const std::string& key = key_iter;      s = txn_db_impl_->TryLock(this, cfh_id, key, true /* exclusive */);      if (!s.ok()) {        break;      }      TrackKey(keys_to_unlock, cfh_id, std::move(key), kMaxSequenceNumber,               false, true /* exclusive */);    }    if (!s.ok()) {      break;    }  }  if (!s.ok()) {    txn_db_impl_->UnLock(this, keys_to_unlock);  }  return s;}// Attempt to lock this key.// Returns OK if the key has been successfully locked.  Non-ok, otherwise.// If check_shapshot is true and this transaction has a snapshot set,// this key will only be locked if there have been no writes to this key since// the snapshot time.Status PessimisticTransaction::TryLock(ColumnFamilyHandle* column_family,                                       const Slice& key, bool read_only,                                       bool exclusive, const bool do_validate,                                       const bool assume_tracked) {  assert(!assume_tracked || !do_validate);  Status s;  if (UNLIKELY(skip_concurrency_control_)) {    return s;  }  uint32_t cfh_id = GetColumnFamilyID(column_family);  std::string key_str = key.ToString();  bool previously_locked;  bool lock_upgrade = false;  // lock this key if this transactions hasn't already locked it  SequenceNumber tracked_at_seq = kMaxSequenceNumber;  const auto& tracked_keys = GetTrackedKeys();  const auto tracked_keys_cf = tracked_keys.find(cfh_id);  if (tracked_keys_cf == tracked_keys.end()) {    previously_locked = false;  } else {    auto iter = tracked_keys_cf->second.find(key_str);    if (iter == tracked_keys_cf->second.end()) {      previously_locked = false;    } else {      if (!iter->second.exclusive && exclusive) {        lock_upgrade = true;      }      previously_locked = true;      tracked_at_seq = iter->second.seq;    }  }  // Lock this key if this transactions hasn't already locked it or we require  // an upgrade.  if (!previously_locked || lock_upgrade) {    s = txn_db_impl_->TryLock(this, cfh_id, key_str, exclusive);  }  SetSnapshotIfNeeded();  // Even though we do not care about doing conflict checking for this write,  // we still need to take a lock to make sure we do not cause a conflict with  // some other write.  However, we do not need to check if there have been  // any writes since this transaction's snapshot.  // TODO(agiardullo): could optimize by supporting shared txn locks in the  // future  if (!do_validate || snapshot_ == nullptr) {    if (assume_tracked && !previously_locked) {      s = Status::InvalidArgument(          "assume_tracked is set but it is not tracked yet");    }    // Need to remember the earliest sequence number that we know that this    // key has not been modified after.  This is useful if this same    // transaction    // later tries to lock this key again.    if (tracked_at_seq == kMaxSequenceNumber) {      // Since we haven't checked a snapshot, we only know this key has not      // been modified since after we locked it.      // Note: when last_seq_same_as_publish_seq_==false this is less than the      // latest allocated seq but it is ok since i) this is just a heuristic      // used only as a hint to avoid actual check for conflicts, ii) this would      // cause a false positive only if the snapthot is taken right after the      // lock, which would be an unusual sequence.      tracked_at_seq = db_->GetLatestSequenceNumber();    }  } else {    // If a snapshot is set, we need to make sure the key hasn't been modified    // since the snapshot.  This must be done after we locked the key.    // If we already have validated an earilier snapshot it must has been    // reflected in tracked_at_seq and ValidateSnapshot will return OK.    if (s.ok()) {      s = ValidateSnapshot(column_family, key, &tracked_at_seq);      if (!s.ok()) {        // Failed to validate key        if (!previously_locked) {          // Unlock key we just locked          if (lock_upgrade) {            s = txn_db_impl_->TryLock(this, cfh_id, key_str,                                      false /* exclusive */);            assert(s.ok());          } else {            txn_db_impl_->UnLock(this, cfh_id, key.ToString());          }        }      }    }  }  if (s.ok()) {    // We must track all the locked keys so that we can unlock them later. If    // the key is already locked, this func will update some stats on the    // tracked key. It could also update the tracked_at_seq if it is lower    // than the existing tracked key seq. These stats are necessary for    // RollbackToSavePoint to determine whether a key can be safely removed    // from tracked_keys_. Removal can only be done if a key was only locked    // during the current savepoint.    //    // Recall that if assume_tracked is true, we assume that TrackKey has been    // called previously since the last savepoint, with the same exclusive    // setting, and at a lower sequence number, so skipping here should be    // safe.    if (!assume_tracked) {      TrackKey(cfh_id, key_str, tracked_at_seq, read_only, exclusive);    } else {#ifndef NDEBUG      assert(tracked_keys_cf->second.count(key_str) > 0);      const auto& info = tracked_keys_cf->second.find(key_str)->second;      assert(info.seq <= tracked_at_seq);      assert(info.exclusive == exclusive);#endif    }  }  return s;}// Return OK() if this key has not been modified more recently than the// transaction snapshot_.// tracked_at_seq is the global seq at which we either locked the key or already// have done ValidateSnapshot.Status PessimisticTransaction::ValidateSnapshot(    ColumnFamilyHandle* column_family, const Slice& key,    SequenceNumber* tracked_at_seq) {  assert(snapshot_);  SequenceNumber snap_seq = snapshot_->GetSequenceNumber();  if (*tracked_at_seq <= snap_seq) {    // If the key has been previous validated (or locked) at a sequence number    // earlier than the current snapshot's sequence number, we already know it    // has not been modified aftter snap_seq either.    return Status::OK();  }  // Otherwise we have either  // 1: tracked_at_seq == kMaxSequenceNumber, i.e., first time tracking the key  // 2: snap_seq < tracked_at_seq: last time we lock the key was via  // do_validate=false which means we had skipped ValidateSnapshot. In both  // cases we should do ValidateSnapshot now.  *tracked_at_seq = snap_seq;  ColumnFamilyHandle* cfh =      column_family ? column_family : db_impl_->DefaultColumnFamily();  return TransactionUtil::CheckKeyForConflicts(      db_impl_, cfh, key.ToString(), snap_seq, false /* cache_only */);}bool PessimisticTransaction::TryStealingLocks() {  assert(IsExpired());  TransactionState expected = STARTED;  return std::atomic_compare_exchange_strong(&txn_state_, &expected,                                             LOCKS_STOLEN);}void PessimisticTransaction::UnlockGetForUpdate(    ColumnFamilyHandle* column_family, const Slice& key) {  txn_db_impl_->UnLock(this, GetColumnFamilyID(column_family), key.ToString());}Status PessimisticTransaction::SetName(const TransactionName& name) {  Status s;  if (txn_state_ == STARTED) {    if (name_.length()) {      s = Status::InvalidArgument("Transaction has already been named.");    } else if (txn_db_impl_->GetTransactionByName(name) != nullptr) {      s = Status::InvalidArgument("Transaction name must be unique.");    } else if (name.length() < 1 || name.length() > 512) {      s = Status::InvalidArgument(          "Transaction name length must be between 1 and 512 chars.");    } else {      name_ = name;      txn_db_impl_->RegisterTransaction(this);    }  } else {    s = Status::InvalidArgument("Transaction is beyond state for naming.");  }  return s;}}  // namespace ROCKSDB_NAMESPACE#endif  // ROCKSDB_LITE
 |