write_unprepared_txn.h 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #pragma once
  6. #ifndef ROCKSDB_LITE
  7. #include <set>
  8. #include "utilities/transactions/write_prepared_txn.h"
  9. #include "utilities/transactions/write_unprepared_txn_db.h"
  10. namespace ROCKSDB_NAMESPACE {
  11. class WriteUnpreparedTxnDB;
  12. class WriteUnpreparedTxn;
  13. // WriteUnprepared transactions needs to be able to read their own uncommitted
  14. // writes, and supporting this requires some careful consideration. Because
  15. // writes in the current transaction may be flushed to DB already, we cannot
  16. // rely on the contents of WriteBatchWithIndex to determine whether a key should
  17. // be visible or not, so we have to remember to check the DB for any uncommitted
  18. // keys that should be visible to us. First, we will need to change the seek to
  19. // snapshot logic, to seek to max_visible_seq = max(snap_seq, max_unprep_seq).
  20. // Any key greater than max_visible_seq should not be visible because they
  21. // cannot be unprepared by the current transaction and they are not in its
  22. // snapshot.
  23. //
  24. // When we seek to max_visible_seq, one of these cases will happen:
  25. // 1. We hit a unprepared key from the current transaction.
  26. // 2. We hit a unprepared key from the another transaction.
  27. // 3. We hit a committed key with snap_seq < seq < max_unprep_seq.
  28. // 4. We hit a committed key with seq <= snap_seq.
  29. //
  30. // IsVisibleFullCheck handles all cases correctly.
  31. //
  32. // Other notes:
  33. // Note that max_visible_seq is only calculated once at iterator construction
  34. // time, meaning if the same transaction is adding more unprep seqs through
  35. // writes during iteration, these newer writes may not be visible. This is not a
  36. // problem for MySQL though because it avoids modifying the index as it is
  37. // scanning through it to avoid the Halloween Problem. Instead, it scans the
  38. // index once up front, and modifies based on a temporary copy.
  39. //
  40. // In DBIter, there is a "reseek" optimization if the iterator skips over too
  41. // many keys. However, this assumes that the reseek seeks exactly to the
  42. // required key. In write unprepared, even after seeking directly to
  43. // max_visible_seq, some iteration may be required before hitting a visible key,
  44. // and special precautions must be taken to avoid performing another reseek,
  45. // leading to an infinite loop.
  46. //
  47. class WriteUnpreparedTxnReadCallback : public ReadCallback {
  48. public:
  49. WriteUnpreparedTxnReadCallback(
  50. WritePreparedTxnDB* db, SequenceNumber snapshot,
  51. SequenceNumber min_uncommitted,
  52. const std::map<SequenceNumber, size_t>& unprep_seqs,
  53. SnapshotBackup backed_by_snapshot)
  54. // Pass our last uncommitted seq as the snapshot to the parent class to
  55. // ensure that the parent will not prematurely filter out own writes. We
  56. // will do the exact comparison against snapshots in IsVisibleFullCheck
  57. // override.
  58. : ReadCallback(CalcMaxVisibleSeq(unprep_seqs, snapshot), min_uncommitted),
  59. db_(db),
  60. unprep_seqs_(unprep_seqs),
  61. wup_snapshot_(snapshot),
  62. backed_by_snapshot_(backed_by_snapshot) {
  63. (void)backed_by_snapshot_; // to silence unused private field warning
  64. }
  65. virtual ~WriteUnpreparedTxnReadCallback() {
  66. // If it is not backed by snapshot, the caller must check validity
  67. assert(valid_checked_ || backed_by_snapshot_ == kBackedByDBSnapshot);
  68. }
  69. virtual bool IsVisibleFullCheck(SequenceNumber seq) override;
  70. inline bool valid() {
  71. valid_checked_ = true;
  72. return snap_released_ == false;
  73. }
  74. void Refresh(SequenceNumber seq) override {
  75. max_visible_seq_ = std::max(max_visible_seq_, seq);
  76. wup_snapshot_ = seq;
  77. }
  78. static SequenceNumber CalcMaxVisibleSeq(
  79. const std::map<SequenceNumber, size_t>& unprep_seqs,
  80. SequenceNumber snapshot_seq) {
  81. SequenceNumber max_unprepared = 0;
  82. if (unprep_seqs.size()) {
  83. max_unprepared =
  84. unprep_seqs.rbegin()->first + unprep_seqs.rbegin()->second - 1;
  85. }
  86. return std::max(max_unprepared, snapshot_seq);
  87. }
  88. private:
  89. WritePreparedTxnDB* db_;
  90. const std::map<SequenceNumber, size_t>& unprep_seqs_;
  91. SequenceNumber wup_snapshot_;
  92. // Whether max_visible_seq_ is backed by a snapshot
  93. const SnapshotBackup backed_by_snapshot_;
  94. bool snap_released_ = false;
  95. // Safety check to ensure that the caller has checked invalid statuses
  96. bool valid_checked_ = false;
  97. };
  98. class WriteUnpreparedTxn : public WritePreparedTxn {
  99. public:
  100. WriteUnpreparedTxn(WriteUnpreparedTxnDB* db,
  101. const WriteOptions& write_options,
  102. const TransactionOptions& txn_options);
  103. virtual ~WriteUnpreparedTxn();
  104. using TransactionBaseImpl::Put;
  105. virtual Status Put(ColumnFamilyHandle* column_family, const Slice& key,
  106. const Slice& value,
  107. const bool assume_tracked = false) override;
  108. virtual Status Put(ColumnFamilyHandle* column_family, const SliceParts& key,
  109. const SliceParts& value,
  110. const bool assume_tracked = false) override;
  111. using TransactionBaseImpl::Merge;
  112. virtual Status Merge(ColumnFamilyHandle* column_family, const Slice& key,
  113. const Slice& value,
  114. const bool assume_tracked = false) override;
  115. using TransactionBaseImpl::Delete;
  116. virtual Status Delete(ColumnFamilyHandle* column_family, const Slice& key,
  117. const bool assume_tracked = false) override;
  118. virtual Status Delete(ColumnFamilyHandle* column_family,
  119. const SliceParts& key,
  120. const bool assume_tracked = false) override;
  121. using TransactionBaseImpl::SingleDelete;
  122. virtual Status SingleDelete(ColumnFamilyHandle* column_family,
  123. const Slice& key,
  124. const bool assume_tracked = false) override;
  125. virtual Status SingleDelete(ColumnFamilyHandle* column_family,
  126. const SliceParts& key,
  127. const bool assume_tracked = false) override;
  128. // In WriteUnprepared, untracked writes will break snapshot validation logic.
  129. // Snapshot validation will only check the largest sequence number of a key to
  130. // see if it was committed or not. However, an untracked unprepared write will
  131. // hide smaller committed sequence numbers.
  132. //
  133. // TODO(lth): Investigate whether it is worth having snapshot validation
  134. // validate all values larger than snap_seq. Otherwise, we should return
  135. // Status::NotSupported for untracked writes.
  136. virtual Status RebuildFromWriteBatch(WriteBatch*) override;
  137. virtual uint64_t GetLastLogNumber() const override {
  138. return last_log_number_;
  139. }
  140. void RemoveActiveIterator(Iterator* iter) {
  141. active_iterators_.erase(
  142. std::remove(active_iterators_.begin(), active_iterators_.end(), iter),
  143. active_iterators_.end());
  144. }
  145. protected:
  146. void Initialize(const TransactionOptions& txn_options) override;
  147. Status PrepareInternal() override;
  148. Status CommitWithoutPrepareInternal() override;
  149. Status CommitInternal() override;
  150. Status RollbackInternal() override;
  151. void Clear() override;
  152. void SetSavePoint() override;
  153. Status RollbackToSavePoint() override;
  154. Status PopSavePoint() override;
  155. // Get and GetIterator needs to be overridden so that a ReadCallback to
  156. // handle read-your-own-write is used.
  157. using Transaction::Get;
  158. virtual Status Get(const ReadOptions& options,
  159. ColumnFamilyHandle* column_family, const Slice& key,
  160. PinnableSlice* value) override;
  161. using Transaction::MultiGet;
  162. virtual void MultiGet(const ReadOptions& options,
  163. ColumnFamilyHandle* column_family,
  164. const size_t num_keys, const Slice* keys,
  165. PinnableSlice* values, Status* statuses,
  166. const bool sorted_input = false) override;
  167. using Transaction::GetIterator;
  168. virtual Iterator* GetIterator(const ReadOptions& options) override;
  169. virtual Iterator* GetIterator(const ReadOptions& options,
  170. ColumnFamilyHandle* column_family) override;
  171. virtual Status ValidateSnapshot(ColumnFamilyHandle* column_family,
  172. const Slice& key,
  173. SequenceNumber* tracked_at_seq) override;
  174. private:
  175. friend class WriteUnpreparedTransactionTest_ReadYourOwnWrite_Test;
  176. friend class WriteUnpreparedTransactionTest_RecoveryTest_Test;
  177. friend class WriteUnpreparedTransactionTest_UnpreparedBatch_Test;
  178. friend class WriteUnpreparedTxnDB;
  179. const std::map<SequenceNumber, size_t>& GetUnpreparedSequenceNumbers();
  180. Status WriteRollbackKeys(const TransactionKeyMap& tracked_keys,
  181. WriteBatchWithIndex* rollback_batch,
  182. ReadCallback* callback, const ReadOptions& roptions);
  183. Status MaybeFlushWriteBatchToDB();
  184. Status FlushWriteBatchToDB(bool prepared);
  185. Status FlushWriteBatchToDBInternal(bool prepared);
  186. Status FlushWriteBatchWithSavePointToDB();
  187. Status RollbackToSavePointInternal();
  188. Status HandleWrite(std::function<Status()> do_write);
  189. // For write unprepared, we check on every writebatch append to see if
  190. // write_batch_flush_threshold_ has been exceeded, and then call
  191. // FlushWriteBatchToDB if so. This logic is encapsulated in
  192. // MaybeFlushWriteBatchToDB.
  193. int64_t write_batch_flush_threshold_;
  194. WriteUnpreparedTxnDB* wupt_db_;
  195. // Ordered list of unprep_seq sequence numbers that we have already written
  196. // to DB.
  197. //
  198. // This maps unprep_seq => prepare_batch_cnt for each unprepared batch
  199. // written by this transaction.
  200. //
  201. // Note that this contains both prepared and unprepared batches, since they
  202. // are treated similarily in prepare heap/commit map, so it simplifies the
  203. // commit callbacks.
  204. std::map<SequenceNumber, size_t> unprep_seqs_;
  205. uint64_t last_log_number_;
  206. // Recovered transactions have tracked_keys_ populated, but are not actually
  207. // locked for efficiency reasons. For recovered transactions, skip unlocking
  208. // keys when transaction ends.
  209. bool recovered_txn_;
  210. // Track the largest sequence number at which we performed snapshot
  211. // validation. If snapshot validation was skipped because no snapshot was set,
  212. // then this is set to GetLastPublishedSequence. This value is useful because
  213. // it means that for keys that have unprepared seqnos, we can guarantee that
  214. // no committed keys by other transactions can exist between
  215. // largest_validated_seq_ and max_unprep_seq. See
  216. // WriteUnpreparedTxnDB::NewIterator for an explanation for why this is
  217. // necessary for iterator Prev().
  218. //
  219. // Currently this value only increases during the lifetime of a transaction,
  220. // but in some cases, we should be able to restore the previously largest
  221. // value when calling RollbackToSavepoint.
  222. SequenceNumber largest_validated_seq_;
  223. using KeySet = std::unordered_map<uint32_t, std::vector<std::string>>;
  224. struct SavePoint {
  225. // Record of unprep_seqs_ at this savepoint. The set of unprep_seq is
  226. // used during RollbackToSavepoint to determine visibility when restoring
  227. // old values.
  228. //
  229. // TODO(lth): Since all unprep_seqs_ sets further down the stack must be
  230. // subsets, this can potentially be deduplicated by just storing set
  231. // difference. Investigate if this is worth it.
  232. std::map<SequenceNumber, size_t> unprep_seqs_;
  233. // This snapshot will be used to read keys at this savepoint if we call
  234. // RollbackToSavePoint.
  235. std::unique_ptr<ManagedSnapshot> snapshot_;
  236. SavePoint(const std::map<SequenceNumber, size_t>& seqs,
  237. ManagedSnapshot* snapshot)
  238. : unprep_seqs_(seqs), snapshot_(snapshot){};
  239. };
  240. // We have 3 data structures holding savepoint information:
  241. // 1. TransactionBaseImpl::save_points_
  242. // 2. WriteUnpreparedTxn::flushed_save_points_
  243. // 3. WriteUnpreparecTxn::unflushed_save_points_
  244. //
  245. // TransactionBaseImpl::save_points_ holds information about all write
  246. // batches, including the current in-memory write_batch_, or unprepared
  247. // batches that have been written out. Its responsibility is just to track
  248. // which keys have been modified in every savepoint.
  249. //
  250. // WriteUnpreparedTxn::flushed_save_points_ holds information about savepoints
  251. // set on unprepared batches that have already flushed. It holds the snapshot
  252. // and unprep_seqs at that savepoint, so that the rollback process can
  253. // determine which keys were visible at that point in time.
  254. //
  255. // WriteUnpreparecTxn::unflushed_save_points_ holds information about
  256. // savepoints on the current in-memory write_batch_. It simply records the
  257. // size of the write batch at every savepoint.
  258. //
  259. // TODO(lth): Remove the redundancy between save_point_boundaries_ and
  260. // write_batch_.save_points_.
  261. //
  262. // Based on this information, here are some invariants:
  263. // size(unflushed_save_points_) = size(write_batch_.save_points_)
  264. // size(flushed_save_points_) + size(unflushed_save_points_)
  265. // = size(save_points_)
  266. //
  267. std::unique_ptr<autovector<WriteUnpreparedTxn::SavePoint>>
  268. flushed_save_points_;
  269. std::unique_ptr<autovector<size_t>> unflushed_save_points_;
  270. // It is currently unsafe to flush a write batch if there are active iterators
  271. // created from this transaction. This is because we use WriteBatchWithIndex
  272. // to do merging reads from the DB and the write batch. If we flush the write
  273. // batch, it is possible that the delta iterator on the iterator will point to
  274. // invalid memory.
  275. std::vector<Iterator*> active_iterators_;
  276. // Untracked keys that we have to rollback.
  277. //
  278. // TODO(lth): Currently we we do not record untracked keys per-savepoint.
  279. // This means that when rolling back to savepoints, we have to check all
  280. // keys in the current transaction for rollback. Note that this is only
  281. // inefficient, but still correct because we take a snapshot at every
  282. // savepoint, and we will use that snapshot to construct the rollback batch.
  283. // The rollback batch will then contain a reissue of the same marker.
  284. //
  285. // A more optimal solution would be to only check keys changed since the
  286. // last savepoint. Also, it may make sense to merge this into tracked_keys_
  287. // and differentiate between tracked but not locked keys to avoid having two
  288. // very similar data structures.
  289. KeySet untracked_keys_;
  290. };
  291. } // namespace ROCKSDB_NAMESPACE
  292. #endif // ROCKSDB_LITE