transaction_base.h 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #pragma once
  6. #ifndef ROCKSDB_LITE
  7. #include <stack>
  8. #include <string>
  9. #include <vector>
  10. #include "db/write_batch_internal.h"
  11. #include "rocksdb/db.h"
  12. #include "rocksdb/slice.h"
  13. #include "rocksdb/snapshot.h"
  14. #include "rocksdb/status.h"
  15. #include "rocksdb/types.h"
  16. #include "rocksdb/utilities/transaction.h"
  17. #include "rocksdb/utilities/transaction_db.h"
  18. #include "rocksdb/utilities/write_batch_with_index.h"
  19. #include "util/autovector.h"
  20. #include "utilities/transactions/transaction_util.h"
  21. namespace ROCKSDB_NAMESPACE {
  22. class TransactionBaseImpl : public Transaction {
  23. public:
  24. TransactionBaseImpl(DB* db, const WriteOptions& write_options);
  25. virtual ~TransactionBaseImpl();
  26. // Remove pending operations queued in this transaction.
  27. virtual void Clear();
  28. void Reinitialize(DB* db, const WriteOptions& write_options);
  29. // Called before executing Put, Merge, Delete, and GetForUpdate. If TryLock
  30. // returns non-OK, the Put/Merge/Delete/GetForUpdate will be failed.
  31. // do_validate will be false if called from PutUntracked, DeleteUntracked,
  32. // MergeUntracked, or GetForUpdate(do_validate=false)
  33. virtual Status TryLock(ColumnFamilyHandle* column_family, const Slice& key,
  34. bool read_only, bool exclusive,
  35. const bool do_validate = true,
  36. const bool assume_tracked = false) = 0;
  37. void SetSavePoint() override;
  38. Status RollbackToSavePoint() override;
  39. Status PopSavePoint() override;
  40. using Transaction::Get;
  41. Status Get(const ReadOptions& options, ColumnFamilyHandle* column_family,
  42. const Slice& key, std::string* value) override;
  43. Status Get(const ReadOptions& options, ColumnFamilyHandle* column_family,
  44. const Slice& key, PinnableSlice* value) override;
  45. Status Get(const ReadOptions& options, const Slice& key,
  46. std::string* value) override {
  47. return Get(options, db_->DefaultColumnFamily(), key, value);
  48. }
  49. using Transaction::GetForUpdate;
  50. Status GetForUpdate(const ReadOptions& options,
  51. ColumnFamilyHandle* column_family, const Slice& key,
  52. std::string* value, bool exclusive,
  53. const bool do_validate) override;
  54. Status GetForUpdate(const ReadOptions& options,
  55. ColumnFamilyHandle* column_family, const Slice& key,
  56. PinnableSlice* pinnable_val, bool exclusive,
  57. const bool do_validate) override;
  58. Status GetForUpdate(const ReadOptions& options, const Slice& key,
  59. std::string* value, bool exclusive,
  60. const bool do_validate) override {
  61. return GetForUpdate(options, db_->DefaultColumnFamily(), key, value,
  62. exclusive, do_validate);
  63. }
  64. using Transaction::MultiGet;
  65. std::vector<Status> MultiGet(
  66. const ReadOptions& options,
  67. const std::vector<ColumnFamilyHandle*>& column_family,
  68. const std::vector<Slice>& keys,
  69. std::vector<std::string>* values) override;
  70. std::vector<Status> MultiGet(const ReadOptions& options,
  71. const std::vector<Slice>& keys,
  72. std::vector<std::string>* values) override {
  73. return MultiGet(options, std::vector<ColumnFamilyHandle*>(
  74. keys.size(), db_->DefaultColumnFamily()),
  75. keys, values);
  76. }
  77. void MultiGet(const ReadOptions& options, ColumnFamilyHandle* column_family,
  78. const size_t num_keys, const Slice* keys, PinnableSlice* values,
  79. Status* statuses, const bool sorted_input = false) override;
  80. using Transaction::MultiGetForUpdate;
  81. std::vector<Status> MultiGetForUpdate(
  82. const ReadOptions& options,
  83. const std::vector<ColumnFamilyHandle*>& column_family,
  84. const std::vector<Slice>& keys,
  85. std::vector<std::string>* values) override;
  86. std::vector<Status> MultiGetForUpdate(
  87. const ReadOptions& options, const std::vector<Slice>& keys,
  88. std::vector<std::string>* values) override {
  89. return MultiGetForUpdate(options,
  90. std::vector<ColumnFamilyHandle*>(
  91. keys.size(), db_->DefaultColumnFamily()),
  92. keys, values);
  93. }
  94. Iterator* GetIterator(const ReadOptions& read_options) override;
  95. Iterator* GetIterator(const ReadOptions& read_options,
  96. ColumnFamilyHandle* column_family) override;
  97. Status Put(ColumnFamilyHandle* column_family, const Slice& key,
  98. const Slice& value, const bool assume_tracked = false) override;
  99. Status Put(const Slice& key, const Slice& value) override {
  100. return Put(nullptr, key, value);
  101. }
  102. Status Put(ColumnFamilyHandle* column_family, const SliceParts& key,
  103. const SliceParts& value,
  104. const bool assume_tracked = false) override;
  105. Status Put(const SliceParts& key, const SliceParts& value) override {
  106. return Put(nullptr, key, value);
  107. }
  108. Status Merge(ColumnFamilyHandle* column_family, const Slice& key,
  109. const Slice& value, const bool assume_tracked = false) override;
  110. Status Merge(const Slice& key, const Slice& value) override {
  111. return Merge(nullptr, key, value);
  112. }
  113. Status Delete(ColumnFamilyHandle* column_family, const Slice& key,
  114. const bool assume_tracked = false) override;
  115. Status Delete(const Slice& key) override { return Delete(nullptr, key); }
  116. Status Delete(ColumnFamilyHandle* column_family, const SliceParts& key,
  117. const bool assume_tracked = false) override;
  118. Status Delete(const SliceParts& key) override { return Delete(nullptr, key); }
  119. Status SingleDelete(ColumnFamilyHandle* column_family, const Slice& key,
  120. const bool assume_tracked = false) override;
  121. Status SingleDelete(const Slice& key) override {
  122. return SingleDelete(nullptr, key);
  123. }
  124. Status SingleDelete(ColumnFamilyHandle* column_family, const SliceParts& key,
  125. const bool assume_tracked = false) override;
  126. Status SingleDelete(const SliceParts& key) override {
  127. return SingleDelete(nullptr, key);
  128. }
  129. Status PutUntracked(ColumnFamilyHandle* column_family, const Slice& key,
  130. const Slice& value) override;
  131. Status PutUntracked(const Slice& key, const Slice& value) override {
  132. return PutUntracked(nullptr, key, value);
  133. }
  134. Status PutUntracked(ColumnFamilyHandle* column_family, const SliceParts& key,
  135. const SliceParts& value) override;
  136. Status PutUntracked(const SliceParts& key, const SliceParts& value) override {
  137. return PutUntracked(nullptr, key, value);
  138. }
  139. Status MergeUntracked(ColumnFamilyHandle* column_family, const Slice& key,
  140. const Slice& value) override;
  141. Status MergeUntracked(const Slice& key, const Slice& value) override {
  142. return MergeUntracked(nullptr, key, value);
  143. }
  144. Status DeleteUntracked(ColumnFamilyHandle* column_family,
  145. const Slice& key) override;
  146. Status DeleteUntracked(const Slice& key) override {
  147. return DeleteUntracked(nullptr, key);
  148. }
  149. Status DeleteUntracked(ColumnFamilyHandle* column_family,
  150. const SliceParts& key) override;
  151. Status DeleteUntracked(const SliceParts& key) override {
  152. return DeleteUntracked(nullptr, key);
  153. }
  154. Status SingleDeleteUntracked(ColumnFamilyHandle* column_family,
  155. const Slice& key) override;
  156. Status SingleDeleteUntracked(const Slice& key) override {
  157. return SingleDeleteUntracked(nullptr, key);
  158. }
  159. void PutLogData(const Slice& blob) override;
  160. WriteBatchWithIndex* GetWriteBatch() override;
  161. virtual void SetLockTimeout(int64_t /*timeout*/) override { /* Do nothing */
  162. }
  163. const Snapshot* GetSnapshot() const override {
  164. return snapshot_ ? snapshot_.get() : nullptr;
  165. }
  166. virtual void SetSnapshot() override;
  167. void SetSnapshotOnNextOperation(
  168. std::shared_ptr<TransactionNotifier> notifier = nullptr) override;
  169. void ClearSnapshot() override {
  170. snapshot_.reset();
  171. snapshot_needed_ = false;
  172. snapshot_notifier_ = nullptr;
  173. }
  174. void DisableIndexing() override { indexing_enabled_ = false; }
  175. void EnableIndexing() override { indexing_enabled_ = true; }
  176. uint64_t GetElapsedTime() const override;
  177. uint64_t GetNumPuts() const override;
  178. uint64_t GetNumDeletes() const override;
  179. uint64_t GetNumMerges() const override;
  180. uint64_t GetNumKeys() const override;
  181. void UndoGetForUpdate(ColumnFamilyHandle* column_family,
  182. const Slice& key) override;
  183. void UndoGetForUpdate(const Slice& key) override {
  184. return UndoGetForUpdate(nullptr, key);
  185. };
  186. // Get list of keys in this transaction that must not have any conflicts
  187. // with writes in other transactions.
  188. const TransactionKeyMap& GetTrackedKeys() const { return tracked_keys_; }
  189. WriteOptions* GetWriteOptions() override { return &write_options_; }
  190. void SetWriteOptions(const WriteOptions& write_options) override {
  191. write_options_ = write_options;
  192. }
  193. // Used for memory management for snapshot_
  194. void ReleaseSnapshot(const Snapshot* snapshot, DB* db);
  195. // iterates over the given batch and makes the appropriate inserts.
  196. // used for rebuilding prepared transactions after recovery.
  197. virtual Status RebuildFromWriteBatch(WriteBatch* src_batch) override;
  198. WriteBatch* GetCommitTimeWriteBatch() override;
  199. protected:
  200. // Add a key to the list of tracked keys.
  201. //
  202. // seqno is the earliest seqno this key was involved with this transaction.
  203. // readonly should be set to true if no data was written for this key
  204. void TrackKey(uint32_t cfh_id, const std::string& key, SequenceNumber seqno,
  205. bool readonly, bool exclusive);
  206. // Helper function to add a key to the given TransactionKeyMap
  207. static void TrackKey(TransactionKeyMap* key_map, uint32_t cfh_id,
  208. const std::string& key, SequenceNumber seqno,
  209. bool readonly, bool exclusive);
  210. // Called when UndoGetForUpdate determines that this key can be unlocked.
  211. virtual void UnlockGetForUpdate(ColumnFamilyHandle* column_family,
  212. const Slice& key) = 0;
  213. std::unique_ptr<TransactionKeyMap> GetTrackedKeysSinceSavePoint();
  214. // Sets a snapshot if SetSnapshotOnNextOperation() has been called.
  215. void SetSnapshotIfNeeded();
  216. // Initialize write_batch_ for 2PC by inserting Noop.
  217. inline void InitWriteBatch(bool clear = false) {
  218. if (clear) {
  219. write_batch_.Clear();
  220. }
  221. assert(write_batch_.GetDataSize() == WriteBatchInternal::kHeader);
  222. WriteBatchInternal::InsertNoop(write_batch_.GetWriteBatch());
  223. }
  224. DB* db_;
  225. DBImpl* dbimpl_;
  226. WriteOptions write_options_;
  227. const Comparator* cmp_;
  228. // Stores that time the txn was constructed, in microseconds.
  229. uint64_t start_time_;
  230. // Stores the current snapshot that was set by SetSnapshot or null if
  231. // no snapshot is currently set.
  232. std::shared_ptr<const Snapshot> snapshot_;
  233. // Count of various operations pending in this transaction
  234. uint64_t num_puts_ = 0;
  235. uint64_t num_deletes_ = 0;
  236. uint64_t num_merges_ = 0;
  237. struct SavePoint {
  238. std::shared_ptr<const Snapshot> snapshot_;
  239. bool snapshot_needed_ = false;
  240. std::shared_ptr<TransactionNotifier> snapshot_notifier_;
  241. uint64_t num_puts_ = 0;
  242. uint64_t num_deletes_ = 0;
  243. uint64_t num_merges_ = 0;
  244. // Record all keys tracked since the last savepoint
  245. TransactionKeyMap new_keys_;
  246. SavePoint(std::shared_ptr<const Snapshot> snapshot, bool snapshot_needed,
  247. std::shared_ptr<TransactionNotifier> snapshot_notifier,
  248. uint64_t num_puts, uint64_t num_deletes, uint64_t num_merges)
  249. : snapshot_(snapshot),
  250. snapshot_needed_(snapshot_needed),
  251. snapshot_notifier_(snapshot_notifier),
  252. num_puts_(num_puts),
  253. num_deletes_(num_deletes),
  254. num_merges_(num_merges) {}
  255. SavePoint() = default;
  256. };
  257. // Records writes pending in this transaction
  258. WriteBatchWithIndex write_batch_;
  259. // Map from column_family_id to map of keys that are involved in this
  260. // transaction.
  261. // For Pessimistic Transactions this is the list of locked keys.
  262. // Optimistic Transactions will wait till commit time to do conflict checking.
  263. TransactionKeyMap tracked_keys_;
  264. // Stack of the Snapshot saved at each save point. Saved snapshots may be
  265. // nullptr if there was no snapshot at the time SetSavePoint() was called.
  266. std::unique_ptr<std::stack<TransactionBaseImpl::SavePoint,
  267. autovector<TransactionBaseImpl::SavePoint>>>
  268. save_points_;
  269. private:
  270. friend class WritePreparedTxn;
  271. // Extra data to be persisted with the commit. Note this is only used when
  272. // prepare phase is not skipped.
  273. WriteBatch commit_time_batch_;
  274. // If true, future Put/Merge/Deletes will be indexed in the
  275. // WriteBatchWithIndex.
  276. // If false, future Put/Merge/Deletes will be inserted directly into the
  277. // underlying WriteBatch and not indexed in the WriteBatchWithIndex.
  278. bool indexing_enabled_;
  279. // SetSnapshotOnNextOperation() has been called and the snapshot has not yet
  280. // been reset.
  281. bool snapshot_needed_ = false;
  282. // SetSnapshotOnNextOperation() has been called and the caller would like
  283. // a notification through the TransactionNotifier interface
  284. std::shared_ptr<TransactionNotifier> snapshot_notifier_ = nullptr;
  285. Status TryLock(ColumnFamilyHandle* column_family, const SliceParts& key,
  286. bool read_only, bool exclusive, const bool do_validate = true,
  287. const bool assume_tracked = false);
  288. WriteBatchBase* GetBatchForWrite();
  289. void SetSnapshotInternal(const Snapshot* snapshot);
  290. };
  291. } // namespace ROCKSDB_NAMESPACE
  292. #endif // ROCKSDB_LITE