write_prepared_txn_db.cc 41 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #ifndef ROCKSDB_LITE
  6. #include "utilities/transactions/write_prepared_txn_db.h"
  7. #include <algorithm>
  8. #include <cinttypes>
  9. #include <string>
  10. #include <unordered_set>
  11. #include <vector>
  12. #include "db/arena_wrapped_db_iter.h"
  13. #include "db/db_impl/db_impl.h"
  14. #include "rocksdb/db.h"
  15. #include "rocksdb/options.h"
  16. #include "rocksdb/utilities/transaction_db.h"
  17. #include "test_util/sync_point.h"
  18. #include "util/cast_util.h"
  19. #include "util/mutexlock.h"
  20. #include "util/string_util.h"
  21. #include "utilities/transactions/pessimistic_transaction.h"
  22. #include "utilities/transactions/transaction_db_mutex_impl.h"
  23. namespace ROCKSDB_NAMESPACE {
  24. Status WritePreparedTxnDB::Initialize(
  25. const std::vector<size_t>& compaction_enabled_cf_indices,
  26. const std::vector<ColumnFamilyHandle*>& handles) {
  27. auto dbimpl = static_cast_with_check<DBImpl, DB>(GetRootDB());
  28. assert(dbimpl != nullptr);
  29. auto rtxns = dbimpl->recovered_transactions();
  30. std::map<SequenceNumber, SequenceNumber> ordered_seq_cnt;
  31. for (auto rtxn : rtxns) {
  32. // There should only one batch for WritePrepared policy.
  33. assert(rtxn.second->batches_.size() == 1);
  34. const auto& seq = rtxn.second->batches_.begin()->first;
  35. const auto& batch_info = rtxn.second->batches_.begin()->second;
  36. auto cnt = batch_info.batch_cnt_ ? batch_info.batch_cnt_ : 1;
  37. ordered_seq_cnt[seq] = cnt;
  38. }
  39. // AddPrepared must be called in order
  40. for (auto seq_cnt : ordered_seq_cnt) {
  41. auto seq = seq_cnt.first;
  42. auto cnt = seq_cnt.second;
  43. for (size_t i = 0; i < cnt; i++) {
  44. AddPrepared(seq + i);
  45. }
  46. }
  47. SequenceNumber prev_max = max_evicted_seq_;
  48. SequenceNumber last_seq = db_impl_->GetLatestSequenceNumber();
  49. AdvanceMaxEvictedSeq(prev_max, last_seq);
  50. // Create a gap between max and the next snapshot. This simplifies the logic
  51. // in IsInSnapshot by not having to consider the special case of max ==
  52. // snapshot after recovery. This is tested in IsInSnapshotEmptyMapTest.
  53. if (last_seq) {
  54. db_impl_->versions_->SetLastAllocatedSequence(last_seq + 1);
  55. db_impl_->versions_->SetLastSequence(last_seq + 1);
  56. db_impl_->versions_->SetLastPublishedSequence(last_seq + 1);
  57. }
  58. db_impl_->SetSnapshotChecker(new WritePreparedSnapshotChecker(this));
  59. // A callback to commit a single sub-batch
  60. class CommitSubBatchPreReleaseCallback : public PreReleaseCallback {
  61. public:
  62. explicit CommitSubBatchPreReleaseCallback(WritePreparedTxnDB* db)
  63. : db_(db) {}
  64. Status Callback(SequenceNumber commit_seq,
  65. bool is_mem_disabled __attribute__((__unused__)), uint64_t,
  66. size_t /*index*/, size_t /*total*/) override {
  67. assert(!is_mem_disabled);
  68. db_->AddCommitted(commit_seq, commit_seq);
  69. return Status::OK();
  70. }
  71. private:
  72. WritePreparedTxnDB* db_;
  73. };
  74. db_impl_->SetRecoverableStatePreReleaseCallback(
  75. new CommitSubBatchPreReleaseCallback(this));
  76. auto s = PessimisticTransactionDB::Initialize(compaction_enabled_cf_indices,
  77. handles);
  78. return s;
  79. }
  80. Status WritePreparedTxnDB::VerifyCFOptions(
  81. const ColumnFamilyOptions& cf_options) {
  82. Status s = PessimisticTransactionDB::VerifyCFOptions(cf_options);
  83. if (!s.ok()) {
  84. return s;
  85. }
  86. if (!cf_options.memtable_factory->CanHandleDuplicatedKey()) {
  87. return Status::InvalidArgument(
  88. "memtable_factory->CanHandleDuplicatedKey() cannot be false with "
  89. "WritePrpeared transactions");
  90. }
  91. return Status::OK();
  92. }
  93. Transaction* WritePreparedTxnDB::BeginTransaction(
  94. const WriteOptions& write_options, const TransactionOptions& txn_options,
  95. Transaction* old_txn) {
  96. if (old_txn != nullptr) {
  97. ReinitializeTransaction(old_txn, write_options, txn_options);
  98. return old_txn;
  99. } else {
  100. return new WritePreparedTxn(this, write_options, txn_options);
  101. }
  102. }
  103. Status WritePreparedTxnDB::Write(const WriteOptions& opts,
  104. WriteBatch* updates) {
  105. if (txn_db_options_.skip_concurrency_control) {
  106. // Skip locking the rows
  107. const size_t UNKNOWN_BATCH_CNT = 0;
  108. WritePreparedTxn* NO_TXN = nullptr;
  109. return WriteInternal(opts, updates, UNKNOWN_BATCH_CNT, NO_TXN);
  110. } else {
  111. return PessimisticTransactionDB::WriteWithConcurrencyControl(opts, updates);
  112. }
  113. }
  114. Status WritePreparedTxnDB::Write(
  115. const WriteOptions& opts,
  116. const TransactionDBWriteOptimizations& optimizations, WriteBatch* updates) {
  117. if (optimizations.skip_concurrency_control) {
  118. // Skip locking the rows
  119. const size_t UNKNOWN_BATCH_CNT = 0;
  120. const size_t ONE_BATCH_CNT = 1;
  121. const size_t batch_cnt = optimizations.skip_duplicate_key_check
  122. ? ONE_BATCH_CNT
  123. : UNKNOWN_BATCH_CNT;
  124. WritePreparedTxn* NO_TXN = nullptr;
  125. return WriteInternal(opts, updates, batch_cnt, NO_TXN);
  126. } else {
  127. // TODO(myabandeh): Make use of skip_duplicate_key_check hint
  128. // Fall back to unoptimized version
  129. return PessimisticTransactionDB::WriteWithConcurrencyControl(opts, updates);
  130. }
  131. }
  132. Status WritePreparedTxnDB::WriteInternal(const WriteOptions& write_options_orig,
  133. WriteBatch* batch, size_t batch_cnt,
  134. WritePreparedTxn* txn) {
  135. ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log,
  136. "CommitBatchInternal");
  137. if (batch->Count() == 0) {
  138. // Otherwise our 1 seq per batch logic will break since there is no seq
  139. // increased for this batch.
  140. return Status::OK();
  141. }
  142. if (batch_cnt == 0) { // not provided, then compute it
  143. // TODO(myabandeh): add an option to allow user skipping this cost
  144. SubBatchCounter counter(*GetCFComparatorMap());
  145. auto s = batch->Iterate(&counter);
  146. assert(s.ok());
  147. batch_cnt = counter.BatchCount();
  148. WPRecordTick(TXN_DUPLICATE_KEY_OVERHEAD);
  149. ROCKS_LOG_DETAILS(info_log_, "Duplicate key overhead: %" PRIu64 " batches",
  150. static_cast<uint64_t>(batch_cnt));
  151. }
  152. assert(batch_cnt);
  153. bool do_one_write = !db_impl_->immutable_db_options().two_write_queues;
  154. WriteOptions write_options(write_options_orig);
  155. // In the absence of Prepare markers, use Noop as a batch separator
  156. WriteBatchInternal::InsertNoop(batch);
  157. const bool DISABLE_MEMTABLE = true;
  158. const uint64_t no_log_ref = 0;
  159. uint64_t seq_used = kMaxSequenceNumber;
  160. const size_t ZERO_PREPARES = 0;
  161. const bool kSeperatePrepareCommitBatches = true;
  162. // Since this is not 2pc, there is no need for AddPrepared but having it in
  163. // the PreReleaseCallback enables an optimization. Refer to
  164. // SmallestUnCommittedSeq for more details.
  165. AddPreparedCallback add_prepared_callback(
  166. this, db_impl_, batch_cnt,
  167. db_impl_->immutable_db_options().two_write_queues,
  168. !kSeperatePrepareCommitBatches);
  169. WritePreparedCommitEntryPreReleaseCallback update_commit_map(
  170. this, db_impl_, kMaxSequenceNumber, ZERO_PREPARES, batch_cnt);
  171. PreReleaseCallback* pre_release_callback;
  172. if (do_one_write) {
  173. pre_release_callback = &update_commit_map;
  174. } else {
  175. pre_release_callback = &add_prepared_callback;
  176. }
  177. auto s = db_impl_->WriteImpl(write_options, batch, nullptr, nullptr,
  178. no_log_ref, !DISABLE_MEMTABLE, &seq_used,
  179. batch_cnt, pre_release_callback);
  180. assert(!s.ok() || seq_used != kMaxSequenceNumber);
  181. uint64_t prepare_seq = seq_used;
  182. if (txn != nullptr) {
  183. txn->SetId(prepare_seq);
  184. }
  185. if (!s.ok()) {
  186. return s;
  187. }
  188. if (do_one_write) {
  189. return s;
  190. } // else do the 2nd write for commit
  191. ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log,
  192. "CommitBatchInternal 2nd write prepare_seq: %" PRIu64,
  193. prepare_seq);
  194. // Commit the batch by writing an empty batch to the 2nd queue that will
  195. // release the commit sequence number to readers.
  196. const size_t ZERO_COMMITS = 0;
  197. WritePreparedCommitEntryPreReleaseCallback update_commit_map_with_prepare(
  198. this, db_impl_, prepare_seq, batch_cnt, ZERO_COMMITS);
  199. WriteBatch empty_batch;
  200. write_options.disableWAL = true;
  201. write_options.sync = false;
  202. const size_t ONE_BATCH = 1; // Just to inc the seq
  203. s = db_impl_->WriteImpl(write_options, &empty_batch, nullptr, nullptr,
  204. no_log_ref, DISABLE_MEMTABLE, &seq_used, ONE_BATCH,
  205. &update_commit_map_with_prepare);
  206. assert(!s.ok() || seq_used != kMaxSequenceNumber);
  207. // Note: RemovePrepared is called from within PreReleaseCallback
  208. return s;
  209. }
  210. Status WritePreparedTxnDB::Get(const ReadOptions& options,
  211. ColumnFamilyHandle* column_family,
  212. const Slice& key, PinnableSlice* value) {
  213. SequenceNumber min_uncommitted, snap_seq;
  214. const SnapshotBackup backed_by_snapshot =
  215. AssignMinMaxSeqs(options.snapshot, &min_uncommitted, &snap_seq);
  216. WritePreparedTxnReadCallback callback(this, snap_seq, min_uncommitted,
  217. backed_by_snapshot);
  218. bool* dont_care = nullptr;
  219. DBImpl::GetImplOptions get_impl_options;
  220. get_impl_options.column_family = column_family;
  221. get_impl_options.value = value;
  222. get_impl_options.value_found = dont_care;
  223. get_impl_options.callback = &callback;
  224. auto res = db_impl_->GetImpl(options, key, get_impl_options);
  225. if (LIKELY(callback.valid() && ValidateSnapshot(callback.max_visible_seq(),
  226. backed_by_snapshot))) {
  227. return res;
  228. } else {
  229. WPRecordTick(TXN_GET_TRY_AGAIN);
  230. return Status::TryAgain();
  231. }
  232. }
  233. void WritePreparedTxnDB::UpdateCFComparatorMap(
  234. const std::vector<ColumnFamilyHandle*>& handles) {
  235. auto cf_map = new std::map<uint32_t, const Comparator*>();
  236. auto handle_map = new std::map<uint32_t, ColumnFamilyHandle*>();
  237. for (auto h : handles) {
  238. auto id = h->GetID();
  239. const Comparator* comparator = h->GetComparator();
  240. (*cf_map)[id] = comparator;
  241. if (id != 0) {
  242. (*handle_map)[id] = h;
  243. } else {
  244. // The pointer to the default cf handle in the handles will be deleted.
  245. // Use the pointer maintained by the db instead.
  246. (*handle_map)[id] = DefaultColumnFamily();
  247. }
  248. }
  249. cf_map_.reset(cf_map);
  250. handle_map_.reset(handle_map);
  251. }
  252. void WritePreparedTxnDB::UpdateCFComparatorMap(ColumnFamilyHandle* h) {
  253. auto old_cf_map_ptr = cf_map_.get();
  254. assert(old_cf_map_ptr);
  255. auto cf_map = new std::map<uint32_t, const Comparator*>(*old_cf_map_ptr);
  256. auto old_handle_map_ptr = handle_map_.get();
  257. assert(old_handle_map_ptr);
  258. auto handle_map =
  259. new std::map<uint32_t, ColumnFamilyHandle*>(*old_handle_map_ptr);
  260. auto id = h->GetID();
  261. const Comparator* comparator = h->GetComparator();
  262. (*cf_map)[id] = comparator;
  263. (*handle_map)[id] = h;
  264. cf_map_.reset(cf_map);
  265. handle_map_.reset(handle_map);
  266. }
  267. std::vector<Status> WritePreparedTxnDB::MultiGet(
  268. const ReadOptions& options,
  269. const std::vector<ColumnFamilyHandle*>& column_family,
  270. const std::vector<Slice>& keys, std::vector<std::string>* values) {
  271. assert(values);
  272. size_t num_keys = keys.size();
  273. values->resize(num_keys);
  274. std::vector<Status> stat_list(num_keys);
  275. for (size_t i = 0; i < num_keys; ++i) {
  276. std::string* value = values ? &(*values)[i] : nullptr;
  277. stat_list[i] = this->Get(options, column_family[i], keys[i], value);
  278. }
  279. return stat_list;
  280. }
  281. // Struct to hold ownership of snapshot and read callback for iterator cleanup.
  282. struct WritePreparedTxnDB::IteratorState {
  283. IteratorState(WritePreparedTxnDB* txn_db, SequenceNumber sequence,
  284. std::shared_ptr<ManagedSnapshot> s,
  285. SequenceNumber min_uncommitted)
  286. : callback(txn_db, sequence, min_uncommitted, kBackedByDBSnapshot),
  287. snapshot(s) {}
  288. WritePreparedTxnReadCallback callback;
  289. std::shared_ptr<ManagedSnapshot> snapshot;
  290. };
  291. namespace {
  292. static void CleanupWritePreparedTxnDBIterator(void* arg1, void* /*arg2*/) {
  293. delete reinterpret_cast<WritePreparedTxnDB::IteratorState*>(arg1);
  294. }
  295. } // anonymous namespace
  296. Iterator* WritePreparedTxnDB::NewIterator(const ReadOptions& options,
  297. ColumnFamilyHandle* column_family) {
  298. constexpr bool ALLOW_BLOB = true;
  299. constexpr bool ALLOW_REFRESH = true;
  300. std::shared_ptr<ManagedSnapshot> own_snapshot = nullptr;
  301. SequenceNumber snapshot_seq = kMaxSequenceNumber;
  302. SequenceNumber min_uncommitted = 0;
  303. if (options.snapshot != nullptr) {
  304. snapshot_seq = options.snapshot->GetSequenceNumber();
  305. min_uncommitted =
  306. static_cast_with_check<const SnapshotImpl, const Snapshot>(
  307. options.snapshot)
  308. ->min_uncommitted_;
  309. } else {
  310. auto* snapshot = GetSnapshot();
  311. // We take a snapshot to make sure that the related data in the commit map
  312. // are not deleted.
  313. snapshot_seq = snapshot->GetSequenceNumber();
  314. min_uncommitted =
  315. static_cast_with_check<const SnapshotImpl, const Snapshot>(snapshot)
  316. ->min_uncommitted_;
  317. own_snapshot = std::make_shared<ManagedSnapshot>(db_impl_, snapshot);
  318. }
  319. assert(snapshot_seq != kMaxSequenceNumber);
  320. auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd();
  321. auto* state =
  322. new IteratorState(this, snapshot_seq, own_snapshot, min_uncommitted);
  323. auto* db_iter =
  324. db_impl_->NewIteratorImpl(options, cfd, snapshot_seq, &state->callback,
  325. !ALLOW_BLOB, !ALLOW_REFRESH);
  326. db_iter->RegisterCleanup(CleanupWritePreparedTxnDBIterator, state, nullptr);
  327. return db_iter;
  328. }
  329. Status WritePreparedTxnDB::NewIterators(
  330. const ReadOptions& options,
  331. const std::vector<ColumnFamilyHandle*>& column_families,
  332. std::vector<Iterator*>* iterators) {
  333. constexpr bool ALLOW_BLOB = true;
  334. constexpr bool ALLOW_REFRESH = true;
  335. std::shared_ptr<ManagedSnapshot> own_snapshot = nullptr;
  336. SequenceNumber snapshot_seq = kMaxSequenceNumber;
  337. SequenceNumber min_uncommitted = 0;
  338. if (options.snapshot != nullptr) {
  339. snapshot_seq = options.snapshot->GetSequenceNumber();
  340. min_uncommitted = static_cast_with_check<const SnapshotImpl, const Snapshot>(
  341. options.snapshot)
  342. ->min_uncommitted_;
  343. } else {
  344. auto* snapshot = GetSnapshot();
  345. // We take a snapshot to make sure that the related data in the commit map
  346. // are not deleted.
  347. snapshot_seq = snapshot->GetSequenceNumber();
  348. own_snapshot = std::make_shared<ManagedSnapshot>(db_impl_, snapshot);
  349. min_uncommitted =
  350. static_cast_with_check<const SnapshotImpl, const Snapshot>(snapshot)
  351. ->min_uncommitted_;
  352. }
  353. iterators->clear();
  354. iterators->reserve(column_families.size());
  355. for (auto* column_family : column_families) {
  356. auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd();
  357. auto* state =
  358. new IteratorState(this, snapshot_seq, own_snapshot, min_uncommitted);
  359. auto* db_iter =
  360. db_impl_->NewIteratorImpl(options, cfd, snapshot_seq, &state->callback,
  361. !ALLOW_BLOB, !ALLOW_REFRESH);
  362. db_iter->RegisterCleanup(CleanupWritePreparedTxnDBIterator, state, nullptr);
  363. iterators->push_back(db_iter);
  364. }
  365. return Status::OK();
  366. }
  367. void WritePreparedTxnDB::Init(const TransactionDBOptions& /* unused */) {
  368. // Adcance max_evicted_seq_ no more than 100 times before the cache wraps
  369. // around.
  370. INC_STEP_FOR_MAX_EVICTED =
  371. std::max(COMMIT_CACHE_SIZE / 100, static_cast<size_t>(1));
  372. snapshot_cache_ = std::unique_ptr<std::atomic<SequenceNumber>[]>(
  373. new std::atomic<SequenceNumber>[SNAPSHOT_CACHE_SIZE] {});
  374. commit_cache_ = std::unique_ptr<std::atomic<CommitEntry64b>[]>(
  375. new std::atomic<CommitEntry64b>[COMMIT_CACHE_SIZE] {});
  376. dummy_max_snapshot_.number_ = kMaxSequenceNumber;
  377. }
  378. void WritePreparedTxnDB::CheckPreparedAgainstMax(SequenceNumber new_max,
  379. bool locked) {
  380. // When max_evicted_seq_ advances, move older entries from prepared_txns_
  381. // to delayed_prepared_. This guarantees that if a seq is lower than max,
  382. // then it is not in prepared_txns_ and save an expensive, synchronized
  383. // lookup from a shared set. delayed_prepared_ is expected to be empty in
  384. // normal cases.
  385. ROCKS_LOG_DETAILS(
  386. info_log_,
  387. "CheckPreparedAgainstMax prepared_txns_.empty() %d top: %" PRIu64,
  388. prepared_txns_.empty(),
  389. prepared_txns_.empty() ? 0 : prepared_txns_.top());
  390. const SequenceNumber prepared_top = prepared_txns_.top();
  391. const bool empty = prepared_top == kMaxSequenceNumber;
  392. // Preliminary check to avoid the synchronization cost
  393. if (!empty && prepared_top <= new_max) {
  394. if (locked) {
  395. // Needed to avoid double locking in pop().
  396. prepared_txns_.push_pop_mutex()->Unlock();
  397. }
  398. WriteLock wl(&prepared_mutex_);
  399. // Need to fetch fresh values of ::top after mutex is acquired
  400. while (!prepared_txns_.empty() && prepared_txns_.top() <= new_max) {
  401. auto to_be_popped = prepared_txns_.top();
  402. delayed_prepared_.insert(to_be_popped);
  403. ROCKS_LOG_WARN(info_log_,
  404. "prepared_mutex_ overhead %" PRIu64 " (prep=%" PRIu64
  405. " new_max=%" PRIu64,
  406. static_cast<uint64_t>(delayed_prepared_.size()),
  407. to_be_popped, new_max);
  408. delayed_prepared_empty_.store(false, std::memory_order_release);
  409. // Update prepared_txns_ after updating delayed_prepared_empty_ otherwise
  410. // there will be a point in time that the entry is neither in
  411. // prepared_txns_ nor in delayed_prepared_, which will not be checked if
  412. // delayed_prepared_empty_ is false.
  413. prepared_txns_.pop();
  414. }
  415. if (locked) {
  416. prepared_txns_.push_pop_mutex()->Lock();
  417. }
  418. }
  419. }
  420. void WritePreparedTxnDB::AddPrepared(uint64_t seq, bool locked) {
  421. ROCKS_LOG_DETAILS(info_log_, "Txn %" PRIu64 " Preparing with max %" PRIu64,
  422. seq, max_evicted_seq_.load());
  423. TEST_SYNC_POINT("AddPrepared::begin:pause");
  424. TEST_SYNC_POINT("AddPrepared::begin:resume");
  425. if (!locked) {
  426. prepared_txns_.push_pop_mutex()->Lock();
  427. }
  428. prepared_txns_.push_pop_mutex()->AssertHeld();
  429. prepared_txns_.push(seq);
  430. auto new_max = future_max_evicted_seq_.load();
  431. if (UNLIKELY(seq <= new_max)) {
  432. // This should not happen in normal case
  433. ROCKS_LOG_ERROR(
  434. info_log_,
  435. "Added prepare_seq is not larger than max_evicted_seq_: %" PRIu64
  436. " <= %" PRIu64,
  437. seq, new_max);
  438. CheckPreparedAgainstMax(new_max, true /*locked*/);
  439. }
  440. if (!locked) {
  441. prepared_txns_.push_pop_mutex()->Unlock();
  442. }
  443. TEST_SYNC_POINT("AddPrepared::end");
  444. }
  445. void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq, uint64_t commit_seq,
  446. uint8_t loop_cnt) {
  447. ROCKS_LOG_DETAILS(info_log_, "Txn %" PRIu64 " Committing with %" PRIu64,
  448. prepare_seq, commit_seq);
  449. TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:start");
  450. TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:start:pause");
  451. auto indexed_seq = prepare_seq % COMMIT_CACHE_SIZE;
  452. CommitEntry64b evicted_64b;
  453. CommitEntry evicted;
  454. bool to_be_evicted = GetCommitEntry(indexed_seq, &evicted_64b, &evicted);
  455. if (LIKELY(to_be_evicted)) {
  456. assert(evicted.prep_seq != prepare_seq);
  457. auto prev_max = max_evicted_seq_.load(std::memory_order_acquire);
  458. ROCKS_LOG_DETAILS(info_log_,
  459. "Evicting %" PRIu64 ",%" PRIu64 " with max %" PRIu64,
  460. evicted.prep_seq, evicted.commit_seq, prev_max);
  461. if (prev_max < evicted.commit_seq) {
  462. auto last = db_impl_->GetLastPublishedSequence(); // could be 0
  463. SequenceNumber max_evicted_seq;
  464. if (LIKELY(evicted.commit_seq < last)) {
  465. assert(last > 0);
  466. // Inc max in larger steps to avoid frequent updates
  467. max_evicted_seq =
  468. std::min(evicted.commit_seq + INC_STEP_FOR_MAX_EVICTED, last - 1);
  469. } else {
  470. // legit when a commit entry in a write batch overwrite the previous one
  471. max_evicted_seq = evicted.commit_seq;
  472. }
  473. ROCKS_LOG_DETAILS(info_log_,
  474. "%lu Evicting %" PRIu64 ",%" PRIu64 " with max %" PRIu64
  475. " => %lu",
  476. prepare_seq, evicted.prep_seq, evicted.commit_seq,
  477. prev_max, max_evicted_seq);
  478. AdvanceMaxEvictedSeq(prev_max, max_evicted_seq);
  479. }
  480. // After each eviction from commit cache, check if the commit entry should
  481. // be kept around because it overlaps with a live snapshot.
  482. CheckAgainstSnapshots(evicted);
  483. if (UNLIKELY(!delayed_prepared_empty_.load(std::memory_order_acquire))) {
  484. WriteLock wl(&prepared_mutex_);
  485. for (auto dp : delayed_prepared_) {
  486. if (dp == evicted.prep_seq) {
  487. // This is a rare case that txn is committed but prepared_txns_ is not
  488. // cleaned up yet. Refer to delayed_prepared_commits_ definition for
  489. // why it should be kept updated.
  490. delayed_prepared_commits_[evicted.prep_seq] = evicted.commit_seq;
  491. ROCKS_LOG_DEBUG(info_log_,
  492. "delayed_prepared_commits_[%" PRIu64 "]=%" PRIu64,
  493. evicted.prep_seq, evicted.commit_seq);
  494. break;
  495. }
  496. }
  497. }
  498. }
  499. bool succ =
  500. ExchangeCommitEntry(indexed_seq, evicted_64b, {prepare_seq, commit_seq});
  501. if (UNLIKELY(!succ)) {
  502. ROCKS_LOG_ERROR(info_log_,
  503. "ExchangeCommitEntry failed on [%" PRIu64 "] %" PRIu64
  504. ",%" PRIu64 " retrying...",
  505. indexed_seq, prepare_seq, commit_seq);
  506. // A very rare event, in which the commit entry is updated before we do.
  507. // Here we apply a very simple solution of retrying.
  508. if (loop_cnt > 100) {
  509. throw std::runtime_error("Infinite loop in AddCommitted!");
  510. }
  511. AddCommitted(prepare_seq, commit_seq, ++loop_cnt);
  512. return;
  513. }
  514. TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:end");
  515. TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:end:pause");
  516. }
  517. void WritePreparedTxnDB::RemovePrepared(const uint64_t prepare_seq,
  518. const size_t batch_cnt) {
  519. TEST_SYNC_POINT_CALLBACK(
  520. "RemovePrepared:Start",
  521. const_cast<void*>(reinterpret_cast<const void*>(&prepare_seq)));
  522. TEST_SYNC_POINT("WritePreparedTxnDB::RemovePrepared:pause");
  523. TEST_SYNC_POINT("WritePreparedTxnDB::RemovePrepared:resume");
  524. ROCKS_LOG_DETAILS(info_log_,
  525. "RemovePrepared %" PRIu64 " cnt: %" ROCKSDB_PRIszt,
  526. prepare_seq, batch_cnt);
  527. WriteLock wl(&prepared_mutex_);
  528. for (size_t i = 0; i < batch_cnt; i++) {
  529. prepared_txns_.erase(prepare_seq + i);
  530. bool was_empty = delayed_prepared_.empty();
  531. if (!was_empty) {
  532. delayed_prepared_.erase(prepare_seq + i);
  533. auto it = delayed_prepared_commits_.find(prepare_seq + i);
  534. if (it != delayed_prepared_commits_.end()) {
  535. ROCKS_LOG_DETAILS(info_log_, "delayed_prepared_commits_.erase %" PRIu64,
  536. prepare_seq + i);
  537. delayed_prepared_commits_.erase(it);
  538. }
  539. bool is_empty = delayed_prepared_.empty();
  540. if (was_empty != is_empty) {
  541. delayed_prepared_empty_.store(is_empty, std::memory_order_release);
  542. }
  543. }
  544. }
  545. }
  546. bool WritePreparedTxnDB::GetCommitEntry(const uint64_t indexed_seq,
  547. CommitEntry64b* entry_64b,
  548. CommitEntry* entry) const {
  549. *entry_64b = commit_cache_[static_cast<size_t>(indexed_seq)].load(std::memory_order_acquire);
  550. bool valid = entry_64b->Parse(indexed_seq, entry, FORMAT);
  551. return valid;
  552. }
  553. bool WritePreparedTxnDB::AddCommitEntry(const uint64_t indexed_seq,
  554. const CommitEntry& new_entry,
  555. CommitEntry* evicted_entry) {
  556. CommitEntry64b new_entry_64b(new_entry, FORMAT);
  557. CommitEntry64b evicted_entry_64b = commit_cache_[static_cast<size_t>(indexed_seq)].exchange(
  558. new_entry_64b, std::memory_order_acq_rel);
  559. bool valid = evicted_entry_64b.Parse(indexed_seq, evicted_entry, FORMAT);
  560. return valid;
  561. }
  562. bool WritePreparedTxnDB::ExchangeCommitEntry(const uint64_t indexed_seq,
  563. CommitEntry64b& expected_entry_64b,
  564. const CommitEntry& new_entry) {
  565. auto& atomic_entry = commit_cache_[static_cast<size_t>(indexed_seq)];
  566. CommitEntry64b new_entry_64b(new_entry, FORMAT);
  567. bool succ = atomic_entry.compare_exchange_strong(
  568. expected_entry_64b, new_entry_64b, std::memory_order_acq_rel,
  569. std::memory_order_acquire);
  570. return succ;
  571. }
  572. void WritePreparedTxnDB::AdvanceMaxEvictedSeq(const SequenceNumber& prev_max,
  573. const SequenceNumber& new_max) {
  574. ROCKS_LOG_DETAILS(info_log_,
  575. "AdvanceMaxEvictedSeq overhead %" PRIu64 " => %" PRIu64,
  576. prev_max, new_max);
  577. // Declare the intention before getting snapshot from the DB. This helps a
  578. // concurrent GetSnapshot to wait to catch up with future_max_evicted_seq_ if
  579. // it has not already. Otherwise the new snapshot is when we ask DB for
  580. // snapshots smaller than future max.
  581. auto updated_future_max = prev_max;
  582. while (updated_future_max < new_max &&
  583. !future_max_evicted_seq_.compare_exchange_weak(
  584. updated_future_max, new_max, std::memory_order_acq_rel,
  585. std::memory_order_relaxed)) {
  586. };
  587. CheckPreparedAgainstMax(new_max, false /*locked*/);
  588. // With each change to max_evicted_seq_ fetch the live snapshots behind it.
  589. // We use max as the version of snapshots to identify how fresh are the
  590. // snapshot list. This works because the snapshots are between 0 and
  591. // max, so the larger the max, the more complete they are.
  592. SequenceNumber new_snapshots_version = new_max;
  593. std::vector<SequenceNumber> snapshots;
  594. bool update_snapshots = false;
  595. if (new_snapshots_version > snapshots_version_) {
  596. // This is to avoid updating the snapshots_ if it already updated
  597. // with a more recent vesion by a concrrent thread
  598. update_snapshots = true;
  599. // We only care about snapshots lower then max
  600. snapshots = GetSnapshotListFromDB(new_max);
  601. }
  602. if (update_snapshots) {
  603. UpdateSnapshots(snapshots, new_snapshots_version);
  604. if (!snapshots.empty()) {
  605. WriteLock wl(&old_commit_map_mutex_);
  606. for (auto snap : snapshots) {
  607. // This allows IsInSnapshot to tell apart the reads from in valid
  608. // snapshots from the reads from committed values in valid snapshots.
  609. old_commit_map_[snap];
  610. }
  611. old_commit_map_empty_.store(false, std::memory_order_release);
  612. }
  613. }
  614. auto updated_prev_max = prev_max;
  615. TEST_SYNC_POINT("AdvanceMaxEvictedSeq::update_max:pause");
  616. TEST_SYNC_POINT("AdvanceMaxEvictedSeq::update_max:resume");
  617. while (updated_prev_max < new_max &&
  618. !max_evicted_seq_.compare_exchange_weak(updated_prev_max, new_max,
  619. std::memory_order_acq_rel,
  620. std::memory_order_relaxed)) {
  621. };
  622. }
  623. const Snapshot* WritePreparedTxnDB::GetSnapshot() {
  624. const bool kForWWConflictCheck = true;
  625. return GetSnapshotInternal(!kForWWConflictCheck);
  626. }
  627. SnapshotImpl* WritePreparedTxnDB::GetSnapshotInternal(
  628. bool for_ww_conflict_check) {
  629. // Note: for this optimization setting the last sequence number and obtaining
  630. // the smallest uncommitted seq should be done atomically. However to avoid
  631. // the mutex overhead, we call SmallestUnCommittedSeq BEFORE taking the
  632. // snapshot. Since we always updated the list of unprepared seq (via
  633. // AddPrepared) AFTER the last sequence is updated, this guarantees that the
  634. // smallest uncommitted seq that we pair with the snapshot is smaller or equal
  635. // the value that would be obtained otherwise atomically. That is ok since
  636. // this optimization works as long as min_uncommitted is less than or equal
  637. // than the smallest uncommitted seq when the snapshot was taken.
  638. auto min_uncommitted = WritePreparedTxnDB::SmallestUnCommittedSeq();
  639. SnapshotImpl* snap_impl = db_impl_->GetSnapshotImpl(for_ww_conflict_check);
  640. TEST_SYNC_POINT("WritePreparedTxnDB::GetSnapshotInternal:first");
  641. assert(snap_impl);
  642. SequenceNumber snap_seq = snap_impl->GetSequenceNumber();
  643. // Note: Check against future_max_evicted_seq_ (in contrast with
  644. // max_evicted_seq_) in case there is a concurrent AdvanceMaxEvictedSeq.
  645. if (UNLIKELY(snap_seq != 0 && snap_seq <= future_max_evicted_seq_)) {
  646. // There is a very rare case in which the commit entry evicts another commit
  647. // entry that is not published yet thus advancing max evicted seq beyond the
  648. // last published seq. This case is not likely in real-world setup so we
  649. // handle it with a few retries.
  650. size_t retry = 0;
  651. SequenceNumber max;
  652. while ((max = future_max_evicted_seq_.load()) != 0 &&
  653. snap_impl->GetSequenceNumber() <= max && retry < 100) {
  654. ROCKS_LOG_WARN(info_log_,
  655. "GetSnapshot snap: %" PRIu64 " max: %" PRIu64
  656. " retry %" ROCKSDB_PRIszt,
  657. snap_impl->GetSequenceNumber(), max, retry);
  658. ReleaseSnapshot(snap_impl);
  659. // Wait for last visible seq to catch up with max, and also go beyond it
  660. // by one.
  661. AdvanceSeqByOne();
  662. snap_impl = db_impl_->GetSnapshotImpl(for_ww_conflict_check);
  663. assert(snap_impl);
  664. retry++;
  665. }
  666. assert(snap_impl->GetSequenceNumber() > max);
  667. if (snap_impl->GetSequenceNumber() <= max) {
  668. throw std::runtime_error(
  669. "Snapshot seq " + ToString(snap_impl->GetSequenceNumber()) +
  670. " after " + ToString(retry) +
  671. " retries is still less than futre_max_evicted_seq_" + ToString(max));
  672. }
  673. }
  674. EnhanceSnapshot(snap_impl, min_uncommitted);
  675. ROCKS_LOG_DETAILS(
  676. db_impl_->immutable_db_options().info_log,
  677. "GetSnapshot %" PRIu64 " ww:%" PRIi32 " min_uncommitted: %" PRIu64,
  678. snap_impl->GetSequenceNumber(), for_ww_conflict_check, min_uncommitted);
  679. TEST_SYNC_POINT("WritePreparedTxnDB::GetSnapshotInternal:end");
  680. return snap_impl;
  681. }
  682. void WritePreparedTxnDB::AdvanceSeqByOne() {
  683. // Inserting an empty value will i) let the max evicted entry to be
  684. // published, i.e., max == last_published, increase the last published to
  685. // be one beyond max, i.e., max < last_published.
  686. WriteOptions woptions;
  687. TransactionOptions txn_options;
  688. Transaction* txn0 = BeginTransaction(woptions, txn_options, nullptr);
  689. std::hash<std::thread::id> hasher;
  690. char name[64];
  691. snprintf(name, 64, "txn%" ROCKSDB_PRIszt, hasher(std::this_thread::get_id()));
  692. assert(strlen(name) < 64 - 1);
  693. Status s = txn0->SetName(name);
  694. assert(s.ok());
  695. if (s.ok()) {
  696. // Without prepare it would simply skip the commit
  697. s = txn0->Prepare();
  698. }
  699. assert(s.ok());
  700. if (s.ok()) {
  701. s = txn0->Commit();
  702. }
  703. assert(s.ok());
  704. delete txn0;
  705. }
  706. const std::vector<SequenceNumber> WritePreparedTxnDB::GetSnapshotListFromDB(
  707. SequenceNumber max) {
  708. ROCKS_LOG_DETAILS(info_log_, "GetSnapshotListFromDB with max %" PRIu64, max);
  709. InstrumentedMutexLock dblock(db_impl_->mutex());
  710. db_impl_->mutex()->AssertHeld();
  711. return db_impl_->snapshots().GetAll(nullptr, max);
  712. }
  713. void WritePreparedTxnDB::ReleaseSnapshotInternal(
  714. const SequenceNumber snap_seq) {
  715. // TODO(myabandeh): relax should enough since the synchronizatin is already
  716. // done by snapshots_mutex_ under which this function is called.
  717. if (snap_seq <= max_evicted_seq_.load(std::memory_order_acquire)) {
  718. // Then this is a rare case that transaction did not finish before max
  719. // advances. It is expected for a few read-only backup snapshots. For such
  720. // snapshots we might have kept around a couple of entries in the
  721. // old_commit_map_. Check and do garbage collection if that is the case.
  722. bool need_gc = false;
  723. {
  724. WPRecordTick(TXN_OLD_COMMIT_MAP_MUTEX_OVERHEAD);
  725. ROCKS_LOG_WARN(info_log_, "old_commit_map_mutex_ overhead for %" PRIu64,
  726. snap_seq);
  727. ReadLock rl(&old_commit_map_mutex_);
  728. auto prep_set_entry = old_commit_map_.find(snap_seq);
  729. need_gc = prep_set_entry != old_commit_map_.end();
  730. }
  731. if (need_gc) {
  732. WPRecordTick(TXN_OLD_COMMIT_MAP_MUTEX_OVERHEAD);
  733. ROCKS_LOG_WARN(info_log_, "old_commit_map_mutex_ overhead for %" PRIu64,
  734. snap_seq);
  735. WriteLock wl(&old_commit_map_mutex_);
  736. old_commit_map_.erase(snap_seq);
  737. old_commit_map_empty_.store(old_commit_map_.empty(),
  738. std::memory_order_release);
  739. }
  740. }
  741. }
  742. void WritePreparedTxnDB::CleanupReleasedSnapshots(
  743. const std::vector<SequenceNumber>& new_snapshots,
  744. const std::vector<SequenceNumber>& old_snapshots) {
  745. auto newi = new_snapshots.begin();
  746. auto oldi = old_snapshots.begin();
  747. for (; newi != new_snapshots.end() && oldi != old_snapshots.end();) {
  748. assert(*newi >= *oldi); // cannot have new snapshots with lower seq
  749. if (*newi == *oldi) { // still not released
  750. auto value = *newi;
  751. while (newi != new_snapshots.end() && *newi == value) {
  752. newi++;
  753. }
  754. while (oldi != old_snapshots.end() && *oldi == value) {
  755. oldi++;
  756. }
  757. } else {
  758. assert(*newi > *oldi); // *oldi is released
  759. ReleaseSnapshotInternal(*oldi);
  760. oldi++;
  761. }
  762. }
  763. // Everything remained in old_snapshots is released and must be cleaned up
  764. for (; oldi != old_snapshots.end(); oldi++) {
  765. ReleaseSnapshotInternal(*oldi);
  766. }
  767. }
  768. void WritePreparedTxnDB::UpdateSnapshots(
  769. const std::vector<SequenceNumber>& snapshots,
  770. const SequenceNumber& version) {
  771. ROCKS_LOG_DETAILS(info_log_, "UpdateSnapshots with version %" PRIu64,
  772. version);
  773. TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:p:start");
  774. TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:s:start");
  775. #ifndef NDEBUG
  776. size_t sync_i = 0;
  777. #endif
  778. ROCKS_LOG_DETAILS(info_log_, "snapshots_mutex_ overhead");
  779. WriteLock wl(&snapshots_mutex_);
  780. snapshots_version_ = version;
  781. // We update the list concurrently with the readers.
  782. // Both new and old lists are sorted and the new list is subset of the
  783. // previous list plus some new items. Thus if a snapshot repeats in
  784. // both new and old lists, it will appear upper in the new list. So if
  785. // we simply insert the new snapshots in order, if an overwritten item
  786. // is still valid in the new list is either written to the same place in
  787. // the array or it is written in a higher palce before it gets
  788. // overwritten by another item. This guarantess a reader that reads the
  789. // list bottom-up will eventaully see a snapshot that repeats in the
  790. // update, either before it gets overwritten by the writer or
  791. // afterwards.
  792. size_t i = 0;
  793. auto it = snapshots.begin();
  794. for (; it != snapshots.end() && i < SNAPSHOT_CACHE_SIZE; ++it, ++i) {
  795. snapshot_cache_[i].store(*it, std::memory_order_release);
  796. TEST_IDX_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:p:", ++sync_i);
  797. TEST_IDX_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:s:", sync_i);
  798. }
  799. #ifndef NDEBUG
  800. // Release the remaining sync points since they are useless given that the
  801. // reader would also use lock to access snapshots
  802. for (++sync_i; sync_i <= 10; ++sync_i) {
  803. TEST_IDX_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:p:", sync_i);
  804. TEST_IDX_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:s:", sync_i);
  805. }
  806. #endif
  807. snapshots_.clear();
  808. for (; it != snapshots.end(); ++it) {
  809. // Insert them to a vector that is less efficient to access
  810. // concurrently
  811. snapshots_.push_back(*it);
  812. }
  813. // Update the size at the end. Otherwise a parallel reader might read
  814. // items that are not set yet.
  815. snapshots_total_.store(snapshots.size(), std::memory_order_release);
  816. // Note: this must be done after the snapshots data structures are updated
  817. // with the new list of snapshots.
  818. CleanupReleasedSnapshots(snapshots, snapshots_all_);
  819. snapshots_all_ = snapshots;
  820. TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:p:end");
  821. TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:s:end");
  822. }
  823. void WritePreparedTxnDB::CheckAgainstSnapshots(const CommitEntry& evicted) {
  824. TEST_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:p:start");
  825. TEST_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:s:start");
  826. #ifndef NDEBUG
  827. size_t sync_i = 0;
  828. #endif
  829. // First check the snapshot cache that is efficient for concurrent access
  830. auto cnt = snapshots_total_.load(std::memory_order_acquire);
  831. // The list might get updated concurrently as we are reading from it. The
  832. // reader should be able to read all the snapshots that are still valid
  833. // after the update. Since the survived snapshots are written in a higher
  834. // place before gets overwritten the reader that reads bottom-up will
  835. // eventully see it.
  836. const bool next_is_larger = true;
  837. // We will set to true if the border line snapshot suggests that.
  838. bool search_larger_list = false;
  839. size_t ip1 = std::min(cnt, SNAPSHOT_CACHE_SIZE);
  840. for (; 0 < ip1; ip1--) {
  841. SequenceNumber snapshot_seq =
  842. snapshot_cache_[ip1 - 1].load(std::memory_order_acquire);
  843. TEST_IDX_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:p:",
  844. ++sync_i);
  845. TEST_IDX_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:s:", sync_i);
  846. if (ip1 == SNAPSHOT_CACHE_SIZE) { // border line snapshot
  847. // snapshot_seq < commit_seq => larger_snapshot_seq <= commit_seq
  848. // then later also continue the search to larger snapshots
  849. search_larger_list = snapshot_seq < evicted.commit_seq;
  850. }
  851. if (!MaybeUpdateOldCommitMap(evicted.prep_seq, evicted.commit_seq,
  852. snapshot_seq, !next_is_larger)) {
  853. break;
  854. }
  855. }
  856. #ifndef NDEBUG
  857. // Release the remaining sync points before accquiring the lock
  858. for (++sync_i; sync_i <= 10; ++sync_i) {
  859. TEST_IDX_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:p:", sync_i);
  860. TEST_IDX_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:s:", sync_i);
  861. }
  862. #endif
  863. TEST_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:p:end");
  864. TEST_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:s:end");
  865. if (UNLIKELY(SNAPSHOT_CACHE_SIZE < cnt && search_larger_list)) {
  866. // Then access the less efficient list of snapshots_
  867. WPRecordTick(TXN_SNAPSHOT_MUTEX_OVERHEAD);
  868. ROCKS_LOG_WARN(info_log_,
  869. "snapshots_mutex_ overhead for <%" PRIu64 ",%" PRIu64
  870. "> with %" ROCKSDB_PRIszt " snapshots",
  871. evicted.prep_seq, evicted.commit_seq, cnt);
  872. ReadLock rl(&snapshots_mutex_);
  873. // Items could have moved from the snapshots_ to snapshot_cache_ before
  874. // accquiring the lock. To make sure that we do not miss a valid snapshot,
  875. // read snapshot_cache_ again while holding the lock.
  876. for (size_t i = 0; i < SNAPSHOT_CACHE_SIZE; i++) {
  877. SequenceNumber snapshot_seq =
  878. snapshot_cache_[i].load(std::memory_order_acquire);
  879. if (!MaybeUpdateOldCommitMap(evicted.prep_seq, evicted.commit_seq,
  880. snapshot_seq, next_is_larger)) {
  881. break;
  882. }
  883. }
  884. for (auto snapshot_seq_2 : snapshots_) {
  885. if (!MaybeUpdateOldCommitMap(evicted.prep_seq, evicted.commit_seq,
  886. snapshot_seq_2, next_is_larger)) {
  887. break;
  888. }
  889. }
  890. }
  891. }
  892. bool WritePreparedTxnDB::MaybeUpdateOldCommitMap(
  893. const uint64_t& prep_seq, const uint64_t& commit_seq,
  894. const uint64_t& snapshot_seq, const bool next_is_larger = true) {
  895. // If we do not store an entry in old_commit_map_ we assume it is committed in
  896. // all snapshots. If commit_seq <= snapshot_seq, it is considered already in
  897. // the snapshot so we need not to keep the entry around for this snapshot.
  898. if (commit_seq <= snapshot_seq) {
  899. // continue the search if the next snapshot could be smaller than commit_seq
  900. return !next_is_larger;
  901. }
  902. // then snapshot_seq < commit_seq
  903. if (prep_seq <= snapshot_seq) { // overlapping range
  904. WPRecordTick(TXN_OLD_COMMIT_MAP_MUTEX_OVERHEAD);
  905. ROCKS_LOG_WARN(info_log_,
  906. "old_commit_map_mutex_ overhead for %" PRIu64
  907. " commit entry: <%" PRIu64 ",%" PRIu64 ">",
  908. snapshot_seq, prep_seq, commit_seq);
  909. WriteLock wl(&old_commit_map_mutex_);
  910. old_commit_map_empty_.store(false, std::memory_order_release);
  911. auto& vec = old_commit_map_[snapshot_seq];
  912. vec.insert(std::upper_bound(vec.begin(), vec.end(), prep_seq), prep_seq);
  913. // We need to store it once for each overlapping snapshot. Returning true to
  914. // continue the search if there is more overlapping snapshot.
  915. return true;
  916. }
  917. // continue the search if the next snapshot could be larger than prep_seq
  918. return next_is_larger;
  919. }
  920. WritePreparedTxnDB::~WritePreparedTxnDB() {
  921. // At this point there could be running compaction/flush holding a
  922. // SnapshotChecker, which holds a pointer back to WritePreparedTxnDB.
  923. // Make sure those jobs finished before destructing WritePreparedTxnDB.
  924. if (!db_impl_->shutting_down_) {
  925. db_impl_->CancelAllBackgroundWork(true /*wait*/);
  926. }
  927. }
  928. void SubBatchCounter::InitWithComp(const uint32_t cf) {
  929. auto cmp = comparators_[cf];
  930. keys_[cf] = CFKeys(SetComparator(cmp));
  931. }
  932. void SubBatchCounter::AddKey(const uint32_t cf, const Slice& key) {
  933. CFKeys& cf_keys = keys_[cf];
  934. if (cf_keys.size() == 0) { // just inserted
  935. InitWithComp(cf);
  936. }
  937. auto it = cf_keys.insert(key);
  938. if (it.second == false) { // second is false if a element already existed.
  939. batches_++;
  940. keys_.clear();
  941. InitWithComp(cf);
  942. keys_[cf].insert(key);
  943. }
  944. }
  945. } // namespace ROCKSDB_NAMESPACE
  946. #endif // ROCKSDB_LITE