pessimistic_transaction_db.h 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #pragma once
  6. #ifndef ROCKSDB_LITE
  7. #include <mutex>
  8. #include <queue>
  9. #include <set>
  10. #include <string>
  11. #include <unordered_map>
  12. #include <vector>
  13. #include "db/db_iter.h"
  14. #include "db/read_callback.h"
  15. #include "db/snapshot_checker.h"
  16. #include "rocksdb/db.h"
  17. #include "rocksdb/options.h"
  18. #include "rocksdb/utilities/transaction_db.h"
  19. #include "util/cast_util.h"
  20. #include "utilities/transactions/pessimistic_transaction.h"
  21. #include "utilities/transactions/transaction_lock_mgr.h"
  22. #include "utilities/transactions/write_prepared_txn.h"
  23. namespace ROCKSDB_NAMESPACE {
  24. class PessimisticTransactionDB : public TransactionDB {
  25. public:
  26. explicit PessimisticTransactionDB(DB* db,
  27. const TransactionDBOptions& txn_db_options);
  28. explicit PessimisticTransactionDB(StackableDB* db,
  29. const TransactionDBOptions& txn_db_options);
  30. virtual ~PessimisticTransactionDB();
  31. virtual const Snapshot* GetSnapshot() override { return db_->GetSnapshot(); }
  32. virtual Status Initialize(
  33. const std::vector<size_t>& compaction_enabled_cf_indices,
  34. const std::vector<ColumnFamilyHandle*>& handles);
  35. Transaction* BeginTransaction(const WriteOptions& write_options,
  36. const TransactionOptions& txn_options,
  37. Transaction* old_txn) override = 0;
  38. using StackableDB::Put;
  39. virtual Status Put(const WriteOptions& options,
  40. ColumnFamilyHandle* column_family, const Slice& key,
  41. const Slice& val) override;
  42. using StackableDB::Delete;
  43. virtual Status Delete(const WriteOptions& wopts,
  44. ColumnFamilyHandle* column_family,
  45. const Slice& key) override;
  46. using StackableDB::SingleDelete;
  47. virtual Status SingleDelete(const WriteOptions& wopts,
  48. ColumnFamilyHandle* column_family,
  49. const Slice& key) override;
  50. using StackableDB::Merge;
  51. virtual Status Merge(const WriteOptions& options,
  52. ColumnFamilyHandle* column_family, const Slice& key,
  53. const Slice& value) override;
  54. using TransactionDB::Write;
  55. virtual Status Write(const WriteOptions& opts, WriteBatch* updates) override;
  56. inline Status WriteWithConcurrencyControl(const WriteOptions& opts,
  57. WriteBatch* updates) {
  58. // Need to lock all keys in this batch to prevent write conflicts with
  59. // concurrent transactions.
  60. Transaction* txn = BeginInternalTransaction(opts);
  61. txn->DisableIndexing();
  62. auto txn_impl =
  63. static_cast_with_check<PessimisticTransaction, Transaction>(txn);
  64. // Since commitBatch sorts the keys before locking, concurrent Write()
  65. // operations will not cause a deadlock.
  66. // In order to avoid a deadlock with a concurrent Transaction, Transactions
  67. // should use a lock timeout.
  68. Status s = txn_impl->CommitBatch(updates);
  69. delete txn;
  70. return s;
  71. }
  72. using StackableDB::CreateColumnFamily;
  73. virtual Status CreateColumnFamily(const ColumnFamilyOptions& options,
  74. const std::string& column_family_name,
  75. ColumnFamilyHandle** handle) override;
  76. using StackableDB::DropColumnFamily;
  77. virtual Status DropColumnFamily(ColumnFamilyHandle* column_family) override;
  78. Status TryLock(PessimisticTransaction* txn, uint32_t cfh_id,
  79. const std::string& key, bool exclusive);
  80. void UnLock(PessimisticTransaction* txn, const TransactionKeyMap* keys);
  81. void UnLock(PessimisticTransaction* txn, uint32_t cfh_id,
  82. const std::string& key);
  83. void AddColumnFamily(const ColumnFamilyHandle* handle);
  84. static TransactionDBOptions ValidateTxnDBOptions(
  85. const TransactionDBOptions& txn_db_options);
  86. const TransactionDBOptions& GetTxnDBOptions() const {
  87. return txn_db_options_;
  88. }
  89. void InsertExpirableTransaction(TransactionID tx_id,
  90. PessimisticTransaction* tx);
  91. void RemoveExpirableTransaction(TransactionID tx_id);
  92. // If transaction is no longer available, locks can be stolen
  93. // If transaction is available, try stealing locks directly from transaction
  94. // It is the caller's responsibility to ensure that the referred transaction
  95. // is expirable (GetExpirationTime() > 0) and that it is expired.
  96. bool TryStealingExpiredTransactionLocks(TransactionID tx_id);
  97. Transaction* GetTransactionByName(const TransactionName& name) override;
  98. void RegisterTransaction(Transaction* txn);
  99. void UnregisterTransaction(Transaction* txn);
  100. // not thread safe. current use case is during recovery (single thread)
  101. void GetAllPreparedTransactions(std::vector<Transaction*>* trans) override;
  102. TransactionLockMgr::LockStatusData GetLockStatusData() override;
  103. std::vector<DeadlockPath> GetDeadlockInfoBuffer() override;
  104. void SetDeadlockInfoBufferSize(uint32_t target_size) override;
  105. // The default implementation does nothing. The actual implementation is moved
  106. // to the child classes that actually need this information. This was due to
  107. // an odd performance drop we observed when the added std::atomic member to
  108. // the base class even when the subclass do not read it in the fast path.
  109. virtual void UpdateCFComparatorMap(const std::vector<ColumnFamilyHandle*>&) {}
  110. virtual void UpdateCFComparatorMap(ColumnFamilyHandle*) {}
  111. protected:
  112. DBImpl* db_impl_;
  113. std::shared_ptr<Logger> info_log_;
  114. const TransactionDBOptions txn_db_options_;
  115. void ReinitializeTransaction(
  116. Transaction* txn, const WriteOptions& write_options,
  117. const TransactionOptions& txn_options = TransactionOptions());
  118. virtual Status VerifyCFOptions(const ColumnFamilyOptions& cf_options);
  119. private:
  120. friend class WritePreparedTxnDB;
  121. friend class WritePreparedTxnDBMock;
  122. friend class WriteUnpreparedTxn;
  123. friend class TransactionTest_DoubleCrashInRecovery_Test;
  124. friend class TransactionTest_DoubleEmptyWrite_Test;
  125. friend class TransactionTest_DuplicateKeys_Test;
  126. friend class TransactionTest_PersistentTwoPhaseTransactionTest_Test;
  127. friend class TransactionTest_TwoPhaseDoubleRecoveryTest_Test;
  128. friend class TransactionTest_TwoPhaseOutOfOrderDelete_Test;
  129. friend class TransactionStressTest_TwoPhaseLongPrepareTest_Test;
  130. friend class WriteUnpreparedTransactionTest_RecoveryTest_Test;
  131. friend class WriteUnpreparedTransactionTest_MarkLogWithPrepSection_Test;
  132. TransactionLockMgr lock_mgr_;
  133. // Must be held when adding/dropping column families.
  134. InstrumentedMutex column_family_mutex_;
  135. Transaction* BeginInternalTransaction(const WriteOptions& options);
  136. // Used to ensure that no locks are stolen from an expirable transaction
  137. // that has started a commit. Only transactions with an expiration time
  138. // should be in this map.
  139. std::mutex map_mutex_;
  140. std::unordered_map<TransactionID, PessimisticTransaction*>
  141. expirable_transactions_map_;
  142. // map from name to two phase transaction instance
  143. std::mutex name_map_mutex_;
  144. std::unordered_map<TransactionName, Transaction*> transactions_;
  145. // Signal that we are testing a crash scenario. Some asserts could be relaxed
  146. // in such cases.
  147. virtual void TEST_Crash() {}
  148. };
  149. // A PessimisticTransactionDB that writes the data to the DB after the commit.
  150. // In this way the DB only contains the committed data.
  151. class WriteCommittedTxnDB : public PessimisticTransactionDB {
  152. public:
  153. explicit WriteCommittedTxnDB(DB* db,
  154. const TransactionDBOptions& txn_db_options)
  155. : PessimisticTransactionDB(db, txn_db_options) {}
  156. explicit WriteCommittedTxnDB(StackableDB* db,
  157. const TransactionDBOptions& txn_db_options)
  158. : PessimisticTransactionDB(db, txn_db_options) {}
  159. virtual ~WriteCommittedTxnDB() {}
  160. Transaction* BeginTransaction(const WriteOptions& write_options,
  161. const TransactionOptions& txn_options,
  162. Transaction* old_txn) override;
  163. // Optimized version of ::Write that makes use of skip_concurrency_control
  164. // hint
  165. using TransactionDB::Write;
  166. virtual Status Write(const WriteOptions& opts,
  167. const TransactionDBWriteOptimizations& optimizations,
  168. WriteBatch* updates) override;
  169. virtual Status Write(const WriteOptions& opts, WriteBatch* updates) override;
  170. };
  171. } // namespace ROCKSDB_NAMESPACE
  172. #endif // ROCKSDB_LITE