| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182 | //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.//  This source code is licensed under both the GPLv2 (found in the//  COPYING file in the root directory) and Apache 2.0 License//  (found in the LICENSE.Apache file in the root directory).#ifndef ROCKSDB_LITE#include "utilities/transactions/transaction_util.h"#include <cinttypes>#include <string>#include <vector>#include "db/db_impl/db_impl.h"#include "rocksdb/status.h"#include "rocksdb/utilities/write_batch_with_index.h"#include "util/string_util.h"namespace ROCKSDB_NAMESPACE {Status TransactionUtil::CheckKeyForConflicts(    DBImpl* db_impl, ColumnFamilyHandle* column_family, const std::string& key,    SequenceNumber snap_seq, bool cache_only, ReadCallback* snap_checker,    SequenceNumber min_uncommitted) {  Status result;  auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);  auto cfd = cfh->cfd();  SuperVersion* sv = db_impl->GetAndRefSuperVersion(cfd);  if (sv == nullptr) {    result = Status::InvalidArgument("Could not access column family " +                                     cfh->GetName());  }  if (result.ok()) {    SequenceNumber earliest_seq =        db_impl->GetEarliestMemTableSequenceNumber(sv, true);    result = CheckKey(db_impl, sv, earliest_seq, snap_seq, key, cache_only,                      snap_checker, min_uncommitted);    db_impl->ReturnAndCleanupSuperVersion(cfd, sv);  }  return result;}Status TransactionUtil::CheckKey(DBImpl* db_impl, SuperVersion* sv,                                 SequenceNumber earliest_seq,                                 SequenceNumber snap_seq,                                 const std::string& key, bool cache_only,                                 ReadCallback* snap_checker,                                 SequenceNumber min_uncommitted) {  // When `min_uncommitted` is provided, keys are not always committed  // in sequence number order, and `snap_checker` is used to check whether  // specific sequence number is in the database is visible to the transaction.  // So `snap_checker` must be provided.  assert(min_uncommitted == kMaxSequenceNumber || snap_checker != nullptr);  Status result;  bool need_to_read_sst = false;  // Since it would be too slow to check the SST files, we will only use  // the memtables to check whether there have been any recent writes  // to this key after it was accessed in this transaction.  But if the  // Memtables do not contain a long enough history, we must fail the  // transaction.  if (earliest_seq == kMaxSequenceNumber) {    // The age of this memtable is unknown.  Cannot rely on it to check    // for recent writes.  This error shouldn't happen often in practice as    // the Memtable should have a valid earliest sequence number except in some    // corner cases (such as error cases during recovery).    need_to_read_sst = true;    if (cache_only) {      result = Status::TryAgain(          "Transaction could not check for conflicts as the MemTable does not "          "contain a long enough history to check write at SequenceNumber: ",          ToString(snap_seq));    }  } else if (snap_seq < earliest_seq || min_uncommitted <= earliest_seq) {    // Use <= for min_uncommitted since earliest_seq is actually the largest sec    // before this memtable was created    need_to_read_sst = true;    if (cache_only) {      // The age of this memtable is too new to use to check for recent      // writes.      char msg[300];      snprintf(msg, sizeof(msg),               "Transaction could not check for conflicts for operation at "               "SequenceNumber %" PRIu64               " as the MemTable only contains changes newer than "               "SequenceNumber %" PRIu64               ".  Increasing the value of the "               "max_write_buffer_size_to_maintain option could reduce the "               "frequency "               "of this error.",               snap_seq, earliest_seq);      result = Status::TryAgain(msg);    }  }  if (result.ok()) {    SequenceNumber seq = kMaxSequenceNumber;    bool found_record_for_key = false;    // When min_uncommitted == kMaxSequenceNumber, writes are committed in    // sequence number order, so only keys larger than `snap_seq` can cause    // conflict.    // When min_uncommitted != kMaxSequenceNumber, keys lower than    // min_uncommitted will not triggered conflicts, while keys larger than    // min_uncommitted might create conflicts, so we need  to read them out    // from the DB, and call callback to snap_checker to determine. So only    // keys lower than min_uncommitted can be skipped.    SequenceNumber lower_bound_seq =        (min_uncommitted == kMaxSequenceNumber) ? snap_seq : min_uncommitted;    Status s = db_impl->GetLatestSequenceForKey(sv, key, !need_to_read_sst,                                                lower_bound_seq, &seq,                                                &found_record_for_key);    if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) {      result = s;    } else if (found_record_for_key) {      bool write_conflict = snap_checker == nullptr                                ? snap_seq < seq                                : !snap_checker->IsVisible(seq);      if (write_conflict) {        result = Status::Busy();      }    }  }  return result;}Status TransactionUtil::CheckKeysForConflicts(DBImpl* db_impl,                                              const TransactionKeyMap& key_map,                                              bool cache_only) {  Status result;  for (auto& key_map_iter : key_map) {    uint32_t cf_id = key_map_iter.first;    const auto& keys = key_map_iter.second;    SuperVersion* sv = db_impl->GetAndRefSuperVersion(cf_id);    if (sv == nullptr) {      result = Status::InvalidArgument("Could not access column family " +                                       ToString(cf_id));      break;    }    SequenceNumber earliest_seq =        db_impl->GetEarliestMemTableSequenceNumber(sv, true);    // For each of the keys in this transaction, check to see if someone has    // written to this key since the start of the transaction.    for (const auto& key_iter : keys) {      const auto& key = key_iter.first;      const SequenceNumber key_seq = key_iter.second.seq;      result = CheckKey(db_impl, sv, earliest_seq, key_seq, key, cache_only);      if (!result.ok()) {        break;      }    }    db_impl->ReturnAndCleanupSuperVersion(cf_id, sv);    if (!result.ok()) {      break;    }  }  return result;}}  // namespace ROCKSDB_NAMESPACE#endif  // ROCKSDB_LITE
 |