| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632 | //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.//  This source code is licensed under both the GPLv2 (found in the//  COPYING file in the root directory) and Apache 2.0 License//  (found in the LICENSE.Apache file in the root directory).#ifndef ROCKSDB_LITE#include "utilities/transactions/pessimistic_transaction_db.h"#include <cinttypes>#include <string>#include <unordered_set>#include <vector>#include "db/db_impl/db_impl.h"#include "rocksdb/db.h"#include "rocksdb/options.h"#include "rocksdb/utilities/transaction_db.h"#include "test_util/sync_point.h"#include "util/cast_util.h"#include "util/mutexlock.h"#include "utilities/transactions/pessimistic_transaction.h"#include "utilities/transactions/transaction_db_mutex_impl.h"#include "utilities/transactions/write_prepared_txn_db.h"#include "utilities/transactions/write_unprepared_txn_db.h"namespace ROCKSDB_NAMESPACE {PessimisticTransactionDB::PessimisticTransactionDB(    DB* db, const TransactionDBOptions& txn_db_options)    : TransactionDB(db),      db_impl_(static_cast_with_check<DBImpl, DB>(db)),      txn_db_options_(txn_db_options),      lock_mgr_(this, txn_db_options_.num_stripes, txn_db_options.max_num_locks,                txn_db_options_.max_num_deadlocks,                txn_db_options_.custom_mutex_factory                    ? txn_db_options_.custom_mutex_factory                    : std::shared_ptr<TransactionDBMutexFactory>(                          new TransactionDBMutexFactoryImpl())) {  assert(db_impl_ != nullptr);  info_log_ = db_impl_->GetDBOptions().info_log;}// Support initiliazing PessimisticTransactionDB from a stackable db////    PessimisticTransactionDB//     ^        ^//     |        |//     |        +//     |   StackableDB//     |   ^//     |   |//     +   +//     DBImpl//       ^//       |(inherit)//       +//       DB//PessimisticTransactionDB::PessimisticTransactionDB(    StackableDB* db, const TransactionDBOptions& txn_db_options)    : TransactionDB(db),      db_impl_(static_cast_with_check<DBImpl, DB>(db->GetRootDB())),      txn_db_options_(txn_db_options),      lock_mgr_(this, txn_db_options_.num_stripes, txn_db_options.max_num_locks,                txn_db_options_.max_num_deadlocks,                txn_db_options_.custom_mutex_factory                    ? txn_db_options_.custom_mutex_factory                    : std::shared_ptr<TransactionDBMutexFactory>(                          new TransactionDBMutexFactoryImpl())) {  assert(db_impl_ != nullptr);}PessimisticTransactionDB::~PessimisticTransactionDB() {  while (!transactions_.empty()) {    delete transactions_.begin()->second;    // TODO(myabandeh): this seems to be an unsafe approach as it is not quite    // clear whether delete would also remove the entry from transactions_.  }}Status PessimisticTransactionDB::VerifyCFOptions(const ColumnFamilyOptions&) {  return Status::OK();}Status PessimisticTransactionDB::Initialize(    const std::vector<size_t>& compaction_enabled_cf_indices,    const std::vector<ColumnFamilyHandle*>& handles) {  for (auto cf_ptr : handles) {    AddColumnFamily(cf_ptr);  }  // Verify cf options  for (auto handle : handles) {    ColumnFamilyDescriptor cfd;    Status s = handle->GetDescriptor(&cfd);    if (!s.ok()) {      return s;    }    s = VerifyCFOptions(cfd.options);    if (!s.ok()) {      return s;    }  }  // Re-enable compaction for the column families that initially had  // compaction enabled.  std::vector<ColumnFamilyHandle*> compaction_enabled_cf_handles;  compaction_enabled_cf_handles.reserve(compaction_enabled_cf_indices.size());  for (auto index : compaction_enabled_cf_indices) {    compaction_enabled_cf_handles.push_back(handles[index]);  }  Status s = EnableAutoCompaction(compaction_enabled_cf_handles);  // create 'real' transactions from recovered shell transactions  auto dbimpl = static_cast_with_check<DBImpl, DB>(GetRootDB());  assert(dbimpl != nullptr);  auto rtrxs = dbimpl->recovered_transactions();  for (auto it = rtrxs.begin(); it != rtrxs.end(); ++it) {    auto recovered_trx = it->second;    assert(recovered_trx);    assert(recovered_trx->batches_.size() == 1);    const auto& seq = recovered_trx->batches_.begin()->first;    const auto& batch_info = recovered_trx->batches_.begin()->second;    assert(batch_info.log_number_);    assert(recovered_trx->name_.length());    WriteOptions w_options;    w_options.sync = true;    TransactionOptions t_options;    // This would help avoiding deadlock for keys that although exist in the WAL    // did not go through concurrency control. This includes the merge that    // MyRocks uses for auto-inc columns. It is safe to do so, since (i) if    // there is a conflict between the keys of two transactions that must be    // avoided, it is already avoided by the application, MyRocks, before the    // restart (ii) application, MyRocks, guarntees to rollback/commit the    // recovered transactions before new transactions start.    t_options.skip_concurrency_control = true;    Transaction* real_trx = BeginTransaction(w_options, t_options, nullptr);    assert(real_trx);    real_trx->SetLogNumber(batch_info.log_number_);    assert(seq != kMaxSequenceNumber);    if (GetTxnDBOptions().write_policy != WRITE_COMMITTED) {      real_trx->SetId(seq);    }    s = real_trx->SetName(recovered_trx->name_);    if (!s.ok()) {      break;    }    s = real_trx->RebuildFromWriteBatch(batch_info.batch_);    // WriteCommitted set this to to disable this check that is specific to    // WritePrepared txns    assert(batch_info.batch_cnt_ == 0 ||           real_trx->GetWriteBatch()->SubBatchCnt() == batch_info.batch_cnt_);    real_trx->SetState(Transaction::PREPARED);    if (!s.ok()) {      break;    }  }  if (s.ok()) {    dbimpl->DeleteAllRecoveredTransactions();  }  return s;}Transaction* WriteCommittedTxnDB::BeginTransaction(    const WriteOptions& write_options, const TransactionOptions& txn_options,    Transaction* old_txn) {  if (old_txn != nullptr) {    ReinitializeTransaction(old_txn, write_options, txn_options);    return old_txn;  } else {    return new WriteCommittedTxn(this, write_options, txn_options);  }}TransactionDBOptions PessimisticTransactionDB::ValidateTxnDBOptions(    const TransactionDBOptions& txn_db_options) {  TransactionDBOptions validated = txn_db_options;  if (txn_db_options.num_stripes == 0) {    validated.num_stripes = 1;  }  return validated;}Status TransactionDB::Open(const Options& options,                           const TransactionDBOptions& txn_db_options,                           const std::string& dbname, TransactionDB** dbptr) {  DBOptions db_options(options);  ColumnFamilyOptions cf_options(options);  std::vector<ColumnFamilyDescriptor> column_families;  column_families.push_back(      ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options));  std::vector<ColumnFamilyHandle*> handles;  Status s = TransactionDB::Open(db_options, txn_db_options, dbname,                                 column_families, &handles, dbptr);  if (s.ok()) {    assert(handles.size() == 1);    // i can delete the handle since DBImpl is always holding a reference to    // default column family    delete handles[0];  }  return s;}Status TransactionDB::Open(    const DBOptions& db_options, const TransactionDBOptions& txn_db_options,    const std::string& dbname,    const std::vector<ColumnFamilyDescriptor>& column_families,    std::vector<ColumnFamilyHandle*>* handles, TransactionDB** dbptr) {  Status s;  DB* db = nullptr;  if (txn_db_options.write_policy == WRITE_COMMITTED &&      db_options.unordered_write) {    return Status::NotSupported(        "WRITE_COMMITTED is incompatible with unordered_writes");  }  if (txn_db_options.write_policy == WRITE_UNPREPARED &&      db_options.unordered_write) {    // TODO(lth): support it    return Status::NotSupported(        "WRITE_UNPREPARED is currently incompatible with unordered_writes");  }  if (txn_db_options.write_policy == WRITE_PREPARED &&      db_options.unordered_write && !db_options.two_write_queues) {    return Status::NotSupported(        "WRITE_PREPARED is incompatible with unordered_writes if "        "two_write_queues is not enabled.");  }  std::vector<ColumnFamilyDescriptor> column_families_copy = column_families;  std::vector<size_t> compaction_enabled_cf_indices;  DBOptions db_options_2pc = db_options;  PrepareWrap(&db_options_2pc, &column_families_copy,              &compaction_enabled_cf_indices);  const bool use_seq_per_batch =      txn_db_options.write_policy == WRITE_PREPARED ||      txn_db_options.write_policy == WRITE_UNPREPARED;  const bool use_batch_per_txn =      txn_db_options.write_policy == WRITE_COMMITTED ||      txn_db_options.write_policy == WRITE_PREPARED;  s = DBImpl::Open(db_options_2pc, dbname, column_families_copy, handles, &db,                   use_seq_per_batch, use_batch_per_txn);  if (s.ok()) {    ROCKS_LOG_WARN(db->GetDBOptions().info_log,                   "Transaction write_policy is %" PRId32,                   static_cast<int>(txn_db_options.write_policy));    s = WrapDB(db, txn_db_options, compaction_enabled_cf_indices, *handles,               dbptr);  }  if (!s.ok()) {    // just in case it was not deleted (and not set to nullptr).    delete db;  }  return s;}void TransactionDB::PrepareWrap(    DBOptions* db_options, std::vector<ColumnFamilyDescriptor>* column_families,    std::vector<size_t>* compaction_enabled_cf_indices) {  compaction_enabled_cf_indices->clear();  // Enable MemTable History if not already enabled  for (size_t i = 0; i < column_families->size(); i++) {    ColumnFamilyOptions* cf_options = &(*column_families)[i].options;    if (cf_options->max_write_buffer_size_to_maintain == 0 &&        cf_options->max_write_buffer_number_to_maintain == 0) {      // Setting to -1 will set the History size to      // max_write_buffer_number * write_buffer_size.      cf_options->max_write_buffer_size_to_maintain = -1;    }    if (!cf_options->disable_auto_compactions) {      // Disable compactions momentarily to prevent race with DB::Open      cf_options->disable_auto_compactions = true;      compaction_enabled_cf_indices->push_back(i);    }  }  db_options->allow_2pc = true;}Status TransactionDB::WrapDB(    // make sure this db is already opened with memtable history enabled,    // auto compaction distabled and 2 phase commit enabled    DB* db, const TransactionDBOptions& txn_db_options,    const std::vector<size_t>& compaction_enabled_cf_indices,    const std::vector<ColumnFamilyHandle*>& handles, TransactionDB** dbptr) {  assert(db != nullptr);  assert(dbptr != nullptr);  *dbptr = nullptr;  std::unique_ptr<PessimisticTransactionDB> txn_db;  switch (txn_db_options.write_policy) {    case WRITE_UNPREPARED:      txn_db.reset(new WriteUnpreparedTxnDB(          db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options)));      break;    case WRITE_PREPARED:      txn_db.reset(new WritePreparedTxnDB(          db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options)));      break;    case WRITE_COMMITTED:    default:      txn_db.reset(new WriteCommittedTxnDB(          db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options)));  }  txn_db->UpdateCFComparatorMap(handles);  Status s = txn_db->Initialize(compaction_enabled_cf_indices, handles);  // In case of a failure at this point, db is deleted via the txn_db destructor  // and set to nullptr.  if (s.ok()) {    *dbptr = txn_db.release();  }  return s;}Status TransactionDB::WrapStackableDB(    // make sure this stackable_db is already opened with memtable history    // enabled, auto compaction distabled and 2 phase commit enabled    StackableDB* db, const TransactionDBOptions& txn_db_options,    const std::vector<size_t>& compaction_enabled_cf_indices,    const std::vector<ColumnFamilyHandle*>& handles, TransactionDB** dbptr) {  assert(db != nullptr);  assert(dbptr != nullptr);  *dbptr = nullptr;  std::unique_ptr<PessimisticTransactionDB> txn_db;  switch (txn_db_options.write_policy) {    case WRITE_UNPREPARED:      txn_db.reset(new WriteUnpreparedTxnDB(          db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options)));      break;    case WRITE_PREPARED:      txn_db.reset(new WritePreparedTxnDB(          db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options)));      break;    case WRITE_COMMITTED:    default:      txn_db.reset(new WriteCommittedTxnDB(          db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options)));  }  txn_db->UpdateCFComparatorMap(handles);  Status s = txn_db->Initialize(compaction_enabled_cf_indices, handles);  // In case of a failure at this point, db is deleted via the txn_db destructor  // and set to nullptr.  if (s.ok()) {    *dbptr = txn_db.release();  }  return s;}// Let TransactionLockMgr know that this column family exists so it can// allocate a LockMap for it.void PessimisticTransactionDB::AddColumnFamily(    const ColumnFamilyHandle* handle) {  lock_mgr_.AddColumnFamily(handle->GetID());}Status PessimisticTransactionDB::CreateColumnFamily(    const ColumnFamilyOptions& options, const std::string& column_family_name,    ColumnFamilyHandle** handle) {  InstrumentedMutexLock l(&column_family_mutex_);  Status s = VerifyCFOptions(options);  if (!s.ok()) {    return s;  }  s = db_->CreateColumnFamily(options, column_family_name, handle);  if (s.ok()) {    lock_mgr_.AddColumnFamily((*handle)->GetID());    UpdateCFComparatorMap(*handle);  }  return s;}// Let TransactionLockMgr know that it can deallocate the LockMap for this// column family.Status PessimisticTransactionDB::DropColumnFamily(    ColumnFamilyHandle* column_family) {  InstrumentedMutexLock l(&column_family_mutex_);  Status s = db_->DropColumnFamily(column_family);  if (s.ok()) {    lock_mgr_.RemoveColumnFamily(column_family->GetID());  }  return s;}Status PessimisticTransactionDB::TryLock(PessimisticTransaction* txn,                                         uint32_t cfh_id,                                         const std::string& key,                                         bool exclusive) {  return lock_mgr_.TryLock(txn, cfh_id, key, GetEnv(), exclusive);}void PessimisticTransactionDB::UnLock(PessimisticTransaction* txn,                                      const TransactionKeyMap* keys) {  lock_mgr_.UnLock(txn, keys, GetEnv());}void PessimisticTransactionDB::UnLock(PessimisticTransaction* txn,                                      uint32_t cfh_id, const std::string& key) {  lock_mgr_.UnLock(txn, cfh_id, key, GetEnv());}// Used when wrapping DB write operations in a transactionTransaction* PessimisticTransactionDB::BeginInternalTransaction(    const WriteOptions& options) {  TransactionOptions txn_options;  Transaction* txn = BeginTransaction(options, txn_options, nullptr);  // Use default timeout for non-transactional writes  txn->SetLockTimeout(txn_db_options_.default_lock_timeout);  return txn;}// All user Put, Merge, Delete, and Write requests must be intercepted to make// sure that they lock all keys that they are writing to avoid causing conflicts// with any concurrent transactions. The easiest way to do this is to wrap all// write operations in a transaction.//// Put(), Merge(), and Delete() only lock a single key per call.  Write() will// sort its keys before locking them.  This guarantees that TransactionDB write// methods cannot deadlock with each other (but still could deadlock with a// Transaction).Status PessimisticTransactionDB::Put(const WriteOptions& options,                                     ColumnFamilyHandle* column_family,                                     const Slice& key, const Slice& val) {  Status s;  Transaction* txn = BeginInternalTransaction(options);  txn->DisableIndexing();  // Since the client didn't create a transaction, they don't care about  // conflict checking for this write.  So we just need to do PutUntracked().  s = txn->PutUntracked(column_family, key, val);  if (s.ok()) {    s = txn->Commit();  }  delete txn;  return s;}Status PessimisticTransactionDB::Delete(const WriteOptions& wopts,                                        ColumnFamilyHandle* column_family,                                        const Slice& key) {  Status s;  Transaction* txn = BeginInternalTransaction(wopts);  txn->DisableIndexing();  // Since the client didn't create a transaction, they don't care about  // conflict checking for this write.  So we just need to do  // DeleteUntracked().  s = txn->DeleteUntracked(column_family, key);  if (s.ok()) {    s = txn->Commit();  }  delete txn;  return s;}Status PessimisticTransactionDB::SingleDelete(const WriteOptions& wopts,                                              ColumnFamilyHandle* column_family,                                              const Slice& key) {  Status s;  Transaction* txn = BeginInternalTransaction(wopts);  txn->DisableIndexing();  // Since the client didn't create a transaction, they don't care about  // conflict checking for this write.  So we just need to do  // SingleDeleteUntracked().  s = txn->SingleDeleteUntracked(column_family, key);  if (s.ok()) {    s = txn->Commit();  }  delete txn;  return s;}Status PessimisticTransactionDB::Merge(const WriteOptions& options,                                       ColumnFamilyHandle* column_family,                                       const Slice& key, const Slice& value) {  Status s;  Transaction* txn = BeginInternalTransaction(options);  txn->DisableIndexing();  // Since the client didn't create a transaction, they don't care about  // conflict checking for this write.  So we just need to do  // MergeUntracked().  s = txn->MergeUntracked(column_family, key, value);  if (s.ok()) {    s = txn->Commit();  }  delete txn;  return s;}Status PessimisticTransactionDB::Write(const WriteOptions& opts,                                       WriteBatch* updates) {  return WriteWithConcurrencyControl(opts, updates);}Status WriteCommittedTxnDB::Write(const WriteOptions& opts,                                  WriteBatch* updates) {  if (txn_db_options_.skip_concurrency_control) {    return db_impl_->Write(opts, updates);  } else {    return WriteWithConcurrencyControl(opts, updates);  }}Status WriteCommittedTxnDB::Write(    const WriteOptions& opts,    const TransactionDBWriteOptimizations& optimizations, WriteBatch* updates) {  if (optimizations.skip_concurrency_control) {    return db_impl_->Write(opts, updates);  } else {    return WriteWithConcurrencyControl(opts, updates);  }}void PessimisticTransactionDB::InsertExpirableTransaction(    TransactionID tx_id, PessimisticTransaction* tx) {  assert(tx->GetExpirationTime() > 0);  std::lock_guard<std::mutex> lock(map_mutex_);  expirable_transactions_map_.insert({tx_id, tx});}void PessimisticTransactionDB::RemoveExpirableTransaction(TransactionID tx_id) {  std::lock_guard<std::mutex> lock(map_mutex_);  expirable_transactions_map_.erase(tx_id);}bool PessimisticTransactionDB::TryStealingExpiredTransactionLocks(    TransactionID tx_id) {  std::lock_guard<std::mutex> lock(map_mutex_);  auto tx_it = expirable_transactions_map_.find(tx_id);  if (tx_it == expirable_transactions_map_.end()) {    return true;  }  PessimisticTransaction& tx = *(tx_it->second);  return tx.TryStealingLocks();}void PessimisticTransactionDB::ReinitializeTransaction(    Transaction* txn, const WriteOptions& write_options,    const TransactionOptions& txn_options) {  auto txn_impl =      static_cast_with_check<PessimisticTransaction, Transaction>(txn);  txn_impl->Reinitialize(this, write_options, txn_options);}Transaction* PessimisticTransactionDB::GetTransactionByName(    const TransactionName& name) {  std::lock_guard<std::mutex> lock(name_map_mutex_);  auto it = transactions_.find(name);  if (it == transactions_.end()) {    return nullptr;  } else {    return it->second;  }}void PessimisticTransactionDB::GetAllPreparedTransactions(    std::vector<Transaction*>* transv) {  assert(transv);  transv->clear();  std::lock_guard<std::mutex> lock(name_map_mutex_);  for (auto it = transactions_.begin(); it != transactions_.end(); ++it) {    if (it->second->GetState() == Transaction::PREPARED) {      transv->push_back(it->second);    }  }}TransactionLockMgr::LockStatusDataPessimisticTransactionDB::GetLockStatusData() {  return lock_mgr_.GetLockStatusData();}std::vector<DeadlockPath> PessimisticTransactionDB::GetDeadlockInfoBuffer() {  return lock_mgr_.GetDeadlockInfoBuffer();}void PessimisticTransactionDB::SetDeadlockInfoBufferSize(uint32_t target_size) {  lock_mgr_.Resize(target_size);}void PessimisticTransactionDB::RegisterTransaction(Transaction* txn) {  assert(txn);  assert(txn->GetName().length() > 0);  assert(GetTransactionByName(txn->GetName()) == nullptr);  assert(txn->GetState() == Transaction::STARTED);  std::lock_guard<std::mutex> lock(name_map_mutex_);  transactions_[txn->GetName()] = txn;}void PessimisticTransactionDB::UnregisterTransaction(Transaction* txn) {  assert(txn);  std::lock_guard<std::mutex> lock(name_map_mutex_);  auto it = transactions_.find(txn->GetName());  assert(it != transactions_.end());  transactions_.erase(it);}}  // namespace ROCKSDB_NAMESPACE#endif  // ROCKSDB_LITE
 |