| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220 | //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.//  This source code is licensed under both the GPLv2 (found in the//  COPYING file in the root directory) and Apache 2.0 License//  (found in the LICENSE.Apache file in the root directory).#pragma once#ifndef ROCKSDB_LITE#include <mutex>#include <queue>#include <set>#include <string>#include <unordered_map>#include <vector>#include "db/db_iter.h"#include "db/read_callback.h"#include "db/snapshot_checker.h"#include "rocksdb/db.h"#include "rocksdb/options.h"#include "rocksdb/utilities/transaction_db.h"#include "util/cast_util.h"#include "utilities/transactions/pessimistic_transaction.h"#include "utilities/transactions/transaction_lock_mgr.h"#include "utilities/transactions/write_prepared_txn.h"namespace ROCKSDB_NAMESPACE {class PessimisticTransactionDB : public TransactionDB { public:  explicit PessimisticTransactionDB(DB* db,                                    const TransactionDBOptions& txn_db_options);  explicit PessimisticTransactionDB(StackableDB* db,                                    const TransactionDBOptions& txn_db_options);  virtual ~PessimisticTransactionDB();  virtual const Snapshot* GetSnapshot() override { return db_->GetSnapshot(); }  virtual Status Initialize(      const std::vector<size_t>& compaction_enabled_cf_indices,      const std::vector<ColumnFamilyHandle*>& handles);  Transaction* BeginTransaction(const WriteOptions& write_options,                                const TransactionOptions& txn_options,                                Transaction* old_txn) override = 0;  using StackableDB::Put;  virtual Status Put(const WriteOptions& options,                     ColumnFamilyHandle* column_family, const Slice& key,                     const Slice& val) override;  using StackableDB::Delete;  virtual Status Delete(const WriteOptions& wopts,                        ColumnFamilyHandle* column_family,                        const Slice& key) override;  using StackableDB::SingleDelete;  virtual Status SingleDelete(const WriteOptions& wopts,                              ColumnFamilyHandle* column_family,                              const Slice& key) override;  using StackableDB::Merge;  virtual Status Merge(const WriteOptions& options,                       ColumnFamilyHandle* column_family, const Slice& key,                       const Slice& value) override;  using TransactionDB::Write;  virtual Status Write(const WriteOptions& opts, WriteBatch* updates) override;  inline Status WriteWithConcurrencyControl(const WriteOptions& opts,                                            WriteBatch* updates) {    // Need to lock all keys in this batch to prevent write conflicts with    // concurrent transactions.    Transaction* txn = BeginInternalTransaction(opts);    txn->DisableIndexing();    auto txn_impl =        static_cast_with_check<PessimisticTransaction, Transaction>(txn);    // Since commitBatch sorts the keys before locking, concurrent Write()    // operations will not cause a deadlock.    // In order to avoid a deadlock with a concurrent Transaction, Transactions    // should use a lock timeout.    Status s = txn_impl->CommitBatch(updates);    delete txn;    return s;  }  using StackableDB::CreateColumnFamily;  virtual Status CreateColumnFamily(const ColumnFamilyOptions& options,                                    const std::string& column_family_name,                                    ColumnFamilyHandle** handle) override;  using StackableDB::DropColumnFamily;  virtual Status DropColumnFamily(ColumnFamilyHandle* column_family) override;  Status TryLock(PessimisticTransaction* txn, uint32_t cfh_id,                 const std::string& key, bool exclusive);  void UnLock(PessimisticTransaction* txn, const TransactionKeyMap* keys);  void UnLock(PessimisticTransaction* txn, uint32_t cfh_id,              const std::string& key);  void AddColumnFamily(const ColumnFamilyHandle* handle);  static TransactionDBOptions ValidateTxnDBOptions(      const TransactionDBOptions& txn_db_options);  const TransactionDBOptions& GetTxnDBOptions() const {    return txn_db_options_;  }  void InsertExpirableTransaction(TransactionID tx_id,                                  PessimisticTransaction* tx);  void RemoveExpirableTransaction(TransactionID tx_id);  // If transaction is no longer available, locks can be stolen  // If transaction is available, try stealing locks directly from transaction  // It is the caller's responsibility to ensure that the referred transaction  // is expirable (GetExpirationTime() > 0) and that it is expired.  bool TryStealingExpiredTransactionLocks(TransactionID tx_id);  Transaction* GetTransactionByName(const TransactionName& name) override;  void RegisterTransaction(Transaction* txn);  void UnregisterTransaction(Transaction* txn);  // not thread safe. current use case is during recovery (single thread)  void GetAllPreparedTransactions(std::vector<Transaction*>* trans) override;  TransactionLockMgr::LockStatusData GetLockStatusData() override;  std::vector<DeadlockPath> GetDeadlockInfoBuffer() override;  void SetDeadlockInfoBufferSize(uint32_t target_size) override;  // The default implementation does nothing. The actual implementation is moved  // to the child classes that actually need this information. This was due to  // an odd performance drop we observed when the added std::atomic member to  // the base class even when the subclass do not read it in the fast path.  virtual void UpdateCFComparatorMap(const std::vector<ColumnFamilyHandle*>&) {}  virtual void UpdateCFComparatorMap(ColumnFamilyHandle*) {} protected:  DBImpl* db_impl_;  std::shared_ptr<Logger> info_log_;  const TransactionDBOptions txn_db_options_;  void ReinitializeTransaction(      Transaction* txn, const WriteOptions& write_options,      const TransactionOptions& txn_options = TransactionOptions());  virtual Status VerifyCFOptions(const ColumnFamilyOptions& cf_options); private:  friend class WritePreparedTxnDB;  friend class WritePreparedTxnDBMock;  friend class WriteUnpreparedTxn;  friend class TransactionTest_DoubleCrashInRecovery_Test;  friend class TransactionTest_DoubleEmptyWrite_Test;  friend class TransactionTest_DuplicateKeys_Test;  friend class TransactionTest_PersistentTwoPhaseTransactionTest_Test;  friend class TransactionTest_TwoPhaseDoubleRecoveryTest_Test;  friend class TransactionTest_TwoPhaseOutOfOrderDelete_Test;  friend class TransactionStressTest_TwoPhaseLongPrepareTest_Test;  friend class WriteUnpreparedTransactionTest_RecoveryTest_Test;  friend class WriteUnpreparedTransactionTest_MarkLogWithPrepSection_Test;  TransactionLockMgr lock_mgr_;  // Must be held when adding/dropping column families.  InstrumentedMutex column_family_mutex_;  Transaction* BeginInternalTransaction(const WriteOptions& options);  // Used to ensure that no locks are stolen from an expirable transaction  // that has started a commit. Only transactions with an expiration time  // should be in this map.  std::mutex map_mutex_;  std::unordered_map<TransactionID, PessimisticTransaction*>      expirable_transactions_map_;  // map from name to two phase transaction instance  std::mutex name_map_mutex_;  std::unordered_map<TransactionName, Transaction*> transactions_;  // Signal that we are testing a crash scenario. Some asserts could be relaxed  // in such cases.  virtual void TEST_Crash() {}};// A PessimisticTransactionDB that writes the data to the DB after the commit.// In this way the DB only contains the committed data.class WriteCommittedTxnDB : public PessimisticTransactionDB { public:  explicit WriteCommittedTxnDB(DB* db,                               const TransactionDBOptions& txn_db_options)      : PessimisticTransactionDB(db, txn_db_options) {}  explicit WriteCommittedTxnDB(StackableDB* db,                               const TransactionDBOptions& txn_db_options)      : PessimisticTransactionDB(db, txn_db_options) {}  virtual ~WriteCommittedTxnDB() {}  Transaction* BeginTransaction(const WriteOptions& write_options,                                const TransactionOptions& txn_options,                                Transaction* old_txn) override;  // Optimized version of ::Write that makes use of skip_concurrency_control  // hint  using TransactionDB::Write;  virtual Status Write(const WriteOptions& opts,                       const TransactionDBWriteOptimizations& optimizations,                       WriteBatch* updates) override;  virtual Status Write(const WriteOptions& opts, WriteBatch* updates) override;};}  // namespace ROCKSDB_NAMESPACE#endif  // ROCKSDB_LITE
 |