write_prepared_txn_db.h 49 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #pragma once
  6. #ifndef ROCKSDB_LITE
  7. #include <cinttypes>
  8. #include <mutex>
  9. #include <queue>
  10. #include <set>
  11. #include <string>
  12. #include <unordered_map>
  13. #include <vector>
  14. #include "db/db_iter.h"
  15. #include "db/pre_release_callback.h"
  16. #include "db/read_callback.h"
  17. #include "db/snapshot_checker.h"
  18. #include "rocksdb/db.h"
  19. #include "rocksdb/options.h"
  20. #include "rocksdb/utilities/transaction_db.h"
  21. #include "util/cast_util.h"
  22. #include "util/set_comparator.h"
  23. #include "util/string_util.h"
  24. #include "utilities/transactions/pessimistic_transaction.h"
  25. #include "utilities/transactions/pessimistic_transaction_db.h"
  26. #include "utilities/transactions/transaction_lock_mgr.h"
  27. #include "utilities/transactions/write_prepared_txn.h"
  28. namespace ROCKSDB_NAMESPACE {
  29. enum SnapshotBackup : bool { kUnbackedByDBSnapshot, kBackedByDBSnapshot };
  30. // A PessimisticTransactionDB that writes data to DB after prepare phase of 2PC.
  31. // In this way some data in the DB might not be committed. The DB provides
  32. // mechanisms to tell such data apart from committed data.
  33. class WritePreparedTxnDB : public PessimisticTransactionDB {
  34. public:
  35. explicit WritePreparedTxnDB(DB* db,
  36. const TransactionDBOptions& txn_db_options)
  37. : PessimisticTransactionDB(db, txn_db_options),
  38. SNAPSHOT_CACHE_BITS(txn_db_options.wp_snapshot_cache_bits),
  39. SNAPSHOT_CACHE_SIZE(static_cast<size_t>(1ull << SNAPSHOT_CACHE_BITS)),
  40. COMMIT_CACHE_BITS(txn_db_options.wp_commit_cache_bits),
  41. COMMIT_CACHE_SIZE(static_cast<size_t>(1ull << COMMIT_CACHE_BITS)),
  42. FORMAT(COMMIT_CACHE_BITS) {
  43. Init(txn_db_options);
  44. }
  45. explicit WritePreparedTxnDB(StackableDB* db,
  46. const TransactionDBOptions& txn_db_options)
  47. : PessimisticTransactionDB(db, txn_db_options),
  48. SNAPSHOT_CACHE_BITS(txn_db_options.wp_snapshot_cache_bits),
  49. SNAPSHOT_CACHE_SIZE(static_cast<size_t>(1ull << SNAPSHOT_CACHE_BITS)),
  50. COMMIT_CACHE_BITS(txn_db_options.wp_commit_cache_bits),
  51. COMMIT_CACHE_SIZE(static_cast<size_t>(1ull << COMMIT_CACHE_BITS)),
  52. FORMAT(COMMIT_CACHE_BITS) {
  53. Init(txn_db_options);
  54. }
  55. virtual ~WritePreparedTxnDB();
  56. virtual Status Initialize(
  57. const std::vector<size_t>& compaction_enabled_cf_indices,
  58. const std::vector<ColumnFamilyHandle*>& handles) override;
  59. Transaction* BeginTransaction(const WriteOptions& write_options,
  60. const TransactionOptions& txn_options,
  61. Transaction* old_txn) override;
  62. using TransactionDB::Write;
  63. Status Write(const WriteOptions& opts, WriteBatch* updates) override;
  64. // Optimized version of ::Write that receives more optimization request such
  65. // as skip_concurrency_control.
  66. using PessimisticTransactionDB::Write;
  67. Status Write(const WriteOptions& opts, const TransactionDBWriteOptimizations&,
  68. WriteBatch* updates) override;
  69. // Write the batch to the underlying DB and mark it as committed. Could be
  70. // used by both directly from TxnDB or through a transaction.
  71. Status WriteInternal(const WriteOptions& write_options, WriteBatch* batch,
  72. size_t batch_cnt, WritePreparedTxn* txn);
  73. using DB::Get;
  74. virtual Status Get(const ReadOptions& options,
  75. ColumnFamilyHandle* column_family, const Slice& key,
  76. PinnableSlice* value) override;
  77. using DB::MultiGet;
  78. virtual std::vector<Status> MultiGet(
  79. const ReadOptions& options,
  80. const std::vector<ColumnFamilyHandle*>& column_family,
  81. const std::vector<Slice>& keys,
  82. std::vector<std::string>* values) override;
  83. using DB::NewIterator;
  84. virtual Iterator* NewIterator(const ReadOptions& options,
  85. ColumnFamilyHandle* column_family) override;
  86. using DB::NewIterators;
  87. virtual Status NewIterators(
  88. const ReadOptions& options,
  89. const std::vector<ColumnFamilyHandle*>& column_families,
  90. std::vector<Iterator*>* iterators) override;
  91. // Check whether the transaction that wrote the value with sequence number seq
  92. // is visible to the snapshot with sequence number snapshot_seq.
  93. // Returns true if commit_seq <= snapshot_seq
  94. // If the snapshot_seq is already released and snapshot_seq <= max, sets
  95. // *snap_released to true and returns true as well.
  96. inline bool IsInSnapshot(uint64_t prep_seq, uint64_t snapshot_seq,
  97. uint64_t min_uncommitted = kMinUnCommittedSeq,
  98. bool* snap_released = nullptr) const {
  99. ROCKS_LOG_DETAILS(info_log_,
  100. "IsInSnapshot %" PRIu64 " in %" PRIu64
  101. " min_uncommitted %" PRIu64,
  102. prep_seq, snapshot_seq, min_uncommitted);
  103. assert(min_uncommitted >= kMinUnCommittedSeq);
  104. // Caller is responsible to initialize snap_released.
  105. assert(snap_released == nullptr || *snap_released == false);
  106. // Here we try to infer the return value without looking into prepare list.
  107. // This would help avoiding synchronization over a shared map.
  108. // TODO(myabandeh): optimize this. This sequence of checks must be correct
  109. // but not necessary efficient
  110. if (prep_seq == 0) {
  111. // Compaction will output keys to bottom-level with sequence number 0 if
  112. // it is visible to the earliest snapshot.
  113. ROCKS_LOG_DETAILS(
  114. info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,
  115. prep_seq, snapshot_seq, 1);
  116. return true;
  117. }
  118. if (snapshot_seq < prep_seq) {
  119. // snapshot_seq < prep_seq <= commit_seq => snapshot_seq < commit_seq
  120. ROCKS_LOG_DETAILS(
  121. info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,
  122. prep_seq, snapshot_seq, 0);
  123. return false;
  124. }
  125. if (prep_seq < min_uncommitted) {
  126. ROCKS_LOG_DETAILS(info_log_,
  127. "IsInSnapshot %" PRIu64 " in %" PRIu64
  128. " returns %" PRId32
  129. " because of min_uncommitted %" PRIu64,
  130. prep_seq, snapshot_seq, 1, min_uncommitted);
  131. return true;
  132. }
  133. // Commit of delayed prepared has two non-atomic steps: add to commit cache,
  134. // remove from delayed prepared. Our reads from these two is also
  135. // non-atomic. By looking into commit cache first thus we might not find the
  136. // prep_seq neither in commit cache not in delayed_prepared_. To fix that i)
  137. // we check if there was any delayed prepared BEFORE looking into commit
  138. // cache, ii) if there was, we complete the search steps to be these: i)
  139. // commit cache, ii) delayed prepared, commit cache again. In this way if
  140. // the first query to commit cache missed the commit, the 2nd will catch it.
  141. bool was_empty;
  142. SequenceNumber max_evicted_seq_lb, max_evicted_seq_ub;
  143. CommitEntry64b dont_care;
  144. auto indexed_seq = prep_seq % COMMIT_CACHE_SIZE;
  145. size_t repeats = 0;
  146. do {
  147. repeats++;
  148. assert(repeats < 100);
  149. if (UNLIKELY(repeats >= 100)) {
  150. throw std::runtime_error(
  151. "The read was intrupted 100 times by update to max_evicted_seq_. "
  152. "This is unexpected in all setups");
  153. }
  154. max_evicted_seq_lb = max_evicted_seq_.load(std::memory_order_acquire);
  155. TEST_SYNC_POINT(
  156. "WritePreparedTxnDB::IsInSnapshot:max_evicted_seq_:pause");
  157. TEST_SYNC_POINT(
  158. "WritePreparedTxnDB::IsInSnapshot:max_evicted_seq_:resume");
  159. was_empty = delayed_prepared_empty_.load(std::memory_order_acquire);
  160. TEST_SYNC_POINT(
  161. "WritePreparedTxnDB::IsInSnapshot:delayed_prepared_empty_:pause");
  162. TEST_SYNC_POINT(
  163. "WritePreparedTxnDB::IsInSnapshot:delayed_prepared_empty_:resume");
  164. CommitEntry cached;
  165. bool exist = GetCommitEntry(indexed_seq, &dont_care, &cached);
  166. TEST_SYNC_POINT("WritePreparedTxnDB::IsInSnapshot:GetCommitEntry:pause");
  167. TEST_SYNC_POINT("WritePreparedTxnDB::IsInSnapshot:GetCommitEntry:resume");
  168. if (exist && prep_seq == cached.prep_seq) {
  169. // It is committed and also not evicted from commit cache
  170. ROCKS_LOG_DETAILS(
  171. info_log_,
  172. "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,
  173. prep_seq, snapshot_seq, cached.commit_seq <= snapshot_seq);
  174. return cached.commit_seq <= snapshot_seq;
  175. }
  176. // else it could be committed but not inserted in the map which could
  177. // happen after recovery, or it could be committed and evicted by another
  178. // commit, or never committed.
  179. // At this point we dont know if it was committed or it is still prepared
  180. max_evicted_seq_ub = max_evicted_seq_.load(std::memory_order_acquire);
  181. if (UNLIKELY(max_evicted_seq_lb != max_evicted_seq_ub)) {
  182. continue;
  183. }
  184. // Note: max_evicted_seq_ when we did GetCommitEntry <= max_evicted_seq_ub
  185. if (max_evicted_seq_ub < prep_seq) {
  186. // Not evicted from cache and also not present, so must be still
  187. // prepared
  188. ROCKS_LOG_DETAILS(info_log_,
  189. "IsInSnapshot %" PRIu64 " in %" PRIu64
  190. " returns %" PRId32,
  191. prep_seq, snapshot_seq, 0);
  192. return false;
  193. }
  194. TEST_SYNC_POINT("WritePreparedTxnDB::IsInSnapshot:prepared_mutex_:pause");
  195. TEST_SYNC_POINT(
  196. "WritePreparedTxnDB::IsInSnapshot:prepared_mutex_:resume");
  197. if (!was_empty) {
  198. // We should not normally reach here
  199. WPRecordTick(TXN_PREPARE_MUTEX_OVERHEAD);
  200. ReadLock rl(&prepared_mutex_);
  201. ROCKS_LOG_WARN(
  202. info_log_, "prepared_mutex_ overhead %" PRIu64 " for %" PRIu64,
  203. static_cast<uint64_t>(delayed_prepared_.size()), prep_seq);
  204. if (delayed_prepared_.find(prep_seq) != delayed_prepared_.end()) {
  205. // This is the order: 1) delayed_prepared_commits_ update, 2) publish
  206. // 3) delayed_prepared_ clean up. So check if it is the case of a late
  207. // clenaup.
  208. auto it = delayed_prepared_commits_.find(prep_seq);
  209. if (it == delayed_prepared_commits_.end()) {
  210. // Then it is not committed yet
  211. ROCKS_LOG_DETAILS(info_log_,
  212. "IsInSnapshot %" PRIu64 " in %" PRIu64
  213. " returns %" PRId32,
  214. prep_seq, snapshot_seq, 0);
  215. return false;
  216. } else {
  217. ROCKS_LOG_DETAILS(info_log_,
  218. "IsInSnapshot %" PRIu64 " in %" PRIu64
  219. " commit: %" PRIu64 " returns %" PRId32,
  220. prep_seq, snapshot_seq, it->second,
  221. snapshot_seq <= it->second);
  222. return it->second <= snapshot_seq;
  223. }
  224. } else {
  225. // 2nd query to commit cache. Refer to was_empty comment above.
  226. exist = GetCommitEntry(indexed_seq, &dont_care, &cached);
  227. if (exist && prep_seq == cached.prep_seq) {
  228. ROCKS_LOG_DETAILS(
  229. info_log_,
  230. "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,
  231. prep_seq, snapshot_seq, cached.commit_seq <= snapshot_seq);
  232. return cached.commit_seq <= snapshot_seq;
  233. }
  234. max_evicted_seq_ub = max_evicted_seq_.load(std::memory_order_acquire);
  235. }
  236. }
  237. } while (UNLIKELY(max_evicted_seq_lb != max_evicted_seq_ub));
  238. // When advancing max_evicted_seq_, we move older entires from prepared to
  239. // delayed_prepared_. Also we move evicted entries from commit cache to
  240. // old_commit_map_ if it overlaps with any snapshot. Since prep_seq <=
  241. // max_evicted_seq_, we have three cases: i) in delayed_prepared_, ii) in
  242. // old_commit_map_, iii) committed with no conflict with any snapshot. Case
  243. // (i) delayed_prepared_ is checked above
  244. if (max_evicted_seq_ub < snapshot_seq) { // then (ii) cannot be the case
  245. // only (iii) is the case: committed
  246. // commit_seq <= max_evicted_seq_ < snapshot_seq => commit_seq <
  247. // snapshot_seq
  248. ROCKS_LOG_DETAILS(
  249. info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,
  250. prep_seq, snapshot_seq, 1);
  251. return true;
  252. }
  253. // else (ii) might be the case: check the commit data saved for this
  254. // snapshot. If there was no overlapping commit entry, then it is committed
  255. // with a commit_seq lower than any live snapshot, including snapshot_seq.
  256. if (old_commit_map_empty_.load(std::memory_order_acquire)) {
  257. ROCKS_LOG_DETAILS(info_log_,
  258. "IsInSnapshot %" PRIu64 " in %" PRIu64
  259. " returns %" PRId32 " released=1",
  260. prep_seq, snapshot_seq, 0);
  261. assert(snap_released);
  262. // This snapshot is not valid anymore. We cannot tell if prep_seq is
  263. // committed before or after the snapshot. Return true but also set
  264. // snap_released to true.
  265. *snap_released = true;
  266. return true;
  267. }
  268. {
  269. // We should not normally reach here unless sapshot_seq is old. This is a
  270. // rare case and it is ok to pay the cost of mutex ReadLock for such old,
  271. // reading transactions.
  272. WPRecordTick(TXN_OLD_COMMIT_MAP_MUTEX_OVERHEAD);
  273. ReadLock rl(&old_commit_map_mutex_);
  274. auto prep_set_entry = old_commit_map_.find(snapshot_seq);
  275. bool found = prep_set_entry != old_commit_map_.end();
  276. if (found) {
  277. auto& vec = prep_set_entry->second;
  278. found = std::binary_search(vec.begin(), vec.end(), prep_seq);
  279. } else {
  280. // coming from compaction
  281. ROCKS_LOG_DETAILS(info_log_,
  282. "IsInSnapshot %" PRIu64 " in %" PRIu64
  283. " returns %" PRId32 " released=1",
  284. prep_seq, snapshot_seq, 0);
  285. // This snapshot is not valid anymore. We cannot tell if prep_seq is
  286. // committed before or after the snapshot. Return true but also set
  287. // snap_released to true.
  288. assert(snap_released);
  289. *snap_released = true;
  290. return true;
  291. }
  292. if (!found) {
  293. ROCKS_LOG_DETAILS(info_log_,
  294. "IsInSnapshot %" PRIu64 " in %" PRIu64
  295. " returns %" PRId32,
  296. prep_seq, snapshot_seq, 1);
  297. return true;
  298. }
  299. }
  300. // (ii) it the case: it is committed but after the snapshot_seq
  301. ROCKS_LOG_DETAILS(
  302. info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,
  303. prep_seq, snapshot_seq, 0);
  304. return false;
  305. }
  306. // Add the transaction with prepare sequence seq to the prepared list.
  307. // Note: must be called serially with increasing seq on each call.
  308. // locked is true if prepared_mutex_ is already locked.
  309. void AddPrepared(uint64_t seq, bool locked = false);
  310. // Check if any of the prepared txns are less than new max_evicted_seq_. Must
  311. // be called with prepared_mutex_ write locked.
  312. void CheckPreparedAgainstMax(SequenceNumber new_max, bool locked);
  313. // Remove the transaction with prepare sequence seq from the prepared list
  314. void RemovePrepared(const uint64_t seq, const size_t batch_cnt = 1);
  315. // Add the transaction with prepare sequence prepare_seq and commit sequence
  316. // commit_seq to the commit map. loop_cnt is to detect infinite loops.
  317. // Note: must be called serially.
  318. void AddCommitted(uint64_t prepare_seq, uint64_t commit_seq,
  319. uint8_t loop_cnt = 0);
  320. struct CommitEntry {
  321. uint64_t prep_seq;
  322. uint64_t commit_seq;
  323. CommitEntry() : prep_seq(0), commit_seq(0) {}
  324. CommitEntry(uint64_t ps, uint64_t cs) : prep_seq(ps), commit_seq(cs) {}
  325. bool operator==(const CommitEntry& rhs) const {
  326. return prep_seq == rhs.prep_seq && commit_seq == rhs.commit_seq;
  327. }
  328. };
  329. struct CommitEntry64bFormat {
  330. explicit CommitEntry64bFormat(size_t index_bits)
  331. : INDEX_BITS(index_bits),
  332. PREP_BITS(static_cast<size_t>(64 - PAD_BITS - INDEX_BITS)),
  333. COMMIT_BITS(static_cast<size_t>(64 - PREP_BITS)),
  334. COMMIT_FILTER(static_cast<uint64_t>((1ull << COMMIT_BITS) - 1)),
  335. DELTA_UPPERBOUND(static_cast<uint64_t>((1ull << COMMIT_BITS))) {}
  336. // Number of higher bits of a sequence number that is not used. They are
  337. // used to encode the value type, ...
  338. const size_t PAD_BITS = static_cast<size_t>(8);
  339. // Number of lower bits from prepare seq that can be skipped as they are
  340. // implied by the index of the entry in the array
  341. const size_t INDEX_BITS;
  342. // Number of bits we use to encode the prepare seq
  343. const size_t PREP_BITS;
  344. // Number of bits we use to encode the commit seq.
  345. const size_t COMMIT_BITS;
  346. // Filter to encode/decode commit seq
  347. const uint64_t COMMIT_FILTER;
  348. // The value of commit_seq - prepare_seq + 1 must be less than this bound
  349. const uint64_t DELTA_UPPERBOUND;
  350. };
  351. // Prepare Seq (64 bits) = PAD ... PAD PREP PREP ... PREP INDEX INDEX ...
  352. // INDEX Delta Seq (64 bits) = 0 0 0 0 0 0 0 0 0 0 0 0 DELTA DELTA ...
  353. // DELTA DELTA Encoded Value = PREP PREP .... PREP PREP DELTA DELTA
  354. // ... DELTA DELTA PAD: first bits of a seq that is reserved for tagging and
  355. // hence ignored PREP/INDEX: the used bits in a prepare seq number INDEX: the
  356. // bits that do not have to be encoded (will be provided externally) DELTA:
  357. // prep seq - commit seq + 1 Number of DELTA bits should be equal to number of
  358. // index bits + PADs
  359. struct CommitEntry64b {
  360. constexpr CommitEntry64b() noexcept : rep_(0) {}
  361. CommitEntry64b(const CommitEntry& entry, const CommitEntry64bFormat& format)
  362. : CommitEntry64b(entry.prep_seq, entry.commit_seq, format) {}
  363. CommitEntry64b(const uint64_t ps, const uint64_t cs,
  364. const CommitEntry64bFormat& format) {
  365. assert(ps < static_cast<uint64_t>(
  366. (1ull << (format.PREP_BITS + format.INDEX_BITS))));
  367. assert(ps <= cs);
  368. uint64_t delta = cs - ps + 1; // make initialized delta always >= 1
  369. // zero is reserved for uninitialized entries
  370. assert(0 < delta);
  371. assert(delta < format.DELTA_UPPERBOUND);
  372. if (delta >= format.DELTA_UPPERBOUND) {
  373. throw std::runtime_error(
  374. "commit_seq >> prepare_seq. The allowed distance is " +
  375. ToString(format.DELTA_UPPERBOUND) + " commit_seq is " +
  376. ToString(cs) + " prepare_seq is " + ToString(ps));
  377. }
  378. rep_ = (ps << format.PAD_BITS) & ~format.COMMIT_FILTER;
  379. rep_ = rep_ | delta;
  380. }
  381. // Return false if the entry is empty
  382. bool Parse(const uint64_t indexed_seq, CommitEntry* entry,
  383. const CommitEntry64bFormat& format) {
  384. uint64_t delta = rep_ & format.COMMIT_FILTER;
  385. // zero is reserved for uninitialized entries
  386. assert(delta < static_cast<uint64_t>((1ull << format.COMMIT_BITS)));
  387. if (delta == 0) {
  388. return false; // initialized entry would have non-zero delta
  389. }
  390. assert(indexed_seq < static_cast<uint64_t>((1ull << format.INDEX_BITS)));
  391. uint64_t prep_up = rep_ & ~format.COMMIT_FILTER;
  392. prep_up >>= format.PAD_BITS;
  393. const uint64_t& prep_low = indexed_seq;
  394. entry->prep_seq = prep_up | prep_low;
  395. entry->commit_seq = entry->prep_seq + delta - 1;
  396. return true;
  397. }
  398. private:
  399. uint64_t rep_;
  400. };
  401. // Struct to hold ownership of snapshot and read callback for cleanup.
  402. struct IteratorState;
  403. std::shared_ptr<std::map<uint32_t, const Comparator*>> GetCFComparatorMap() {
  404. return cf_map_;
  405. }
  406. std::shared_ptr<std::map<uint32_t, ColumnFamilyHandle*>> GetCFHandleMap() {
  407. return handle_map_;
  408. }
  409. void UpdateCFComparatorMap(
  410. const std::vector<ColumnFamilyHandle*>& handles) override;
  411. void UpdateCFComparatorMap(ColumnFamilyHandle* handle) override;
  412. virtual const Snapshot* GetSnapshot() override;
  413. SnapshotImpl* GetSnapshotInternal(bool for_ww_conflict_check);
  414. protected:
  415. virtual Status VerifyCFOptions(
  416. const ColumnFamilyOptions& cf_options) override;
  417. // Assign the min and max sequence numbers for reading from the db. A seq >
  418. // max is not valid, and a seq < min is valid, and a min <= seq < max requires
  419. // further checking. Normally max is defined by the snapshot and min is by
  420. // minimum uncommitted seq.
  421. inline SnapshotBackup AssignMinMaxSeqs(const Snapshot* snapshot,
  422. SequenceNumber* min,
  423. SequenceNumber* max);
  424. // Validate is a snapshot sequence number is still valid based on the latest
  425. // db status. backed_by_snapshot specifies if the number is baked by an actual
  426. // snapshot object. order specified the memory order with which we load the
  427. // atomic variables: relax is enough for the default since we care about last
  428. // value seen by same thread.
  429. inline bool ValidateSnapshot(
  430. const SequenceNumber snap_seq, const SnapshotBackup backed_by_snapshot,
  431. std::memory_order order = std::memory_order_relaxed);
  432. // Get a dummy snapshot that refers to kMaxSequenceNumber
  433. Snapshot* GetMaxSnapshot() { return &dummy_max_snapshot_; }
  434. private:
  435. friend class AddPreparedCallback;
  436. friend class PreparedHeap_BasicsTest_Test;
  437. friend class PreparedHeap_Concurrent_Test;
  438. friend class PreparedHeap_EmptyAtTheEnd_Test;
  439. friend class SnapshotConcurrentAccessTest_SnapshotConcurrentAccess_Test;
  440. friend class WritePreparedCommitEntryPreReleaseCallback;
  441. friend class WritePreparedTransactionTestBase;
  442. friend class WritePreparedTxn;
  443. friend class WritePreparedTxnDBMock;
  444. friend class WritePreparedTransactionTest_AddPreparedBeforeMax_Test;
  445. friend class WritePreparedTransactionTest_AdvanceMaxEvictedSeqBasic_Test;
  446. friend class
  447. WritePreparedTransactionTest_AdvanceMaxEvictedSeqWithDuplicates_Test;
  448. friend class WritePreparedTransactionTest_AdvanceSeqByOne_Test;
  449. friend class WritePreparedTransactionTest_BasicRecovery_Test;
  450. friend class WritePreparedTransactionTest_CheckAgainstSnapshots_Test;
  451. friend class WritePreparedTransactionTest_CleanupSnapshotEqualToMax_Test;
  452. friend class WritePreparedTransactionTest_ConflictDetectionAfterRecovery_Test;
  453. friend class WritePreparedTransactionTest_CommitMap_Test;
  454. friend class WritePreparedTransactionTest_DoubleSnapshot_Test;
  455. friend class WritePreparedTransactionTest_IsInSnapshotEmptyMap_Test;
  456. friend class WritePreparedTransactionTest_IsInSnapshotReleased_Test;
  457. friend class WritePreparedTransactionTest_IsInSnapshot_Test;
  458. friend class WritePreparedTransactionTest_NewSnapshotLargerThanMax_Test;
  459. friend class WritePreparedTransactionTest_MaxCatchupWithNewSnapshot_Test;
  460. friend class WritePreparedTransactionTest_MaxCatchupWithUnbackedSnapshot_Test;
  461. friend class
  462. WritePreparedTransactionTest_NonAtomicCommitOfDelayedPrepared_Test;
  463. friend class
  464. WritePreparedTransactionTest_NonAtomicUpdateOfDelayedPrepared_Test;
  465. friend class WritePreparedTransactionTest_NonAtomicUpdateOfMaxEvictedSeq_Test;
  466. friend class WritePreparedTransactionTest_OldCommitMapGC_Test;
  467. friend class WritePreparedTransactionTest_Rollback_Test;
  468. friend class WritePreparedTransactionTest_SmallestUnCommittedSeq_Test;
  469. friend class WriteUnpreparedTxn;
  470. friend class WriteUnpreparedTxnDB;
  471. friend class WriteUnpreparedTransactionTest_RecoveryTest_Test;
  472. void Init(const TransactionDBOptions& /* unused */);
  473. void WPRecordTick(uint32_t ticker_type) const {
  474. RecordTick(db_impl_->immutable_db_options_.statistics.get(), ticker_type);
  475. }
  476. // A heap with the amortized O(1) complexity for erase. It uses one extra heap
  477. // to keep track of erased entries that are not yet on top of the main heap.
  478. class PreparedHeap {
  479. // The mutex is required for push and pop from PreparedHeap. ::erase will
  480. // use external synchronization via prepared_mutex_.
  481. port::Mutex push_pop_mutex_;
  482. std::deque<uint64_t> heap_;
  483. std::priority_queue<uint64_t, std::vector<uint64_t>, std::greater<uint64_t>>
  484. erased_heap_;
  485. std::atomic<uint64_t> heap_top_ = {kMaxSequenceNumber};
  486. // True when testing crash recovery
  487. bool TEST_CRASH_ = false;
  488. friend class WritePreparedTxnDB;
  489. public:
  490. ~PreparedHeap() {
  491. if (!TEST_CRASH_) {
  492. assert(heap_.empty());
  493. assert(erased_heap_.empty());
  494. }
  495. }
  496. port::Mutex* push_pop_mutex() { return &push_pop_mutex_; }
  497. inline bool empty() { return top() == kMaxSequenceNumber; }
  498. // Returns kMaxSequenceNumber if empty() and the smallest otherwise.
  499. inline uint64_t top() { return heap_top_.load(std::memory_order_acquire); }
  500. inline void push(uint64_t v) {
  501. push_pop_mutex_.AssertHeld();
  502. if (heap_.empty()) {
  503. heap_top_.store(v, std::memory_order_release);
  504. } else {
  505. assert(heap_top_.load() < v);
  506. }
  507. heap_.push_back(v);
  508. }
  509. void pop(bool locked = false) {
  510. if (!locked) {
  511. push_pop_mutex()->Lock();
  512. }
  513. push_pop_mutex_.AssertHeld();
  514. heap_.pop_front();
  515. while (!heap_.empty() && !erased_heap_.empty() &&
  516. // heap_.top() > erased_heap_.top() could happen if we have erased
  517. // a non-existent entry. Ideally the user should not do that but we
  518. // should be resilient against it.
  519. heap_.front() >= erased_heap_.top()) {
  520. if (heap_.front() == erased_heap_.top()) {
  521. heap_.pop_front();
  522. }
  523. uint64_t erased __attribute__((__unused__));
  524. erased = erased_heap_.top();
  525. erased_heap_.pop();
  526. // No duplicate prepare sequence numbers
  527. assert(erased_heap_.empty() || erased_heap_.top() != erased);
  528. }
  529. while (heap_.empty() && !erased_heap_.empty()) {
  530. erased_heap_.pop();
  531. }
  532. heap_top_.store(!heap_.empty() ? heap_.front() : kMaxSequenceNumber,
  533. std::memory_order_release);
  534. if (!locked) {
  535. push_pop_mutex()->Unlock();
  536. }
  537. }
  538. // Concurrrent calls needs external synchronization. It is safe to be called
  539. // concurrent to push and pop though.
  540. void erase(uint64_t seq) {
  541. if (!empty()) {
  542. auto top_seq = top();
  543. if (seq < top_seq) {
  544. // Already popped, ignore it.
  545. } else if (top_seq == seq) {
  546. pop();
  547. #ifndef NDEBUG
  548. MutexLock ml(push_pop_mutex());
  549. assert(heap_.empty() || heap_.front() != seq);
  550. #endif
  551. } else { // top() > seq
  552. // Down the heap, remember to pop it later
  553. erased_heap_.push(seq);
  554. }
  555. }
  556. }
  557. };
  558. void TEST_Crash() override { prepared_txns_.TEST_CRASH_ = true; }
  559. // Get the commit entry with index indexed_seq from the commit table. It
  560. // returns true if such entry exists.
  561. bool GetCommitEntry(const uint64_t indexed_seq, CommitEntry64b* entry_64b,
  562. CommitEntry* entry) const;
  563. // Rewrite the entry with the index indexed_seq in the commit table with the
  564. // commit entry <prep_seq, commit_seq>. If the rewrite results into eviction,
  565. // sets the evicted_entry and returns true.
  566. bool AddCommitEntry(const uint64_t indexed_seq, const CommitEntry& new_entry,
  567. CommitEntry* evicted_entry);
  568. // Rewrite the entry with the index indexed_seq in the commit table with the
  569. // commit entry new_entry only if the existing entry matches the
  570. // expected_entry. Returns false otherwise.
  571. bool ExchangeCommitEntry(const uint64_t indexed_seq,
  572. CommitEntry64b& expected_entry,
  573. const CommitEntry& new_entry);
  574. // Increase max_evicted_seq_ from the previous value prev_max to the new
  575. // value. This also involves taking care of prepared txns that are not
  576. // committed before new_max, as well as updating the list of live snapshots at
  577. // the time of updating the max. Thread-safety: this function can be called
  578. // concurrently. The concurrent invocations of this function is equivalent to
  579. // a serial invocation in which the last invocation is the one with the
  580. // largest new_max value.
  581. void AdvanceMaxEvictedSeq(const SequenceNumber& prev_max,
  582. const SequenceNumber& new_max);
  583. inline SequenceNumber SmallestUnCommittedSeq() {
  584. // Note: We have two lists to look into, but for performance reasons they
  585. // are not read atomically. Since CheckPreparedAgainstMax copies the entry
  586. // to delayed_prepared_ before removing it from prepared_txns_, to ensure
  587. // that a prepared entry will not go unmissed, we look into them in opposite
  588. // order: first read prepared_txns_ and then delayed_prepared_.
  589. // This must be called before calling ::top. This is because the concurrent
  590. // thread would call ::RemovePrepared before updating
  591. // GetLatestSequenceNumber(). Reading then in opposite order here guarantees
  592. // that the ::top that we read would be lower the ::top if we had otherwise
  593. // update/read them atomically.
  594. auto next_prepare = db_impl_->GetLatestSequenceNumber() + 1;
  595. auto min_prepare = prepared_txns_.top();
  596. // Since we update the prepare_heap always from the main write queue via
  597. // PreReleaseCallback, the prepared_txns_.top() indicates the smallest
  598. // prepared data in 2pc transactions. For non-2pc transactions that are
  599. // written in two steps, we also update prepared_txns_ at the first step
  600. // (via the same mechanism) so that their uncommitted data is reflected in
  601. // SmallestUnCommittedSeq.
  602. if (!delayed_prepared_empty_.load()) {
  603. ReadLock rl(&prepared_mutex_);
  604. if (!delayed_prepared_.empty()) {
  605. return *delayed_prepared_.begin();
  606. }
  607. }
  608. bool empty = min_prepare == kMaxSequenceNumber;
  609. if (empty) {
  610. // Since GetLatestSequenceNumber is updated
  611. // after prepared_txns_ are, the value of GetLatestSequenceNumber would
  612. // reflect any uncommitted data that is not added to prepared_txns_ yet.
  613. // Otherwise, if there is no concurrent txn, this value simply reflects
  614. // that latest value in the memtable.
  615. return next_prepare;
  616. } else {
  617. return std::min(min_prepare, next_prepare);
  618. }
  619. }
  620. // Enhance the snapshot object by recording in it the smallest uncommitted seq
  621. inline void EnhanceSnapshot(SnapshotImpl* snapshot,
  622. SequenceNumber min_uncommitted) {
  623. assert(snapshot);
  624. assert(min_uncommitted <= snapshot->number_ + 1);
  625. snapshot->min_uncommitted_ = min_uncommitted;
  626. }
  627. virtual const std::vector<SequenceNumber> GetSnapshotListFromDB(
  628. SequenceNumber max);
  629. // Will be called by the public ReleaseSnapshot method. Does the maintenance
  630. // internal to WritePreparedTxnDB
  631. void ReleaseSnapshotInternal(const SequenceNumber snap_seq);
  632. // Update the list of snapshots corresponding to the soon-to-be-updated
  633. // max_evicted_seq_. Thread-safety: this function can be called concurrently.
  634. // The concurrent invocations of this function is equivalent to a serial
  635. // invocation in which the last invocation is the one with the largest
  636. // version value.
  637. void UpdateSnapshots(const std::vector<SequenceNumber>& snapshots,
  638. const SequenceNumber& version);
  639. // Check the new list of new snapshots against the old one to see if any of
  640. // the snapshots are released and to do the cleanup for the released snapshot.
  641. void CleanupReleasedSnapshots(
  642. const std::vector<SequenceNumber>& new_snapshots,
  643. const std::vector<SequenceNumber>& old_snapshots);
  644. // Check an evicted entry against live snapshots to see if it should be kept
  645. // around or it can be safely discarded (and hence assume committed for all
  646. // snapshots). Thread-safety: this function can be called concurrently. If it
  647. // is called concurrently with multiple UpdateSnapshots, the result is the
  648. // same as checking the intersection of the snapshot list before updates with
  649. // the snapshot list of all the concurrent updates.
  650. void CheckAgainstSnapshots(const CommitEntry& evicted);
  651. // Add a new entry to old_commit_map_ if prep_seq <= snapshot_seq <
  652. // commit_seq. Return false if checking the next snapshot(s) is not needed.
  653. // This is the case if none of the next snapshots could satisfy the condition.
  654. // next_is_larger: the next snapshot will be a larger value
  655. bool MaybeUpdateOldCommitMap(const uint64_t& prep_seq,
  656. const uint64_t& commit_seq,
  657. const uint64_t& snapshot_seq,
  658. const bool next_is_larger);
  659. // A trick to increase the last visible sequence number by one and also wait
  660. // for the in-flight commits to be visible.
  661. void AdvanceSeqByOne();
  662. // The list of live snapshots at the last time that max_evicted_seq_ advanced.
  663. // The list stored into two data structures: in snapshot_cache_ that is
  664. // efficient for concurrent reads, and in snapshots_ if the data does not fit
  665. // into snapshot_cache_. The total number of snapshots in the two lists
  666. std::atomic<size_t> snapshots_total_ = {};
  667. // The list sorted in ascending order. Thread-safety for writes is provided
  668. // with snapshots_mutex_ and concurrent reads are safe due to std::atomic for
  669. // each entry. In x86_64 architecture such reads are compiled to simple read
  670. // instructions.
  671. const size_t SNAPSHOT_CACHE_BITS;
  672. const size_t SNAPSHOT_CACHE_SIZE;
  673. std::unique_ptr<std::atomic<SequenceNumber>[]> snapshot_cache_;
  674. // 2nd list for storing snapshots. The list sorted in ascending order.
  675. // Thread-safety is provided with snapshots_mutex_.
  676. std::vector<SequenceNumber> snapshots_;
  677. // The list of all snapshots: snapshots_ + snapshot_cache_. This list although
  678. // redundant but simplifies CleanupOldSnapshots implementation.
  679. // Thread-safety is provided with snapshots_mutex_.
  680. std::vector<SequenceNumber> snapshots_all_;
  681. // The version of the latest list of snapshots. This can be used to avoid
  682. // rewriting a list that is concurrently updated with a more recent version.
  683. SequenceNumber snapshots_version_ = 0;
  684. // A heap of prepared transactions. Thread-safety is provided with
  685. // prepared_mutex_.
  686. PreparedHeap prepared_txns_;
  687. const size_t COMMIT_CACHE_BITS;
  688. const size_t COMMIT_CACHE_SIZE;
  689. const CommitEntry64bFormat FORMAT;
  690. // commit_cache_ must be initialized to zero to tell apart an empty index from
  691. // a filled one. Thread-safety is provided with commit_cache_mutex_.
  692. std::unique_ptr<std::atomic<CommitEntry64b>[]> commit_cache_;
  693. // The largest evicted *commit* sequence number from the commit_cache_. If a
  694. // seq is smaller than max_evicted_seq_ is might or might not be present in
  695. // commit_cache_. So commit_cache_ must first be checked before consulting
  696. // with max_evicted_seq_.
  697. std::atomic<uint64_t> max_evicted_seq_ = {};
  698. // Order: 1) update future_max_evicted_seq_ = new_max, 2)
  699. // GetSnapshotListFromDB(new_max), max_evicted_seq_ = new_max. Since
  700. // GetSnapshotInternal guarantess that the snapshot seq is larger than
  701. // future_max_evicted_seq_, this guarantes that if a snapshot is not larger
  702. // than max has already being looked at via a GetSnapshotListFromDB(new_max).
  703. std::atomic<uint64_t> future_max_evicted_seq_ = {};
  704. // Advance max_evicted_seq_ by this value each time it needs an update. The
  705. // larger the value, the less frequent advances we would have. We do not want
  706. // it to be too large either as it would cause stalls by doing too much
  707. // maintenance work under the lock.
  708. size_t INC_STEP_FOR_MAX_EVICTED = 1;
  709. // A map from old snapshots (expected to be used by a few read-only txns) to
  710. // prepared sequence number of the evicted entries from commit_cache_ that
  711. // overlaps with such snapshot. These are the prepared sequence numbers that
  712. // the snapshot, to which they are mapped, cannot assume to be committed just
  713. // because it is no longer in the commit_cache_. The vector must be sorted
  714. // after each update.
  715. // Thread-safety is provided with old_commit_map_mutex_.
  716. std::map<SequenceNumber, std::vector<SequenceNumber>> old_commit_map_;
  717. // A set of long-running prepared transactions that are not finished by the
  718. // time max_evicted_seq_ advances their sequence number. This is expected to
  719. // be empty normally. Thread-safety is provided with prepared_mutex_.
  720. std::set<uint64_t> delayed_prepared_;
  721. // Commit of a delayed prepared: 1) update commit cache, 2) update
  722. // delayed_prepared_commits_, 3) publish seq, 3) clean up delayed_prepared_.
  723. // delayed_prepared_commits_ will help us tell apart the unprepared txns from
  724. // the ones that are committed but not cleaned up yet.
  725. std::unordered_map<SequenceNumber, SequenceNumber> delayed_prepared_commits_;
  726. // Update when delayed_prepared_.empty() changes. Expected to be true
  727. // normally.
  728. std::atomic<bool> delayed_prepared_empty_ = {true};
  729. // Update when old_commit_map_.empty() changes. Expected to be true normally.
  730. std::atomic<bool> old_commit_map_empty_ = {true};
  731. mutable port::RWMutex prepared_mutex_;
  732. mutable port::RWMutex old_commit_map_mutex_;
  733. mutable port::RWMutex commit_cache_mutex_;
  734. mutable port::RWMutex snapshots_mutex_;
  735. // A cache of the cf comparators
  736. // Thread safety: since it is a const it is safe to read it concurrently
  737. std::shared_ptr<std::map<uint32_t, const Comparator*>> cf_map_;
  738. // A cache of the cf handles
  739. // Thread safety: since the handle is read-only object it is a const it is
  740. // safe to read it concurrently
  741. std::shared_ptr<std::map<uint32_t, ColumnFamilyHandle*>> handle_map_;
  742. // A dummy snapshot object that refers to kMaxSequenceNumber
  743. SnapshotImpl dummy_max_snapshot_;
  744. };
  745. class WritePreparedTxnReadCallback : public ReadCallback {
  746. public:
  747. WritePreparedTxnReadCallback(WritePreparedTxnDB* db, SequenceNumber snapshot)
  748. : ReadCallback(snapshot),
  749. db_(db),
  750. backed_by_snapshot_(kBackedByDBSnapshot) {}
  751. WritePreparedTxnReadCallback(WritePreparedTxnDB* db, SequenceNumber snapshot,
  752. SequenceNumber min_uncommitted,
  753. SnapshotBackup backed_by_snapshot)
  754. : ReadCallback(snapshot, min_uncommitted),
  755. db_(db),
  756. backed_by_snapshot_(backed_by_snapshot) {
  757. (void)backed_by_snapshot_; // to silence unused private field warning
  758. }
  759. virtual ~WritePreparedTxnReadCallback() {
  760. // If it is not backed by snapshot, the caller must check validity
  761. assert(valid_checked_ || backed_by_snapshot_ == kBackedByDBSnapshot);
  762. }
  763. // Will be called to see if the seq number visible; if not it moves on to
  764. // the next seq number.
  765. inline virtual bool IsVisibleFullCheck(SequenceNumber seq) override {
  766. auto snapshot = max_visible_seq_;
  767. bool snap_released = false;
  768. auto ret =
  769. db_->IsInSnapshot(seq, snapshot, min_uncommitted_, &snap_released);
  770. assert(!snap_released || backed_by_snapshot_ == kUnbackedByDBSnapshot);
  771. snap_released_ |= snap_released;
  772. return ret;
  773. }
  774. inline bool valid() {
  775. valid_checked_ = true;
  776. return snap_released_ == false;
  777. }
  778. // TODO(myabandeh): override Refresh when Iterator::Refresh is supported
  779. private:
  780. WritePreparedTxnDB* db_;
  781. // Whether max_visible_seq_ is backed by a snapshot
  782. const SnapshotBackup backed_by_snapshot_;
  783. bool snap_released_ = false;
  784. // Safety check to ensure that the caller has checked invalid statuses
  785. bool valid_checked_ = false;
  786. };
  787. class AddPreparedCallback : public PreReleaseCallback {
  788. public:
  789. AddPreparedCallback(WritePreparedTxnDB* db, DBImpl* db_impl,
  790. size_t sub_batch_cnt, bool two_write_queues,
  791. bool first_prepare_batch)
  792. : db_(db),
  793. db_impl_(db_impl),
  794. sub_batch_cnt_(sub_batch_cnt),
  795. two_write_queues_(two_write_queues),
  796. first_prepare_batch_(first_prepare_batch) {
  797. (void)two_write_queues_; // to silence unused private field warning
  798. }
  799. virtual Status Callback(SequenceNumber prepare_seq,
  800. bool is_mem_disabled __attribute__((__unused__)),
  801. uint64_t log_number, size_t index,
  802. size_t total) override {
  803. assert(index < total);
  804. // To reduce the cost of lock acquisition competing with the concurrent
  805. // prepare requests, lock on the first callback and unlock on the last.
  806. const bool do_lock = !two_write_queues_ || index == 0;
  807. const bool do_unlock = !two_write_queues_ || index + 1 == total;
  808. // Always Prepare from the main queue
  809. assert(!two_write_queues_ || !is_mem_disabled); // implies the 1st queue
  810. TEST_SYNC_POINT("AddPreparedCallback::AddPrepared::begin:pause");
  811. TEST_SYNC_POINT("AddPreparedCallback::AddPrepared::begin:resume");
  812. if (do_lock) {
  813. db_->prepared_txns_.push_pop_mutex()->Lock();
  814. }
  815. const bool kLocked = true;
  816. for (size_t i = 0; i < sub_batch_cnt_; i++) {
  817. db_->AddPrepared(prepare_seq + i, kLocked);
  818. }
  819. if (do_unlock) {
  820. db_->prepared_txns_.push_pop_mutex()->Unlock();
  821. }
  822. TEST_SYNC_POINT("AddPreparedCallback::AddPrepared::end");
  823. if (first_prepare_batch_) {
  824. assert(log_number != 0);
  825. db_impl_->logs_with_prep_tracker()->MarkLogAsContainingPrepSection(
  826. log_number);
  827. }
  828. return Status::OK();
  829. }
  830. private:
  831. WritePreparedTxnDB* db_;
  832. DBImpl* db_impl_;
  833. size_t sub_batch_cnt_;
  834. bool two_write_queues_;
  835. // It is 2PC and this is the first prepare batch. Always the case in 2PC
  836. // unless it is WriteUnPrepared.
  837. bool first_prepare_batch_;
  838. };
  839. class WritePreparedCommitEntryPreReleaseCallback : public PreReleaseCallback {
  840. public:
  841. // includes_data indicates that the commit also writes non-empty
  842. // CommitTimeWriteBatch to memtable, which needs to be committed separately.
  843. WritePreparedCommitEntryPreReleaseCallback(
  844. WritePreparedTxnDB* db, DBImpl* db_impl, SequenceNumber prep_seq,
  845. size_t prep_batch_cnt, size_t data_batch_cnt = 0,
  846. SequenceNumber aux_seq = kMaxSequenceNumber, size_t aux_batch_cnt = 0)
  847. : db_(db),
  848. db_impl_(db_impl),
  849. prep_seq_(prep_seq),
  850. prep_batch_cnt_(prep_batch_cnt),
  851. data_batch_cnt_(data_batch_cnt),
  852. includes_data_(data_batch_cnt_ > 0),
  853. aux_seq_(aux_seq),
  854. aux_batch_cnt_(aux_batch_cnt),
  855. includes_aux_batch_(aux_batch_cnt > 0) {
  856. assert((prep_batch_cnt_ > 0) != (prep_seq == kMaxSequenceNumber)); // xor
  857. assert(prep_batch_cnt_ > 0 || data_batch_cnt_ > 0);
  858. assert((aux_batch_cnt_ > 0) != (aux_seq == kMaxSequenceNumber)); // xor
  859. }
  860. virtual Status Callback(SequenceNumber commit_seq,
  861. bool is_mem_disabled __attribute__((__unused__)),
  862. uint64_t, size_t /*index*/,
  863. size_t /*total*/) override {
  864. // Always commit from the 2nd queue
  865. assert(!db_impl_->immutable_db_options().two_write_queues ||
  866. is_mem_disabled);
  867. assert(includes_data_ || prep_seq_ != kMaxSequenceNumber);
  868. // Data batch is what accompanied with the commit marker and affects the
  869. // last seq in the commit batch.
  870. const uint64_t last_commit_seq = LIKELY(data_batch_cnt_ <= 1)
  871. ? commit_seq
  872. : commit_seq + data_batch_cnt_ - 1;
  873. if (prep_seq_ != kMaxSequenceNumber) {
  874. for (size_t i = 0; i < prep_batch_cnt_; i++) {
  875. db_->AddCommitted(prep_seq_ + i, last_commit_seq);
  876. }
  877. } // else there was no prepare phase
  878. if (includes_aux_batch_) {
  879. for (size_t i = 0; i < aux_batch_cnt_; i++) {
  880. db_->AddCommitted(aux_seq_ + i, last_commit_seq);
  881. }
  882. }
  883. if (includes_data_) {
  884. assert(data_batch_cnt_);
  885. // Commit the data that is accompanied with the commit request
  886. for (size_t i = 0; i < data_batch_cnt_; i++) {
  887. // For commit seq of each batch use the commit seq of the last batch.
  888. // This would make debugging easier by having all the batches having
  889. // the same sequence number.
  890. db_->AddCommitted(commit_seq + i, last_commit_seq);
  891. }
  892. }
  893. if (db_impl_->immutable_db_options().two_write_queues) {
  894. assert(is_mem_disabled); // implies the 2nd queue
  895. // Publish the sequence number. We can do that here assuming the callback
  896. // is invoked only from one write queue, which would guarantee that the
  897. // publish sequence numbers will be in order, i.e., once a seq is
  898. // published all the seq prior to that are also publishable.
  899. db_impl_->SetLastPublishedSequence(last_commit_seq);
  900. // Note RemovePrepared should be called after publishing the seq.
  901. // Otherwise SmallestUnCommittedSeq optimization breaks.
  902. if (prep_seq_ != kMaxSequenceNumber) {
  903. db_->RemovePrepared(prep_seq_, prep_batch_cnt_);
  904. } // else there was no prepare phase
  905. if (includes_aux_batch_) {
  906. db_->RemovePrepared(aux_seq_, aux_batch_cnt_);
  907. }
  908. }
  909. // else SequenceNumber that is updated as part of the write already does the
  910. // publishing
  911. return Status::OK();
  912. }
  913. private:
  914. WritePreparedTxnDB* db_;
  915. DBImpl* db_impl_;
  916. // kMaxSequenceNumber if there was no prepare phase
  917. SequenceNumber prep_seq_;
  918. size_t prep_batch_cnt_;
  919. size_t data_batch_cnt_;
  920. // Data here is the batch that is written with the commit marker, either
  921. // because it is commit without prepare or commit has a CommitTimeWriteBatch.
  922. bool includes_data_;
  923. // Auxiliary batch (if there is any) is a batch that is written before, but
  924. // gets the same commit seq as prepare batch or data batch. This is used in
  925. // two write queues where the CommitTimeWriteBatch becomes the aux batch and
  926. // we do a separate write to actually commit everything.
  927. SequenceNumber aux_seq_;
  928. size_t aux_batch_cnt_;
  929. bool includes_aux_batch_;
  930. };
  931. // For two_write_queues commit both the aborted batch and the cleanup batch and
  932. // then published the seq
  933. class WritePreparedRollbackPreReleaseCallback : public PreReleaseCallback {
  934. public:
  935. WritePreparedRollbackPreReleaseCallback(WritePreparedTxnDB* db,
  936. DBImpl* db_impl,
  937. SequenceNumber prep_seq,
  938. SequenceNumber rollback_seq,
  939. size_t prep_batch_cnt)
  940. : db_(db),
  941. db_impl_(db_impl),
  942. prep_seq_(prep_seq),
  943. rollback_seq_(rollback_seq),
  944. prep_batch_cnt_(prep_batch_cnt) {
  945. assert(prep_seq != kMaxSequenceNumber);
  946. assert(rollback_seq != kMaxSequenceNumber);
  947. assert(prep_batch_cnt_ > 0);
  948. }
  949. Status Callback(SequenceNumber commit_seq, bool is_mem_disabled, uint64_t,
  950. size_t /*index*/, size_t /*total*/) override {
  951. // Always commit from the 2nd queue
  952. assert(is_mem_disabled); // implies the 2nd queue
  953. assert(db_impl_->immutable_db_options().two_write_queues);
  954. #ifdef NDEBUG
  955. (void)is_mem_disabled;
  956. #endif
  957. const uint64_t last_commit_seq = commit_seq;
  958. db_->AddCommitted(rollback_seq_, last_commit_seq);
  959. for (size_t i = 0; i < prep_batch_cnt_; i++) {
  960. db_->AddCommitted(prep_seq_ + i, last_commit_seq);
  961. }
  962. db_impl_->SetLastPublishedSequence(last_commit_seq);
  963. return Status::OK();
  964. }
  965. private:
  966. WritePreparedTxnDB* db_;
  967. DBImpl* db_impl_;
  968. SequenceNumber prep_seq_;
  969. SequenceNumber rollback_seq_;
  970. size_t prep_batch_cnt_;
  971. };
  972. // Count the number of sub-batches inside a batch. A sub-batch does not have
  973. // duplicate keys.
  974. struct SubBatchCounter : public WriteBatch::Handler {
  975. explicit SubBatchCounter(std::map<uint32_t, const Comparator*>& comparators)
  976. : comparators_(comparators), batches_(1) {}
  977. std::map<uint32_t, const Comparator*>& comparators_;
  978. using CFKeys = std::set<Slice, SetComparator>;
  979. std::map<uint32_t, CFKeys> keys_;
  980. size_t batches_;
  981. size_t BatchCount() { return batches_; }
  982. void AddKey(const uint32_t cf, const Slice& key);
  983. void InitWithComp(const uint32_t cf);
  984. Status MarkNoop(bool) override { return Status::OK(); }
  985. Status MarkEndPrepare(const Slice&) override { return Status::OK(); }
  986. Status MarkCommit(const Slice&) override { return Status::OK(); }
  987. Status PutCF(uint32_t cf, const Slice& key, const Slice&) override {
  988. AddKey(cf, key);
  989. return Status::OK();
  990. }
  991. Status DeleteCF(uint32_t cf, const Slice& key) override {
  992. AddKey(cf, key);
  993. return Status::OK();
  994. }
  995. Status SingleDeleteCF(uint32_t cf, const Slice& key) override {
  996. AddKey(cf, key);
  997. return Status::OK();
  998. }
  999. Status MergeCF(uint32_t cf, const Slice& key, const Slice&) override {
  1000. AddKey(cf, key);
  1001. return Status::OK();
  1002. }
  1003. Status MarkBeginPrepare(bool) override { return Status::OK(); }
  1004. Status MarkRollback(const Slice&) override { return Status::OK(); }
  1005. bool WriteAfterCommit() const override { return false; }
  1006. };
  1007. SnapshotBackup WritePreparedTxnDB::AssignMinMaxSeqs(const Snapshot* snapshot,
  1008. SequenceNumber* min,
  1009. SequenceNumber* max) {
  1010. if (snapshot != nullptr) {
  1011. *min = static_cast_with_check<const SnapshotImpl, const Snapshot>(snapshot)
  1012. ->min_uncommitted_;
  1013. *max = static_cast_with_check<const SnapshotImpl, const Snapshot>(snapshot)
  1014. ->number_;
  1015. return kBackedByDBSnapshot;
  1016. } else {
  1017. *min = SmallestUnCommittedSeq();
  1018. *max = 0; // to be assigned later after sv is referenced.
  1019. return kUnbackedByDBSnapshot;
  1020. }
  1021. }
  1022. bool WritePreparedTxnDB::ValidateSnapshot(
  1023. const SequenceNumber snap_seq, const SnapshotBackup backed_by_snapshot,
  1024. std::memory_order order) {
  1025. if (backed_by_snapshot == kBackedByDBSnapshot) {
  1026. return true;
  1027. } else {
  1028. SequenceNumber max = max_evicted_seq_.load(order);
  1029. // Validate that max has not advanced the snapshot seq that is not backed
  1030. // by a real snapshot. This is a very rare case that should not happen in
  1031. // real workloads.
  1032. if (UNLIKELY(snap_seq <= max && snap_seq != 0)) {
  1033. return false;
  1034. }
  1035. }
  1036. return true;
  1037. }
  1038. } // namespace ROCKSDB_NAMESPACE
  1039. #endif // ROCKSDB_LITE