pessimistic_transaction_db.h 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #pragma once
  6. #include <mutex>
  7. #include <queue>
  8. #include <set>
  9. #include <string>
  10. #include <unordered_map>
  11. #include <vector>
  12. #include "db/db_iter.h"
  13. #include "db/read_callback.h"
  14. #include "db/snapshot_checker.h"
  15. #include "rocksdb/db.h"
  16. #include "rocksdb/options.h"
  17. #include "rocksdb/utilities/transaction_db.h"
  18. #include "util/cast_util.h"
  19. #include "utilities/transactions/lock/lock_manager.h"
  20. #include "utilities/transactions/lock/range/range_lock_manager.h"
  21. #include "utilities/transactions/pessimistic_transaction.h"
  22. #include "utilities/transactions/write_prepared_txn.h"
  23. namespace ROCKSDB_NAMESPACE {
  24. class PessimisticTransactionDB : public TransactionDB {
  25. public:
  26. explicit PessimisticTransactionDB(DB* db,
  27. const TransactionDBOptions& txn_db_options);
  28. explicit PessimisticTransactionDB(StackableDB* db,
  29. const TransactionDBOptions& txn_db_options);
  30. virtual ~PessimisticTransactionDB();
  31. const Snapshot* GetSnapshot() override { return db_->GetSnapshot(); }
  32. virtual Status Initialize(
  33. const std::vector<size_t>& compaction_enabled_cf_indices,
  34. const std::vector<ColumnFamilyHandle*>& handles);
  35. Transaction* BeginTransaction(const WriteOptions& write_options,
  36. const TransactionOptions& txn_options,
  37. Transaction* old_txn) override = 0;
  38. using StackableDB::Put;
  39. Status Put(const WriteOptions& options, ColumnFamilyHandle* column_family,
  40. const Slice& key, const Slice& val) override;
  41. Status PutEntity(const WriteOptions& options,
  42. ColumnFamilyHandle* column_family, const Slice& key,
  43. const WideColumns& columns) override;
  44. Status PutEntity(const WriteOptions& /* options */, const Slice& /* key */,
  45. const AttributeGroups& attribute_groups) override {
  46. if (attribute_groups.empty()) {
  47. return Status::InvalidArgument(
  48. "Cannot call this method without attribute groups");
  49. }
  50. return Status::NotSupported(
  51. "PutEntity with AttributeGroups not supported by "
  52. "PessimisticTransactionDB");
  53. }
  54. using StackableDB::Delete;
  55. Status Delete(const WriteOptions& wopts, ColumnFamilyHandle* column_family,
  56. const Slice& key) override;
  57. using StackableDB::SingleDelete;
  58. Status SingleDelete(const WriteOptions& wopts,
  59. ColumnFamilyHandle* column_family,
  60. const Slice& key) override;
  61. using StackableDB::Merge;
  62. Status Merge(const WriteOptions& options, ColumnFamilyHandle* column_family,
  63. const Slice& key, const Slice& value) override;
  64. using TransactionDB::Write;
  65. Status Write(const WriteOptions& opts, WriteBatch* updates) override;
  66. inline Status WriteWithConcurrencyControl(const WriteOptions& opts,
  67. WriteBatch* updates) {
  68. Status s;
  69. if (opts.protection_bytes_per_key > 0) {
  70. s = WriteBatchInternal::UpdateProtectionInfo(
  71. updates, opts.protection_bytes_per_key);
  72. }
  73. if (s.ok()) {
  74. // Need to lock all keys in this batch to prevent write conflicts with
  75. // concurrent transactions.
  76. Transaction* txn = BeginInternalTransaction(opts);
  77. txn->DisableIndexing();
  78. auto txn_impl = static_cast_with_check<PessimisticTransaction>(txn);
  79. // Since commitBatch sorts the keys before locking, concurrent Write()
  80. // operations will not cause a deadlock.
  81. // In order to avoid a deadlock with a concurrent Transaction,
  82. // Transactions should use a lock timeout.
  83. s = txn_impl->CommitBatch(updates);
  84. delete txn;
  85. }
  86. return s;
  87. }
  88. using StackableDB::CreateColumnFamily;
  89. Status CreateColumnFamily(const ColumnFamilyOptions& options,
  90. const std::string& column_family_name,
  91. ColumnFamilyHandle** handle) override;
  92. Status CreateColumnFamilies(
  93. const ColumnFamilyOptions& options,
  94. const std::vector<std::string>& column_family_names,
  95. std::vector<ColumnFamilyHandle*>* handles) override;
  96. Status CreateColumnFamilies(
  97. const std::vector<ColumnFamilyDescriptor>& column_families,
  98. std::vector<ColumnFamilyHandle*>* handles) override;
  99. using StackableDB::CreateColumnFamilyWithImport;
  100. Status CreateColumnFamilyWithImport(
  101. const ColumnFamilyOptions& options, const std::string& column_family_name,
  102. const ImportColumnFamilyOptions& import_options,
  103. const ExportImportFilesMetaData& metadata,
  104. ColumnFamilyHandle** handle) override {
  105. const std::vector<const ExportImportFilesMetaData*>& metadatas{&metadata};
  106. return CreateColumnFamilyWithImport(options, column_family_name,
  107. import_options, metadatas, handle);
  108. }
  109. Status CreateColumnFamilyWithImport(
  110. const ColumnFamilyOptions& options, const std::string& column_family_name,
  111. const ImportColumnFamilyOptions& import_options,
  112. const std::vector<const ExportImportFilesMetaData*>& metadatas,
  113. ColumnFamilyHandle** handle) override;
  114. using StackableDB::DropColumnFamily;
  115. Status DropColumnFamily(ColumnFamilyHandle* column_family) override;
  116. Status DropColumnFamilies(
  117. const std::vector<ColumnFamilyHandle*>& column_families) override;
  118. Status TryLock(PessimisticTransaction* txn, uint32_t cfh_id,
  119. const std::string& key, bool exclusive);
  120. Status TryRangeLock(PessimisticTransaction* txn, uint32_t cfh_id,
  121. const Endpoint& start_endp, const Endpoint& end_endp);
  122. void UnLock(PessimisticTransaction* txn, const LockTracker& keys);
  123. void UnLock(PessimisticTransaction* txn, uint32_t cfh_id,
  124. const std::string& key);
  125. void AddColumnFamily(const ColumnFamilyHandle* handle);
  126. static TransactionDBOptions ValidateTxnDBOptions(
  127. const TransactionDBOptions& txn_db_options);
  128. const TransactionDBOptions& GetTxnDBOptions() const {
  129. return txn_db_options_;
  130. }
  131. void InsertExpirableTransaction(TransactionID tx_id,
  132. PessimisticTransaction* tx);
  133. void RemoveExpirableTransaction(TransactionID tx_id);
  134. // If transaction is no longer available, locks can be stolen
  135. // If transaction is available, try stealing locks directly from transaction
  136. // It is the caller's responsibility to ensure that the referred transaction
  137. // is expirable (GetExpirationTime() > 0) and that it is expired.
  138. bool TryStealingExpiredTransactionLocks(TransactionID tx_id);
  139. Transaction* GetTransactionByName(const TransactionName& name) override;
  140. Status RegisterTransaction(Transaction* txn);
  141. void UnregisterTransaction(Transaction* txn);
  142. // not thread safe. current use case is during recovery (single thread)
  143. void GetAllPreparedTransactions(std::vector<Transaction*>* trans) override;
  144. LockManager::PointLockStatus GetLockStatusData() override;
  145. std::vector<DeadlockPath> GetDeadlockInfoBuffer() override;
  146. void SetDeadlockInfoBufferSize(uint32_t target_size) override;
  147. // The default implementation does nothing. The actual implementation is moved
  148. // to the child classes that actually need this information. This was due to
  149. // an odd performance drop we observed when the added std::atomic member to
  150. // the base class even when the subclass do not read it in the fast path.
  151. virtual void UpdateCFComparatorMap(const std::vector<ColumnFamilyHandle*>&) {}
  152. virtual void UpdateCFComparatorMap(ColumnFamilyHandle*) {}
  153. // Use the returned factory to create LockTrackers in transactions.
  154. const LockTrackerFactory& GetLockTrackerFactory() const {
  155. return lock_manager_->GetLockTrackerFactory();
  156. }
  157. std::pair<Status, std::shared_ptr<const Snapshot>> CreateTimestampedSnapshot(
  158. TxnTimestamp ts) override;
  159. std::shared_ptr<const Snapshot> GetTimestampedSnapshot(
  160. TxnTimestamp ts) const override;
  161. void ReleaseTimestampedSnapshotsOlderThan(TxnTimestamp ts) override;
  162. Status GetTimestampedSnapshots(TxnTimestamp ts_lb, TxnTimestamp ts_ub,
  163. std::vector<std::shared_ptr<const Snapshot>>&
  164. timestamped_snapshots) const override;
  165. protected:
  166. DBImpl* db_impl_;
  167. std::shared_ptr<Logger> info_log_;
  168. const TransactionDBOptions txn_db_options_;
  169. static Status FailIfBatchHasTs(const WriteBatch* wb);
  170. static Status FailIfCfEnablesTs(const DB* db,
  171. const ColumnFamilyHandle* column_family);
  172. void ReinitializeTransaction(
  173. Transaction* txn, const WriteOptions& write_options,
  174. const TransactionOptions& txn_options = TransactionOptions());
  175. virtual Status VerifyCFOptions(const ColumnFamilyOptions& cf_options);
  176. private:
  177. friend class WritePreparedTxnDB;
  178. friend class WritePreparedTxnDBMock;
  179. friend class WriteUnpreparedTxn;
  180. friend class TransactionTest_DoubleCrashInRecovery_Test;
  181. friend class TransactionTest_DoubleEmptyWrite_Test;
  182. friend class TransactionTest_DuplicateKeys_Test;
  183. friend class TransactionTest_PersistentTwoPhaseTransactionTest_Test;
  184. friend class TransactionTest_TwoPhaseDoubleRecoveryTest_Test;
  185. friend class TransactionTest_TwoPhaseOutOfOrderDelete_Test;
  186. friend class TransactionStressTest_TwoPhaseLongPrepareTest_Test;
  187. friend class WriteUnpreparedTransactionTest_RecoveryTest_Test;
  188. friend class WriteUnpreparedTransactionTest_MarkLogWithPrepSection_Test;
  189. Transaction* BeginInternalTransaction(const WriteOptions& options);
  190. Transaction* GetTransactionByNameLocked(const TransactionName& name);
  191. std::shared_ptr<LockManager> lock_manager_;
  192. // Must be held when adding/dropping column families.
  193. InstrumentedMutex column_family_mutex_;
  194. // Used to ensure that no locks are stolen from an expirable transaction
  195. // that has started a commit. Only transactions with an expiration time
  196. // should be in this map.
  197. std::mutex map_mutex_;
  198. std::unordered_map<TransactionID, PessimisticTransaction*>
  199. expirable_transactions_map_;
  200. // map from name to two phase transaction instance
  201. std::mutex name_map_mutex_;
  202. std::unordered_map<TransactionName, Transaction*> transactions_;
  203. // Signal that we are testing a crash scenario. Some asserts could be relaxed
  204. // in such cases.
  205. virtual void TEST_Crash() {}
  206. };
  207. // A PessimisticTransactionDB that writes the data to the DB after the commit.
  208. // In this way the DB only contains the committed data.
  209. class WriteCommittedTxnDB : public PessimisticTransactionDB {
  210. public:
  211. explicit WriteCommittedTxnDB(DB* db,
  212. const TransactionDBOptions& txn_db_options)
  213. : PessimisticTransactionDB(db, txn_db_options) {}
  214. explicit WriteCommittedTxnDB(StackableDB* db,
  215. const TransactionDBOptions& txn_db_options)
  216. : PessimisticTransactionDB(db, txn_db_options) {}
  217. virtual ~WriteCommittedTxnDB() {}
  218. Transaction* BeginTransaction(const WriteOptions& write_options,
  219. const TransactionOptions& txn_options,
  220. Transaction* old_txn) override;
  221. // Optimized version of ::Write that makes use of skip_concurrency_control
  222. // hint
  223. using TransactionDB::Write;
  224. Status Write(const WriteOptions& opts,
  225. const TransactionDBWriteOptimizations& optimizations,
  226. WriteBatch* updates) override;
  227. Status Write(const WriteOptions& opts, WriteBatch* updates) override;
  228. };
  229. inline Status PessimisticTransactionDB::FailIfBatchHasTs(
  230. const WriteBatch* batch) {
  231. if (batch != nullptr && WriteBatchInternal::HasKeyWithTimestamp(*batch)) {
  232. return Status::NotSupported(
  233. "Writes with timestamp must go through transaction API instead of "
  234. "TransactionDB.");
  235. }
  236. return Status::OK();
  237. }
  238. inline Status PessimisticTransactionDB::FailIfCfEnablesTs(
  239. const DB* db, const ColumnFamilyHandle* column_family) {
  240. assert(db);
  241. column_family = column_family ? column_family : db->DefaultColumnFamily();
  242. assert(column_family);
  243. const Comparator* const ucmp = column_family->GetComparator();
  244. assert(ucmp);
  245. if (ucmp->timestamp_size() > 0) {
  246. return Status::NotSupported(
  247. "Write operation with user timestamp must go through the transaction "
  248. "API instead of TransactionDB.");
  249. }
  250. return Status::OK();
  251. }
  252. class SnapshotCreationCallback : public PostMemTableCallback {
  253. public:
  254. explicit SnapshotCreationCallback(
  255. DBImpl* dbi, TxnTimestamp commit_ts,
  256. const std::shared_ptr<TransactionNotifier>& notifier,
  257. std::shared_ptr<const Snapshot>& snapshot)
  258. : db_impl_(dbi),
  259. commit_ts_(commit_ts),
  260. snapshot_notifier_(notifier),
  261. snapshot_(snapshot) {
  262. assert(db_impl_);
  263. }
  264. ~SnapshotCreationCallback() override {
  265. snapshot_creation_status_.PermitUncheckedError();
  266. }
  267. Status operator()(SequenceNumber seq, bool disable_memtable) override;
  268. private:
  269. DBImpl* const db_impl_;
  270. const TxnTimestamp commit_ts_;
  271. std::shared_ptr<TransactionNotifier> snapshot_notifier_;
  272. std::shared_ptr<const Snapshot>& snapshot_;
  273. Status snapshot_creation_status_;
  274. };
  275. } // namespace ROCKSDB_NAMESPACE