write_unprepared_txn.h 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #pragma once
  6. #include <set>
  7. #include "utilities/transactions/write_prepared_txn.h"
  8. #include "utilities/transactions/write_unprepared_txn_db.h"
  9. namespace ROCKSDB_NAMESPACE {
  10. class WriteUnpreparedTxnDB;
  11. class WriteUnpreparedTxn;
  12. // WriteUnprepared transactions needs to be able to read their own uncommitted
  13. // writes, and supporting this requires some careful consideration. Because
  14. // writes in the current transaction may be flushed to DB already, we cannot
  15. // rely on the contents of WriteBatchWithIndex to determine whether a key should
  16. // be visible or not, so we have to remember to check the DB for any uncommitted
  17. // keys that should be visible to us. First, we will need to change the seek to
  18. // snapshot logic, to seek to max_visible_seq = max(snap_seq, max_unprep_seq).
  19. // Any key greater than max_visible_seq should not be visible because they
  20. // cannot be unprepared by the current transaction and they are not in its
  21. // snapshot.
  22. //
  23. // When we seek to max_visible_seq, one of these cases will happen:
  24. // 1. We hit a unprepared key from the current transaction.
  25. // 2. We hit a unprepared key from the another transaction.
  26. // 3. We hit a committed key with snap_seq < seq < max_unprep_seq.
  27. // 4. We hit a committed key with seq <= snap_seq.
  28. //
  29. // IsVisibleFullCheck handles all cases correctly.
  30. //
  31. // Other notes:
  32. // Note that max_visible_seq is only calculated once at iterator construction
  33. // time, meaning if the same transaction is adding more unprep seqs through
  34. // writes during iteration, these newer writes may not be visible. This is not a
  35. // problem for MySQL though because it avoids modifying the index as it is
  36. // scanning through it to avoid the Halloween Problem. Instead, it scans the
  37. // index once up front, and modifies based on a temporary copy.
  38. //
  39. // In DBIter, there is a "reseek" optimization if the iterator skips over too
  40. // many keys. However, this assumes that the reseek seeks exactly to the
  41. // required key. In write unprepared, even after seeking directly to
  42. // max_visible_seq, some iteration may be required before hitting a visible key,
  43. // and special precautions must be taken to avoid performing another reseek,
  44. // leading to an infinite loop.
  45. //
  46. class WriteUnpreparedTxnReadCallback : public ReadCallback {
  47. public:
  48. WriteUnpreparedTxnReadCallback(
  49. WritePreparedTxnDB* db, SequenceNumber snapshot,
  50. SequenceNumber min_uncommitted,
  51. const std::map<SequenceNumber, size_t>& unprep_seqs,
  52. SnapshotBackup backed_by_snapshot)
  53. // Pass our last uncommitted seq as the snapshot to the parent class to
  54. // ensure that the parent will not prematurely filter out own writes. We
  55. // will do the exact comparison against snapshots in IsVisibleFullCheck
  56. // override.
  57. : ReadCallback(CalcMaxVisibleSeq(unprep_seqs, snapshot), min_uncommitted),
  58. db_(db),
  59. unprep_seqs_(unprep_seqs),
  60. wup_snapshot_(snapshot),
  61. backed_by_snapshot_(backed_by_snapshot) {
  62. (void)backed_by_snapshot_; // to silence unused private field warning
  63. }
  64. virtual ~WriteUnpreparedTxnReadCallback() {
  65. // If it is not backed by snapshot, the caller must check validity
  66. assert(valid_checked_ || backed_by_snapshot_ == kBackedByDBSnapshot);
  67. }
  68. bool IsVisibleFullCheck(SequenceNumber seq) override;
  69. inline bool valid() {
  70. valid_checked_ = true;
  71. return snap_released_ == false;
  72. }
  73. void Refresh(SequenceNumber seq) override {
  74. max_visible_seq_ = std::max(max_visible_seq_, seq);
  75. wup_snapshot_ = seq;
  76. }
  77. static SequenceNumber CalcMaxVisibleSeq(
  78. const std::map<SequenceNumber, size_t>& unprep_seqs,
  79. SequenceNumber snapshot_seq) {
  80. SequenceNumber max_unprepared = 0;
  81. if (unprep_seqs.size()) {
  82. max_unprepared =
  83. unprep_seqs.rbegin()->first + unprep_seqs.rbegin()->second - 1;
  84. }
  85. return std::max(max_unprepared, snapshot_seq);
  86. }
  87. private:
  88. WritePreparedTxnDB* db_;
  89. const std::map<SequenceNumber, size_t>& unprep_seqs_;
  90. SequenceNumber wup_snapshot_;
  91. // Whether max_visible_seq_ is backed by a snapshot
  92. const SnapshotBackup backed_by_snapshot_;
  93. bool snap_released_ = false;
  94. // Safety check to ensure that the caller has checked invalid statuses
  95. bool valid_checked_ = false;
  96. };
  97. class WriteUnpreparedTxn : public WritePreparedTxn {
  98. public:
  99. WriteUnpreparedTxn(WriteUnpreparedTxnDB* db,
  100. const WriteOptions& write_options,
  101. const TransactionOptions& txn_options);
  102. virtual ~WriteUnpreparedTxn();
  103. using TransactionBaseImpl::Put;
  104. Status Put(ColumnFamilyHandle* column_family, const Slice& key,
  105. const Slice& value, const bool assume_tracked = false) override;
  106. Status Put(ColumnFamilyHandle* column_family, const SliceParts& key,
  107. const SliceParts& value,
  108. const bool assume_tracked = false) override;
  109. using TransactionBaseImpl::Merge;
  110. Status Merge(ColumnFamilyHandle* column_family, const Slice& key,
  111. const Slice& value, const bool assume_tracked = false) override;
  112. using TransactionBaseImpl::Delete;
  113. Status Delete(ColumnFamilyHandle* column_family, const Slice& key,
  114. const bool assume_tracked = false) override;
  115. Status Delete(ColumnFamilyHandle* column_family, const SliceParts& key,
  116. const bool assume_tracked = false) override;
  117. using TransactionBaseImpl::SingleDelete;
  118. Status SingleDelete(ColumnFamilyHandle* column_family, const Slice& key,
  119. const bool assume_tracked = false) override;
  120. Status SingleDelete(ColumnFamilyHandle* column_family, const SliceParts& key,
  121. const bool assume_tracked = false) override;
  122. // In WriteUnprepared, untracked writes will break snapshot validation logic.
  123. // Snapshot validation will only check the largest sequence number of a key to
  124. // see if it was committed or not. However, an untracked unprepared write will
  125. // hide smaller committed sequence numbers.
  126. //
  127. // TODO(lth): Investigate whether it is worth having snapshot validation
  128. // validate all values larger than snap_seq. Otherwise, we should return
  129. // Status::NotSupported for untracked writes.
  130. Status RebuildFromWriteBatch(WriteBatch*) override;
  131. uint64_t GetLastLogNumber() const override { return last_log_number_; }
  132. void RemoveActiveIterator(Iterator* iter) {
  133. active_iterators_.erase(
  134. std::remove(active_iterators_.begin(), active_iterators_.end(), iter),
  135. active_iterators_.end());
  136. }
  137. protected:
  138. void Initialize(const TransactionOptions& txn_options) override;
  139. Status PrepareInternal() override;
  140. Status CommitWithoutPrepareInternal() override;
  141. Status CommitInternal() override;
  142. Status RollbackInternal() override;
  143. void Clear() override;
  144. void SetSavePoint() override;
  145. Status RollbackToSavePoint() override;
  146. Status PopSavePoint() override;
  147. // Get and GetIterator needs to be overridden so that a ReadCallback to
  148. // handle read-your-own-write is used.
  149. using Transaction::Get;
  150. Status Get(const ReadOptions& _read_options,
  151. ColumnFamilyHandle* column_family, const Slice& key,
  152. PinnableSlice* value) override;
  153. using Transaction::MultiGet;
  154. void MultiGet(const ReadOptions& _read_options,
  155. ColumnFamilyHandle* column_family, const size_t num_keys,
  156. const Slice* keys, PinnableSlice* values, Status* statuses,
  157. const bool sorted_input = false) override;
  158. using Transaction::GetIterator;
  159. Iterator* GetIterator(const ReadOptions& options) override;
  160. Iterator* GetIterator(const ReadOptions& options,
  161. ColumnFamilyHandle* column_family) override;
  162. Status ValidateSnapshot(ColumnFamilyHandle* column_family, const Slice& key,
  163. SequenceNumber* tracked_at_seq) override;
  164. private:
  165. friend class WriteUnpreparedTransactionTest_ReadYourOwnWrite_Test;
  166. friend class WriteUnpreparedTransactionTest_RecoveryTest_Test;
  167. friend class WriteUnpreparedTransactionTest_UnpreparedBatch_Test;
  168. friend class WriteUnpreparedTxnDB;
  169. const std::map<SequenceNumber, size_t>& GetUnpreparedSequenceNumbers();
  170. using Transaction::GetImpl;
  171. Status GetImpl(const ReadOptions& options, ColumnFamilyHandle* column_family,
  172. const Slice& key, PinnableSlice* value) override;
  173. Status WriteRollbackKeys(const LockTracker& tracked_keys,
  174. WriteBatchWithIndex* rollback_batch,
  175. ReadCallback* callback, const ReadOptions& roptions);
  176. Status MaybeFlushWriteBatchToDB();
  177. Status FlushWriteBatchToDB(bool prepared);
  178. Status FlushWriteBatchToDBInternal(bool prepared);
  179. Status FlushWriteBatchWithSavePointToDB();
  180. Status RollbackToSavePointInternal();
  181. Status HandleWrite(std::function<Status()> do_write);
  182. // For write unprepared, we check on every writebatch append to see if
  183. // write_batch_flush_threshold_ has been exceeded, and then call
  184. // FlushWriteBatchToDB if so. This logic is encapsulated in
  185. // MaybeFlushWriteBatchToDB.
  186. int64_t write_batch_flush_threshold_;
  187. WriteUnpreparedTxnDB* wupt_db_;
  188. // Ordered list of unprep_seq sequence numbers that we have already written
  189. // to DB.
  190. //
  191. // This maps unprep_seq => prepare_batch_cnt for each unprepared batch
  192. // written by this transaction.
  193. //
  194. // Note that this contains both prepared and unprepared batches, since they
  195. // are treated similarily in prepare heap/commit map, so it simplifies the
  196. // commit callbacks.
  197. std::map<SequenceNumber, size_t> unprep_seqs_;
  198. uint64_t last_log_number_;
  199. // Recovered transactions have tracked_keys_ populated, but are not actually
  200. // locked for efficiency reasons. For recovered transactions, skip unlocking
  201. // keys when transaction ends.
  202. bool recovered_txn_;
  203. // Track the largest sequence number at which we performed snapshot
  204. // validation. If snapshot validation was skipped because no snapshot was set,
  205. // then this is set to GetLastPublishedSequence. This value is useful because
  206. // it means that for keys that have unprepared seqnos, we can guarantee that
  207. // no committed keys by other transactions can exist between
  208. // largest_validated_seq_ and max_unprep_seq. See
  209. // WriteUnpreparedTxnDB::NewIterator for an explanation for why this is
  210. // necessary for iterator Prev().
  211. //
  212. // Currently this value only increases during the lifetime of a transaction,
  213. // but in some cases, we should be able to restore the previously largest
  214. // value when calling RollbackToSavepoint.
  215. SequenceNumber largest_validated_seq_;
  216. struct SavePoint {
  217. // Record of unprep_seqs_ at this savepoint. The set of unprep_seq is
  218. // used during RollbackToSavepoint to determine visibility when restoring
  219. // old values.
  220. //
  221. // TODO(lth): Since all unprep_seqs_ sets further down the stack must be
  222. // subsets, this can potentially be deduplicated by just storing set
  223. // difference. Investigate if this is worth it.
  224. std::map<SequenceNumber, size_t> unprep_seqs_;
  225. // This snapshot will be used to read keys at this savepoint if we call
  226. // RollbackToSavePoint.
  227. std::unique_ptr<ManagedSnapshot> snapshot_;
  228. SavePoint(const std::map<SequenceNumber, size_t>& seqs,
  229. ManagedSnapshot* snapshot)
  230. : unprep_seqs_(seqs), snapshot_(snapshot) {}
  231. };
  232. // We have 3 data structures holding savepoint information:
  233. // 1. TransactionBaseImpl::save_points_
  234. // 2. WriteUnpreparedTxn::flushed_save_points_
  235. // 3. WriteUnpreparecTxn::unflushed_save_points_
  236. //
  237. // TransactionBaseImpl::save_points_ holds information about all write
  238. // batches, including the current in-memory write_batch_, or unprepared
  239. // batches that have been written out. Its responsibility is just to track
  240. // which keys have been modified in every savepoint.
  241. //
  242. // WriteUnpreparedTxn::flushed_save_points_ holds information about savepoints
  243. // set on unprepared batches that have already flushed. It holds the snapshot
  244. // and unprep_seqs at that savepoint, so that the rollback process can
  245. // determine which keys were visible at that point in time.
  246. //
  247. // WriteUnpreparecTxn::unflushed_save_points_ holds information about
  248. // savepoints on the current in-memory write_batch_. It simply records the
  249. // size of the write batch at every savepoint.
  250. //
  251. // TODO(lth): Remove the redundancy between save_point_boundaries_ and
  252. // write_batch_.save_points_.
  253. //
  254. // Based on this information, here are some invariants:
  255. // size(unflushed_save_points_) = size(write_batch_.save_points_)
  256. // size(flushed_save_points_) + size(unflushed_save_points_)
  257. // = size(save_points_)
  258. //
  259. std::unique_ptr<autovector<WriteUnpreparedTxn::SavePoint>>
  260. flushed_save_points_;
  261. std::unique_ptr<autovector<size_t>> unflushed_save_points_;
  262. // It is currently unsafe to flush a write batch if there are active iterators
  263. // created from this transaction. This is because we use WriteBatchWithIndex
  264. // to do merging reads from the DB and the write batch. If we flush the write
  265. // batch, it is possible that the delta iterator on the iterator will point to
  266. // invalid memory.
  267. std::vector<Iterator*> active_iterators_;
  268. // Untracked keys that we have to rollback.
  269. //
  270. // TODO(lth): Currently we we do not record untracked keys per-savepoint.
  271. // This means that when rolling back to savepoints, we have to check all
  272. // keys in the current transaction for rollback. Note that this is only
  273. // inefficient, but still correct because we take a snapshot at every
  274. // savepoint, and we will use that snapshot to construct the rollback batch.
  275. // The rollback batch will then contain a reissue of the same marker.
  276. //
  277. // A more optimal solution would be to only check keys changed since the
  278. // last savepoint. Also, it may make sense to merge this into tracked_keys_
  279. // and differentiate between tracked but not locked keys to avoid having two
  280. // very similar data structures.
  281. using KeySet = std::unordered_map<uint32_t, std::vector<std::string>>;
  282. KeySet untracked_keys_;
  283. };
  284. } // namespace ROCKSDB_NAMESPACE