pessimistic_transaction.h 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #pragma once
  6. #include <algorithm>
  7. #include <atomic>
  8. #include <mutex>
  9. #include <stack>
  10. #include <string>
  11. #include <unordered_map>
  12. #include <vector>
  13. #include "db/write_callback.h"
  14. #include "rocksdb/db.h"
  15. #include "rocksdb/slice.h"
  16. #include "rocksdb/snapshot.h"
  17. #include "rocksdb/status.h"
  18. #include "rocksdb/types.h"
  19. #include "rocksdb/utilities/transaction.h"
  20. #include "rocksdb/utilities/transaction_db.h"
  21. #include "rocksdb/utilities/write_batch_with_index.h"
  22. #include "util/autovector.h"
  23. #include "utilities/transactions/transaction_base.h"
  24. #include "utilities/transactions/transaction_util.h"
  25. namespace ROCKSDB_NAMESPACE {
  26. class PessimisticTransactionDB;
  27. // A transaction under pessimistic concurrency control. This class implements
  28. // the locking API and interfaces with the lock manager as well as the
  29. // pessimistic transactional db.
  30. class PessimisticTransaction : public TransactionBaseImpl {
  31. public:
  32. PessimisticTransaction(TransactionDB* db, const WriteOptions& write_options,
  33. const TransactionOptions& txn_options,
  34. const bool init = true);
  35. // No copying allowed
  36. PessimisticTransaction(const PessimisticTransaction&) = delete;
  37. void operator=(const PessimisticTransaction&) = delete;
  38. ~PessimisticTransaction() override;
  39. void Reinitialize(TransactionDB* txn_db, const WriteOptions& write_options,
  40. const TransactionOptions& txn_options);
  41. Status Prepare() override;
  42. Status Commit() override;
  43. // It is basically Commit without going through Prepare phase. The write batch
  44. // is also directly provided instead of expecting txn to gradually batch the
  45. // transactions writes to an internal write batch.
  46. Status CommitBatch(WriteBatch* batch);
  47. Status Rollback() override;
  48. Status RollbackToSavePoint() override;
  49. Status SetName(const TransactionName& name) override;
  50. // Generate a new unique transaction identifier
  51. static TransactionID GenTxnID();
  52. TransactionID GetID() const override { return txn_id_; }
  53. std::vector<TransactionID> GetWaitingTxns(uint32_t* column_family_id,
  54. std::string* key) const override {
  55. std::lock_guard<std::mutex> lock(wait_mutex_);
  56. std::vector<TransactionID> ids(waiting_txn_ids_.size());
  57. if (timed_out_key_.has_value()) {
  58. if (key) *key = timed_out_key_.value();
  59. } else {
  60. if (key) *key = waiting_key_ ? *waiting_key_ : "";
  61. }
  62. if (column_family_id) *column_family_id = waiting_cf_id_;
  63. std::copy(waiting_txn_ids_.begin(), waiting_txn_ids_.end(), ids.begin());
  64. return ids;
  65. }
  66. void SetWaitingTxn(autovector<TransactionID>& ids, uint32_t column_family_id,
  67. const std::string* key, bool is_timed_out = false) {
  68. std::lock_guard<std::mutex> lock(wait_mutex_);
  69. waiting_txn_ids_ = ids;
  70. waiting_cf_id_ = column_family_id;
  71. if (is_timed_out) {
  72. timed_out_key_ = key ? *key : "";
  73. } else {
  74. waiting_key_ = key;
  75. }
  76. }
  77. void ClearWaitingTxn() {
  78. std::lock_guard<std::mutex> lock(wait_mutex_);
  79. waiting_txn_ids_.clear();
  80. waiting_cf_id_ = 0;
  81. waiting_key_ = nullptr;
  82. }
  83. // Returns the time (in microseconds according to Env->GetMicros())
  84. // that this transaction will be expired. Returns 0 if this transaction does
  85. // not expire.
  86. uint64_t GetExpirationTime() const { return expiration_time_; }
  87. // returns true if this transaction has an expiration_time and has expired.
  88. bool IsExpired() const;
  89. // Returns the number of microseconds a transaction can wait on acquiring a
  90. // lock or -1 if there is no timeout.
  91. int64_t GetLockTimeout() const { return lock_timeout_; }
  92. void SetLockTimeout(int64_t timeout) override {
  93. lock_timeout_ = timeout * 1000;
  94. }
  95. int64_t GetDeadlockTimeout() const { return deadlock_timeout_us_; }
  96. void SetDeadlockTimeout(int64_t timeout_ms) override {
  97. deadlock_timeout_us_ = timeout_ms * 1000;
  98. }
  99. // Returns true if locks were stolen successfully, false otherwise.
  100. bool TryStealingLocks();
  101. bool IsDeadlockDetect() const override { return deadlock_detect_; }
  102. int64_t GetDeadlockDetectDepth() const { return deadlock_detect_depth_; }
  103. Status GetRangeLock(ColumnFamilyHandle* column_family,
  104. const Endpoint& start_key,
  105. const Endpoint& end_key) override;
  106. Status CollapseKey(const ReadOptions& options, const Slice& key,
  107. ColumnFamilyHandle* column_family = nullptr) override;
  108. protected:
  109. virtual Status PrepareInternal() = 0;
  110. virtual Status CommitWithoutPrepareInternal() = 0;
  111. // batch_cnt if non-zero is the number of sub-batches. A sub-batch is a batch
  112. // with no duplicate keys. If zero, then the number of sub-batches is unknown.
  113. virtual Status CommitBatchInternal(WriteBatch* batch,
  114. size_t batch_cnt = 0) = 0;
  115. virtual Status CommitInternal() = 0;
  116. virtual Status RollbackInternal() = 0;
  117. virtual void Initialize(const TransactionOptions& txn_options);
  118. Status LockBatch(WriteBatch* batch, LockTracker* keys_to_unlock);
  119. Status TryLock(ColumnFamilyHandle* column_family, const Slice& key,
  120. bool read_only, bool exclusive, const bool do_validate = true,
  121. const bool assume_tracked = false) override;
  122. void Clear() override;
  123. PessimisticTransactionDB* txn_db_impl_;
  124. DBImpl* db_impl_;
  125. // If non-zero, this transaction should not be committed after this time (in
  126. // microseconds according to Env->NowMicros())
  127. uint64_t expiration_time_;
  128. // Timestamp used by the transaction to perform all GetForUpdate.
  129. // Use this timestamp for conflict checking.
  130. // read_timestamp_ == kMaxTxnTimestamp means this transaction has not
  131. // performed any GetForUpdate. It is possible that the transaction has
  132. // performed blind writes or Get, though.
  133. TxnTimestamp read_timestamp_{kMaxTxnTimestamp};
  134. TxnTimestamp commit_timestamp_{kMaxTxnTimestamp};
  135. // Refer to
  136. // TransactionOptions::use_only_the_last_commit_time_batch_for_recovery
  137. bool use_only_the_last_commit_time_batch_for_recovery_ = false;
  138. // Refer to
  139. // TransactionOptions::skip_prepare
  140. bool skip_prepare_ = false;
  141. // Refer to TransactionOptions::commit_bypass_memtable
  142. uint32_t commit_bypass_memtable_threshold_ =
  143. std::numeric_limits<uint32_t>::max();
  144. uint64_t commit_bypass_memtable_byte_threshold_ =
  145. std::numeric_limits<uint64_t>::max();
  146. private:
  147. friend class TransactionTest_ValidateSnapshotTest_Test;
  148. // Used to create unique ids for transactions.
  149. static std::atomic<TransactionID> txn_id_counter_;
  150. // Unique ID for this transaction
  151. TransactionID txn_id_;
  152. // IDs for the transactions that are blocking the current transaction.
  153. //
  154. // empty if current transaction is not waiting or has timed out
  155. autovector<TransactionID> waiting_txn_ids_;
  156. // The following two represents the (cf, key) that a transaction is waiting
  157. // on.
  158. //
  159. // If waiting_key_ is not null, then the pointer should always point to
  160. // a valid string object. The reason is that it is only non-null when the
  161. // transaction is blocked in the PointLockManager::AcquireWithTimeout
  162. // function. At that point, the key string object is one of the function
  163. // parameters.
  164. uint32_t waiting_cf_id_;
  165. const std::string* waiting_key_;
  166. // Waiting key with lifetime of the txn so it can be accessed after timeouts
  167. std::optional<std::string> timed_out_key_;
  168. // Mutex protecting waiting_txn_ids_, waiting_cf_id_ and waiting_key_.
  169. mutable std::mutex wait_mutex_;
  170. // Timeout in microseconds when locking a key or -1 if there is no timeout.
  171. int64_t lock_timeout_;
  172. // Timeout in microseconds before perform dead lock detection.
  173. // If 0, deadlock detection will be performed immediately.
  174. int64_t deadlock_timeout_us_;
  175. // Whether to perform deadlock detection or not.
  176. bool deadlock_detect_;
  177. // Whether to perform deadlock detection or not.
  178. int64_t deadlock_detect_depth_;
  179. // Refer to TransactionOptions::skip_concurrency_control
  180. bool skip_concurrency_control_;
  181. virtual Status ValidateSnapshot(ColumnFamilyHandle* column_family,
  182. const Slice& key,
  183. SequenceNumber* tracked_at_seq);
  184. void UnlockGetForUpdate(ColumnFamilyHandle* column_family,
  185. const Slice& key) override;
  186. };
  187. class WriteCommittedTxn : public PessimisticTransaction {
  188. public:
  189. WriteCommittedTxn(TransactionDB* db, const WriteOptions& write_options,
  190. const TransactionOptions& txn_options);
  191. // No copying allowed
  192. WriteCommittedTxn(const WriteCommittedTxn&) = delete;
  193. void operator=(const WriteCommittedTxn&) = delete;
  194. ~WriteCommittedTxn() override {}
  195. using TransactionBaseImpl::GetForUpdate;
  196. Status GetForUpdate(const ReadOptions& read_options,
  197. ColumnFamilyHandle* column_family, const Slice& key,
  198. std::string* value, bool exclusive,
  199. const bool do_validate) override;
  200. Status GetForUpdate(const ReadOptions& read_options,
  201. ColumnFamilyHandle* column_family, const Slice& key,
  202. PinnableSlice* pinnable_val, bool exclusive,
  203. const bool do_validate) override;
  204. Status GetEntityForUpdate(const ReadOptions& read_options,
  205. ColumnFamilyHandle* column_family, const Slice& key,
  206. PinnableWideColumns* columns, bool exclusive,
  207. bool do_validate) override;
  208. using TransactionBaseImpl::Put;
  209. // `key` does NOT include timestamp even when it's enabled.
  210. Status Put(ColumnFamilyHandle* column_family, const Slice& key,
  211. const Slice& value, const bool assume_tracked = false) override;
  212. Status Put(ColumnFamilyHandle* column_family, const SliceParts& key,
  213. const SliceParts& value,
  214. const bool assume_tracked = false) override;
  215. using TransactionBaseImpl::PutUntracked;
  216. Status PutUntracked(ColumnFamilyHandle* column_family, const Slice& key,
  217. const Slice& value) override;
  218. Status PutUntracked(ColumnFamilyHandle* column_family, const SliceParts& key,
  219. const SliceParts& value) override;
  220. // `key` does NOT include timestamp even when it's enabled.
  221. Status PutEntity(ColumnFamilyHandle* column_family, const Slice& key,
  222. const WideColumns& columns,
  223. bool assume_tracked = false) override {
  224. const bool do_validate = !assume_tracked;
  225. return PutEntityImpl(column_family, key, columns, do_validate,
  226. assume_tracked);
  227. }
  228. Status PutEntityUntracked(ColumnFamilyHandle* column_family, const Slice& key,
  229. const WideColumns& columns) override {
  230. constexpr bool do_validate = false;
  231. constexpr bool assume_tracked = false;
  232. return PutEntityImpl(column_family, key, columns, do_validate,
  233. assume_tracked);
  234. }
  235. using TransactionBaseImpl::Delete;
  236. // `key` does NOT include timestamp even when it's enabled.
  237. Status Delete(ColumnFamilyHandle* column_family, const Slice& key,
  238. const bool assume_tracked = false) override;
  239. Status Delete(ColumnFamilyHandle* column_family, const SliceParts& key,
  240. const bool assume_tracked = false) override;
  241. using TransactionBaseImpl::DeleteUntracked;
  242. Status DeleteUntracked(ColumnFamilyHandle* column_family,
  243. const Slice& key) override;
  244. Status DeleteUntracked(ColumnFamilyHandle* column_family,
  245. const SliceParts& key) override;
  246. using TransactionBaseImpl::SingleDelete;
  247. // `key` does NOT include timestamp even when it's enabled.
  248. Status SingleDelete(ColumnFamilyHandle* column_family, const Slice& key,
  249. const bool assume_tracked = false) override;
  250. Status SingleDelete(ColumnFamilyHandle* column_family, const SliceParts& key,
  251. const bool assume_tracked = false) override;
  252. using TransactionBaseImpl::SingleDeleteUntracked;
  253. Status SingleDeleteUntracked(ColumnFamilyHandle* column_family,
  254. const Slice& key) override;
  255. using TransactionBaseImpl::Merge;
  256. Status Merge(ColumnFamilyHandle* column_family, const Slice& key,
  257. const Slice& value, const bool assume_tracked = false) override;
  258. Status SetReadTimestampForValidation(TxnTimestamp ts) override;
  259. Status SetCommitTimestamp(TxnTimestamp ts) override;
  260. TxnTimestamp GetCommitTimestamp() const override { return commit_timestamp_; }
  261. private:
  262. template <typename TValue>
  263. Status GetForUpdateImpl(const ReadOptions& read_options,
  264. ColumnFamilyHandle* column_family, const Slice& key,
  265. TValue* value, bool exclusive,
  266. const bool do_validate);
  267. Status PutEntityImpl(ColumnFamilyHandle* column_family, const Slice& key,
  268. const WideColumns& columns, bool do_validate,
  269. bool assume_tracked);
  270. template <typename TKey, typename TOperation>
  271. Status Operate(ColumnFamilyHandle* column_family, const TKey& key,
  272. const bool do_validate, const bool assume_tracked,
  273. TOperation&& operation);
  274. Status PrepareInternal() override;
  275. Status CommitWithoutPrepareInternal() override;
  276. Status CommitBatchInternal(WriteBatch* batch, size_t batch_cnt) override;
  277. Status CommitInternal() override;
  278. Status RollbackInternal() override;
  279. // Checks if the combination of `do_validate`, the read timestamp set in
  280. // `read_timestamp_` and the `enable_udt_validation` flag in
  281. // TransactionDBOptions make sense together.
  282. Status SanityCheckReadTimestamp(bool do_validate);
  283. // Column families that enable timestamps and whose data are written when
  284. // indexing_enabled_ is false. If a key is written when indexing_enabled_ is
  285. // true, then the corresponding column family is not added to cfs_with_ts
  286. // even if it enables timestamp.
  287. std::unordered_set<uint32_t> cfs_with_ts_tracked_when_indexing_disabled_;
  288. };
  289. } // namespace ROCKSDB_NAMESPACE