transaction_base.h 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #pragma once
  6. #include <stack>
  7. #include <string>
  8. #include <vector>
  9. #include "db/write_batch_internal.h"
  10. #include "rocksdb/db.h"
  11. #include "rocksdb/slice.h"
  12. #include "rocksdb/snapshot.h"
  13. #include "rocksdb/status.h"
  14. #include "rocksdb/types.h"
  15. #include "rocksdb/utilities/transaction.h"
  16. #include "rocksdb/utilities/transaction_db.h"
  17. #include "rocksdb/utilities/write_batch_with_index.h"
  18. #include "util/autovector.h"
  19. #include "utilities/transactions/lock/lock_tracker.h"
  20. #include "utilities/transactions/transaction_util.h"
  21. namespace ROCKSDB_NAMESPACE {
  22. class TransactionBaseImpl : public Transaction {
  23. public:
  24. TransactionBaseImpl(DB* db, const WriteOptions& write_options,
  25. const LockTrackerFactory& lock_tracker_factory);
  26. ~TransactionBaseImpl() override;
  27. // Remove pending operations queued in this transaction.
  28. virtual void Clear();
  29. void Reinitialize(DB* db, const WriteOptions& write_options);
  30. // Called before executing Put, PutEntity, Merge, Delete, and GetForUpdate. If
  31. // TryLock returns non-OK, the Put/PutEntity/Merge/Delete/GetForUpdate will be
  32. // failed. do_validate will be false if called from PutUntracked,
  33. // PutEntityUntracked, DeleteUntracked, MergeUntracked, or
  34. // GetForUpdate(do_validate=false)
  35. virtual Status TryLock(ColumnFamilyHandle* column_family, const Slice& key,
  36. bool read_only, bool exclusive,
  37. const bool do_validate = true,
  38. const bool assume_tracked = false) = 0;
  39. void SetSavePoint() override;
  40. Status RollbackToSavePoint() override;
  41. Status PopSavePoint() override;
  42. using Transaction::Get;
  43. Status Get(const ReadOptions& _read_options,
  44. ColumnFamilyHandle* column_family, const Slice& key,
  45. std::string* value) override;
  46. Status Get(const ReadOptions& _read_options,
  47. ColumnFamilyHandle* column_family, const Slice& key,
  48. PinnableSlice* value) override;
  49. Status Get(const ReadOptions& options, const Slice& key,
  50. std::string* value) override {
  51. return Get(options, db_->DefaultColumnFamily(), key, value);
  52. }
  53. Status GetEntity(const ReadOptions& options,
  54. ColumnFamilyHandle* column_family, const Slice& key,
  55. PinnableWideColumns* columns) override;
  56. using Transaction::GetForUpdate;
  57. Status GetForUpdate(const ReadOptions& options,
  58. ColumnFamilyHandle* column_family, const Slice& key,
  59. std::string* value, bool exclusive,
  60. const bool do_validate) override;
  61. Status GetForUpdate(const ReadOptions& options,
  62. ColumnFamilyHandle* column_family, const Slice& key,
  63. PinnableSlice* pinnable_val, bool exclusive,
  64. const bool do_validate) override;
  65. Status GetForUpdate(const ReadOptions& options, const Slice& key,
  66. std::string* value, bool exclusive,
  67. const bool do_validate) override {
  68. return GetForUpdate(options, db_->DefaultColumnFamily(), key, value,
  69. exclusive, do_validate);
  70. }
  71. Status GetForUpdate(const ReadOptions& options, const Slice& key,
  72. PinnableSlice* pinnable_val, bool exclusive,
  73. const bool do_validate) override {
  74. return GetForUpdate(options, db_->DefaultColumnFamily(), key, pinnable_val,
  75. exclusive, do_validate);
  76. }
  77. Status GetEntityForUpdate(const ReadOptions& read_options,
  78. ColumnFamilyHandle* column_family, const Slice& key,
  79. PinnableWideColumns* columns, bool exclusive = true,
  80. bool do_validate = true) override;
  81. using Transaction::MultiGet;
  82. std::vector<Status> MultiGet(
  83. const ReadOptions& _read_options,
  84. const std::vector<ColumnFamilyHandle*>& column_family,
  85. const std::vector<Slice>& keys,
  86. std::vector<std::string>* values) override;
  87. std::vector<Status> MultiGet(const ReadOptions& options,
  88. const std::vector<Slice>& keys,
  89. std::vector<std::string>* values) override {
  90. return MultiGet(options,
  91. std::vector<ColumnFamilyHandle*>(
  92. keys.size(), db_->DefaultColumnFamily()),
  93. keys, values);
  94. }
  95. void MultiGet(const ReadOptions& _read_options,
  96. ColumnFamilyHandle* column_family, const size_t num_keys,
  97. const Slice* keys, PinnableSlice* values, Status* statuses,
  98. const bool sorted_input = false) override;
  99. void MultiGetEntity(const ReadOptions& options,
  100. ColumnFamilyHandle* column_family, size_t num_keys,
  101. const Slice* keys, PinnableWideColumns* results,
  102. Status* statuses, bool sorted_input = false) override;
  103. using Transaction::MultiGetForUpdate;
  104. std::vector<Status> MultiGetForUpdate(
  105. const ReadOptions& options,
  106. const std::vector<ColumnFamilyHandle*>& column_family,
  107. const std::vector<Slice>& keys,
  108. std::vector<std::string>* values) override;
  109. std::vector<Status> MultiGetForUpdate(
  110. const ReadOptions& options, const std::vector<Slice>& keys,
  111. std::vector<std::string>* values) override {
  112. return MultiGetForUpdate(options,
  113. std::vector<ColumnFamilyHandle*>(
  114. keys.size(), db_->DefaultColumnFamily()),
  115. keys, values);
  116. }
  117. Iterator* GetIterator(const ReadOptions& read_options) override;
  118. Iterator* GetIterator(const ReadOptions& read_options,
  119. ColumnFamilyHandle* column_family) override;
  120. std::unique_ptr<Iterator> GetCoalescingIterator(
  121. const ReadOptions& read_options,
  122. const std::vector<ColumnFamilyHandle*>& column_families) override;
  123. std::unique_ptr<AttributeGroupIterator> GetAttributeGroupIterator(
  124. const ReadOptions& read_options,
  125. const std::vector<ColumnFamilyHandle*>& column_families) override;
  126. Status Put(ColumnFamilyHandle* column_family, const Slice& key,
  127. const Slice& value, const bool assume_tracked = false) override;
  128. Status Put(const Slice& key, const Slice& value) override {
  129. return Put(nullptr, key, value);
  130. }
  131. Status Put(ColumnFamilyHandle* column_family, const SliceParts& key,
  132. const SliceParts& value,
  133. const bool assume_tracked = false) override;
  134. Status Put(const SliceParts& key, const SliceParts& value) override {
  135. return Put(nullptr, key, value);
  136. }
  137. Status PutEntity(ColumnFamilyHandle* column_family, const Slice& key,
  138. const WideColumns& columns,
  139. bool assume_tracked = false) override {
  140. const bool do_validate = !assume_tracked;
  141. return PutEntityImpl(column_family, key, columns, do_validate,
  142. assume_tracked);
  143. }
  144. Status Merge(ColumnFamilyHandle* column_family, const Slice& key,
  145. const Slice& value, const bool assume_tracked = false) override;
  146. Status Merge(const Slice& key, const Slice& value) override {
  147. return Merge(nullptr, key, value);
  148. }
  149. Status Delete(ColumnFamilyHandle* column_family, const Slice& key,
  150. const bool assume_tracked = false) override;
  151. Status Delete(const Slice& key) override { return Delete(nullptr, key); }
  152. Status Delete(ColumnFamilyHandle* column_family, const SliceParts& key,
  153. const bool assume_tracked = false) override;
  154. Status Delete(const SliceParts& key) override { return Delete(nullptr, key); }
  155. Status SingleDelete(ColumnFamilyHandle* column_family, const Slice& key,
  156. const bool assume_tracked = false) override;
  157. Status SingleDelete(const Slice& key) override {
  158. return SingleDelete(nullptr, key);
  159. }
  160. Status SingleDelete(ColumnFamilyHandle* column_family, const SliceParts& key,
  161. const bool assume_tracked = false) override;
  162. Status SingleDelete(const SliceParts& key) override {
  163. return SingleDelete(nullptr, key);
  164. }
  165. Status PutUntracked(ColumnFamilyHandle* column_family, const Slice& key,
  166. const Slice& value) override;
  167. Status PutUntracked(const Slice& key, const Slice& value) override {
  168. return PutUntracked(nullptr, key, value);
  169. }
  170. Status PutUntracked(ColumnFamilyHandle* column_family, const SliceParts& key,
  171. const SliceParts& value) override;
  172. Status PutUntracked(const SliceParts& key, const SliceParts& value) override {
  173. return PutUntracked(nullptr, key, value);
  174. }
  175. Status PutEntityUntracked(ColumnFamilyHandle* column_family, const Slice& key,
  176. const WideColumns& columns) override {
  177. constexpr bool do_validate = false;
  178. constexpr bool assume_tracked = false;
  179. return PutEntityImpl(column_family, key, columns, do_validate,
  180. assume_tracked);
  181. }
  182. Status MergeUntracked(ColumnFamilyHandle* column_family, const Slice& key,
  183. const Slice& value) override;
  184. Status MergeUntracked(const Slice& key, const Slice& value) override {
  185. return MergeUntracked(nullptr, key, value);
  186. }
  187. Status DeleteUntracked(ColumnFamilyHandle* column_family,
  188. const Slice& key) override;
  189. Status DeleteUntracked(const Slice& key) override {
  190. return DeleteUntracked(nullptr, key);
  191. }
  192. Status DeleteUntracked(ColumnFamilyHandle* column_family,
  193. const SliceParts& key) override;
  194. Status DeleteUntracked(const SliceParts& key) override {
  195. return DeleteUntracked(nullptr, key);
  196. }
  197. Status SingleDeleteUntracked(ColumnFamilyHandle* column_family,
  198. const Slice& key) override;
  199. Status SingleDeleteUntracked(const Slice& key) override {
  200. return SingleDeleteUntracked(nullptr, key);
  201. }
  202. void PutLogData(const Slice& blob) override;
  203. WriteBatchWithIndex* GetWriteBatch() override;
  204. void SetLockTimeout(int64_t /*timeout*/) override { /* Do nothing */ }
  205. void SetDeadlockTimeout(int64_t /*timeout*/) override { /* Do nothing */ }
  206. const Snapshot* GetSnapshot() const override {
  207. // will return nullptr when there is no snapshot
  208. return snapshot_.get();
  209. }
  210. std::shared_ptr<const Snapshot> GetTimestampedSnapshot() const override {
  211. return snapshot_;
  212. }
  213. void SetSnapshot() override;
  214. void SetSnapshotOnNextOperation(
  215. std::shared_ptr<TransactionNotifier> notifier = nullptr) override;
  216. void ClearSnapshot() override {
  217. snapshot_.reset();
  218. snapshot_needed_ = false;
  219. snapshot_notifier_ = nullptr;
  220. }
  221. void DisableIndexing() override { indexing_enabled_ = false; }
  222. void EnableIndexing() override { indexing_enabled_ = true; }
  223. bool IndexingEnabled() const { return indexing_enabled_; }
  224. uint64_t GetElapsedTime() const override;
  225. uint64_t GetNumPuts() const override;
  226. uint64_t GetNumPutEntities() const override;
  227. uint64_t GetNumDeletes() const override;
  228. uint64_t GetNumMerges() const override;
  229. uint64_t GetNumKeys() const override;
  230. void UndoGetForUpdate(ColumnFamilyHandle* column_family,
  231. const Slice& key) override;
  232. void UndoGetForUpdate(const Slice& key) override {
  233. return UndoGetForUpdate(nullptr, key);
  234. }
  235. WriteOptions* GetWriteOptions() override { return &write_options_; }
  236. void SetWriteOptions(const WriteOptions& write_options) override {
  237. write_options_ = write_options;
  238. }
  239. // Used for memory management for snapshot_
  240. void ReleaseSnapshot(const Snapshot* snapshot, DB* db);
  241. // iterates over the given batch and makes the appropriate inserts.
  242. // used for rebuilding prepared transactions after recovery.
  243. Status RebuildFromWriteBatch(WriteBatch* src_batch) override;
  244. WriteBatch* GetCommitTimeWriteBatch() override;
  245. LockTracker& GetTrackedLocks() { return *tracked_locks_; }
  246. protected:
  247. ColumnFamilyHandle* DefaultColumnFamily() const {
  248. assert(db_);
  249. return db_->DefaultColumnFamily();
  250. }
  251. template <typename IterType, typename ImplType,
  252. typename ErrorIteratorFuncType>
  253. std::unique_ptr<IterType> NewMultiCfIterator(
  254. const ReadOptions& read_options,
  255. const std::vector<ColumnFamilyHandle*>& column_families,
  256. ErrorIteratorFuncType error_iterator_func);
  257. Status GetImpl(const ReadOptions& options, ColumnFamilyHandle* column_family,
  258. const Slice& key, std::string* value) override;
  259. Status GetImpl(const ReadOptions& options, ColumnFamilyHandle* column_family,
  260. const Slice& key, PinnableSlice* value) override;
  261. Status GetEntityImpl(const ReadOptions& options,
  262. ColumnFamilyHandle* column_family, const Slice& key,
  263. PinnableWideColumns* columns) {
  264. return write_batch_.GetEntityFromBatchAndDB(db_, options, column_family,
  265. key, columns);
  266. }
  267. void MultiGetEntityImpl(const ReadOptions& options,
  268. ColumnFamilyHandle* column_family, size_t num_keys,
  269. const Slice* keys, PinnableWideColumns* results,
  270. Status* statuses, bool sorted_input) {
  271. write_batch_.MultiGetEntityFromBatchAndDB(db_, options, column_family,
  272. num_keys, keys, results, statuses,
  273. sorted_input);
  274. }
  275. Status PutEntityImpl(ColumnFamilyHandle* column_family, const Slice& key,
  276. const WideColumns& columns, bool do_validate,
  277. bool assume_tracked);
  278. // Add a key to the list of tracked keys.
  279. //
  280. // seqno is the earliest seqno this key was involved with this transaction.
  281. // readonly should be set to true if no data was written for this key
  282. void TrackKey(uint32_t cfh_id, const std::string& key, SequenceNumber seqno,
  283. bool readonly, bool exclusive);
  284. // Called when UndoGetForUpdate determines that this key can be unlocked.
  285. virtual void UnlockGetForUpdate(ColumnFamilyHandle* column_family,
  286. const Slice& key) = 0;
  287. // Sets a snapshot if SetSnapshotOnNextOperation() has been called.
  288. void SetSnapshotIfNeeded();
  289. // Initialize write_batch_ for 2PC by inserting Noop.
  290. inline void InitWriteBatch(bool clear = false) {
  291. if (clear) {
  292. write_batch_.Clear();
  293. }
  294. assert(write_batch_.GetDataSize() == WriteBatchInternal::kHeader);
  295. auto s = WriteBatchInternal::InsertNoop(write_batch_.GetWriteBatch());
  296. assert(s.ok());
  297. }
  298. WriteBatchBase* GetBatchForWrite();
  299. DB* db_;
  300. DBImpl* dbimpl_;
  301. WriteOptions write_options_;
  302. const Comparator* cmp_;
  303. const LockTrackerFactory& lock_tracker_factory_;
  304. // Stores that time the txn was constructed, in microseconds.
  305. uint64_t start_time_;
  306. // Stores the current snapshot that was set by SetSnapshot or null if
  307. // no snapshot is currently set.
  308. std::shared_ptr<const Snapshot> snapshot_;
  309. // Count of various operations pending in this transaction
  310. uint64_t num_puts_ = 0;
  311. uint64_t num_put_entities_ = 0;
  312. uint64_t num_deletes_ = 0;
  313. uint64_t num_merges_ = 0;
  314. struct SavePoint {
  315. std::shared_ptr<const Snapshot> snapshot_;
  316. bool snapshot_needed_ = false;
  317. std::shared_ptr<TransactionNotifier> snapshot_notifier_;
  318. uint64_t num_puts_ = 0;
  319. uint64_t num_put_entities_ = 0;
  320. uint64_t num_deletes_ = 0;
  321. uint64_t num_merges_ = 0;
  322. // Record all locks tracked since the last savepoint
  323. std::shared_ptr<LockTracker> new_locks_;
  324. SavePoint(std::shared_ptr<const Snapshot> snapshot, bool snapshot_needed,
  325. std::shared_ptr<TransactionNotifier> snapshot_notifier,
  326. uint64_t num_puts, uint64_t num_put_entities,
  327. uint64_t num_deletes, uint64_t num_merges,
  328. const LockTrackerFactory& lock_tracker_factory)
  329. : snapshot_(snapshot),
  330. snapshot_needed_(snapshot_needed),
  331. snapshot_notifier_(snapshot_notifier),
  332. num_puts_(num_puts),
  333. num_put_entities_(num_put_entities),
  334. num_deletes_(num_deletes),
  335. num_merges_(num_merges),
  336. new_locks_(lock_tracker_factory.Create()) {}
  337. explicit SavePoint(const LockTrackerFactory& lock_tracker_factory)
  338. : new_locks_(lock_tracker_factory.Create()) {}
  339. };
  340. // Records writes pending in this transaction
  341. WriteBatchWithIndex write_batch_;
  342. // For Pessimistic Transactions this is the set of acquired locks.
  343. // Optimistic Transactions will keep note the requested locks (not actually
  344. // locked), and do conflict checking until commit time based on the tracked
  345. // lock requests.
  346. std::unique_ptr<LockTracker> tracked_locks_;
  347. // Stack of the Snapshot saved at each save point. Saved snapshots may be
  348. // nullptr if there was no snapshot at the time SetSavePoint() was called.
  349. std::unique_ptr<std::stack<TransactionBaseImpl::SavePoint,
  350. autovector<TransactionBaseImpl::SavePoint>>>
  351. save_points_;
  352. private:
  353. friend class WriteCommittedTxn;
  354. friend class WritePreparedTxn;
  355. // Extra data to be persisted with the commit. Note this is only used when
  356. // prepare phase is not skipped.
  357. WriteBatch commit_time_batch_;
  358. // If true, future Put/PutEntity/Merge/Delete operations will be indexed in
  359. // the WriteBatchWithIndex. If false, future Put/PutEntity/Merge/Delete
  360. // operations will be inserted directly into the underlying WriteBatch and not
  361. // indexed in the WriteBatchWithIndex.
  362. bool indexing_enabled_;
  363. // SetSnapshotOnNextOperation() has been called and the snapshot has not yet
  364. // been reset.
  365. bool snapshot_needed_ = false;
  366. // SetSnapshotOnNextOperation() has been called and the caller would like
  367. // a notification through the TransactionNotifier interface
  368. std::shared_ptr<TransactionNotifier> snapshot_notifier_ = nullptr;
  369. Status TryLock(ColumnFamilyHandle* column_family, const SliceParts& key,
  370. bool read_only, bool exclusive, const bool do_validate = true,
  371. const bool assume_tracked = false);
  372. void SetSnapshotInternal(const Snapshot* snapshot);
  373. };
  374. } // namespace ROCKSDB_NAMESPACE