write_prepared_txn_db.cc 45 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #include "utilities/transactions/write_prepared_txn_db.h"
  6. #include <algorithm>
  7. #include <cinttypes>
  8. #include <string>
  9. #include <unordered_set>
  10. #include <vector>
  11. #include "db/arena_wrapped_db_iter.h"
  12. #include "db/db_impl/db_impl.h"
  13. #include "logging/logging.h"
  14. #include "rocksdb/db.h"
  15. #include "rocksdb/options.h"
  16. #include "rocksdb/utilities/transaction_db.h"
  17. #include "test_util/sync_point.h"
  18. #include "util/cast_util.h"
  19. #include "util/mutexlock.h"
  20. #include "util/string_util.h"
  21. #include "utilities/transactions/pessimistic_transaction.h"
  22. #include "utilities/transactions/transaction_db_mutex_impl.h"
  23. // This function is for testing only. If it returns true, then all entries in
  24. // the commit cache will be evicted. Unit and/or stress tests (db_stress)
  25. // can implement this function and customize how frequently commit cache
  26. // eviction occurs.
  27. // TODO: remove this function once we can configure commit cache to be very
  28. // small so that eviction occurs very frequently. This requires the commit
  29. // cache entry to be able to encode prepare and commit sequence numbers so that
  30. // the commit sequence number does not have to be within a certain range of
  31. // prepare sequence number.
  32. extern "C" bool rocksdb_write_prepared_TEST_ShouldClearCommitCache(void)
  33. __attribute__((__weak__));
  34. namespace ROCKSDB_NAMESPACE {
  35. Status WritePreparedTxnDB::Initialize(
  36. const std::vector<size_t>& compaction_enabled_cf_indices,
  37. const std::vector<ColumnFamilyHandle*>& handles) {
  38. auto dbimpl = static_cast_with_check<DBImpl>(GetRootDB());
  39. assert(dbimpl != nullptr);
  40. auto rtxns = dbimpl->recovered_transactions();
  41. std::map<SequenceNumber, SequenceNumber> ordered_seq_cnt;
  42. for (const auto& rtxn : rtxns) {
  43. // There should only one batch for WritePrepared policy.
  44. assert(rtxn.second->batches_.size() == 1);
  45. const auto& seq = rtxn.second->batches_.begin()->first;
  46. const auto& batch_info = rtxn.second->batches_.begin()->second;
  47. auto cnt = batch_info.batch_cnt_ ? batch_info.batch_cnt_ : 1;
  48. ordered_seq_cnt[seq] = cnt;
  49. }
  50. // AddPrepared must be called in order
  51. for (auto seq_cnt : ordered_seq_cnt) {
  52. auto seq = seq_cnt.first;
  53. auto cnt = seq_cnt.second;
  54. for (size_t i = 0; i < cnt; i++) {
  55. AddPrepared(seq + i);
  56. }
  57. }
  58. SequenceNumber prev_max = max_evicted_seq_;
  59. SequenceNumber last_seq = db_impl_->GetLatestSequenceNumber();
  60. AdvanceMaxEvictedSeq(prev_max, last_seq);
  61. // Create a gap between max and the next snapshot. This simplifies the logic
  62. // in IsInSnapshot by not having to consider the special case of max ==
  63. // snapshot after recovery. This is tested in IsInSnapshotEmptyMapTest.
  64. if (last_seq) {
  65. db_impl_->versions_->SetLastAllocatedSequence(last_seq + 1);
  66. db_impl_->versions_->SetLastSequence(last_seq + 1);
  67. db_impl_->versions_->SetLastPublishedSequence(last_seq + 1);
  68. }
  69. db_impl_->SetSnapshotChecker(new WritePreparedSnapshotChecker(this));
  70. // A callback to commit a single sub-batch
  71. class CommitSubBatchPreReleaseCallback : public PreReleaseCallback {
  72. public:
  73. explicit CommitSubBatchPreReleaseCallback(WritePreparedTxnDB* db)
  74. : db_(db) {}
  75. Status Callback(SequenceNumber commit_seq,
  76. bool is_mem_disabled __attribute__((__unused__)), uint64_t,
  77. size_t /*index*/, size_t /*total*/) override {
  78. assert(!is_mem_disabled);
  79. db_->AddCommitted(commit_seq, commit_seq);
  80. return Status::OK();
  81. }
  82. private:
  83. WritePreparedTxnDB* db_;
  84. };
  85. db_impl_->SetRecoverableStatePreReleaseCallback(
  86. new CommitSubBatchPreReleaseCallback(this));
  87. auto s = PessimisticTransactionDB::Initialize(compaction_enabled_cf_indices,
  88. handles);
  89. return s;
  90. }
  91. Status WritePreparedTxnDB::VerifyCFOptions(
  92. const ColumnFamilyOptions& cf_options) {
  93. Status s = PessimisticTransactionDB::VerifyCFOptions(cf_options);
  94. if (!s.ok()) {
  95. return s;
  96. }
  97. if (!cf_options.memtable_factory->CanHandleDuplicatedKey()) {
  98. return Status::InvalidArgument(
  99. "memtable_factory->CanHandleDuplicatedKey() cannot be false with "
  100. "WritePrepared transactions");
  101. }
  102. return Status::OK();
  103. }
  104. Transaction* WritePreparedTxnDB::BeginTransaction(
  105. const WriteOptions& write_options, const TransactionOptions& txn_options,
  106. Transaction* old_txn) {
  107. if (old_txn != nullptr) {
  108. ReinitializeTransaction(old_txn, write_options, txn_options);
  109. return old_txn;
  110. } else {
  111. return new WritePreparedTxn(this, write_options, txn_options);
  112. }
  113. }
  114. Status WritePreparedTxnDB::Write(const WriteOptions& opts,
  115. WriteBatch* updates) {
  116. if (txn_db_options_.skip_concurrency_control) {
  117. // Skip locking the rows
  118. const size_t UNKNOWN_BATCH_CNT = 0;
  119. WritePreparedTxn* NO_TXN = nullptr;
  120. return WriteInternal(opts, updates, UNKNOWN_BATCH_CNT, NO_TXN);
  121. } else {
  122. return PessimisticTransactionDB::WriteWithConcurrencyControl(opts, updates);
  123. }
  124. }
  125. Status WritePreparedTxnDB::Write(
  126. const WriteOptions& opts,
  127. const TransactionDBWriteOptimizations& optimizations, WriteBatch* updates) {
  128. if (optimizations.skip_concurrency_control) {
  129. // Skip locking the rows
  130. const size_t UNKNOWN_BATCH_CNT = 0;
  131. const size_t ONE_BATCH_CNT = 1;
  132. const size_t batch_cnt = optimizations.skip_duplicate_key_check
  133. ? ONE_BATCH_CNT
  134. : UNKNOWN_BATCH_CNT;
  135. WritePreparedTxn* NO_TXN = nullptr;
  136. return WriteInternal(opts, updates, batch_cnt, NO_TXN);
  137. } else {
  138. // TODO(myabandeh): Make use of skip_duplicate_key_check hint
  139. // Fall back to unoptimized version
  140. return PessimisticTransactionDB::WriteWithConcurrencyControl(opts, updates);
  141. }
  142. }
  143. Status WritePreparedTxnDB::WriteInternal(const WriteOptions& write_options_orig,
  144. WriteBatch* batch, size_t batch_cnt,
  145. WritePreparedTxn* txn) {
  146. ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log,
  147. "CommitBatchInternal");
  148. if (batch->Count() == 0) {
  149. // Otherwise our 1 seq per batch logic will break since there is no seq
  150. // increased for this batch.
  151. return Status::OK();
  152. }
  153. if (write_options_orig.protection_bytes_per_key > 0) {
  154. auto s = WriteBatchInternal::UpdateProtectionInfo(
  155. batch, write_options_orig.protection_bytes_per_key);
  156. if (!s.ok()) {
  157. return s;
  158. }
  159. }
  160. if (batch_cnt == 0) { // not provided, then compute it
  161. // TODO(myabandeh): add an option to allow user skipping this cost
  162. SubBatchCounter counter(*GetCFComparatorMap());
  163. auto s = batch->Iterate(&counter);
  164. if (!s.ok()) {
  165. return s;
  166. }
  167. batch_cnt = counter.BatchCount();
  168. WPRecordTick(TXN_DUPLICATE_KEY_OVERHEAD);
  169. ROCKS_LOG_DETAILS(info_log_, "Duplicate key overhead: %" PRIu64 " batches",
  170. static_cast<uint64_t>(batch_cnt));
  171. }
  172. assert(batch_cnt);
  173. bool do_one_write = !db_impl_->immutable_db_options().two_write_queues;
  174. WriteOptions write_options(write_options_orig);
  175. // In the absence of Prepare markers, use Noop as a batch separator
  176. auto s = WriteBatchInternal::InsertNoop(batch);
  177. assert(s.ok());
  178. const bool DISABLE_MEMTABLE = true;
  179. const uint64_t no_log_ref = 0;
  180. uint64_t seq_used = kMaxSequenceNumber;
  181. const size_t ZERO_PREPARES = 0;
  182. const bool kSeparatePrepareCommitBatches = true;
  183. // Since this is not 2pc, there is no need for AddPrepared but having it in
  184. // the PreReleaseCallback enables an optimization. Refer to
  185. // SmallestUnCommittedSeq for more details.
  186. AddPreparedCallback add_prepared_callback(
  187. this, db_impl_, batch_cnt,
  188. db_impl_->immutable_db_options().two_write_queues,
  189. !kSeparatePrepareCommitBatches);
  190. WritePreparedCommitEntryPreReleaseCallback update_commit_map(
  191. this, db_impl_, kMaxSequenceNumber, ZERO_PREPARES, batch_cnt);
  192. PreReleaseCallback* pre_release_callback;
  193. if (do_one_write) {
  194. pre_release_callback = &update_commit_map;
  195. } else {
  196. pre_release_callback = &add_prepared_callback;
  197. }
  198. s = db_impl_->WriteImpl(write_options, batch, nullptr, nullptr, nullptr,
  199. no_log_ref, !DISABLE_MEMTABLE, &seq_used, batch_cnt,
  200. pre_release_callback);
  201. assert(!s.ok() || seq_used != kMaxSequenceNumber);
  202. uint64_t prepare_seq = seq_used;
  203. if (txn != nullptr) {
  204. txn->SetId(prepare_seq);
  205. }
  206. if (!s.ok()) {
  207. return s;
  208. }
  209. if (do_one_write) {
  210. return s;
  211. } // else do the 2nd write for commit
  212. ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log,
  213. "CommitBatchInternal 2nd write prepare_seq: %" PRIu64,
  214. prepare_seq);
  215. // Commit the batch by writing an empty batch to the 2nd queue that will
  216. // release the commit sequence number to readers.
  217. const size_t ZERO_COMMITS = 0;
  218. WritePreparedCommitEntryPreReleaseCallback update_commit_map_with_prepare(
  219. this, db_impl_, prepare_seq, batch_cnt, ZERO_COMMITS);
  220. WriteBatch empty_batch;
  221. write_options.disableWAL = true;
  222. write_options.sync = false;
  223. const size_t ONE_BATCH = 1; // Just to inc the seq
  224. s = db_impl_->WriteImpl(write_options, &empty_batch, nullptr, nullptr,
  225. nullptr, no_log_ref, DISABLE_MEMTABLE, &seq_used,
  226. ONE_BATCH, &update_commit_map_with_prepare);
  227. assert(!s.ok() || seq_used != kMaxSequenceNumber);
  228. // Note: RemovePrepared is called from within PreReleaseCallback
  229. return s;
  230. }
  231. Status WritePreparedTxnDB::Get(const ReadOptions& _read_options,
  232. ColumnFamilyHandle* column_family,
  233. const Slice& key, PinnableSlice* value,
  234. std::string* timestamp) {
  235. if (_read_options.io_activity != Env::IOActivity::kUnknown &&
  236. _read_options.io_activity != Env::IOActivity::kGet) {
  237. return Status::InvalidArgument(
  238. "Can only call Get with `ReadOptions::io_activity` is "
  239. "`Env::IOActivity::kUnknown` or `Env::IOActivity::kGet`");
  240. }
  241. if (timestamp) {
  242. return Status::NotSupported(
  243. "Get() that returns timestamp is not implemented");
  244. }
  245. ReadOptions read_options(_read_options);
  246. if (read_options.io_activity == Env::IOActivity::kUnknown) {
  247. read_options.io_activity = Env::IOActivity::kGet;
  248. }
  249. return GetImpl(read_options, column_family, key, value);
  250. }
  251. Status WritePreparedTxnDB::GetImpl(const ReadOptions& options,
  252. ColumnFamilyHandle* column_family,
  253. const Slice& key, PinnableSlice* value) {
  254. SequenceNumber min_uncommitted, snap_seq;
  255. const SnapshotBackup backed_by_snapshot =
  256. AssignMinMaxSeqs(options.snapshot, &min_uncommitted, &snap_seq);
  257. WritePreparedTxnReadCallback callback(this, snap_seq, min_uncommitted,
  258. backed_by_snapshot);
  259. bool* dont_care = nullptr;
  260. DBImpl::GetImplOptions get_impl_options;
  261. get_impl_options.column_family = column_family;
  262. get_impl_options.value = value;
  263. get_impl_options.value_found = dont_care;
  264. get_impl_options.callback = &callback;
  265. auto res = db_impl_->GetImpl(options, key, get_impl_options);
  266. if (LIKELY(callback.valid() && ValidateSnapshot(callback.max_visible_seq(),
  267. backed_by_snapshot))) {
  268. return res;
  269. } else {
  270. res.PermitUncheckedError();
  271. WPRecordTick(TXN_GET_TRY_AGAIN);
  272. return Status::TryAgain();
  273. }
  274. }
  275. void WritePreparedTxnDB::UpdateCFComparatorMap(
  276. const std::vector<ColumnFamilyHandle*>& handles) {
  277. auto cf_map = new std::map<uint32_t, const Comparator*>();
  278. auto handle_map = new std::map<uint32_t, ColumnFamilyHandle*>();
  279. for (auto h : handles) {
  280. auto id = h->GetID();
  281. const Comparator* comparator = h->GetComparator();
  282. (*cf_map)[id] = comparator;
  283. if (id != 0) {
  284. (*handle_map)[id] = h;
  285. } else {
  286. // The pointer to the default cf handle in the handles will be deleted.
  287. // Use the pointer maintained by the db instead.
  288. (*handle_map)[id] = DefaultColumnFamily();
  289. }
  290. }
  291. cf_map_.reset(cf_map);
  292. handle_map_.reset(handle_map);
  293. }
  294. void WritePreparedTxnDB::UpdateCFComparatorMap(ColumnFamilyHandle* h) {
  295. auto old_cf_map_ptr = cf_map_.get();
  296. assert(old_cf_map_ptr);
  297. auto cf_map = new std::map<uint32_t, const Comparator*>(*old_cf_map_ptr);
  298. auto old_handle_map_ptr = handle_map_.get();
  299. assert(old_handle_map_ptr);
  300. auto handle_map =
  301. new std::map<uint32_t, ColumnFamilyHandle*>(*old_handle_map_ptr);
  302. auto id = h->GetID();
  303. const Comparator* comparator = h->GetComparator();
  304. (*cf_map)[id] = comparator;
  305. (*handle_map)[id] = h;
  306. cf_map_.reset(cf_map);
  307. handle_map_.reset(handle_map);
  308. }
  309. void WritePreparedTxnDB::MultiGet(const ReadOptions& _read_options,
  310. const size_t num_keys,
  311. ColumnFamilyHandle** column_families,
  312. const Slice* keys, PinnableSlice* values,
  313. std::string* timestamps, Status* statuses,
  314. const bool /*sorted_input*/) {
  315. assert(values);
  316. Status s;
  317. if (_read_options.io_activity != Env::IOActivity::kUnknown &&
  318. _read_options.io_activity != Env::IOActivity::kMultiGet) {
  319. s = Status::InvalidArgument(
  320. "Can only call MultiGet with `ReadOptions::io_activity` is "
  321. "`Env::IOActivity::kUnknown` or `Env::IOActivity::kMultiGet`");
  322. }
  323. if (s.ok()) {
  324. if (timestamps) {
  325. s = Status::NotSupported(
  326. "MultiGet() returning timestamps not implemented.");
  327. }
  328. }
  329. if (!s.ok()) {
  330. for (size_t i = 0; i < num_keys; ++i) {
  331. statuses[i] = s;
  332. }
  333. return;
  334. }
  335. ReadOptions read_options(_read_options);
  336. if (read_options.io_activity == Env::IOActivity::kUnknown) {
  337. read_options.io_activity = Env::IOActivity::kMultiGet;
  338. }
  339. for (size_t i = 0; i < num_keys; ++i) {
  340. statuses[i] =
  341. this->GetImpl(read_options, column_families[i], keys[i], &values[i]);
  342. }
  343. }
  344. // Struct to hold ownership of snapshot and read callback for iterator cleanup.
  345. struct WritePreparedTxnDB::IteratorState {
  346. IteratorState(WritePreparedTxnDB* txn_db, SequenceNumber sequence,
  347. std::shared_ptr<ManagedSnapshot> s,
  348. SequenceNumber min_uncommitted)
  349. : callback(txn_db, sequence, min_uncommitted, kBackedByDBSnapshot),
  350. snapshot(s) {}
  351. WritePreparedTxnReadCallback callback;
  352. std::shared_ptr<ManagedSnapshot> snapshot;
  353. };
  354. namespace {
  355. static void CleanupWritePreparedTxnDBIterator(void* arg1, void* /*arg2*/) {
  356. delete static_cast<WritePreparedTxnDB::IteratorState*>(arg1);
  357. }
  358. } // anonymous namespace
  359. Iterator* WritePreparedTxnDB::NewIterator(const ReadOptions& _read_options,
  360. ColumnFamilyHandle* column_family) {
  361. if (_read_options.io_activity != Env::IOActivity::kUnknown &&
  362. _read_options.io_activity != Env::IOActivity::kDBIterator) {
  363. return NewErrorIterator(Status::InvalidArgument(
  364. "Can only call NewIterator with `ReadOptions::io_activity` is "
  365. "`Env::IOActivity::kUnknown` or `Env::IOActivity::kDBIterator`"));
  366. }
  367. ReadOptions read_options(_read_options);
  368. if (read_options.io_activity == Env::IOActivity::kUnknown) {
  369. read_options.io_activity = Env::IOActivity::kDBIterator;
  370. }
  371. constexpr bool expose_blob_index = false;
  372. constexpr bool allow_refresh = false;
  373. std::shared_ptr<ManagedSnapshot> own_snapshot = nullptr;
  374. SequenceNumber snapshot_seq = kMaxSequenceNumber;
  375. SequenceNumber min_uncommitted = 0;
  376. if (read_options.snapshot != nullptr) {
  377. snapshot_seq = read_options.snapshot->GetSequenceNumber();
  378. min_uncommitted =
  379. static_cast_with_check<const SnapshotImpl>(read_options.snapshot)
  380. ->min_uncommitted_;
  381. } else {
  382. auto* snapshot = GetSnapshot();
  383. // We take a snapshot to make sure that the related data in the commit map
  384. // are not deleted.
  385. snapshot_seq = snapshot->GetSequenceNumber();
  386. min_uncommitted =
  387. static_cast_with_check<const SnapshotImpl>(snapshot)->min_uncommitted_;
  388. own_snapshot = std::make_shared<ManagedSnapshot>(db_impl_, snapshot);
  389. }
  390. assert(snapshot_seq != kMaxSequenceNumber);
  391. auto* cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
  392. auto* cfd = cfh->cfd();
  393. auto* state =
  394. new IteratorState(this, snapshot_seq, own_snapshot, min_uncommitted);
  395. SuperVersion* super_version = cfd->GetReferencedSuperVersion(db_impl_);
  396. auto* db_iter = db_impl_->NewIteratorImpl(read_options, cfh, super_version,
  397. snapshot_seq, &state->callback,
  398. expose_blob_index, allow_refresh);
  399. db_iter->RegisterCleanup(CleanupWritePreparedTxnDBIterator, state, nullptr);
  400. return db_iter;
  401. }
  402. Status WritePreparedTxnDB::NewIterators(
  403. const ReadOptions& _read_options,
  404. const std::vector<ColumnFamilyHandle*>& column_families,
  405. std::vector<Iterator*>* iterators) {
  406. if (_read_options.io_activity != Env::IOActivity::kUnknown &&
  407. _read_options.io_activity != Env::IOActivity::kDBIterator) {
  408. return Status::InvalidArgument(
  409. "Can only call NewIterator with `ReadOptions::io_activity` is "
  410. "`Env::IOActivity::kUnknown` or `Env::IOActivity::kDBIterator`");
  411. }
  412. ReadOptions read_options(_read_options);
  413. if (read_options.io_activity == Env::IOActivity::kUnknown) {
  414. read_options.io_activity = Env::IOActivity::kDBIterator;
  415. }
  416. constexpr bool expose_blob_index = false;
  417. constexpr bool allow_refresh = false;
  418. std::shared_ptr<ManagedSnapshot> own_snapshot = nullptr;
  419. SequenceNumber snapshot_seq = kMaxSequenceNumber;
  420. SequenceNumber min_uncommitted = 0;
  421. if (read_options.snapshot != nullptr) {
  422. snapshot_seq = read_options.snapshot->GetSequenceNumber();
  423. min_uncommitted =
  424. static_cast_with_check<const SnapshotImpl>(read_options.snapshot)
  425. ->min_uncommitted_;
  426. } else {
  427. auto* snapshot = GetSnapshot();
  428. // We take a snapshot to make sure that the related data in the commit map
  429. // are not deleted.
  430. snapshot_seq = snapshot->GetSequenceNumber();
  431. own_snapshot = std::make_shared<ManagedSnapshot>(db_impl_, snapshot);
  432. min_uncommitted =
  433. static_cast_with_check<const SnapshotImpl>(snapshot)->min_uncommitted_;
  434. }
  435. iterators->clear();
  436. iterators->reserve(column_families.size());
  437. for (auto* column_family : column_families) {
  438. auto* cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
  439. auto* cfd = cfh->cfd();
  440. auto* state =
  441. new IteratorState(this, snapshot_seq, own_snapshot, min_uncommitted);
  442. SuperVersion* super_version = cfd->GetReferencedSuperVersion(db_impl_);
  443. auto* db_iter = db_impl_->NewIteratorImpl(read_options, cfh, super_version,
  444. snapshot_seq, &state->callback,
  445. expose_blob_index, allow_refresh);
  446. db_iter->RegisterCleanup(CleanupWritePreparedTxnDBIterator, state, nullptr);
  447. iterators->push_back(db_iter);
  448. }
  449. return Status::OK();
  450. }
  451. void WritePreparedTxnDB::Init(const TransactionDBOptions& txn_db_opts) {
  452. // Advance max_evicted_seq_ no more than 100 times before the cache wraps
  453. // around.
  454. INC_STEP_FOR_MAX_EVICTED =
  455. std::max(COMMIT_CACHE_SIZE / 100, static_cast<size_t>(1));
  456. snapshot_cache_ = std::unique_ptr<std::atomic<SequenceNumber>[]>(
  457. new std::atomic<SequenceNumber>[SNAPSHOT_CACHE_SIZE] {});
  458. commit_cache_ = std::unique_ptr<std::atomic<CommitEntry64b>[]>(
  459. new std::atomic<CommitEntry64b>[COMMIT_CACHE_SIZE] {});
  460. dummy_max_snapshot_.number_ = kMaxSequenceNumber;
  461. rollback_deletion_type_callback_ =
  462. txn_db_opts.rollback_deletion_type_callback;
  463. }
  464. void WritePreparedTxnDB::CheckPreparedAgainstMax(SequenceNumber new_max,
  465. bool locked) {
  466. // When max_evicted_seq_ advances, move older entries from prepared_txns_
  467. // to delayed_prepared_. This guarantees that if a seq is lower than max,
  468. // then it is not in prepared_txns_ and save an expensive, synchronized
  469. // lookup from a shared set. delayed_prepared_ is expected to be empty in
  470. // normal cases.
  471. ROCKS_LOG_DETAILS(
  472. info_log_,
  473. "CheckPreparedAgainstMax prepared_txns_.empty() %d top: %" PRIu64,
  474. prepared_txns_.empty(),
  475. prepared_txns_.empty() ? 0 : prepared_txns_.top());
  476. const SequenceNumber prepared_top = prepared_txns_.top();
  477. const bool empty = prepared_top == kMaxSequenceNumber;
  478. // Preliminary check to avoid the synchronization cost
  479. if (!empty && prepared_top <= new_max) {
  480. if (locked) {
  481. // Needed to avoid double locking in pop().
  482. prepared_txns_.push_pop_mutex()->Unlock();
  483. }
  484. WriteLock wl(&prepared_mutex_);
  485. // Need to fetch fresh values of ::top after mutex is acquired
  486. while (!prepared_txns_.empty() && prepared_txns_.top() <= new_max) {
  487. auto to_be_popped = prepared_txns_.top();
  488. delayed_prepared_.insert(to_be_popped);
  489. ROCKS_LOG_WARN(info_log_,
  490. "prepared_mutex_ overhead %" PRIu64 " (prep=%" PRIu64
  491. " new_max=%" PRIu64 ")",
  492. static_cast<uint64_t>(delayed_prepared_.size()),
  493. to_be_popped, new_max);
  494. delayed_prepared_empty_.store(false, std::memory_order_release);
  495. // Update prepared_txns_ after updating delayed_prepared_empty_ otherwise
  496. // there will be a point in time that the entry is neither in
  497. // prepared_txns_ nor in delayed_prepared_, which will not be checked if
  498. // delayed_prepared_empty_ is false.
  499. prepared_txns_.pop();
  500. }
  501. if (locked) {
  502. prepared_txns_.push_pop_mutex()->Lock();
  503. }
  504. }
  505. }
  506. void WritePreparedTxnDB::AddPrepared(uint64_t seq, bool locked) {
  507. ROCKS_LOG_DETAILS(info_log_, "Txn %" PRIu64 " Preparing with max %" PRIu64,
  508. seq, max_evicted_seq_.load());
  509. TEST_SYNC_POINT("AddPrepared::begin:pause");
  510. TEST_SYNC_POINT("AddPrepared::begin:resume");
  511. if (!locked) {
  512. prepared_txns_.push_pop_mutex()->Lock();
  513. }
  514. prepared_txns_.push_pop_mutex()->AssertHeld();
  515. prepared_txns_.push(seq);
  516. auto new_max = future_max_evicted_seq_.load();
  517. if (UNLIKELY(seq <= new_max)) {
  518. // This should not happen in normal case
  519. ROCKS_LOG_ERROR(
  520. info_log_,
  521. "Added prepare_seq is not larger than max_evicted_seq_: %" PRIu64
  522. " <= %" PRIu64,
  523. seq, new_max);
  524. CheckPreparedAgainstMax(new_max, true /*locked*/);
  525. }
  526. if (!locked) {
  527. prepared_txns_.push_pop_mutex()->Unlock();
  528. }
  529. TEST_SYNC_POINT("AddPrepared::end");
  530. }
  531. void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq, uint64_t commit_seq,
  532. uint8_t loop_cnt) {
  533. ROCKS_LOG_DETAILS(info_log_, "Txn %" PRIu64 " Committing with %" PRIu64,
  534. prepare_seq, commit_seq);
  535. TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:start");
  536. TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:start:pause");
  537. auto indexed_seq = prepare_seq % COMMIT_CACHE_SIZE;
  538. CommitEntry64b evicted_64b;
  539. CommitEntry evicted;
  540. bool to_be_evicted = GetCommitEntry(indexed_seq, &evicted_64b, &evicted);
  541. if (LIKELY(to_be_evicted)) {
  542. assert(evicted.prep_seq != prepare_seq);
  543. auto prev_max = max_evicted_seq_.load(std::memory_order_acquire);
  544. ROCKS_LOG_DETAILS(info_log_,
  545. "Evicting %" PRIu64 ",%" PRIu64 " with max %" PRIu64,
  546. evicted.prep_seq, evicted.commit_seq, prev_max);
  547. if (prev_max < evicted.commit_seq) {
  548. auto last = db_impl_->GetLastPublishedSequence(); // could be 0
  549. SequenceNumber max_evicted_seq;
  550. if (LIKELY(evicted.commit_seq < last)) {
  551. assert(last > 0);
  552. // Inc max in larger steps to avoid frequent updates
  553. max_evicted_seq =
  554. std::min(evicted.commit_seq + INC_STEP_FOR_MAX_EVICTED, last - 1);
  555. } else {
  556. // legit when a commit entry in a write batch overwrite the previous one
  557. max_evicted_seq = evicted.commit_seq;
  558. }
  559. #ifdef OS_LINUX
  560. if (rocksdb_write_prepared_TEST_ShouldClearCommitCache &&
  561. rocksdb_write_prepared_TEST_ShouldClearCommitCache()) {
  562. max_evicted_seq = last;
  563. }
  564. #endif // OS_LINUX
  565. ROCKS_LOG_DETAILS(info_log_,
  566. "%lu Evicting %" PRIu64 ",%" PRIu64 " with max %" PRIu64
  567. " => %lu",
  568. prepare_seq, evicted.prep_seq, evicted.commit_seq,
  569. prev_max, max_evicted_seq);
  570. AdvanceMaxEvictedSeq(prev_max, max_evicted_seq);
  571. }
  572. if (UNLIKELY(!delayed_prepared_empty_.load(std::memory_order_acquire))) {
  573. WriteLock wl(&prepared_mutex_);
  574. auto dp_iter = delayed_prepared_.find(evicted.prep_seq);
  575. if (dp_iter != delayed_prepared_.end()) {
  576. // This is a rare case that txn is committed but prepared_txns_ is not
  577. // cleaned up yet. Refer to delayed_prepared_commits_ definition for
  578. // why it should be kept updated.
  579. delayed_prepared_commits_[evicted.prep_seq] = evicted.commit_seq;
  580. ROCKS_LOG_DEBUG(info_log_,
  581. "delayed_prepared_commits_[%" PRIu64 "]=%" PRIu64,
  582. evicted.prep_seq, evicted.commit_seq);
  583. }
  584. }
  585. // After each eviction from commit cache, check if the commit entry should
  586. // be kept around because it overlaps with a live snapshot.
  587. CheckAgainstSnapshots(evicted);
  588. }
  589. bool succ =
  590. ExchangeCommitEntry(indexed_seq, evicted_64b, {prepare_seq, commit_seq});
  591. if (UNLIKELY(!succ)) {
  592. ROCKS_LOG_ERROR(info_log_,
  593. "ExchangeCommitEntry failed on [%" PRIu64 "] %" PRIu64
  594. ",%" PRIu64 " retrying...",
  595. indexed_seq, prepare_seq, commit_seq);
  596. // A very rare event, in which the commit entry is updated before we do.
  597. // Here we apply a very simple solution of retrying.
  598. if (loop_cnt > 100) {
  599. throw std::runtime_error("Infinite loop in AddCommitted!");
  600. }
  601. AddCommitted(prepare_seq, commit_seq, ++loop_cnt);
  602. return;
  603. }
  604. TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:end");
  605. TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:end:pause");
  606. }
  607. void WritePreparedTxnDB::RemovePrepared(const uint64_t prepare_seq,
  608. const size_t batch_cnt) {
  609. TEST_SYNC_POINT_CALLBACK(
  610. "RemovePrepared:Start",
  611. const_cast<void*>(static_cast<const void*>(&prepare_seq)));
  612. TEST_SYNC_POINT("WritePreparedTxnDB::RemovePrepared:pause");
  613. TEST_SYNC_POINT("WritePreparedTxnDB::RemovePrepared:resume");
  614. ROCKS_LOG_DETAILS(info_log_,
  615. "RemovePrepared %" PRIu64 " cnt: %" ROCKSDB_PRIszt,
  616. prepare_seq, batch_cnt);
  617. WriteLock wl(&prepared_mutex_);
  618. for (size_t i = 0; i < batch_cnt; i++) {
  619. prepared_txns_.erase(prepare_seq + i);
  620. bool was_empty = delayed_prepared_.empty();
  621. if (!was_empty) {
  622. delayed_prepared_.erase(prepare_seq + i);
  623. auto it = delayed_prepared_commits_.find(prepare_seq + i);
  624. if (it != delayed_prepared_commits_.end()) {
  625. ROCKS_LOG_DETAILS(info_log_, "delayed_prepared_commits_.erase %" PRIu64,
  626. prepare_seq + i);
  627. delayed_prepared_commits_.erase(it);
  628. }
  629. bool is_empty = delayed_prepared_.empty();
  630. if (was_empty != is_empty) {
  631. delayed_prepared_empty_.store(is_empty, std::memory_order_release);
  632. }
  633. }
  634. }
  635. }
  636. bool WritePreparedTxnDB::GetCommitEntry(const uint64_t indexed_seq,
  637. CommitEntry64b* entry_64b,
  638. CommitEntry* entry) const {
  639. *entry_64b = commit_cache_[static_cast<size_t>(indexed_seq)].load(
  640. std::memory_order_acquire);
  641. bool valid = entry_64b->Parse(indexed_seq, entry, FORMAT);
  642. return valid;
  643. }
  644. bool WritePreparedTxnDB::AddCommitEntry(const uint64_t indexed_seq,
  645. const CommitEntry& new_entry,
  646. CommitEntry* evicted_entry) {
  647. CommitEntry64b new_entry_64b(new_entry, FORMAT);
  648. CommitEntry64b evicted_entry_64b =
  649. commit_cache_[static_cast<size_t>(indexed_seq)].exchange(
  650. new_entry_64b, std::memory_order_acq_rel);
  651. bool valid = evicted_entry_64b.Parse(indexed_seq, evicted_entry, FORMAT);
  652. return valid;
  653. }
  654. bool WritePreparedTxnDB::ExchangeCommitEntry(const uint64_t indexed_seq,
  655. CommitEntry64b& expected_entry_64b,
  656. const CommitEntry& new_entry) {
  657. auto& atomic_entry = commit_cache_[static_cast<size_t>(indexed_seq)];
  658. CommitEntry64b new_entry_64b(new_entry, FORMAT);
  659. bool succ = atomic_entry.compare_exchange_strong(
  660. expected_entry_64b, new_entry_64b, std::memory_order_acq_rel,
  661. std::memory_order_acquire);
  662. return succ;
  663. }
  664. void WritePreparedTxnDB::AdvanceMaxEvictedSeq(const SequenceNumber& prev_max,
  665. const SequenceNumber& new_max) {
  666. ROCKS_LOG_DETAILS(info_log_,
  667. "AdvanceMaxEvictedSeq overhead %" PRIu64 " => %" PRIu64,
  668. prev_max, new_max);
  669. // Declare the intention before getting snapshot from the DB. This helps a
  670. // concurrent GetSnapshot to wait to catch up with future_max_evicted_seq_ if
  671. // it has not already. Otherwise the new snapshot is when we ask DB for
  672. // snapshots smaller than future max.
  673. auto updated_future_max = prev_max;
  674. while (updated_future_max < new_max &&
  675. !future_max_evicted_seq_.compare_exchange_weak(
  676. updated_future_max, new_max, std::memory_order_acq_rel,
  677. std::memory_order_relaxed)) {
  678. };
  679. CheckPreparedAgainstMax(new_max, false /*locked*/);
  680. // With each change to max_evicted_seq_ fetch the live snapshots behind it.
  681. // We use max as the version of snapshots to identify how fresh are the
  682. // snapshot list. This works because the snapshots are between 0 and
  683. // max, so the larger the max, the more complete they are.
  684. SequenceNumber new_snapshots_version = new_max;
  685. std::vector<SequenceNumber> snapshots;
  686. bool update_snapshots = false;
  687. if (new_snapshots_version > snapshots_version_) {
  688. // This is to avoid updating the snapshots_ if it already updated
  689. // with a more recent version by a concurrent thread
  690. update_snapshots = true;
  691. // We only care about snapshots lower then max
  692. snapshots = GetSnapshotListFromDB(new_max);
  693. }
  694. if (update_snapshots) {
  695. UpdateSnapshots(snapshots, new_snapshots_version);
  696. if (!snapshots.empty()) {
  697. WriteLock wl(&old_commit_map_mutex_);
  698. for (auto snap : snapshots) {
  699. // This allows IsInSnapshot to tell apart the reads from in valid
  700. // snapshots from the reads from committed values in valid snapshots.
  701. old_commit_map_[snap];
  702. }
  703. old_commit_map_empty_.store(false, std::memory_order_release);
  704. }
  705. }
  706. auto updated_prev_max = prev_max;
  707. TEST_SYNC_POINT("AdvanceMaxEvictedSeq::update_max:pause");
  708. TEST_SYNC_POINT("AdvanceMaxEvictedSeq::update_max:resume");
  709. while (updated_prev_max < new_max &&
  710. !max_evicted_seq_.compare_exchange_weak(updated_prev_max, new_max,
  711. std::memory_order_acq_rel,
  712. std::memory_order_relaxed)) {
  713. };
  714. }
  715. const Snapshot* WritePreparedTxnDB::GetSnapshot() {
  716. const bool kForWWConflictCheck = true;
  717. return GetSnapshotInternal(!kForWWConflictCheck);
  718. }
  719. SnapshotImpl* WritePreparedTxnDB::GetSnapshotInternal(
  720. bool for_ww_conflict_check) {
  721. // Note: for this optimization setting the last sequence number and obtaining
  722. // the smallest uncommitted seq should be done atomically. However to avoid
  723. // the mutex overhead, we call SmallestUnCommittedSeq BEFORE taking the
  724. // snapshot. Since we always updated the list of unprepared seq (via
  725. // AddPrepared) AFTER the last sequence is updated, this guarantees that the
  726. // smallest uncommitted seq that we pair with the snapshot is smaller or equal
  727. // the value that would be obtained otherwise atomically. That is ok since
  728. // this optimization works as long as min_uncommitted is less than or equal
  729. // than the smallest uncommitted seq when the snapshot was taken.
  730. auto min_uncommitted = WritePreparedTxnDB::SmallestUnCommittedSeq();
  731. SnapshotImpl* snap_impl = db_impl_->GetSnapshotImpl(for_ww_conflict_check);
  732. TEST_SYNC_POINT("WritePreparedTxnDB::GetSnapshotInternal:first");
  733. assert(snap_impl);
  734. SequenceNumber snap_seq = snap_impl->GetSequenceNumber();
  735. // Note: Check against future_max_evicted_seq_ (in contrast with
  736. // max_evicted_seq_) in case there is a concurrent AdvanceMaxEvictedSeq.
  737. if (UNLIKELY(snap_seq != 0 && snap_seq <= future_max_evicted_seq_)) {
  738. // There is a very rare case in which the commit entry evicts another commit
  739. // entry that is not published yet thus advancing max evicted seq beyond the
  740. // last published seq. This case is not likely in real-world setup so we
  741. // handle it with a few retries.
  742. size_t retry = 0;
  743. SequenceNumber max;
  744. while ((max = future_max_evicted_seq_.load()) != 0 &&
  745. snap_impl->GetSequenceNumber() <= max && retry < 100) {
  746. ROCKS_LOG_WARN(info_log_,
  747. "GetSnapshot snap: %" PRIu64 " max: %" PRIu64
  748. " retry %" ROCKSDB_PRIszt,
  749. snap_impl->GetSequenceNumber(), max, retry);
  750. ReleaseSnapshot(snap_impl);
  751. // Wait for last visible seq to catch up with max, and also go beyond it
  752. // by one.
  753. AdvanceSeqByOne();
  754. snap_impl = db_impl_->GetSnapshotImpl(for_ww_conflict_check);
  755. assert(snap_impl);
  756. retry++;
  757. }
  758. assert(snap_impl->GetSequenceNumber() > max);
  759. if (snap_impl->GetSequenceNumber() <= max) {
  760. throw std::runtime_error(
  761. "Snapshot seq " + std::to_string(snap_impl->GetSequenceNumber()) +
  762. " after " + std::to_string(retry) +
  763. " retries is still less than future_max_evicted_seq_" +
  764. std::to_string(max));
  765. }
  766. }
  767. EnhanceSnapshot(snap_impl, min_uncommitted);
  768. ROCKS_LOG_DETAILS(
  769. db_impl_->immutable_db_options().info_log,
  770. "GetSnapshot %" PRIu64 " ww:%" PRIi32 " min_uncommitted: %" PRIu64,
  771. snap_impl->GetSequenceNumber(), for_ww_conflict_check, min_uncommitted);
  772. TEST_SYNC_POINT("WritePreparedTxnDB::GetSnapshotInternal:end");
  773. return snap_impl;
  774. }
  775. void WritePreparedTxnDB::AdvanceSeqByOne() {
  776. // Inserting an empty value will i) let the max evicted entry to be
  777. // published, i.e., max == last_published, increase the last published to
  778. // be one beyond max, i.e., max < last_published.
  779. // TODO: plumb Env::IOActivity, Env::IOPriority
  780. WriteOptions woptions;
  781. TransactionOptions txn_options;
  782. Transaction* txn0 = BeginTransaction(woptions, txn_options, nullptr);
  783. std::hash<std::thread::id> hasher;
  784. char name[64];
  785. snprintf(name, 64, "txn%" ROCKSDB_PRIszt, hasher(std::this_thread::get_id()));
  786. assert(strlen(name) < 64 - 1);
  787. Status s = txn0->SetName(name);
  788. assert(s.ok());
  789. if (s.ok()) {
  790. // Without prepare it would simply skip the commit
  791. s = txn0->Prepare();
  792. }
  793. assert(s.ok());
  794. if (s.ok()) {
  795. s = txn0->Commit();
  796. }
  797. assert(s.ok());
  798. delete txn0;
  799. }
  800. const std::vector<SequenceNumber> WritePreparedTxnDB::GetSnapshotListFromDB(
  801. SequenceNumber max) {
  802. ROCKS_LOG_DETAILS(info_log_, "GetSnapshotListFromDB with max %" PRIu64, max);
  803. InstrumentedMutexLock dblock(db_impl_->mutex());
  804. db_impl_->mutex()->AssertHeld();
  805. return db_impl_->snapshots().GetAll(nullptr, max);
  806. }
  807. void WritePreparedTxnDB::ReleaseSnapshotInternal(
  808. const SequenceNumber snap_seq) {
  809. // TODO(myabandeh): relax should enough since the synchronizatin is already
  810. // done by snapshots_mutex_ under which this function is called.
  811. if (snap_seq <= max_evicted_seq_.load(std::memory_order_acquire)) {
  812. // Then this is a rare case that transaction did not finish before max
  813. // advances. It is expected for a few read-only backup snapshots. For such
  814. // snapshots we might have kept around a couple of entries in the
  815. // old_commit_map_. Check and do garbage collection if that is the case.
  816. bool need_gc = false;
  817. {
  818. WPRecordTick(TXN_OLD_COMMIT_MAP_MUTEX_OVERHEAD);
  819. ROCKS_LOG_WARN(info_log_, "old_commit_map_mutex_ overhead for %" PRIu64,
  820. snap_seq);
  821. ReadLock rl(&old_commit_map_mutex_);
  822. auto prep_set_entry = old_commit_map_.find(snap_seq);
  823. need_gc = prep_set_entry != old_commit_map_.end();
  824. }
  825. if (need_gc) {
  826. WPRecordTick(TXN_OLD_COMMIT_MAP_MUTEX_OVERHEAD);
  827. ROCKS_LOG_WARN(info_log_, "old_commit_map_mutex_ overhead for %" PRIu64,
  828. snap_seq);
  829. WriteLock wl(&old_commit_map_mutex_);
  830. old_commit_map_.erase(snap_seq);
  831. old_commit_map_empty_.store(old_commit_map_.empty(),
  832. std::memory_order_release);
  833. }
  834. }
  835. }
  836. void WritePreparedTxnDB::CleanupReleasedSnapshots(
  837. const std::vector<SequenceNumber>& new_snapshots,
  838. const std::vector<SequenceNumber>& old_snapshots) {
  839. auto newi = new_snapshots.begin();
  840. auto oldi = old_snapshots.begin();
  841. for (; newi != new_snapshots.end() && oldi != old_snapshots.end();) {
  842. assert(*newi >= *oldi); // cannot have new snapshots with lower seq
  843. if (*newi == *oldi) { // still not released
  844. auto value = *newi;
  845. while (newi != new_snapshots.end() && *newi == value) {
  846. newi++;
  847. }
  848. while (oldi != old_snapshots.end() && *oldi == value) {
  849. oldi++;
  850. }
  851. } else {
  852. assert(*newi > *oldi); // *oldi is released
  853. ReleaseSnapshotInternal(*oldi);
  854. oldi++;
  855. }
  856. }
  857. // Everything remained in old_snapshots is released and must be cleaned up
  858. for (; oldi != old_snapshots.end(); oldi++) {
  859. ReleaseSnapshotInternal(*oldi);
  860. }
  861. }
  862. void WritePreparedTxnDB::UpdateSnapshots(
  863. const std::vector<SequenceNumber>& snapshots,
  864. const SequenceNumber& version) {
  865. ROCKS_LOG_DETAILS(info_log_, "UpdateSnapshots with version %" PRIu64,
  866. version);
  867. TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:p:start");
  868. TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:s:start");
  869. #ifndef NDEBUG
  870. size_t sync_i = 0;
  871. #endif
  872. ROCKS_LOG_DETAILS(info_log_, "snapshots_mutex_ overhead");
  873. WriteLock wl(&snapshots_mutex_);
  874. snapshots_version_ = version;
  875. // We update the list concurrently with the readers.
  876. // Both new and old lists are sorted and the new list is subset of the
  877. // previous list plus some new items. Thus if a snapshot repeats in
  878. // both new and old lists, it will appear upper in the new list. So if
  879. // we simply insert the new snapshots in order, if an overwritten item
  880. // is still valid in the new list is either written to the same place in
  881. // the array or it is written in a higher place before it gets
  882. // overwritten by another item. This guarantee a reader that reads the
  883. // list bottom-up will eventually see a snapshot that repeats in the
  884. // update, either before it gets overwritten by the writer or
  885. // afterwards.
  886. size_t i = 0;
  887. auto it = snapshots.begin();
  888. for (; it != snapshots.end() && i < SNAPSHOT_CACHE_SIZE; ++it, ++i) {
  889. snapshot_cache_[i].store(*it, std::memory_order_release);
  890. TEST_IDX_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:p:", ++sync_i);
  891. TEST_IDX_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:s:", sync_i);
  892. }
  893. #ifndef NDEBUG
  894. // Release the remaining sync points since they are useless given that the
  895. // reader would also use lock to access snapshots
  896. for (++sync_i; sync_i <= 10; ++sync_i) {
  897. TEST_IDX_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:p:", sync_i);
  898. TEST_IDX_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:s:", sync_i);
  899. }
  900. #endif
  901. snapshots_.clear();
  902. for (; it != snapshots.end(); ++it) {
  903. // Insert them to a vector that is less efficient to access
  904. // concurrently
  905. snapshots_.push_back(*it);
  906. }
  907. // Update the size at the end. Otherwise a parallel reader might read
  908. // items that are not set yet.
  909. snapshots_total_.store(snapshots.size(), std::memory_order_release);
  910. // Note: this must be done after the snapshots data structures are updated
  911. // with the new list of snapshots.
  912. CleanupReleasedSnapshots(snapshots, snapshots_all_);
  913. snapshots_all_ = snapshots;
  914. TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:p:end");
  915. TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:s:end");
  916. }
  917. void WritePreparedTxnDB::CheckAgainstSnapshots(const CommitEntry& evicted) {
  918. TEST_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:p:start");
  919. TEST_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:s:start");
  920. #ifndef NDEBUG
  921. size_t sync_i = 0;
  922. #endif
  923. // First check the snapshot cache that is efficient for concurrent access
  924. auto cnt = snapshots_total_.load(std::memory_order_acquire);
  925. // The list might get updated concurrently as we are reading from it. The
  926. // reader should be able to read all the snapshots that are still valid
  927. // after the update. Since the survived snapshots are written in a higher
  928. // place before gets overwritten the reader that reads bottom-up will
  929. // eventually see it.
  930. const bool next_is_larger = true;
  931. // We will set to true if the border line snapshot suggests that.
  932. bool search_larger_list = false;
  933. size_t ip1 = std::min(cnt, SNAPSHOT_CACHE_SIZE);
  934. for (; 0 < ip1; ip1--) {
  935. SequenceNumber snapshot_seq =
  936. snapshot_cache_[ip1 - 1].load(std::memory_order_acquire);
  937. TEST_IDX_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:p:",
  938. ++sync_i);
  939. TEST_IDX_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:s:", sync_i);
  940. if (ip1 == SNAPSHOT_CACHE_SIZE) { // border line snapshot
  941. // snapshot_seq < commit_seq => larger_snapshot_seq <= commit_seq
  942. // then later also continue the search to larger snapshots
  943. search_larger_list = snapshot_seq < evicted.commit_seq;
  944. }
  945. if (!MaybeUpdateOldCommitMap(evicted.prep_seq, evicted.commit_seq,
  946. snapshot_seq, !next_is_larger)) {
  947. break;
  948. }
  949. }
  950. #ifndef NDEBUG
  951. // Release the remaining sync points before acquiring the lock
  952. for (++sync_i; sync_i <= 10; ++sync_i) {
  953. TEST_IDX_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:p:", sync_i);
  954. TEST_IDX_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:s:", sync_i);
  955. }
  956. #endif
  957. TEST_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:p:end");
  958. TEST_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:s:end");
  959. if (UNLIKELY(SNAPSHOT_CACHE_SIZE < cnt && search_larger_list)) {
  960. // Then access the less efficient list of snapshots_
  961. WPRecordTick(TXN_SNAPSHOT_MUTEX_OVERHEAD);
  962. ROCKS_LOG_WARN(info_log_,
  963. "snapshots_mutex_ overhead for <%" PRIu64 ",%" PRIu64
  964. "> with %" ROCKSDB_PRIszt " snapshots",
  965. evicted.prep_seq, evicted.commit_seq, cnt);
  966. ReadLock rl(&snapshots_mutex_);
  967. // Items could have moved from the snapshots_ to snapshot_cache_ before
  968. // acquiring the lock. To make sure that we do not miss a valid snapshot,
  969. // read snapshot_cache_ again while holding the lock.
  970. for (size_t i = 0; i < SNAPSHOT_CACHE_SIZE; i++) {
  971. SequenceNumber snapshot_seq =
  972. snapshot_cache_[i].load(std::memory_order_acquire);
  973. if (!MaybeUpdateOldCommitMap(evicted.prep_seq, evicted.commit_seq,
  974. snapshot_seq, next_is_larger)) {
  975. break;
  976. }
  977. }
  978. for (auto snapshot_seq_2 : snapshots_) {
  979. if (!MaybeUpdateOldCommitMap(evicted.prep_seq, evicted.commit_seq,
  980. snapshot_seq_2, next_is_larger)) {
  981. break;
  982. }
  983. }
  984. }
  985. }
  986. bool WritePreparedTxnDB::MaybeUpdateOldCommitMap(
  987. const uint64_t& prep_seq, const uint64_t& commit_seq,
  988. const uint64_t& snapshot_seq, const bool next_is_larger = true) {
  989. // If we do not store an entry in old_commit_map_ we assume it is committed in
  990. // all snapshots. If commit_seq <= snapshot_seq, it is considered already in
  991. // the snapshot so we need not to keep the entry around for this snapshot.
  992. if (commit_seq <= snapshot_seq) {
  993. // continue the search if the next snapshot could be smaller than commit_seq
  994. return !next_is_larger;
  995. }
  996. // then snapshot_seq < commit_seq
  997. if (prep_seq <= snapshot_seq) { // overlapping range
  998. WPRecordTick(TXN_OLD_COMMIT_MAP_MUTEX_OVERHEAD);
  999. ROCKS_LOG_WARN(info_log_,
  1000. "old_commit_map_mutex_ overhead for %" PRIu64
  1001. " commit entry: <%" PRIu64 ",%" PRIu64 ">",
  1002. snapshot_seq, prep_seq, commit_seq);
  1003. WriteLock wl(&old_commit_map_mutex_);
  1004. old_commit_map_empty_.store(false, std::memory_order_release);
  1005. auto& vec = old_commit_map_[snapshot_seq];
  1006. vec.insert(std::upper_bound(vec.begin(), vec.end(), prep_seq), prep_seq);
  1007. // We need to store it once for each overlapping snapshot. Returning true to
  1008. // continue the search if there is more overlapping snapshot.
  1009. return true;
  1010. }
  1011. // continue the search if the next snapshot could be larger than prep_seq
  1012. return next_is_larger;
  1013. }
  1014. WritePreparedTxnDB::~WritePreparedTxnDB() {
  1015. // At this point there could be running compaction/flush holding a
  1016. // SnapshotChecker, which holds a pointer back to WritePreparedTxnDB.
  1017. // Make sure those jobs finished before destructing WritePreparedTxnDB.
  1018. if (!db_impl_->shutting_down_) {
  1019. db_impl_->CancelAllBackgroundWork(true /*wait*/);
  1020. }
  1021. }
  1022. void SubBatchCounter::InitWithComp(const uint32_t cf) {
  1023. auto cmp = comparators_[cf];
  1024. keys_[cf] = CFKeys(SetComparator(cmp));
  1025. }
  1026. void SubBatchCounter::AddKey(const uint32_t cf, const Slice& key) {
  1027. CFKeys& cf_keys = keys_[cf];
  1028. if (cf_keys.size() == 0) { // just inserted
  1029. InitWithComp(cf);
  1030. }
  1031. auto it = cf_keys.insert(key);
  1032. if (it.second == false) { // second is false if a element already existed.
  1033. batches_++;
  1034. keys_.clear();
  1035. InitWithComp(cf);
  1036. keys_[cf].insert(key);
  1037. }
  1038. }
  1039. } // namespace ROCKSDB_NAMESPACE