optimistic_transaction.cc 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #include "utilities/transactions/optimistic_transaction.h"
  6. #include <cstdint>
  7. #include <string>
  8. #include "db/column_family.h"
  9. #include "db/db_impl/db_impl.h"
  10. #include "rocksdb/comparator.h"
  11. #include "rocksdb/db.h"
  12. #include "rocksdb/status.h"
  13. #include "rocksdb/utilities/optimistic_transaction_db.h"
  14. #include "util/cast_util.h"
  15. #include "util/defer.h"
  16. #include "util/string_util.h"
  17. #include "utilities/transactions/lock/point/point_lock_tracker.h"
  18. #include "utilities/transactions/optimistic_transaction_db_impl.h"
  19. #include "utilities/transactions/transaction_util.h"
  20. namespace ROCKSDB_NAMESPACE {
  21. struct WriteOptions;
  22. OptimisticTransaction::OptimisticTransaction(
  23. OptimisticTransactionDB* txn_db, const WriteOptions& write_options,
  24. const OptimisticTransactionOptions& txn_options)
  25. : TransactionBaseImpl(txn_db->GetBaseDB(), write_options,
  26. PointLockTrackerFactory::Get()),
  27. txn_db_(txn_db) {
  28. Initialize(txn_options);
  29. }
  30. void OptimisticTransaction::Initialize(
  31. const OptimisticTransactionOptions& txn_options) {
  32. if (txn_options.set_snapshot) {
  33. SetSnapshot();
  34. }
  35. }
  36. void OptimisticTransaction::Reinitialize(
  37. OptimisticTransactionDB* txn_db, const WriteOptions& write_options,
  38. const OptimisticTransactionOptions& txn_options) {
  39. TransactionBaseImpl::Reinitialize(txn_db->GetBaseDB(), write_options);
  40. Initialize(txn_options);
  41. }
  42. OptimisticTransaction::~OptimisticTransaction() = default;
  43. void OptimisticTransaction::Clear() { TransactionBaseImpl::Clear(); }
  44. Status OptimisticTransaction::Prepare() {
  45. return Status::InvalidArgument(
  46. "Two phase commit not supported for optimistic transactions.");
  47. }
  48. Status OptimisticTransaction::Commit() {
  49. auto txn_db_impl = static_cast_with_check<OptimisticTransactionDBImpl,
  50. OptimisticTransactionDB>(txn_db_);
  51. assert(txn_db_impl);
  52. switch (txn_db_impl->GetValidatePolicy()) {
  53. case OccValidationPolicy::kValidateParallel:
  54. return CommitWithParallelValidate();
  55. case OccValidationPolicy::kValidateSerial:
  56. return CommitWithSerialValidate();
  57. default:
  58. assert(0);
  59. }
  60. // unreachable, just void compiler complain
  61. return Status::OK();
  62. }
  63. Status OptimisticTransaction::CommitWithSerialValidate() {
  64. // Set up callback which will call CheckTransactionForConflicts() to
  65. // check whether this transaction is safe to be committed.
  66. OptimisticTransactionCallback callback(this);
  67. DBImpl* db_impl = static_cast_with_check<DBImpl>(db_->GetRootDB());
  68. Status s = db_impl->WriteWithCallback(
  69. write_options_, GetWriteBatch()->GetWriteBatch(), &callback);
  70. if (s.ok()) {
  71. Clear();
  72. }
  73. return s;
  74. }
  75. Status OptimisticTransaction::CommitWithParallelValidate() {
  76. auto txn_db_impl = static_cast_with_check<OptimisticTransactionDBImpl,
  77. OptimisticTransactionDB>(txn_db_);
  78. assert(txn_db_impl);
  79. DBImpl* db_impl = static_cast_with_check<DBImpl>(db_->GetRootDB());
  80. assert(db_impl);
  81. std::set<port::Mutex*> lk_ptrs;
  82. std::unique_ptr<LockTracker::ColumnFamilyIterator> cf_it(
  83. tracked_locks_->GetColumnFamilyIterator());
  84. assert(cf_it != nullptr);
  85. while (cf_it->HasNext()) {
  86. ColumnFamilyId cf = cf_it->Next();
  87. // To avoid the same key(s) contending across CFs or DBs, seed the
  88. // hash independently.
  89. uint64_t seed = reinterpret_cast<uintptr_t>(db_impl) +
  90. uint64_t{0xb83c07fbc6ced699} /*random prime*/ * cf;
  91. std::unique_ptr<LockTracker::KeyIterator> key_it(
  92. tracked_locks_->GetKeyIterator(cf));
  93. assert(key_it != nullptr);
  94. while (key_it->HasNext()) {
  95. auto lock_bucket_ptr = &txn_db_impl->GetLockBucket(key_it->Next(), seed);
  96. TEST_SYNC_POINT_CALLBACK(
  97. "OptimisticTransaction::CommitWithParallelValidate::lock_bucket_ptr",
  98. lock_bucket_ptr);
  99. lk_ptrs.insert(lock_bucket_ptr);
  100. }
  101. }
  102. // NOTE: in a single txn, all bucket-locks are taken in ascending order.
  103. // In this way, txns from different threads all obey this rule so that
  104. // deadlock can be avoided.
  105. for (auto v : lk_ptrs) {
  106. // WART: if an exception is thrown during a Lock(), previously locked will
  107. // not be Unlock()ed. But a vector of MutexLock is likely inefficient.
  108. v->Lock();
  109. }
  110. Defer unlocks([&]() {
  111. for (auto v : lk_ptrs) {
  112. v->Unlock();
  113. }
  114. });
  115. Status s = TransactionUtil::CheckKeysForConflicts(db_impl, *tracked_locks_,
  116. true /* cache_only */);
  117. if (!s.ok()) {
  118. return s;
  119. }
  120. s = db_impl->Write(write_options_, GetWriteBatch()->GetWriteBatch());
  121. if (s.ok()) {
  122. Clear();
  123. }
  124. return s;
  125. }
  126. Status OptimisticTransaction::Rollback() {
  127. Clear();
  128. return Status::OK();
  129. }
  130. // Record this key so that we can check it for conflicts at commit time.
  131. //
  132. // 'exclusive' is unused for OptimisticTransaction.
  133. Status OptimisticTransaction::TryLock(ColumnFamilyHandle* column_family,
  134. const Slice& key, bool read_only,
  135. bool exclusive, const bool do_validate,
  136. const bool assume_tracked) {
  137. assert(!assume_tracked); // not supported
  138. (void)assume_tracked;
  139. if (!do_validate) {
  140. return Status::OK();
  141. }
  142. uint32_t cfh_id = GetColumnFamilyID(column_family);
  143. SetSnapshotIfNeeded();
  144. SequenceNumber seq;
  145. if (snapshot_) {
  146. seq = snapshot_->GetSequenceNumber();
  147. } else {
  148. seq = db_->GetLatestSequenceNumber();
  149. }
  150. std::string key_str = key.ToString();
  151. TrackKey(cfh_id, key_str, seq, read_only, exclusive);
  152. // Always return OK. Confilct checking will happen at commit time.
  153. return Status::OK();
  154. }
  155. // Returns OK if it is safe to commit this transaction. Returns Status::Busy
  156. // if there are read or write conflicts that would prevent us from committing OR
  157. // if we can not determine whether there would be any such conflicts.
  158. //
  159. // Should only be called on writer thread in order to avoid any race conditions
  160. // in detecting write conflicts.
  161. Status OptimisticTransaction::CheckTransactionForConflicts(DB* db) {
  162. auto db_impl = static_cast_with_check<DBImpl>(db);
  163. // Since we are on the write thread and do not want to block other writers,
  164. // we will do a cache-only conflict check. This can result in TryAgain
  165. // getting returned if there is not sufficient memtable history to check
  166. // for conflicts.
  167. return TransactionUtil::CheckKeysForConflicts(db_impl, *tracked_locks_,
  168. true /* cache_only */);
  169. }
  170. Status OptimisticTransaction::SetName(const TransactionName& /* unused */) {
  171. return Status::InvalidArgument("Optimistic transactions cannot be named.");
  172. }
  173. } // namespace ROCKSDB_NAMESPACE