| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345 |
- // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
- // This source code is licensed under both the GPLv2 (found in the
- // COPYING file in the root directory) and Apache 2.0 License
- // (found in the LICENSE.Apache file in the root directory).
- #pragma once
- #include <mutex>
- #include <queue>
- #include <set>
- #include <string>
- #include <unordered_map>
- #include <vector>
- #include "db/db_iter.h"
- #include "db/read_callback.h"
- #include "db/snapshot_checker.h"
- #include "rocksdb/db.h"
- #include "rocksdb/options.h"
- #include "rocksdb/utilities/transaction_db.h"
- #include "util/cast_util.h"
- #include "utilities/transactions/lock/lock_manager.h"
- #include "utilities/transactions/lock/range/range_lock_manager.h"
- #include "utilities/transactions/pessimistic_transaction.h"
- #include "utilities/transactions/write_prepared_txn.h"
- namespace ROCKSDB_NAMESPACE {
- class PessimisticTransactionDB : public TransactionDB {
- public:
- explicit PessimisticTransactionDB(DB* db,
- const TransactionDBOptions& txn_db_options);
- explicit PessimisticTransactionDB(StackableDB* db,
- const TransactionDBOptions& txn_db_options);
- virtual ~PessimisticTransactionDB();
- const Snapshot* GetSnapshot() override { return db_->GetSnapshot(); }
- virtual Status Initialize(
- const std::vector<size_t>& compaction_enabled_cf_indices,
- const std::vector<ColumnFamilyHandle*>& handles);
- Transaction* BeginTransaction(const WriteOptions& write_options,
- const TransactionOptions& txn_options,
- Transaction* old_txn) override = 0;
- using StackableDB::Put;
- Status Put(const WriteOptions& options, ColumnFamilyHandle* column_family,
- const Slice& key, const Slice& val) override;
- Status PutEntity(const WriteOptions& options,
- ColumnFamilyHandle* column_family, const Slice& key,
- const WideColumns& columns) override;
- Status PutEntity(const WriteOptions& /* options */, const Slice& /* key */,
- const AttributeGroups& attribute_groups) override {
- if (attribute_groups.empty()) {
- return Status::InvalidArgument(
- "Cannot call this method without attribute groups");
- }
- return Status::NotSupported(
- "PutEntity with AttributeGroups not supported by "
- "PessimisticTransactionDB");
- }
- using StackableDB::Delete;
- Status Delete(const WriteOptions& wopts, ColumnFamilyHandle* column_family,
- const Slice& key) override;
- using StackableDB::SingleDelete;
- Status SingleDelete(const WriteOptions& wopts,
- ColumnFamilyHandle* column_family,
- const Slice& key) override;
- using StackableDB::Merge;
- Status Merge(const WriteOptions& options, ColumnFamilyHandle* column_family,
- const Slice& key, const Slice& value) override;
- using TransactionDB::Write;
- Status Write(const WriteOptions& opts, WriteBatch* updates) override;
- inline Status WriteWithConcurrencyControl(const WriteOptions& opts,
- WriteBatch* updates) {
- Status s;
- if (opts.protection_bytes_per_key > 0) {
- s = WriteBatchInternal::UpdateProtectionInfo(
- updates, opts.protection_bytes_per_key);
- }
- if (s.ok()) {
- // Need to lock all keys in this batch to prevent write conflicts with
- // concurrent transactions.
- Transaction* txn = BeginInternalTransaction(opts);
- txn->DisableIndexing();
- auto txn_impl = static_cast_with_check<PessimisticTransaction>(txn);
- // Since commitBatch sorts the keys before locking, concurrent Write()
- // operations will not cause a deadlock.
- // In order to avoid a deadlock with a concurrent Transaction,
- // Transactions should use a lock timeout.
- s = txn_impl->CommitBatch(updates);
- delete txn;
- }
- return s;
- }
- using StackableDB::CreateColumnFamily;
- Status CreateColumnFamily(const ColumnFamilyOptions& options,
- const std::string& column_family_name,
- ColumnFamilyHandle** handle) override;
- Status CreateColumnFamilies(
- const ColumnFamilyOptions& options,
- const std::vector<std::string>& column_family_names,
- std::vector<ColumnFamilyHandle*>* handles) override;
- Status CreateColumnFamilies(
- const std::vector<ColumnFamilyDescriptor>& column_families,
- std::vector<ColumnFamilyHandle*>* handles) override;
- using StackableDB::CreateColumnFamilyWithImport;
- Status CreateColumnFamilyWithImport(
- const ColumnFamilyOptions& options, const std::string& column_family_name,
- const ImportColumnFamilyOptions& import_options,
- const ExportImportFilesMetaData& metadata,
- ColumnFamilyHandle** handle) override {
- const std::vector<const ExportImportFilesMetaData*>& metadatas{&metadata};
- return CreateColumnFamilyWithImport(options, column_family_name,
- import_options, metadatas, handle);
- }
- Status CreateColumnFamilyWithImport(
- const ColumnFamilyOptions& options, const std::string& column_family_name,
- const ImportColumnFamilyOptions& import_options,
- const std::vector<const ExportImportFilesMetaData*>& metadatas,
- ColumnFamilyHandle** handle) override;
- using StackableDB::DropColumnFamily;
- Status DropColumnFamily(ColumnFamilyHandle* column_family) override;
- Status DropColumnFamilies(
- const std::vector<ColumnFamilyHandle*>& column_families) override;
- Status TryLock(PessimisticTransaction* txn, uint32_t cfh_id,
- const std::string& key, bool exclusive);
- Status TryRangeLock(PessimisticTransaction* txn, uint32_t cfh_id,
- const Endpoint& start_endp, const Endpoint& end_endp);
- void UnLock(PessimisticTransaction* txn, const LockTracker& keys);
- void UnLock(PessimisticTransaction* txn, uint32_t cfh_id,
- const std::string& key);
- void AddColumnFamily(const ColumnFamilyHandle* handle);
- static TransactionDBOptions ValidateTxnDBOptions(
- const TransactionDBOptions& txn_db_options);
- const TransactionDBOptions& GetTxnDBOptions() const {
- return txn_db_options_;
- }
- void InsertExpirableTransaction(TransactionID tx_id,
- PessimisticTransaction* tx);
- void RemoveExpirableTransaction(TransactionID tx_id);
- // If transaction is no longer available, locks can be stolen
- // If transaction is available, try stealing locks directly from transaction
- // It is the caller's responsibility to ensure that the referred transaction
- // is expirable (GetExpirationTime() > 0) and that it is expired.
- bool TryStealingExpiredTransactionLocks(TransactionID tx_id);
- Transaction* GetTransactionByName(const TransactionName& name) override;
- Status RegisterTransaction(Transaction* txn);
- void UnregisterTransaction(Transaction* txn);
- // not thread safe. current use case is during recovery (single thread)
- void GetAllPreparedTransactions(std::vector<Transaction*>* trans) override;
- LockManager::PointLockStatus GetLockStatusData() override;
- std::vector<DeadlockPath> GetDeadlockInfoBuffer() override;
- void SetDeadlockInfoBufferSize(uint32_t target_size) override;
- // The default implementation does nothing. The actual implementation is moved
- // to the child classes that actually need this information. This was due to
- // an odd performance drop we observed when the added std::atomic member to
- // the base class even when the subclass do not read it in the fast path.
- virtual void UpdateCFComparatorMap(const std::vector<ColumnFamilyHandle*>&) {}
- virtual void UpdateCFComparatorMap(ColumnFamilyHandle*) {}
- // Use the returned factory to create LockTrackers in transactions.
- const LockTrackerFactory& GetLockTrackerFactory() const {
- return lock_manager_->GetLockTrackerFactory();
- }
- std::pair<Status, std::shared_ptr<const Snapshot>> CreateTimestampedSnapshot(
- TxnTimestamp ts) override;
- std::shared_ptr<const Snapshot> GetTimestampedSnapshot(
- TxnTimestamp ts) const override;
- void ReleaseTimestampedSnapshotsOlderThan(TxnTimestamp ts) override;
- Status GetTimestampedSnapshots(TxnTimestamp ts_lb, TxnTimestamp ts_ub,
- std::vector<std::shared_ptr<const Snapshot>>&
- timestamped_snapshots) const override;
- protected:
- DBImpl* db_impl_;
- std::shared_ptr<Logger> info_log_;
- const TransactionDBOptions txn_db_options_;
- static Status FailIfBatchHasTs(const WriteBatch* wb);
- static Status FailIfCfEnablesTs(const DB* db,
- const ColumnFamilyHandle* column_family);
- void ReinitializeTransaction(
- Transaction* txn, const WriteOptions& write_options,
- const TransactionOptions& txn_options = TransactionOptions());
- virtual Status VerifyCFOptions(const ColumnFamilyOptions& cf_options);
- private:
- friend class WritePreparedTxnDB;
- friend class WritePreparedTxnDBMock;
- friend class WriteUnpreparedTxn;
- friend class TransactionTest_DoubleCrashInRecovery_Test;
- friend class TransactionTest_DoubleEmptyWrite_Test;
- friend class TransactionTest_DuplicateKeys_Test;
- friend class TransactionTest_PersistentTwoPhaseTransactionTest_Test;
- friend class TransactionTest_TwoPhaseDoubleRecoveryTest_Test;
- friend class TransactionTest_TwoPhaseOutOfOrderDelete_Test;
- friend class TransactionStressTest_TwoPhaseLongPrepareTest_Test;
- friend class WriteUnpreparedTransactionTest_RecoveryTest_Test;
- friend class WriteUnpreparedTransactionTest_MarkLogWithPrepSection_Test;
- Transaction* BeginInternalTransaction(const WriteOptions& options);
- Transaction* GetTransactionByNameLocked(const TransactionName& name);
- std::shared_ptr<LockManager> lock_manager_;
- // Must be held when adding/dropping column families.
- InstrumentedMutex column_family_mutex_;
- // Used to ensure that no locks are stolen from an expirable transaction
- // that has started a commit. Only transactions with an expiration time
- // should be in this map.
- std::mutex map_mutex_;
- std::unordered_map<TransactionID, PessimisticTransaction*>
- expirable_transactions_map_;
- // map from name to two phase transaction instance
- std::mutex name_map_mutex_;
- std::unordered_map<TransactionName, Transaction*> transactions_;
- // Signal that we are testing a crash scenario. Some asserts could be relaxed
- // in such cases.
- virtual void TEST_Crash() {}
- };
- // A PessimisticTransactionDB that writes the data to the DB after the commit.
- // In this way the DB only contains the committed data.
- class WriteCommittedTxnDB : public PessimisticTransactionDB {
- public:
- explicit WriteCommittedTxnDB(DB* db,
- const TransactionDBOptions& txn_db_options)
- : PessimisticTransactionDB(db, txn_db_options) {}
- explicit WriteCommittedTxnDB(StackableDB* db,
- const TransactionDBOptions& txn_db_options)
- : PessimisticTransactionDB(db, txn_db_options) {}
- virtual ~WriteCommittedTxnDB() {}
- Transaction* BeginTransaction(const WriteOptions& write_options,
- const TransactionOptions& txn_options,
- Transaction* old_txn) override;
- // Optimized version of ::Write that makes use of skip_concurrency_control
- // hint
- using TransactionDB::Write;
- Status Write(const WriteOptions& opts,
- const TransactionDBWriteOptimizations& optimizations,
- WriteBatch* updates) override;
- Status Write(const WriteOptions& opts, WriteBatch* updates) override;
- };
- inline Status PessimisticTransactionDB::FailIfBatchHasTs(
- const WriteBatch* batch) {
- if (batch != nullptr && WriteBatchInternal::HasKeyWithTimestamp(*batch)) {
- return Status::NotSupported(
- "Writes with timestamp must go through transaction API instead of "
- "TransactionDB.");
- }
- return Status::OK();
- }
- inline Status PessimisticTransactionDB::FailIfCfEnablesTs(
- const DB* db, const ColumnFamilyHandle* column_family) {
- assert(db);
- column_family = column_family ? column_family : db->DefaultColumnFamily();
- assert(column_family);
- const Comparator* const ucmp = column_family->GetComparator();
- assert(ucmp);
- if (ucmp->timestamp_size() > 0) {
- return Status::NotSupported(
- "Write operation with user timestamp must go through the transaction "
- "API instead of TransactionDB.");
- }
- return Status::OK();
- }
- class SnapshotCreationCallback : public PostMemTableCallback {
- public:
- explicit SnapshotCreationCallback(
- DBImpl* dbi, TxnTimestamp commit_ts,
- const std::shared_ptr<TransactionNotifier>& notifier,
- std::shared_ptr<const Snapshot>& snapshot)
- : db_impl_(dbi),
- commit_ts_(commit_ts),
- snapshot_notifier_(notifier),
- snapshot_(snapshot) {
- assert(db_impl_);
- }
- ~SnapshotCreationCallback() override {
- snapshot_creation_status_.PermitUncheckedError();
- }
- Status operator()(SequenceNumber seq, bool disable_memtable) override;
- private:
- DBImpl* const db_impl_;
- const TxnTimestamp commit_ts_;
- std::shared_ptr<TransactionNotifier> snapshot_notifier_;
- std::shared_ptr<const Snapshot>& snapshot_;
- Status snapshot_creation_status_;
- };
- } // namespace ROCKSDB_NAMESPACE
|