write_prepared_txn_db.h 51 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #pragma once
  6. #include <cinttypes>
  7. #include <mutex>
  8. #include <queue>
  9. #include <set>
  10. #include <string>
  11. #include <unordered_map>
  12. #include <vector>
  13. #include "db/attribute_group_iterator_impl.h"
  14. #include "db/db_iter.h"
  15. #include "db/pre_release_callback.h"
  16. #include "db/read_callback.h"
  17. #include "db/snapshot_checker.h"
  18. #include "logging/logging.h"
  19. #include "rocksdb/db.h"
  20. #include "rocksdb/options.h"
  21. #include "rocksdb/utilities/transaction_db.h"
  22. #include "util/cast_util.h"
  23. #include "util/set_comparator.h"
  24. #include "util/string_util.h"
  25. #include "utilities/transactions/pessimistic_transaction.h"
  26. #include "utilities/transactions/pessimistic_transaction_db.h"
  27. #include "utilities/transactions/write_prepared_txn.h"
  28. namespace ROCKSDB_NAMESPACE {
  29. enum SnapshotBackup : bool { kUnbackedByDBSnapshot, kBackedByDBSnapshot };
  30. // A PessimisticTransactionDB that writes data to DB after prepare phase of 2PC.
  31. // In this way some data in the DB might not be committed. The DB provides
  32. // mechanisms to tell such data apart from committed data.
  33. class WritePreparedTxnDB : public PessimisticTransactionDB {
  34. public:
  35. explicit WritePreparedTxnDB(DB* db,
  36. const TransactionDBOptions& txn_db_options)
  37. : PessimisticTransactionDB(db, txn_db_options),
  38. SNAPSHOT_CACHE_BITS(txn_db_options.wp_snapshot_cache_bits),
  39. SNAPSHOT_CACHE_SIZE(static_cast<size_t>(1ull << SNAPSHOT_CACHE_BITS)),
  40. COMMIT_CACHE_BITS(txn_db_options.wp_commit_cache_bits),
  41. COMMIT_CACHE_SIZE(static_cast<size_t>(1ull << COMMIT_CACHE_BITS)),
  42. FORMAT(COMMIT_CACHE_BITS) {
  43. Init(txn_db_options);
  44. }
  45. explicit WritePreparedTxnDB(StackableDB* db,
  46. const TransactionDBOptions& txn_db_options)
  47. : PessimisticTransactionDB(db, txn_db_options),
  48. SNAPSHOT_CACHE_BITS(txn_db_options.wp_snapshot_cache_bits),
  49. SNAPSHOT_CACHE_SIZE(static_cast<size_t>(1ull << SNAPSHOT_CACHE_BITS)),
  50. COMMIT_CACHE_BITS(txn_db_options.wp_commit_cache_bits),
  51. COMMIT_CACHE_SIZE(static_cast<size_t>(1ull << COMMIT_CACHE_BITS)),
  52. FORMAT(COMMIT_CACHE_BITS) {
  53. Init(txn_db_options);
  54. }
  55. virtual ~WritePreparedTxnDB();
  56. Status Initialize(const std::vector<size_t>& compaction_enabled_cf_indices,
  57. const std::vector<ColumnFamilyHandle*>& handles) override;
  58. Transaction* BeginTransaction(const WriteOptions& write_options,
  59. const TransactionOptions& txn_options,
  60. Transaction* old_txn) override;
  61. using TransactionDB::Write;
  62. Status Write(const WriteOptions& opts, WriteBatch* updates) override;
  63. // Optimized version of ::Write that receives more optimization request such
  64. // as skip_concurrency_control.
  65. using PessimisticTransactionDB::Write;
  66. Status Write(const WriteOptions& opts, const TransactionDBWriteOptimizations&,
  67. WriteBatch* updates) override;
  68. // Write the batch to the underlying DB and mark it as committed. Could be
  69. // used by both directly from TxnDB or through a transaction.
  70. Status WriteInternal(const WriteOptions& write_options, WriteBatch* batch,
  71. size_t batch_cnt, WritePreparedTxn* txn);
  72. using DB::Get;
  73. Status Get(const ReadOptions& _read_options,
  74. ColumnFamilyHandle* column_family, const Slice& key,
  75. PinnableSlice* value, std::string* timestamp) override;
  76. using DB::MultiGet;
  77. void MultiGet(const ReadOptions& _read_options, const size_t num_keys,
  78. ColumnFamilyHandle** column_families, const Slice* keys,
  79. PinnableSlice* values, std::string* timestamps,
  80. Status* statuses, const bool sorted_input) override;
  81. using DB::NewIterator;
  82. Iterator* NewIterator(const ReadOptions& _read_options,
  83. ColumnFamilyHandle* column_family) override;
  84. using DB::NewIterators;
  85. Status NewIterators(const ReadOptions& _read_options,
  86. const std::vector<ColumnFamilyHandle*>& column_families,
  87. std::vector<Iterator*>* iterators) override;
  88. using DB::NewCoalescingIterator;
  89. std::unique_ptr<Iterator> NewCoalescingIterator(
  90. const ReadOptions& /*options*/,
  91. const std::vector<ColumnFamilyHandle*>& /*column_families*/) override {
  92. return std::unique_ptr<Iterator>(
  93. NewErrorIterator(Status::NotSupported("Not supported yet")));
  94. }
  95. using DB::NewAttributeGroupIterator;
  96. std::unique_ptr<AttributeGroupIterator> NewAttributeGroupIterator(
  97. const ReadOptions& /*options*/,
  98. const std::vector<ColumnFamilyHandle*>& /*column_families*/) override {
  99. return NewAttributeGroupErrorIterator(
  100. Status::NotSupported("Not supported yet"));
  101. }
  102. // Check whether the transaction that wrote the value with sequence number seq
  103. // is visible to the snapshot with sequence number snapshot_seq.
  104. // Returns true if commit_seq <= snapshot_seq
  105. // If the snapshot_seq is already released and snapshot_seq <= max, sets
  106. // *snap_released to true and returns true as well.
  107. inline bool IsInSnapshot(uint64_t prep_seq, uint64_t snapshot_seq,
  108. uint64_t min_uncommitted = kMinUnCommittedSeq,
  109. bool* snap_released = nullptr) const {
  110. ROCKS_LOG_DETAILS(info_log_,
  111. "IsInSnapshot %" PRIu64 " in %" PRIu64
  112. " min_uncommitted %" PRIu64,
  113. prep_seq, snapshot_seq, min_uncommitted);
  114. assert(min_uncommitted >= kMinUnCommittedSeq);
  115. // Caller is responsible to initialize snap_released.
  116. assert(snap_released == nullptr || *snap_released == false);
  117. // Here we try to infer the return value without looking into prepare list.
  118. // This would help avoiding synchronization over a shared map.
  119. // TODO(myabandeh): optimize this. This sequence of checks must be correct
  120. // but not necessary efficient
  121. if (prep_seq == 0) {
  122. // Compaction will output keys to bottom-level with sequence number 0 if
  123. // it is visible to the earliest snapshot.
  124. ROCKS_LOG_DETAILS(
  125. info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,
  126. prep_seq, snapshot_seq, 1);
  127. return true;
  128. }
  129. if (snapshot_seq < prep_seq) {
  130. // snapshot_seq < prep_seq <= commit_seq => snapshot_seq < commit_seq
  131. ROCKS_LOG_DETAILS(
  132. info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,
  133. prep_seq, snapshot_seq, 0);
  134. return false;
  135. }
  136. if (prep_seq < min_uncommitted) {
  137. ROCKS_LOG_DETAILS(info_log_,
  138. "IsInSnapshot %" PRIu64 " in %" PRIu64
  139. " returns %" PRId32
  140. " because of min_uncommitted %" PRIu64,
  141. prep_seq, snapshot_seq, 1, min_uncommitted);
  142. return true;
  143. }
  144. // Commit of delayed prepared has two non-atomic steps: add to commit cache,
  145. // remove from delayed prepared. Our reads from these two is also
  146. // non-atomic. By looking into commit cache first thus we might not find the
  147. // prep_seq neither in commit cache not in delayed_prepared_. To fix that i)
  148. // we check if there was any delayed prepared BEFORE looking into commit
  149. // cache, ii) if there was, we complete the search steps to be these: i)
  150. // commit cache, ii) delayed prepared, commit cache again. In this way if
  151. // the first query to commit cache missed the commit, the 2nd will catch it.
  152. bool was_empty;
  153. SequenceNumber max_evicted_seq_lb, max_evicted_seq_ub;
  154. CommitEntry64b dont_care;
  155. auto indexed_seq = prep_seq % COMMIT_CACHE_SIZE;
  156. size_t repeats = 0;
  157. do {
  158. repeats++;
  159. assert(repeats < 100);
  160. if (UNLIKELY(repeats >= 100)) {
  161. throw std::runtime_error(
  162. "The read was intrupted 100 times by update to max_evicted_seq_. "
  163. "This is unexpected in all setups");
  164. }
  165. max_evicted_seq_lb = max_evicted_seq_.load(std::memory_order_acquire);
  166. TEST_SYNC_POINT(
  167. "WritePreparedTxnDB::IsInSnapshot:max_evicted_seq_:pause");
  168. TEST_SYNC_POINT(
  169. "WritePreparedTxnDB::IsInSnapshot:max_evicted_seq_:resume");
  170. was_empty = delayed_prepared_empty_.load(std::memory_order_acquire);
  171. TEST_SYNC_POINT(
  172. "WritePreparedTxnDB::IsInSnapshot:delayed_prepared_empty_:pause");
  173. TEST_SYNC_POINT(
  174. "WritePreparedTxnDB::IsInSnapshot:delayed_prepared_empty_:resume");
  175. CommitEntry cached;
  176. bool exist = GetCommitEntry(indexed_seq, &dont_care, &cached);
  177. TEST_SYNC_POINT("WritePreparedTxnDB::IsInSnapshot:GetCommitEntry:pause");
  178. TEST_SYNC_POINT("WritePreparedTxnDB::IsInSnapshot:GetCommitEntry:resume");
  179. if (exist && prep_seq == cached.prep_seq) {
  180. // It is committed and also not evicted from commit cache
  181. ROCKS_LOG_DETAILS(
  182. info_log_,
  183. "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,
  184. prep_seq, snapshot_seq, cached.commit_seq <= snapshot_seq);
  185. return cached.commit_seq <= snapshot_seq;
  186. }
  187. // else it could be committed but not inserted in the map which could
  188. // happen after recovery, or it could be committed and evicted by another
  189. // commit, or never committed.
  190. // At this point we don't know if it was committed or it is still prepared
  191. max_evicted_seq_ub = max_evicted_seq_.load(std::memory_order_acquire);
  192. if (UNLIKELY(max_evicted_seq_lb != max_evicted_seq_ub)) {
  193. continue;
  194. }
  195. // Note: max_evicted_seq_ when we did GetCommitEntry <= max_evicted_seq_ub
  196. if (max_evicted_seq_ub < prep_seq) {
  197. // Not evicted from cache and also not present, so must be still
  198. // prepared
  199. ROCKS_LOG_DETAILS(info_log_,
  200. "IsInSnapshot %" PRIu64 " in %" PRIu64
  201. " returns %" PRId32,
  202. prep_seq, snapshot_seq, 0);
  203. return false;
  204. }
  205. TEST_SYNC_POINT("WritePreparedTxnDB::IsInSnapshot:prepared_mutex_:pause");
  206. TEST_SYNC_POINT(
  207. "WritePreparedTxnDB::IsInSnapshot:prepared_mutex_:resume");
  208. if (!was_empty) {
  209. // We should not normally reach here
  210. WPRecordTick(TXN_PREPARE_MUTEX_OVERHEAD);
  211. ReadLock rl(&prepared_mutex_);
  212. ROCKS_LOG_WARN(
  213. info_log_, "prepared_mutex_ overhead %" PRIu64 " for %" PRIu64,
  214. static_cast<uint64_t>(delayed_prepared_.size()), prep_seq);
  215. if (delayed_prepared_.find(prep_seq) != delayed_prepared_.end()) {
  216. // This is the order: 1) delayed_prepared_commits_ update, 2) publish
  217. // 3) delayed_prepared_ clean up. So check if it is the case of a late
  218. // clenaup.
  219. auto it = delayed_prepared_commits_.find(prep_seq);
  220. if (it == delayed_prepared_commits_.end()) {
  221. // Then it is not committed yet
  222. ROCKS_LOG_DETAILS(info_log_,
  223. "IsInSnapshot %" PRIu64 " in %" PRIu64
  224. " returns %" PRId32,
  225. prep_seq, snapshot_seq, 0);
  226. return false;
  227. } else {
  228. ROCKS_LOG_DETAILS(info_log_,
  229. "IsInSnapshot %" PRIu64 " in %" PRIu64
  230. " commit: %" PRIu64 " returns %" PRId32,
  231. prep_seq, snapshot_seq, it->second,
  232. snapshot_seq <= it->second);
  233. return it->second <= snapshot_seq;
  234. }
  235. } else {
  236. // 2nd query to commit cache. Refer to was_empty comment above.
  237. exist = GetCommitEntry(indexed_seq, &dont_care, &cached);
  238. if (exist && prep_seq == cached.prep_seq) {
  239. ROCKS_LOG_DETAILS(
  240. info_log_,
  241. "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,
  242. prep_seq, snapshot_seq, cached.commit_seq <= snapshot_seq);
  243. return cached.commit_seq <= snapshot_seq;
  244. }
  245. max_evicted_seq_ub = max_evicted_seq_.load(std::memory_order_acquire);
  246. }
  247. }
  248. } while (UNLIKELY(max_evicted_seq_lb != max_evicted_seq_ub));
  249. // When advancing max_evicted_seq_, we move older entires from prepared to
  250. // delayed_prepared_. Also we move evicted entries from commit cache to
  251. // old_commit_map_ if it overlaps with any snapshot. Since prep_seq <=
  252. // max_evicted_seq_, we have three cases: i) in delayed_prepared_, ii) in
  253. // old_commit_map_, iii) committed with no conflict with any snapshot. Case
  254. // (i) delayed_prepared_ is checked above
  255. if (max_evicted_seq_ub < snapshot_seq) { // then (ii) cannot be the case
  256. // only (iii) is the case: committed
  257. // commit_seq <= max_evicted_seq_ < snapshot_seq => commit_seq <
  258. // snapshot_seq
  259. ROCKS_LOG_DETAILS(
  260. info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,
  261. prep_seq, snapshot_seq, 1);
  262. return true;
  263. }
  264. // else (ii) might be the case: check the commit data saved for this
  265. // snapshot. If there was no overlapping commit entry, then it is committed
  266. // with a commit_seq lower than any live snapshot, including snapshot_seq.
  267. if (old_commit_map_empty_.load(std::memory_order_acquire)) {
  268. ROCKS_LOG_DETAILS(info_log_,
  269. "IsInSnapshot %" PRIu64 " in %" PRIu64
  270. " returns %" PRId32 " released=1",
  271. prep_seq, snapshot_seq, 0);
  272. assert(snap_released);
  273. // This snapshot is not valid anymore. We cannot tell if prep_seq is
  274. // committed before or after the snapshot. Return true but also set
  275. // snap_released to true.
  276. *snap_released = true;
  277. return true;
  278. }
  279. {
  280. // We should not normally reach here unless sapshot_seq is old. This is a
  281. // rare case and it is ok to pay the cost of mutex ReadLock for such old,
  282. // reading transactions.
  283. WPRecordTick(TXN_OLD_COMMIT_MAP_MUTEX_OVERHEAD);
  284. ReadLock rl(&old_commit_map_mutex_);
  285. auto prep_set_entry = old_commit_map_.find(snapshot_seq);
  286. bool found = prep_set_entry != old_commit_map_.end();
  287. if (found) {
  288. auto& vec = prep_set_entry->second;
  289. found = std::binary_search(vec.begin(), vec.end(), prep_seq);
  290. } else {
  291. // coming from compaction
  292. ROCKS_LOG_DETAILS(info_log_,
  293. "IsInSnapshot %" PRIu64 " in %" PRIu64
  294. " returns %" PRId32 " released=1",
  295. prep_seq, snapshot_seq, 0);
  296. // This snapshot is not valid anymore. We cannot tell if prep_seq is
  297. // committed before or after the snapshot. Return true but also set
  298. // snap_released to true.
  299. assert(snap_released);
  300. *snap_released = true;
  301. return true;
  302. }
  303. if (!found) {
  304. ROCKS_LOG_DETAILS(info_log_,
  305. "IsInSnapshot %" PRIu64 " in %" PRIu64
  306. " returns %" PRId32,
  307. prep_seq, snapshot_seq, 1);
  308. return true;
  309. }
  310. }
  311. // (ii) it the case: it is committed but after the snapshot_seq
  312. ROCKS_LOG_DETAILS(
  313. info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,
  314. prep_seq, snapshot_seq, 0);
  315. return false;
  316. }
  317. // Add the transaction with prepare sequence seq to the prepared list.
  318. // Note: must be called serially with increasing seq on each call.
  319. // locked is true if prepared_mutex_ is already locked.
  320. void AddPrepared(uint64_t seq, bool locked = false);
  321. // Check if any of the prepared txns are less than new max_evicted_seq_. Must
  322. // be called with prepared_mutex_ write locked.
  323. void CheckPreparedAgainstMax(SequenceNumber new_max, bool locked);
  324. // Remove the transaction with prepare sequence seq from the prepared list
  325. void RemovePrepared(const uint64_t seq, const size_t batch_cnt = 1);
  326. // Add the transaction with prepare sequence prepare_seq and commit sequence
  327. // commit_seq to the commit map. loop_cnt is to detect infinite loops.
  328. // Note: must be called serially.
  329. void AddCommitted(uint64_t prepare_seq, uint64_t commit_seq,
  330. uint8_t loop_cnt = 0);
  331. struct CommitEntry {
  332. uint64_t prep_seq;
  333. uint64_t commit_seq;
  334. CommitEntry() : prep_seq(0), commit_seq(0) {}
  335. CommitEntry(uint64_t ps, uint64_t cs) : prep_seq(ps), commit_seq(cs) {}
  336. bool operator==(const CommitEntry& rhs) const {
  337. return prep_seq == rhs.prep_seq && commit_seq == rhs.commit_seq;
  338. }
  339. };
  340. struct CommitEntry64bFormat {
  341. explicit CommitEntry64bFormat(size_t index_bits)
  342. : INDEX_BITS(index_bits),
  343. PREP_BITS(static_cast<size_t>(64 - PAD_BITS - INDEX_BITS)),
  344. COMMIT_BITS(static_cast<size_t>(64 - PREP_BITS)),
  345. COMMIT_FILTER(static_cast<uint64_t>((1ull << COMMIT_BITS) - 1)),
  346. DELTA_UPPERBOUND(static_cast<uint64_t>((1ull << COMMIT_BITS))) {}
  347. // Number of higher bits of a sequence number that is not used. They are
  348. // used to encode the value type, ...
  349. const size_t PAD_BITS = static_cast<size_t>(8);
  350. // Number of lower bits from prepare seq that can be skipped as they are
  351. // implied by the index of the entry in the array
  352. const size_t INDEX_BITS;
  353. // Number of bits we use to encode the prepare seq
  354. const size_t PREP_BITS;
  355. // Number of bits we use to encode the commit seq.
  356. const size_t COMMIT_BITS;
  357. // Filter to encode/decode commit seq
  358. const uint64_t COMMIT_FILTER;
  359. // The value of commit_seq - prepare_seq + 1 must be less than this bound
  360. const uint64_t DELTA_UPPERBOUND;
  361. };
  362. // Prepare Seq (64 bits) = PAD ... PAD PREP PREP ... PREP INDEX INDEX ...
  363. // INDEX Delta Seq (64 bits) = 0 0 0 0 0 0 0 0 0 0 0 0 DELTA DELTA ...
  364. // DELTA DELTA Encoded Value = PREP PREP .... PREP PREP DELTA DELTA
  365. // ... DELTA DELTA PAD: first bits of a seq that is reserved for tagging and
  366. // hence ignored PREP/INDEX: the used bits in a prepare seq number INDEX: the
  367. // bits that do not have to be encoded (will be provided externally) DELTA:
  368. // prep seq - commit seq + 1 Number of DELTA bits should be equal to number of
  369. // index bits + PADs
  370. struct CommitEntry64b {
  371. constexpr CommitEntry64b() noexcept : rep_(0) {}
  372. CommitEntry64b(const CommitEntry& entry, const CommitEntry64bFormat& format)
  373. : CommitEntry64b(entry.prep_seq, entry.commit_seq, format) {}
  374. CommitEntry64b(const uint64_t ps, const uint64_t cs,
  375. const CommitEntry64bFormat& format) {
  376. assert(ps < static_cast<uint64_t>(
  377. (1ull << (format.PREP_BITS + format.INDEX_BITS))));
  378. assert(ps <= cs);
  379. uint64_t delta = cs - ps + 1; // make initialized delta always >= 1
  380. // zero is reserved for uninitialized entries
  381. assert(0 < delta);
  382. assert(delta < format.DELTA_UPPERBOUND);
  383. if (delta >= format.DELTA_UPPERBOUND) {
  384. throw std::runtime_error(
  385. "commit_seq >> prepare_seq. The allowed distance is " +
  386. std::to_string(format.DELTA_UPPERBOUND) + " commit_seq is " +
  387. std::to_string(cs) + " prepare_seq is " + std::to_string(ps));
  388. }
  389. rep_ = (ps << format.PAD_BITS) & ~format.COMMIT_FILTER;
  390. rep_ = rep_ | delta;
  391. }
  392. // Return false if the entry is empty
  393. bool Parse(const uint64_t indexed_seq, CommitEntry* entry,
  394. const CommitEntry64bFormat& format) {
  395. uint64_t delta = rep_ & format.COMMIT_FILTER;
  396. // zero is reserved for uninitialized entries
  397. assert(delta < static_cast<uint64_t>((1ull << format.COMMIT_BITS)));
  398. if (delta == 0) {
  399. return false; // initialized entry would have non-zero delta
  400. }
  401. assert(indexed_seq < static_cast<uint64_t>((1ull << format.INDEX_BITS)));
  402. uint64_t prep_up = rep_ & ~format.COMMIT_FILTER;
  403. prep_up >>= format.PAD_BITS;
  404. const uint64_t& prep_low = indexed_seq;
  405. entry->prep_seq = prep_up | prep_low;
  406. entry->commit_seq = entry->prep_seq + delta - 1;
  407. return true;
  408. }
  409. private:
  410. uint64_t rep_;
  411. };
  412. // Struct to hold ownership of snapshot and read callback for cleanup.
  413. struct IteratorState;
  414. std::shared_ptr<std::map<uint32_t, const Comparator*>> GetCFComparatorMap() {
  415. return cf_map_;
  416. }
  417. std::shared_ptr<std::map<uint32_t, ColumnFamilyHandle*>> GetCFHandleMap() {
  418. return handle_map_;
  419. }
  420. void UpdateCFComparatorMap(
  421. const std::vector<ColumnFamilyHandle*>& handles) override;
  422. void UpdateCFComparatorMap(ColumnFamilyHandle* handle) override;
  423. const Snapshot* GetSnapshot() override;
  424. SnapshotImpl* GetSnapshotInternal(bool for_ww_conflict_check);
  425. protected:
  426. Status VerifyCFOptions(const ColumnFamilyOptions& cf_options) override;
  427. // Assign the min and max sequence numbers for reading from the db. A seq >
  428. // max is not valid, and a seq < min is valid, and a min <= seq < max requires
  429. // further checking. Normally max is defined by the snapshot and min is by
  430. // minimum uncommitted seq.
  431. inline SnapshotBackup AssignMinMaxSeqs(const Snapshot* snapshot,
  432. SequenceNumber* min,
  433. SequenceNumber* max);
  434. // Validate is a snapshot sequence number is still valid based on the latest
  435. // db status. backed_by_snapshot specifies if the number is baked by an actual
  436. // snapshot object. order specified the memory order with which we load the
  437. // atomic variables: relax is enough for the default since we care about last
  438. // value seen by same thread.
  439. inline bool ValidateSnapshot(
  440. const SequenceNumber snap_seq, const SnapshotBackup backed_by_snapshot,
  441. std::memory_order order = std::memory_order_relaxed);
  442. // Get a dummy snapshot that refers to kMaxSequenceNumber
  443. Snapshot* GetMaxSnapshot() { return &dummy_max_snapshot_; }
  444. bool ShouldRollbackWithSingleDelete(ColumnFamilyHandle* column_family,
  445. const Slice& key) {
  446. return rollback_deletion_type_callback_
  447. ? rollback_deletion_type_callback_(this, column_family, key)
  448. : false;
  449. }
  450. std::function<bool(TransactionDB*, ColumnFamilyHandle*, const Slice&)>
  451. rollback_deletion_type_callback_;
  452. private:
  453. friend class AddPreparedCallback;
  454. friend class PreparedHeap_BasicsTest_Test;
  455. friend class PreparedHeap_Concurrent_Test;
  456. friend class PreparedHeap_EmptyAtTheEnd_Test;
  457. friend class SnapshotConcurrentAccessTest_SnapshotConcurrentAccess_Test;
  458. friend class WritePreparedCommitEntryPreReleaseCallback;
  459. friend class WritePreparedTransactionTestBase;
  460. friend class WritePreparedTxn;
  461. friend class WritePreparedTxnDBMock;
  462. friend class WritePreparedTransactionTest_AddPreparedBeforeMax_Test;
  463. friend class WritePreparedTransactionTest_AdvanceMaxEvictedSeqBasic_Test;
  464. friend class
  465. WritePreparedTransactionTest_AdvanceMaxEvictedSeqWithDuplicates_Test;
  466. friend class WritePreparedTransactionTest_AdvanceSeqByOne_Test;
  467. friend class WritePreparedTransactionTest_BasicRecovery_Test;
  468. friend class WritePreparedTransactionTest_CheckAgainstSnapshots_Test;
  469. friend class WritePreparedTransactionTest_CleanupSnapshotEqualToMax_Test;
  470. friend class WritePreparedTransactionTest_ConflictDetectionAfterRecovery_Test;
  471. friend class WritePreparedTransactionTest_CommitMap_Test;
  472. friend class WritePreparedTransactionTest_DoubleSnapshot_Test;
  473. friend class WritePreparedTransactionTest_IsInSnapshotEmptyMap_Test;
  474. friend class WritePreparedTransactionTest_IsInSnapshotReleased_Test;
  475. friend class WritePreparedTransactionTest_IsInSnapshot_Test;
  476. friend class WritePreparedTransactionTest_NewSnapshotLargerThanMax_Test;
  477. friend class WritePreparedTransactionTest_MaxCatchupWithNewSnapshot_Test;
  478. friend class WritePreparedTransactionTest_MaxCatchupWithUnbackedSnapshot_Test;
  479. friend class
  480. WritePreparedTransactionTest_NonAtomicCommitOfDelayedPrepared_Test;
  481. friend class
  482. WritePreparedTransactionTest_NonAtomicUpdateOfDelayedPrepared_Test;
  483. friend class WritePreparedTransactionTest_NonAtomicUpdateOfMaxEvictedSeq_Test;
  484. friend class WritePreparedTransactionTest_OldCommitMapGC_Test;
  485. friend class WritePreparedTransactionTest_Rollback_Test;
  486. friend class WritePreparedTransactionTest_SmallestUnCommittedSeq_Test;
  487. friend class WriteUnpreparedTxn;
  488. friend class WriteUnpreparedTxnDB;
  489. friend class WriteUnpreparedTransactionTest_RecoveryTest_Test;
  490. friend class MultiOpsTxnsStressTest;
  491. void Init(const TransactionDBOptions& txn_db_opts);
  492. void WPRecordTick(uint32_t ticker_type) const {
  493. RecordTick(db_impl_->immutable_db_options_.statistics.get(), ticker_type);
  494. }
  495. Status GetImpl(const ReadOptions& options, ColumnFamilyHandle* column_family,
  496. const Slice& key, std::string* value) {
  497. assert(value != nullptr);
  498. PinnableSlice pinnable_val(value);
  499. assert(!pinnable_val.IsPinned());
  500. auto s = GetImpl(options, column_family, key, &pinnable_val);
  501. if (s.ok() && pinnable_val.IsPinned()) {
  502. value->assign(pinnable_val.data(), pinnable_val.size());
  503. } // else value is already assigned
  504. return s;
  505. }
  506. Status GetImpl(const ReadOptions& options, ColumnFamilyHandle* column_family,
  507. const Slice& key, PinnableSlice* value);
  508. // A heap with the amortized O(1) complexity for erase. It uses one extra heap
  509. // to keep track of erased entries that are not yet on top of the main heap.
  510. class PreparedHeap {
  511. // The mutex is required for push and pop from PreparedHeap. ::erase will
  512. // use external synchronization via prepared_mutex_.
  513. port::Mutex push_pop_mutex_;
  514. std::deque<uint64_t> heap_;
  515. std::priority_queue<uint64_t, std::vector<uint64_t>, std::greater<uint64_t>>
  516. erased_heap_;
  517. std::atomic<uint64_t> heap_top_ = {kMaxSequenceNumber};
  518. // True when testing crash recovery
  519. bool TEST_CRASH_ = false;
  520. friend class WritePreparedTxnDB;
  521. public:
  522. ~PreparedHeap() {
  523. if (!TEST_CRASH_) {
  524. assert(heap_.empty());
  525. assert(erased_heap_.empty());
  526. }
  527. }
  528. port::Mutex* push_pop_mutex() { return &push_pop_mutex_; }
  529. inline bool empty() { return top() == kMaxSequenceNumber; }
  530. // Returns kMaxSequenceNumber if empty() and the smallest otherwise.
  531. inline uint64_t top() { return heap_top_.load(std::memory_order_acquire); }
  532. inline void push(uint64_t v) {
  533. push_pop_mutex_.AssertHeld();
  534. if (heap_.empty()) {
  535. heap_top_.store(v, std::memory_order_release);
  536. } else {
  537. assert(heap_top_.load() < v);
  538. }
  539. heap_.push_back(v);
  540. }
  541. void pop(bool locked = false) {
  542. if (!locked) {
  543. push_pop_mutex()->Lock();
  544. }
  545. push_pop_mutex_.AssertHeld();
  546. heap_.pop_front();
  547. while (!heap_.empty() && !erased_heap_.empty() &&
  548. // heap_.top() > erased_heap_.top() could happen if we have erased
  549. // a non-existent entry. Ideally the user should not do that but we
  550. // should be resilient against it.
  551. heap_.front() >= erased_heap_.top()) {
  552. if (heap_.front() == erased_heap_.top()) {
  553. heap_.pop_front();
  554. }
  555. uint64_t erased __attribute__((__unused__));
  556. erased = erased_heap_.top();
  557. erased_heap_.pop();
  558. // No duplicate prepare sequence numbers
  559. assert(erased_heap_.empty() || erased_heap_.top() != erased);
  560. }
  561. while (heap_.empty() && !erased_heap_.empty()) {
  562. erased_heap_.pop();
  563. }
  564. heap_top_.store(!heap_.empty() ? heap_.front() : kMaxSequenceNumber,
  565. std::memory_order_release);
  566. if (!locked) {
  567. push_pop_mutex()->Unlock();
  568. }
  569. }
  570. // Concurrrent calls needs external synchronization. It is safe to be called
  571. // concurrent to push and pop though.
  572. void erase(uint64_t seq) {
  573. if (!empty()) {
  574. auto top_seq = top();
  575. if (seq < top_seq) {
  576. // Already popped, ignore it.
  577. } else if (top_seq == seq) {
  578. pop();
  579. #ifndef NDEBUG
  580. MutexLock ml(push_pop_mutex());
  581. assert(heap_.empty() || heap_.front() != seq);
  582. #endif
  583. } else { // top() > seq
  584. // Down the heap, remember to pop it later
  585. erased_heap_.push(seq);
  586. }
  587. }
  588. }
  589. };
  590. void TEST_Crash() override { prepared_txns_.TEST_CRASH_ = true; }
  591. // Get the commit entry with index indexed_seq from the commit table. It
  592. // returns true if such entry exists.
  593. bool GetCommitEntry(const uint64_t indexed_seq, CommitEntry64b* entry_64b,
  594. CommitEntry* entry) const;
  595. // Rewrite the entry with the index indexed_seq in the commit table with the
  596. // commit entry <prep_seq, commit_seq>. If the rewrite results into eviction,
  597. // sets the evicted_entry and returns true.
  598. bool AddCommitEntry(const uint64_t indexed_seq, const CommitEntry& new_entry,
  599. CommitEntry* evicted_entry);
  600. // Rewrite the entry with the index indexed_seq in the commit table with the
  601. // commit entry new_entry only if the existing entry matches the
  602. // expected_entry. Returns false otherwise.
  603. bool ExchangeCommitEntry(const uint64_t indexed_seq,
  604. CommitEntry64b& expected_entry,
  605. const CommitEntry& new_entry);
  606. // Increase max_evicted_seq_ from the previous value prev_max to the new
  607. // value. This also involves taking care of prepared txns that are not
  608. // committed before new_max, as well as updating the list of live snapshots at
  609. // the time of updating the max. Thread-safety: this function can be called
  610. // concurrently. The concurrent invocations of this function is equivalent to
  611. // a serial invocation in which the last invocation is the one with the
  612. // largest new_max value.
  613. void AdvanceMaxEvictedSeq(const SequenceNumber& prev_max,
  614. const SequenceNumber& new_max);
  615. inline SequenceNumber SmallestUnCommittedSeq() {
  616. // Note: We have two lists to look into, but for performance reasons they
  617. // are not read atomically. Since CheckPreparedAgainstMax copies the entry
  618. // to delayed_prepared_ before removing it from prepared_txns_, to ensure
  619. // that a prepared entry will not go unmissed, we look into them in opposite
  620. // order: first read prepared_txns_ and then delayed_prepared_.
  621. // This must be called before calling ::top. This is because the concurrent
  622. // thread would call ::RemovePrepared before updating
  623. // GetLatestSequenceNumber(). Reading then in opposite order here guarantees
  624. // that the ::top that we read would be lower the ::top if we had otherwise
  625. // update/read them atomically.
  626. auto next_prepare = db_impl_->GetLatestSequenceNumber() + 1;
  627. auto min_prepare = prepared_txns_.top();
  628. // Since we update the prepare_heap always from the main write queue via
  629. // PreReleaseCallback, the prepared_txns_.top() indicates the smallest
  630. // prepared data in 2pc transactions. For non-2pc transactions that are
  631. // written in two steps, we also update prepared_txns_ at the first step
  632. // (via the same mechanism) so that their uncommitted data is reflected in
  633. // SmallestUnCommittedSeq.
  634. if (!delayed_prepared_empty_.load()) {
  635. ReadLock rl(&prepared_mutex_);
  636. if (!delayed_prepared_.empty()) {
  637. return *delayed_prepared_.begin();
  638. }
  639. }
  640. bool empty = min_prepare == kMaxSequenceNumber;
  641. if (empty) {
  642. // Since GetLatestSequenceNumber is updated
  643. // after prepared_txns_ are, the value of GetLatestSequenceNumber would
  644. // reflect any uncommitted data that is not added to prepared_txns_ yet.
  645. // Otherwise, if there is no concurrent txn, this value simply reflects
  646. // that latest value in the memtable.
  647. return next_prepare;
  648. } else {
  649. return std::min(min_prepare, next_prepare);
  650. }
  651. }
  652. // Enhance the snapshot object by recording in it the smallest uncommitted seq
  653. inline void EnhanceSnapshot(SnapshotImpl* snapshot,
  654. SequenceNumber min_uncommitted) {
  655. assert(snapshot);
  656. assert(min_uncommitted <= snapshot->number_ + 1);
  657. snapshot->min_uncommitted_ = min_uncommitted;
  658. }
  659. virtual const std::vector<SequenceNumber> GetSnapshotListFromDB(
  660. SequenceNumber max);
  661. // Will be called by the public ReleaseSnapshot method. Does the maintenance
  662. // internal to WritePreparedTxnDB
  663. void ReleaseSnapshotInternal(const SequenceNumber snap_seq);
  664. // Update the list of snapshots corresponding to the soon-to-be-updated
  665. // max_evicted_seq_. Thread-safety: this function can be called concurrently.
  666. // The concurrent invocations of this function is equivalent to a serial
  667. // invocation in which the last invocation is the one with the largest
  668. // version value.
  669. void UpdateSnapshots(const std::vector<SequenceNumber>& snapshots,
  670. const SequenceNumber& version);
  671. // Check the new list of new snapshots against the old one to see if any of
  672. // the snapshots are released and to do the cleanup for the released snapshot.
  673. void CleanupReleasedSnapshots(
  674. const std::vector<SequenceNumber>& new_snapshots,
  675. const std::vector<SequenceNumber>& old_snapshots);
  676. // Check an evicted entry against live snapshots to see if it should be kept
  677. // around or it can be safely discarded (and hence assume committed for all
  678. // snapshots). Thread-safety: this function can be called concurrently. If it
  679. // is called concurrently with multiple UpdateSnapshots, the result is the
  680. // same as checking the intersection of the snapshot list before updates with
  681. // the snapshot list of all the concurrent updates.
  682. void CheckAgainstSnapshots(const CommitEntry& evicted);
  683. // Add a new entry to old_commit_map_ if prep_seq <= snapshot_seq <
  684. // commit_seq. Return false if checking the next snapshot(s) is not needed.
  685. // This is the case if none of the next snapshots could satisfy the condition.
  686. // next_is_larger: the next snapshot will be a larger value
  687. bool MaybeUpdateOldCommitMap(const uint64_t& prep_seq,
  688. const uint64_t& commit_seq,
  689. const uint64_t& snapshot_seq,
  690. const bool next_is_larger);
  691. // A trick to increase the last visible sequence number by one and also wait
  692. // for the in-flight commits to be visible.
  693. void AdvanceSeqByOne();
  694. // The list of live snapshots at the last time that max_evicted_seq_ advanced.
  695. // The list stored into two data structures: in snapshot_cache_ that is
  696. // efficient for concurrent reads, and in snapshots_ if the data does not fit
  697. // into snapshot_cache_. The total number of snapshots in the two lists
  698. std::atomic<size_t> snapshots_total_ = {};
  699. // The list sorted in ascending order. Thread-safety for writes is provided
  700. // with snapshots_mutex_ and concurrent reads are safe due to std::atomic for
  701. // each entry. In x86_64 architecture such reads are compiled to simple read
  702. // instructions.
  703. const size_t SNAPSHOT_CACHE_BITS;
  704. const size_t SNAPSHOT_CACHE_SIZE;
  705. std::unique_ptr<std::atomic<SequenceNumber>[]> snapshot_cache_;
  706. // 2nd list for storing snapshots. The list sorted in ascending order.
  707. // Thread-safety is provided with snapshots_mutex_.
  708. std::vector<SequenceNumber> snapshots_;
  709. // The list of all snapshots: snapshots_ + snapshot_cache_. This list although
  710. // redundant but simplifies CleanupOldSnapshots implementation.
  711. // Thread-safety is provided with snapshots_mutex_.
  712. std::vector<SequenceNumber> snapshots_all_;
  713. // The version of the latest list of snapshots. This can be used to avoid
  714. // rewriting a list that is concurrently updated with a more recent version.
  715. SequenceNumber snapshots_version_ = 0;
  716. // A heap of prepared transactions. Thread-safety is provided with
  717. // prepared_mutex_.
  718. PreparedHeap prepared_txns_;
  719. const size_t COMMIT_CACHE_BITS;
  720. const size_t COMMIT_CACHE_SIZE;
  721. const CommitEntry64bFormat FORMAT;
  722. // commit_cache_ must be initialized to zero to tell apart an empty index from
  723. // a filled one. Thread-safety is provided with commit_cache_mutex_.
  724. std::unique_ptr<std::atomic<CommitEntry64b>[]> commit_cache_;
  725. // The largest evicted *commit* sequence number from the commit_cache_. If a
  726. // seq is smaller than max_evicted_seq_ is might or might not be present in
  727. // commit_cache_. So commit_cache_ must first be checked before consulting
  728. // with max_evicted_seq_.
  729. std::atomic<uint64_t> max_evicted_seq_ = {};
  730. // Order: 1) update future_max_evicted_seq_ = new_max, 2)
  731. // GetSnapshotListFromDB(new_max), max_evicted_seq_ = new_max. Since
  732. // GetSnapshotInternal guarantess that the snapshot seq is larger than
  733. // future_max_evicted_seq_, this guarantes that if a snapshot is not larger
  734. // than max has already being looked at via a GetSnapshotListFromDB(new_max).
  735. std::atomic<uint64_t> future_max_evicted_seq_ = {};
  736. // Advance max_evicted_seq_ by this value each time it needs an update. The
  737. // larger the value, the less frequent advances we would have. We do not want
  738. // it to be too large either as it would cause stalls by doing too much
  739. // maintenance work under the lock.
  740. size_t INC_STEP_FOR_MAX_EVICTED = 1;
  741. // A map from old snapshots (expected to be used by a few read-only txns) to
  742. // prepared sequence number of the evicted entries from commit_cache_ that
  743. // overlaps with such snapshot. These are the prepared sequence numbers that
  744. // the snapshot, to which they are mapped, cannot assume to be committed just
  745. // because it is no longer in the commit_cache_. The vector must be sorted
  746. // after each update.
  747. // Thread-safety is provided with old_commit_map_mutex_.
  748. std::map<SequenceNumber, std::vector<SequenceNumber>> old_commit_map_;
  749. // A set of long-running prepared transactions that are not finished by the
  750. // time max_evicted_seq_ advances their sequence number. This is expected to
  751. // be empty normally. Thread-safety is provided with prepared_mutex_.
  752. std::set<uint64_t> delayed_prepared_;
  753. // Commit of a delayed prepared: 1) update commit cache, 2) update
  754. // delayed_prepared_commits_, 3) publish seq, 3) clean up delayed_prepared_.
  755. // delayed_prepared_commits_ will help us tell apart the unprepared txns from
  756. // the ones that are committed but not cleaned up yet.
  757. std::unordered_map<SequenceNumber, SequenceNumber> delayed_prepared_commits_;
  758. // Update when delayed_prepared_.empty() changes. Expected to be true
  759. // normally.
  760. std::atomic<bool> delayed_prepared_empty_ = {true};
  761. // Update when old_commit_map_.empty() changes. Expected to be true normally.
  762. std::atomic<bool> old_commit_map_empty_ = {true};
  763. mutable port::RWMutex prepared_mutex_;
  764. mutable port::RWMutex old_commit_map_mutex_;
  765. mutable port::RWMutex commit_cache_mutex_;
  766. mutable port::RWMutex snapshots_mutex_;
  767. // A cache of the cf comparators
  768. // Thread safety: since it is a const it is safe to read it concurrently
  769. std::shared_ptr<std::map<uint32_t, const Comparator*>> cf_map_;
  770. // A cache of the cf handles
  771. // Thread safety: since the handle is read-only object it is a const it is
  772. // safe to read it concurrently
  773. std::shared_ptr<std::map<uint32_t, ColumnFamilyHandle*>> handle_map_;
  774. // A dummy snapshot object that refers to kMaxSequenceNumber
  775. SnapshotImpl dummy_max_snapshot_;
  776. };
  777. class WritePreparedTxnReadCallback : public ReadCallback {
  778. public:
  779. WritePreparedTxnReadCallback(WritePreparedTxnDB* db, SequenceNumber snapshot)
  780. : ReadCallback(snapshot),
  781. db_(db),
  782. backed_by_snapshot_(kBackedByDBSnapshot) {}
  783. WritePreparedTxnReadCallback(WritePreparedTxnDB* db, SequenceNumber snapshot,
  784. SequenceNumber min_uncommitted,
  785. SnapshotBackup backed_by_snapshot)
  786. : ReadCallback(snapshot, min_uncommitted),
  787. db_(db),
  788. backed_by_snapshot_(backed_by_snapshot) {
  789. (void)backed_by_snapshot_; // to silence unused private field warning
  790. }
  791. virtual ~WritePreparedTxnReadCallback() {
  792. // If it is not backed by snapshot, the caller must check validity
  793. assert(valid_checked_ || backed_by_snapshot_ == kBackedByDBSnapshot);
  794. }
  795. // Will be called to see if the seq number visible; if not it moves on to
  796. // the next seq number.
  797. inline bool IsVisibleFullCheck(SequenceNumber seq) override {
  798. auto snapshot = max_visible_seq_;
  799. bool snap_released = false;
  800. auto ret =
  801. db_->IsInSnapshot(seq, snapshot, min_uncommitted_, &snap_released);
  802. assert(!snap_released || backed_by_snapshot_ == kUnbackedByDBSnapshot);
  803. snap_released_ |= snap_released;
  804. return ret;
  805. }
  806. inline bool valid() {
  807. valid_checked_ = true;
  808. return snap_released_ == false;
  809. }
  810. // TODO(myabandeh): override Refresh when Iterator::Refresh is supported
  811. private:
  812. WritePreparedTxnDB* db_;
  813. // Whether max_visible_seq_ is backed by a snapshot
  814. const SnapshotBackup backed_by_snapshot_;
  815. bool snap_released_ = false;
  816. // Safety check to ensure that the caller has checked invalid statuses
  817. bool valid_checked_ = false;
  818. };
  819. class AddPreparedCallback : public PreReleaseCallback {
  820. public:
  821. AddPreparedCallback(WritePreparedTxnDB* db, DBImpl* db_impl,
  822. size_t sub_batch_cnt, bool two_write_queues,
  823. bool first_prepare_batch)
  824. : db_(db),
  825. db_impl_(db_impl),
  826. sub_batch_cnt_(sub_batch_cnt),
  827. two_write_queues_(two_write_queues),
  828. first_prepare_batch_(first_prepare_batch) {
  829. (void)two_write_queues_; // to silence unused private field warning
  830. }
  831. Status Callback(SequenceNumber prepare_seq,
  832. bool is_mem_disabled __attribute__((__unused__)),
  833. uint64_t log_number, size_t index, size_t total) override {
  834. assert(index < total);
  835. // To reduce the cost of lock acquisition competing with the concurrent
  836. // prepare requests, lock on the first callback and unlock on the last.
  837. const bool do_lock = !two_write_queues_ || index == 0;
  838. const bool do_unlock = !two_write_queues_ || index + 1 == total;
  839. // Always Prepare from the main queue
  840. assert(!two_write_queues_ || !is_mem_disabled); // implies the 1st queue
  841. TEST_SYNC_POINT("AddPreparedCallback::AddPrepared::begin:pause");
  842. TEST_SYNC_POINT("AddPreparedCallback::AddPrepared::begin:resume");
  843. if (do_lock) {
  844. db_->prepared_txns_.push_pop_mutex()->Lock();
  845. }
  846. const bool kLocked = true;
  847. for (size_t i = 0; i < sub_batch_cnt_; i++) {
  848. db_->AddPrepared(prepare_seq + i, kLocked);
  849. }
  850. if (do_unlock) {
  851. db_->prepared_txns_.push_pop_mutex()->Unlock();
  852. }
  853. TEST_SYNC_POINT("AddPreparedCallback::AddPrepared::end");
  854. if (first_prepare_batch_) {
  855. assert(log_number != 0);
  856. db_impl_->logs_with_prep_tracker()->MarkLogAsContainingPrepSection(
  857. log_number);
  858. }
  859. return Status::OK();
  860. }
  861. private:
  862. WritePreparedTxnDB* db_;
  863. DBImpl* db_impl_;
  864. size_t sub_batch_cnt_;
  865. bool two_write_queues_;
  866. // It is 2PC and this is the first prepare batch. Always the case in 2PC
  867. // unless it is WriteUnPrepared.
  868. bool first_prepare_batch_;
  869. };
  870. class WritePreparedCommitEntryPreReleaseCallback : public PreReleaseCallback {
  871. public:
  872. // includes_data indicates that the commit also writes non-empty
  873. // CommitTimeWriteBatch to memtable, which needs to be committed separately.
  874. WritePreparedCommitEntryPreReleaseCallback(
  875. WritePreparedTxnDB* db, DBImpl* db_impl, SequenceNumber prep_seq,
  876. size_t prep_batch_cnt, size_t data_batch_cnt = 0,
  877. SequenceNumber aux_seq = kMaxSequenceNumber, size_t aux_batch_cnt = 0)
  878. : db_(db),
  879. db_impl_(db_impl),
  880. prep_seq_(prep_seq),
  881. prep_batch_cnt_(prep_batch_cnt),
  882. data_batch_cnt_(data_batch_cnt),
  883. includes_data_(data_batch_cnt_ > 0),
  884. aux_seq_(aux_seq),
  885. aux_batch_cnt_(aux_batch_cnt),
  886. includes_aux_batch_(aux_batch_cnt > 0) {
  887. assert((prep_batch_cnt_ > 0) != (prep_seq == kMaxSequenceNumber)); // xor
  888. assert(prep_batch_cnt_ > 0 || data_batch_cnt_ > 0);
  889. assert((aux_batch_cnt_ > 0) != (aux_seq == kMaxSequenceNumber)); // xor
  890. }
  891. Status Callback(SequenceNumber commit_seq,
  892. bool is_mem_disabled __attribute__((__unused__)), uint64_t,
  893. size_t /*index*/, size_t /*total*/) override {
  894. // Always commit from the 2nd queue
  895. assert(!db_impl_->immutable_db_options().two_write_queues ||
  896. is_mem_disabled);
  897. assert(includes_data_ || prep_seq_ != kMaxSequenceNumber);
  898. // Data batch is what accompanied with the commit marker and affects the
  899. // last seq in the commit batch.
  900. const uint64_t last_commit_seq = LIKELY(data_batch_cnt_ <= 1)
  901. ? commit_seq
  902. : commit_seq + data_batch_cnt_ - 1;
  903. if (prep_seq_ != kMaxSequenceNumber) {
  904. for (size_t i = 0; i < prep_batch_cnt_; i++) {
  905. db_->AddCommitted(prep_seq_ + i, last_commit_seq);
  906. }
  907. } // else there was no prepare phase
  908. if (includes_aux_batch_) {
  909. for (size_t i = 0; i < aux_batch_cnt_; i++) {
  910. db_->AddCommitted(aux_seq_ + i, last_commit_seq);
  911. }
  912. }
  913. if (includes_data_) {
  914. assert(data_batch_cnt_);
  915. // Commit the data that is accompanied with the commit request
  916. for (size_t i = 0; i < data_batch_cnt_; i++) {
  917. // For commit seq of each batch use the commit seq of the last batch.
  918. // This would make debugging easier by having all the batches having
  919. // the same sequence number.
  920. db_->AddCommitted(commit_seq + i, last_commit_seq);
  921. }
  922. }
  923. if (db_impl_->immutable_db_options().two_write_queues) {
  924. assert(is_mem_disabled); // implies the 2nd queue
  925. // Publish the sequence number. We can do that here assuming the callback
  926. // is invoked only from one write queue, which would guarantee that the
  927. // publish sequence numbers will be in order, i.e., once a seq is
  928. // published all the seq prior to that are also publishable.
  929. db_impl_->SetLastPublishedSequence(last_commit_seq);
  930. // Note RemovePrepared should be called after publishing the seq.
  931. // Otherwise SmallestUnCommittedSeq optimization breaks.
  932. if (prep_seq_ != kMaxSequenceNumber) {
  933. db_->RemovePrepared(prep_seq_, prep_batch_cnt_);
  934. } // else there was no prepare phase
  935. if (includes_aux_batch_) {
  936. db_->RemovePrepared(aux_seq_, aux_batch_cnt_);
  937. }
  938. }
  939. // else SequenceNumber that is updated as part of the write already does the
  940. // publishing
  941. return Status::OK();
  942. }
  943. private:
  944. WritePreparedTxnDB* db_;
  945. DBImpl* db_impl_;
  946. // kMaxSequenceNumber if there was no prepare phase
  947. SequenceNumber prep_seq_;
  948. size_t prep_batch_cnt_;
  949. size_t data_batch_cnt_;
  950. // Data here is the batch that is written with the commit marker, either
  951. // because it is commit without prepare or commit has a CommitTimeWriteBatch.
  952. bool includes_data_;
  953. // Auxiliary batch (if there is any) is a batch that is written before, but
  954. // gets the same commit seq as prepare batch or data batch. This is used in
  955. // two write queues where the CommitTimeWriteBatch becomes the aux batch and
  956. // we do a separate write to actually commit everything.
  957. SequenceNumber aux_seq_;
  958. size_t aux_batch_cnt_;
  959. bool includes_aux_batch_;
  960. };
  961. // For two_write_queues commit both the aborted batch and the cleanup batch and
  962. // then published the seq
  963. class WritePreparedRollbackPreReleaseCallback : public PreReleaseCallback {
  964. public:
  965. WritePreparedRollbackPreReleaseCallback(WritePreparedTxnDB* db,
  966. DBImpl* db_impl,
  967. SequenceNumber prep_seq,
  968. SequenceNumber rollback_seq,
  969. size_t prep_batch_cnt)
  970. : db_(db),
  971. db_impl_(db_impl),
  972. prep_seq_(prep_seq),
  973. rollback_seq_(rollback_seq),
  974. prep_batch_cnt_(prep_batch_cnt) {
  975. assert(prep_seq != kMaxSequenceNumber);
  976. assert(rollback_seq != kMaxSequenceNumber);
  977. assert(prep_batch_cnt_ > 0);
  978. }
  979. Status Callback(SequenceNumber commit_seq, bool is_mem_disabled, uint64_t,
  980. size_t /*index*/, size_t /*total*/) override {
  981. // Always commit from the 2nd queue
  982. assert(is_mem_disabled); // implies the 2nd queue
  983. assert(db_impl_->immutable_db_options().two_write_queues);
  984. #ifdef NDEBUG
  985. (void)is_mem_disabled;
  986. #endif
  987. const uint64_t last_commit_seq = commit_seq;
  988. db_->AddCommitted(rollback_seq_, last_commit_seq);
  989. for (size_t i = 0; i < prep_batch_cnt_; i++) {
  990. db_->AddCommitted(prep_seq_ + i, last_commit_seq);
  991. }
  992. db_impl_->SetLastPublishedSequence(last_commit_seq);
  993. return Status::OK();
  994. }
  995. private:
  996. WritePreparedTxnDB* db_;
  997. DBImpl* db_impl_;
  998. SequenceNumber prep_seq_;
  999. SequenceNumber rollback_seq_;
  1000. size_t prep_batch_cnt_;
  1001. };
  1002. // Count the number of sub-batches inside a batch. A sub-batch does not have
  1003. // duplicate keys.
  1004. struct SubBatchCounter : public WriteBatch::Handler {
  1005. explicit SubBatchCounter(std::map<uint32_t, const Comparator*>& comparators)
  1006. : comparators_(comparators), batches_(1) {}
  1007. std::map<uint32_t, const Comparator*>& comparators_;
  1008. using CFKeys = std::set<Slice, SetComparator>;
  1009. std::map<uint32_t, CFKeys> keys_;
  1010. size_t batches_;
  1011. size_t BatchCount() { return batches_; }
  1012. void AddKey(const uint32_t cf, const Slice& key);
  1013. void InitWithComp(const uint32_t cf);
  1014. Status MarkNoop(bool) override { return Status::OK(); }
  1015. Status MarkEndPrepare(const Slice&) override { return Status::OK(); }
  1016. Status MarkCommit(const Slice&) override { return Status::OK(); }
  1017. Status PutCF(uint32_t cf, const Slice& key, const Slice&) override {
  1018. AddKey(cf, key);
  1019. return Status::OK();
  1020. }
  1021. Status DeleteCF(uint32_t cf, const Slice& key) override {
  1022. AddKey(cf, key);
  1023. return Status::OK();
  1024. }
  1025. Status SingleDeleteCF(uint32_t cf, const Slice& key) override {
  1026. AddKey(cf, key);
  1027. return Status::OK();
  1028. }
  1029. Status MergeCF(uint32_t cf, const Slice& key, const Slice&) override {
  1030. AddKey(cf, key);
  1031. return Status::OK();
  1032. }
  1033. Status MarkBeginPrepare(bool) override { return Status::OK(); }
  1034. Status MarkRollback(const Slice&) override { return Status::OK(); }
  1035. Handler::OptionState WriteAfterCommit() const override {
  1036. return Handler::OptionState::kDisabled;
  1037. }
  1038. };
  1039. SnapshotBackup WritePreparedTxnDB::AssignMinMaxSeqs(const Snapshot* snapshot,
  1040. SequenceNumber* min,
  1041. SequenceNumber* max) {
  1042. if (snapshot != nullptr) {
  1043. *min =
  1044. static_cast_with_check<const SnapshotImpl>(snapshot)->min_uncommitted_;
  1045. *max = static_cast_with_check<const SnapshotImpl>(snapshot)->number_;
  1046. // A duplicate of the check in EnhanceSnapshot().
  1047. assert(*min <= *max + 1);
  1048. return kBackedByDBSnapshot;
  1049. } else {
  1050. *min = SmallestUnCommittedSeq();
  1051. *max = 0; // to be assigned later after sv is referenced.
  1052. return kUnbackedByDBSnapshot;
  1053. }
  1054. }
  1055. bool WritePreparedTxnDB::ValidateSnapshot(
  1056. const SequenceNumber snap_seq, const SnapshotBackup backed_by_snapshot,
  1057. std::memory_order order) {
  1058. if (backed_by_snapshot == kBackedByDBSnapshot) {
  1059. return true;
  1060. } else {
  1061. SequenceNumber max = max_evicted_seq_.load(order);
  1062. // Validate that max has not advanced the snapshot seq that is not backed
  1063. // by a real snapshot. This is a very rare case that should not happen in
  1064. // real workloads.
  1065. if (UNLIKELY(snap_seq <= max && snap_seq != 0)) {
  1066. return false;
  1067. }
  1068. }
  1069. return true;
  1070. }
  1071. } // namespace ROCKSDB_NAMESPACE