| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341 |
- // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
- // This source code is licensed under both the GPLv2 (found in the
- // COPYING file in the root directory) and Apache 2.0 License
- // (found in the LICENSE.Apache file in the root directory).
- #pragma once
- #ifndef ROCKSDB_LITE
- #include <set>
- #include "utilities/transactions/write_prepared_txn.h"
- #include "utilities/transactions/write_unprepared_txn_db.h"
- namespace ROCKSDB_NAMESPACE {
- class WriteUnpreparedTxnDB;
- class WriteUnpreparedTxn;
- // WriteUnprepared transactions needs to be able to read their own uncommitted
- // writes, and supporting this requires some careful consideration. Because
- // writes in the current transaction may be flushed to DB already, we cannot
- // rely on the contents of WriteBatchWithIndex to determine whether a key should
- // be visible or not, so we have to remember to check the DB for any uncommitted
- // keys that should be visible to us. First, we will need to change the seek to
- // snapshot logic, to seek to max_visible_seq = max(snap_seq, max_unprep_seq).
- // Any key greater than max_visible_seq should not be visible because they
- // cannot be unprepared by the current transaction and they are not in its
- // snapshot.
- //
- // When we seek to max_visible_seq, one of these cases will happen:
- // 1. We hit a unprepared key from the current transaction.
- // 2. We hit a unprepared key from the another transaction.
- // 3. We hit a committed key with snap_seq < seq < max_unprep_seq.
- // 4. We hit a committed key with seq <= snap_seq.
- //
- // IsVisibleFullCheck handles all cases correctly.
- //
- // Other notes:
- // Note that max_visible_seq is only calculated once at iterator construction
- // time, meaning if the same transaction is adding more unprep seqs through
- // writes during iteration, these newer writes may not be visible. This is not a
- // problem for MySQL though because it avoids modifying the index as it is
- // scanning through it to avoid the Halloween Problem. Instead, it scans the
- // index once up front, and modifies based on a temporary copy.
- //
- // In DBIter, there is a "reseek" optimization if the iterator skips over too
- // many keys. However, this assumes that the reseek seeks exactly to the
- // required key. In write unprepared, even after seeking directly to
- // max_visible_seq, some iteration may be required before hitting a visible key,
- // and special precautions must be taken to avoid performing another reseek,
- // leading to an infinite loop.
- //
- class WriteUnpreparedTxnReadCallback : public ReadCallback {
- public:
- WriteUnpreparedTxnReadCallback(
- WritePreparedTxnDB* db, SequenceNumber snapshot,
- SequenceNumber min_uncommitted,
- const std::map<SequenceNumber, size_t>& unprep_seqs,
- SnapshotBackup backed_by_snapshot)
- // Pass our last uncommitted seq as the snapshot to the parent class to
- // ensure that the parent will not prematurely filter out own writes. We
- // will do the exact comparison against snapshots in IsVisibleFullCheck
- // override.
- : ReadCallback(CalcMaxVisibleSeq(unprep_seqs, snapshot), min_uncommitted),
- db_(db),
- unprep_seqs_(unprep_seqs),
- wup_snapshot_(snapshot),
- backed_by_snapshot_(backed_by_snapshot) {
- (void)backed_by_snapshot_; // to silence unused private field warning
- }
- virtual ~WriteUnpreparedTxnReadCallback() {
- // If it is not backed by snapshot, the caller must check validity
- assert(valid_checked_ || backed_by_snapshot_ == kBackedByDBSnapshot);
- }
- virtual bool IsVisibleFullCheck(SequenceNumber seq) override;
- inline bool valid() {
- valid_checked_ = true;
- return snap_released_ == false;
- }
- void Refresh(SequenceNumber seq) override {
- max_visible_seq_ = std::max(max_visible_seq_, seq);
- wup_snapshot_ = seq;
- }
- static SequenceNumber CalcMaxVisibleSeq(
- const std::map<SequenceNumber, size_t>& unprep_seqs,
- SequenceNumber snapshot_seq) {
- SequenceNumber max_unprepared = 0;
- if (unprep_seqs.size()) {
- max_unprepared =
- unprep_seqs.rbegin()->first + unprep_seqs.rbegin()->second - 1;
- }
- return std::max(max_unprepared, snapshot_seq);
- }
- private:
- WritePreparedTxnDB* db_;
- const std::map<SequenceNumber, size_t>& unprep_seqs_;
- SequenceNumber wup_snapshot_;
- // Whether max_visible_seq_ is backed by a snapshot
- const SnapshotBackup backed_by_snapshot_;
- bool snap_released_ = false;
- // Safety check to ensure that the caller has checked invalid statuses
- bool valid_checked_ = false;
- };
- class WriteUnpreparedTxn : public WritePreparedTxn {
- public:
- WriteUnpreparedTxn(WriteUnpreparedTxnDB* db,
- const WriteOptions& write_options,
- const TransactionOptions& txn_options);
- virtual ~WriteUnpreparedTxn();
- using TransactionBaseImpl::Put;
- virtual Status Put(ColumnFamilyHandle* column_family, const Slice& key,
- const Slice& value,
- const bool assume_tracked = false) override;
- virtual Status Put(ColumnFamilyHandle* column_family, const SliceParts& key,
- const SliceParts& value,
- const bool assume_tracked = false) override;
- using TransactionBaseImpl::Merge;
- virtual Status Merge(ColumnFamilyHandle* column_family, const Slice& key,
- const Slice& value,
- const bool assume_tracked = false) override;
- using TransactionBaseImpl::Delete;
- virtual Status Delete(ColumnFamilyHandle* column_family, const Slice& key,
- const bool assume_tracked = false) override;
- virtual Status Delete(ColumnFamilyHandle* column_family,
- const SliceParts& key,
- const bool assume_tracked = false) override;
- using TransactionBaseImpl::SingleDelete;
- virtual Status SingleDelete(ColumnFamilyHandle* column_family,
- const Slice& key,
- const bool assume_tracked = false) override;
- virtual Status SingleDelete(ColumnFamilyHandle* column_family,
- const SliceParts& key,
- const bool assume_tracked = false) override;
- // In WriteUnprepared, untracked writes will break snapshot validation logic.
- // Snapshot validation will only check the largest sequence number of a key to
- // see if it was committed or not. However, an untracked unprepared write will
- // hide smaller committed sequence numbers.
- //
- // TODO(lth): Investigate whether it is worth having snapshot validation
- // validate all values larger than snap_seq. Otherwise, we should return
- // Status::NotSupported for untracked writes.
- virtual Status RebuildFromWriteBatch(WriteBatch*) override;
- virtual uint64_t GetLastLogNumber() const override {
- return last_log_number_;
- }
- void RemoveActiveIterator(Iterator* iter) {
- active_iterators_.erase(
- std::remove(active_iterators_.begin(), active_iterators_.end(), iter),
- active_iterators_.end());
- }
- protected:
- void Initialize(const TransactionOptions& txn_options) override;
- Status PrepareInternal() override;
- Status CommitWithoutPrepareInternal() override;
- Status CommitInternal() override;
- Status RollbackInternal() override;
- void Clear() override;
- void SetSavePoint() override;
- Status RollbackToSavePoint() override;
- Status PopSavePoint() override;
- // Get and GetIterator needs to be overridden so that a ReadCallback to
- // handle read-your-own-write is used.
- using Transaction::Get;
- virtual Status Get(const ReadOptions& options,
- ColumnFamilyHandle* column_family, const Slice& key,
- PinnableSlice* value) override;
- using Transaction::MultiGet;
- virtual void MultiGet(const ReadOptions& options,
- ColumnFamilyHandle* column_family,
- const size_t num_keys, const Slice* keys,
- PinnableSlice* values, Status* statuses,
- const bool sorted_input = false) override;
- using Transaction::GetIterator;
- virtual Iterator* GetIterator(const ReadOptions& options) override;
- virtual Iterator* GetIterator(const ReadOptions& options,
- ColumnFamilyHandle* column_family) override;
- virtual Status ValidateSnapshot(ColumnFamilyHandle* column_family,
- const Slice& key,
- SequenceNumber* tracked_at_seq) override;
- private:
- friend class WriteUnpreparedTransactionTest_ReadYourOwnWrite_Test;
- friend class WriteUnpreparedTransactionTest_RecoveryTest_Test;
- friend class WriteUnpreparedTransactionTest_UnpreparedBatch_Test;
- friend class WriteUnpreparedTxnDB;
- const std::map<SequenceNumber, size_t>& GetUnpreparedSequenceNumbers();
- Status WriteRollbackKeys(const TransactionKeyMap& tracked_keys,
- WriteBatchWithIndex* rollback_batch,
- ReadCallback* callback, const ReadOptions& roptions);
- Status MaybeFlushWriteBatchToDB();
- Status FlushWriteBatchToDB(bool prepared);
- Status FlushWriteBatchToDBInternal(bool prepared);
- Status FlushWriteBatchWithSavePointToDB();
- Status RollbackToSavePointInternal();
- Status HandleWrite(std::function<Status()> do_write);
- // For write unprepared, we check on every writebatch append to see if
- // write_batch_flush_threshold_ has been exceeded, and then call
- // FlushWriteBatchToDB if so. This logic is encapsulated in
- // MaybeFlushWriteBatchToDB.
- int64_t write_batch_flush_threshold_;
- WriteUnpreparedTxnDB* wupt_db_;
- // Ordered list of unprep_seq sequence numbers that we have already written
- // to DB.
- //
- // This maps unprep_seq => prepare_batch_cnt for each unprepared batch
- // written by this transaction.
- //
- // Note that this contains both prepared and unprepared batches, since they
- // are treated similarily in prepare heap/commit map, so it simplifies the
- // commit callbacks.
- std::map<SequenceNumber, size_t> unprep_seqs_;
- uint64_t last_log_number_;
- // Recovered transactions have tracked_keys_ populated, but are not actually
- // locked for efficiency reasons. For recovered transactions, skip unlocking
- // keys when transaction ends.
- bool recovered_txn_;
- // Track the largest sequence number at which we performed snapshot
- // validation. If snapshot validation was skipped because no snapshot was set,
- // then this is set to GetLastPublishedSequence. This value is useful because
- // it means that for keys that have unprepared seqnos, we can guarantee that
- // no committed keys by other transactions can exist between
- // largest_validated_seq_ and max_unprep_seq. See
- // WriteUnpreparedTxnDB::NewIterator for an explanation for why this is
- // necessary for iterator Prev().
- //
- // Currently this value only increases during the lifetime of a transaction,
- // but in some cases, we should be able to restore the previously largest
- // value when calling RollbackToSavepoint.
- SequenceNumber largest_validated_seq_;
- using KeySet = std::unordered_map<uint32_t, std::vector<std::string>>;
- struct SavePoint {
- // Record of unprep_seqs_ at this savepoint. The set of unprep_seq is
- // used during RollbackToSavepoint to determine visibility when restoring
- // old values.
- //
- // TODO(lth): Since all unprep_seqs_ sets further down the stack must be
- // subsets, this can potentially be deduplicated by just storing set
- // difference. Investigate if this is worth it.
- std::map<SequenceNumber, size_t> unprep_seqs_;
- // This snapshot will be used to read keys at this savepoint if we call
- // RollbackToSavePoint.
- std::unique_ptr<ManagedSnapshot> snapshot_;
- SavePoint(const std::map<SequenceNumber, size_t>& seqs,
- ManagedSnapshot* snapshot)
- : unprep_seqs_(seqs), snapshot_(snapshot){};
- };
- // We have 3 data structures holding savepoint information:
- // 1. TransactionBaseImpl::save_points_
- // 2. WriteUnpreparedTxn::flushed_save_points_
- // 3. WriteUnpreparecTxn::unflushed_save_points_
- //
- // TransactionBaseImpl::save_points_ holds information about all write
- // batches, including the current in-memory write_batch_, or unprepared
- // batches that have been written out. Its responsibility is just to track
- // which keys have been modified in every savepoint.
- //
- // WriteUnpreparedTxn::flushed_save_points_ holds information about savepoints
- // set on unprepared batches that have already flushed. It holds the snapshot
- // and unprep_seqs at that savepoint, so that the rollback process can
- // determine which keys were visible at that point in time.
- //
- // WriteUnpreparecTxn::unflushed_save_points_ holds information about
- // savepoints on the current in-memory write_batch_. It simply records the
- // size of the write batch at every savepoint.
- //
- // TODO(lth): Remove the redundancy between save_point_boundaries_ and
- // write_batch_.save_points_.
- //
- // Based on this information, here are some invariants:
- // size(unflushed_save_points_) = size(write_batch_.save_points_)
- // size(flushed_save_points_) + size(unflushed_save_points_)
- // = size(save_points_)
- //
- std::unique_ptr<autovector<WriteUnpreparedTxn::SavePoint>>
- flushed_save_points_;
- std::unique_ptr<autovector<size_t>> unflushed_save_points_;
- // It is currently unsafe to flush a write batch if there are active iterators
- // created from this transaction. This is because we use WriteBatchWithIndex
- // to do merging reads from the DB and the write batch. If we flush the write
- // batch, it is possible that the delta iterator on the iterator will point to
- // invalid memory.
- std::vector<Iterator*> active_iterators_;
- // Untracked keys that we have to rollback.
- //
- // TODO(lth): Currently we we do not record untracked keys per-savepoint.
- // This means that when rolling back to savepoints, we have to check all
- // keys in the current transaction for rollback. Note that this is only
- // inefficient, but still correct because we take a snapshot at every
- // savepoint, and we will use that snapshot to construct the rollback batch.
- // The rollback batch will then contain a reissue of the same marker.
- //
- // A more optimal solution would be to only check keys changed since the
- // last savepoint. Also, it may make sense to merge this into tracked_keys_
- // and differentiate between tracked but not locked keys to avoid having two
- // very similar data structures.
- KeySet untracked_keys_;
- };
- } // namespace ROCKSDB_NAMESPACE
- #endif // ROCKSDB_LITE
|