write_prepared_txn.cc 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #include "utilities/transactions/write_prepared_txn.h"
  6. #include <cinttypes>
  7. #include <map>
  8. #include <set>
  9. #include "db/attribute_group_iterator_impl.h"
  10. #include "db/column_family.h"
  11. #include "db/db_impl/db_impl.h"
  12. #include "rocksdb/db.h"
  13. #include "rocksdb/status.h"
  14. #include "rocksdb/utilities/transaction_db.h"
  15. #include "util/cast_util.h"
  16. #include "utilities/transactions/pessimistic_transaction.h"
  17. #include "utilities/transactions/write_prepared_txn_db.h"
  18. namespace ROCKSDB_NAMESPACE {
  19. struct WriteOptions;
  20. WritePreparedTxn::WritePreparedTxn(WritePreparedTxnDB* txn_db,
  21. const WriteOptions& write_options,
  22. const TransactionOptions& txn_options)
  23. : PessimisticTransaction(txn_db, write_options, txn_options, false),
  24. wpt_db_(txn_db) {
  25. // Call Initialize outside PessimisticTransaction constructor otherwise it
  26. // would skip overridden functions in WritePreparedTxn since they are not
  27. // defined yet in the constructor of PessimisticTransaction
  28. Initialize(txn_options);
  29. }
  30. void WritePreparedTxn::Initialize(const TransactionOptions& txn_options) {
  31. PessimisticTransaction::Initialize(txn_options);
  32. prepare_batch_cnt_ = 0;
  33. }
  34. void WritePreparedTxn::MultiGet(const ReadOptions& _read_options,
  35. ColumnFamilyHandle* column_family,
  36. const size_t num_keys, const Slice* keys,
  37. PinnableSlice* values, Status* statuses,
  38. const bool sorted_input) {
  39. if (_read_options.io_activity != Env::IOActivity::kUnknown &&
  40. _read_options.io_activity != Env::IOActivity::kMultiGet) {
  41. Status s = Status::InvalidArgument(
  42. "Can only call MultiGet with `ReadOptions::io_activity` is "
  43. "`Env::IOActivity::kUnknown` or `Env::IOActivity::kMultiGet`");
  44. for (size_t i = 0; i < num_keys; ++i) {
  45. if (statuses[i].ok()) {
  46. statuses[i] = s;
  47. }
  48. }
  49. return;
  50. }
  51. ReadOptions read_options(_read_options);
  52. if (read_options.io_activity == Env::IOActivity::kUnknown) {
  53. read_options.io_activity = Env::IOActivity::kMultiGet;
  54. }
  55. SequenceNumber min_uncommitted, snap_seq;
  56. const SnapshotBackup backed_by_snapshot = wpt_db_->AssignMinMaxSeqs(
  57. read_options.snapshot, &min_uncommitted, &snap_seq);
  58. WritePreparedTxnReadCallback callback(wpt_db_, snap_seq, min_uncommitted,
  59. backed_by_snapshot);
  60. write_batch_.MultiGetFromBatchAndDB(db_, read_options, column_family,
  61. num_keys, keys, values, statuses,
  62. sorted_input, &callback);
  63. if (UNLIKELY(!callback.valid() ||
  64. !wpt_db_->ValidateSnapshot(snap_seq, backed_by_snapshot))) {
  65. wpt_db_->WPRecordTick(TXN_GET_TRY_AGAIN);
  66. for (size_t i = 0; i < num_keys; i++) {
  67. statuses[i] = Status::TryAgain();
  68. }
  69. }
  70. }
  71. Status WritePreparedTxn::Get(const ReadOptions& _read_options,
  72. ColumnFamilyHandle* column_family,
  73. const Slice& key, PinnableSlice* pinnable_val) {
  74. if (_read_options.io_activity != Env::IOActivity::kUnknown &&
  75. _read_options.io_activity != Env::IOActivity::kGet) {
  76. return Status::InvalidArgument(
  77. "Can only call Get with `ReadOptions::io_activity` is "
  78. "`Env::IOActivity::kUnknown` or `Env::IOActivity::kGet`");
  79. }
  80. ReadOptions read_options(_read_options);
  81. if (read_options.io_activity == Env::IOActivity::kUnknown) {
  82. read_options.io_activity = Env::IOActivity::kGet;
  83. }
  84. return GetImpl(read_options, column_family, key, pinnable_val);
  85. }
  86. Status WritePreparedTxn::GetImpl(const ReadOptions& options,
  87. ColumnFamilyHandle* column_family,
  88. const Slice& key,
  89. PinnableSlice* pinnable_val) {
  90. SequenceNumber min_uncommitted, snap_seq;
  91. const SnapshotBackup backed_by_snapshot =
  92. wpt_db_->AssignMinMaxSeqs(options.snapshot, &min_uncommitted, &snap_seq);
  93. WritePreparedTxnReadCallback callback(wpt_db_, snap_seq, min_uncommitted,
  94. backed_by_snapshot);
  95. Status res = write_batch_.GetFromBatchAndDB(db_, options, column_family, key,
  96. pinnable_val, &callback);
  97. const bool callback_valid =
  98. callback.valid(); // NOTE: validity of callback must always be checked
  99. // before it is destructed
  100. if (res.ok()) {
  101. if (!LIKELY(callback_valid &&
  102. wpt_db_->ValidateSnapshot(callback.max_visible_seq(),
  103. backed_by_snapshot))) {
  104. wpt_db_->WPRecordTick(TXN_GET_TRY_AGAIN);
  105. res = Status::TryAgain();
  106. }
  107. }
  108. return res;
  109. }
  110. Iterator* WritePreparedTxn::GetIterator(const ReadOptions& options) {
  111. return GetIterator(options, wpt_db_->DefaultColumnFamily());
  112. }
  113. Iterator* WritePreparedTxn::GetIterator(const ReadOptions& options,
  114. ColumnFamilyHandle* column_family) {
  115. // Make sure to get iterator from WritePrepareTxnDB, not the root db.
  116. Iterator* db_iter = wpt_db_->NewIterator(options, column_family);
  117. assert(db_iter);
  118. return write_batch_.NewIteratorWithBase(column_family, db_iter, &options);
  119. }
  120. std::unique_ptr<Iterator> WritePreparedTxn::GetCoalescingIterator(
  121. const ReadOptions& /* read_options */,
  122. const std::vector<ColumnFamilyHandle*>& /* column_families */) {
  123. return std::unique_ptr<Iterator>(NewErrorIterator(
  124. Status::NotSupported("GetCoalescingIterator not supported for "
  125. "write-prepared/write-unprepared transactions")));
  126. }
  127. std::unique_ptr<AttributeGroupIterator>
  128. WritePreparedTxn::GetAttributeGroupIterator(
  129. const ReadOptions& /* read_options */,
  130. const std::vector<ColumnFamilyHandle*>& /* column_families */) {
  131. return NewAttributeGroupErrorIterator(
  132. Status::NotSupported("GetAttributeGroupIterator not supported for "
  133. "write-prepared/write-unprepared transactions"));
  134. }
  135. Status WritePreparedTxn::PrepareInternal() {
  136. WriteOptions write_options = write_options_;
  137. write_options.disableWAL = false;
  138. const bool WRITE_AFTER_COMMIT = true;
  139. const bool kFirstPrepareBatch = true;
  140. auto s = WriteBatchInternal::MarkEndPrepare(GetWriteBatch()->GetWriteBatch(),
  141. name_, !WRITE_AFTER_COMMIT);
  142. assert(s.ok());
  143. // For each duplicate key we account for a new sub-batch
  144. prepare_batch_cnt_ = GetWriteBatch()->SubBatchCnt();
  145. // Having AddPrepared in the PreReleaseCallback allows in-order addition of
  146. // prepared entries to PreparedHeap and hence enables an optimization. Refer
  147. // to SmallestUnCommittedSeq for more details.
  148. AddPreparedCallback add_prepared_callback(
  149. wpt_db_, db_impl_, prepare_batch_cnt_,
  150. db_impl_->immutable_db_options().two_write_queues, kFirstPrepareBatch);
  151. const bool DISABLE_MEMTABLE = true;
  152. uint64_t seq_used = kMaxSequenceNumber;
  153. s = db_impl_->WriteImpl(write_options, GetWriteBatch()->GetWriteBatch(),
  154. /*callback*/ nullptr, /*user_write_cb=*/nullptr,
  155. &log_number_, /*log ref*/ 0, !DISABLE_MEMTABLE,
  156. &seq_used, prepare_batch_cnt_,
  157. &add_prepared_callback);
  158. assert(!s.ok() || seq_used != kMaxSequenceNumber);
  159. auto prepare_seq = seq_used;
  160. SetId(prepare_seq);
  161. return s;
  162. }
  163. Status WritePreparedTxn::CommitWithoutPrepareInternal() {
  164. // For each duplicate key we account for a new sub-batch
  165. const size_t batch_cnt = GetWriteBatch()->SubBatchCnt();
  166. return CommitBatchInternal(GetWriteBatch()->GetWriteBatch(), batch_cnt);
  167. }
  168. Status WritePreparedTxn::CommitBatchInternal(WriteBatch* batch,
  169. size_t batch_cnt) {
  170. return wpt_db_->WriteInternal(write_options_, batch, batch_cnt, this);
  171. }
  172. Status WritePreparedTxn::CommitInternal() {
  173. ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log,
  174. "CommitInternal prepare_seq: %" PRIu64, GetID());
  175. // We take the commit-time batch and append the Commit marker.
  176. // The Memtable will ignore the Commit marker in non-recovery mode
  177. WriteBatch* working_batch = GetCommitTimeWriteBatch();
  178. const bool empty = working_batch->Count() == 0;
  179. auto s = WriteBatchInternal::MarkCommit(working_batch, name_);
  180. assert(s.ok());
  181. const bool for_recovery = use_only_the_last_commit_time_batch_for_recovery_;
  182. if (!empty) {
  183. // When not writing to memtable, we can still cache the latest write batch.
  184. // The cached batch will be written to memtable in WriteRecoverableState
  185. // during FlushMemTable
  186. if (for_recovery) {
  187. WriteBatchInternal::SetAsLatestPersistentState(working_batch);
  188. } else {
  189. return Status::InvalidArgument(
  190. "Commit-time-batch can only be used if "
  191. "use_only_the_last_commit_time_batch_for_recovery is true");
  192. }
  193. }
  194. auto prepare_seq = GetId();
  195. const bool includes_data = !empty && !for_recovery;
  196. assert(prepare_batch_cnt_);
  197. size_t commit_batch_cnt = 0;
  198. if (UNLIKELY(includes_data)) {
  199. ROCKS_LOG_WARN(db_impl_->immutable_db_options().info_log,
  200. "Duplicate key overhead");
  201. SubBatchCounter counter(*wpt_db_->GetCFComparatorMap());
  202. s = working_batch->Iterate(&counter);
  203. assert(s.ok());
  204. commit_batch_cnt = counter.BatchCount();
  205. }
  206. const bool disable_memtable = !includes_data;
  207. const bool do_one_write =
  208. !db_impl_->immutable_db_options().two_write_queues || disable_memtable;
  209. WritePreparedCommitEntryPreReleaseCallback update_commit_map(
  210. wpt_db_, db_impl_, prepare_seq, prepare_batch_cnt_, commit_batch_cnt);
  211. // This is to call AddPrepared on CommitTimeWriteBatch
  212. const bool kFirstPrepareBatch = true;
  213. AddPreparedCallback add_prepared_callback(
  214. wpt_db_, db_impl_, commit_batch_cnt,
  215. db_impl_->immutable_db_options().two_write_queues, !kFirstPrepareBatch);
  216. PreReleaseCallback* pre_release_callback;
  217. if (do_one_write) {
  218. pre_release_callback = &update_commit_map;
  219. } else {
  220. pre_release_callback = &add_prepared_callback;
  221. }
  222. uint64_t seq_used = kMaxSequenceNumber;
  223. // Since the prepared batch is directly written to memtable, there is already
  224. // a connection between the memtable and its WAL, so there is no need to
  225. // redundantly reference the log that contains the prepared data.
  226. const uint64_t zero_log_number = 0ull;
  227. size_t batch_cnt = UNLIKELY(commit_batch_cnt) ? commit_batch_cnt : 1;
  228. // If `two_write_queues && includes_data`, then `do_one_write` is false. The
  229. // following `WriteImpl` will insert the data of the commit-time-batch into
  230. // the database before updating the commit cache. Therefore, the data of the
  231. // commmit-time-batch is considered uncommitted. Furthermore, since data of
  232. // the commit-time-batch are not locked, it is possible for two uncommitted
  233. // versions of the same key to co-exist for a (short) period of time until
  234. // the commit cache is updated by the second write. If the two uncommitted
  235. // keys are compacted to the bottommost level in the meantime, it is possible
  236. // that compaction iterator will zero out the sequence numbers of both, thus
  237. // violating the invariant that an SST does not have two identical internal
  238. // keys. To prevent this situation, we should allow the usage of
  239. // commit-time-batch only if the user sets
  240. // TransactionOptions::use_only_the_last_commit_time_batch_for_recovery to
  241. // true. See the comments about GetCommitTimeWriteBatch() in
  242. // include/rocksdb/utilities/transaction.h.
  243. s = db_impl_->WriteImpl(write_options_, working_batch, nullptr,
  244. /*user_write_cb=*/nullptr, nullptr, zero_log_number,
  245. disable_memtable, &seq_used, batch_cnt,
  246. pre_release_callback);
  247. assert(!s.ok() || seq_used != kMaxSequenceNumber);
  248. const SequenceNumber commit_batch_seq = seq_used;
  249. if (LIKELY(do_one_write || !s.ok())) {
  250. if (UNLIKELY(!db_impl_->immutable_db_options().two_write_queues &&
  251. s.ok())) {
  252. // Note: RemovePrepared should be called after WriteImpl that publishsed
  253. // the seq. Otherwise SmallestUnCommittedSeq optimization breaks.
  254. wpt_db_->RemovePrepared(prepare_seq, prepare_batch_cnt_);
  255. } // else RemovePrepared is called from within PreReleaseCallback
  256. if (UNLIKELY(!do_one_write)) {
  257. assert(!s.ok());
  258. // Cleanup the prepared entry we added with add_prepared_callback
  259. wpt_db_->RemovePrepared(commit_batch_seq, commit_batch_cnt);
  260. }
  261. return s;
  262. } // else do the 2nd write to publish seq
  263. // Note: the 2nd write comes with a performance penality. So if we have too
  264. // many of commits accompanied with ComitTimeWriteBatch and yet we cannot
  265. // enable use_only_the_last_commit_time_batch_for_recovery_ optimization,
  266. // two_write_queues should be disabled to avoid many additional writes here.
  267. const size_t kZeroData = 0;
  268. // Update commit map only from the 2nd queue
  269. WritePreparedCommitEntryPreReleaseCallback update_commit_map_with_aux_batch(
  270. wpt_db_, db_impl_, prepare_seq, prepare_batch_cnt_, kZeroData,
  271. commit_batch_seq, commit_batch_cnt);
  272. WriteBatch empty_batch;
  273. s = empty_batch.PutLogData(Slice());
  274. assert(s.ok());
  275. // In the absence of Prepare markers, use Noop as a batch separator
  276. s = WriteBatchInternal::InsertNoop(&empty_batch);
  277. assert(s.ok());
  278. const bool DISABLE_MEMTABLE = true;
  279. const size_t ONE_BATCH = 1;
  280. const uint64_t NO_REF_LOG = 0;
  281. s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr,
  282. /*user_write_cb=*/nullptr, nullptr, NO_REF_LOG,
  283. DISABLE_MEMTABLE, &seq_used, ONE_BATCH,
  284. &update_commit_map_with_aux_batch);
  285. assert(!s.ok() || seq_used != kMaxSequenceNumber);
  286. return s;
  287. }
  288. Status WritePreparedTxn::RollbackInternal() {
  289. ROCKS_LOG_WARN(db_impl_->immutable_db_options().info_log,
  290. "RollbackInternal prepare_seq: %" PRIu64, GetId());
  291. assert(db_impl_);
  292. assert(wpt_db_);
  293. WriteBatch rollback_batch(0 /* reserved_bytes */, 0 /* max_bytes */,
  294. write_options_.protection_bytes_per_key,
  295. 0 /* default_cf_ts_sz */);
  296. assert(GetId() != kMaxSequenceNumber);
  297. assert(GetId() > 0);
  298. auto cf_map_shared_ptr = wpt_db_->GetCFHandleMap();
  299. auto cf_comp_map_shared_ptr = wpt_db_->GetCFComparatorMap();
  300. auto read_at_seq = kMaxSequenceNumber;
  301. // TODO: plumb Env::IOActivity, Env::IOPriority
  302. ReadOptions roptions;
  303. // to prevent callback's seq to be overrriden inside DBImpk::Get
  304. roptions.snapshot = wpt_db_->GetMaxSnapshot();
  305. struct RollbackWriteBatchBuilder : public WriteBatch::Handler {
  306. DBImpl* const db_;
  307. WritePreparedTxnDB* const wpt_db_;
  308. WritePreparedTxnReadCallback callback_;
  309. WriteBatch* rollback_batch_;
  310. std::map<uint32_t, const Comparator*>& comparators_;
  311. std::map<uint32_t, ColumnFamilyHandle*>& handles_;
  312. using CFKeys = std::set<Slice, SetComparator>;
  313. std::map<uint32_t, CFKeys> keys_;
  314. bool rollback_merge_operands_;
  315. ReadOptions roptions_;
  316. RollbackWriteBatchBuilder(
  317. DBImpl* db, WritePreparedTxnDB* wpt_db, SequenceNumber snap_seq,
  318. WriteBatch* dst_batch,
  319. std::map<uint32_t, const Comparator*>& comparators,
  320. std::map<uint32_t, ColumnFamilyHandle*>& handles,
  321. bool rollback_merge_operands, const ReadOptions& _roptions)
  322. : db_(db),
  323. wpt_db_(wpt_db),
  324. callback_(wpt_db, snap_seq), // disable min_uncommitted optimization
  325. rollback_batch_(dst_batch),
  326. comparators_(comparators),
  327. handles_(handles),
  328. rollback_merge_operands_(rollback_merge_operands),
  329. roptions_(_roptions) {}
  330. Status Rollback(uint32_t cf, const Slice& key) {
  331. Status s;
  332. CFKeys& cf_keys = keys_[cf];
  333. if (cf_keys.size() == 0) { // just inserted
  334. auto cmp = comparators_[cf];
  335. keys_[cf] = CFKeys(SetComparator(cmp));
  336. }
  337. auto it = cf_keys.insert(key);
  338. // second is false if a element already existed.
  339. if (it.second == false) {
  340. return s;
  341. }
  342. PinnableSlice pinnable_val;
  343. bool not_used;
  344. auto cf_handle = handles_[cf];
  345. DBImpl::GetImplOptions get_impl_options;
  346. get_impl_options.column_family = cf_handle;
  347. get_impl_options.value = &pinnable_val;
  348. get_impl_options.value_found = &not_used;
  349. get_impl_options.callback = &callback_;
  350. s = db_->GetImpl(roptions_, key, get_impl_options);
  351. assert(s.ok() || s.IsNotFound());
  352. if (s.ok()) {
  353. s = rollback_batch_->Put(cf_handle, key, pinnable_val);
  354. assert(s.ok());
  355. } else if (s.IsNotFound()) {
  356. // There has been no readable value before txn. By adding a delete we
  357. // make sure that there will be none afterwards either.
  358. if (wpt_db_->ShouldRollbackWithSingleDelete(cf_handle, key)) {
  359. s = rollback_batch_->SingleDelete(cf_handle, key);
  360. } else {
  361. s = rollback_batch_->Delete(cf_handle, key);
  362. }
  363. assert(s.ok());
  364. } else {
  365. // Unexpected status. Return it to the user.
  366. }
  367. return s;
  368. }
  369. Status PutCF(uint32_t cf, const Slice& key, const Slice& /*val*/) override {
  370. return Rollback(cf, key);
  371. }
  372. Status DeleteCF(uint32_t cf, const Slice& key) override {
  373. return Rollback(cf, key);
  374. }
  375. Status SingleDeleteCF(uint32_t cf, const Slice& key) override {
  376. return Rollback(cf, key);
  377. }
  378. Status MergeCF(uint32_t cf, const Slice& key,
  379. const Slice& /*val*/) override {
  380. if (rollback_merge_operands_) {
  381. return Rollback(cf, key);
  382. } else {
  383. return Status::OK();
  384. }
  385. }
  386. Status MarkNoop(bool) override { return Status::OK(); }
  387. Status MarkBeginPrepare(bool) override { return Status::OK(); }
  388. Status MarkEndPrepare(const Slice&) override { return Status::OK(); }
  389. Status MarkCommit(const Slice&) override { return Status::OK(); }
  390. Status MarkRollback(const Slice&) override {
  391. return Status::InvalidArgument();
  392. }
  393. protected:
  394. Handler::OptionState WriteAfterCommit() const override {
  395. return Handler::OptionState::kDisabled;
  396. }
  397. } rollback_handler(db_impl_, wpt_db_, read_at_seq, &rollback_batch,
  398. *cf_comp_map_shared_ptr.get(), *cf_map_shared_ptr.get(),
  399. wpt_db_->txn_db_options_.rollback_merge_operands,
  400. roptions);
  401. auto s = GetWriteBatch()->GetWriteBatch()->Iterate(&rollback_handler);
  402. if (!s.ok()) {
  403. return s;
  404. }
  405. // The Rollback marker will be used as a batch separator
  406. s = WriteBatchInternal::MarkRollback(&rollback_batch, name_);
  407. assert(s.ok());
  408. bool do_one_write = !db_impl_->immutable_db_options().two_write_queues;
  409. const bool DISABLE_MEMTABLE = true;
  410. const uint64_t NO_REF_LOG = 0;
  411. uint64_t seq_used = kMaxSequenceNumber;
  412. const size_t ONE_BATCH = 1;
  413. const bool kFirstPrepareBatch = true;
  414. // We commit the rolled back prepared batches. Although this is
  415. // counter-intuitive, i) it is safe to do so, since the prepared batches are
  416. // already canceled out by the rollback batch, ii) adding the commit entry to
  417. // CommitCache will allow us to benefit from the existing mechanism in
  418. // CommitCache that keeps an entry evicted due to max advance and yet overlaps
  419. // with a live snapshot around so that the live snapshot properly skips the
  420. // entry even if its prepare seq is lower than max_evicted_seq_.
  421. AddPreparedCallback add_prepared_callback(
  422. wpt_db_, db_impl_, ONE_BATCH,
  423. db_impl_->immutable_db_options().two_write_queues, !kFirstPrepareBatch);
  424. WritePreparedCommitEntryPreReleaseCallback update_commit_map(
  425. wpt_db_, db_impl_, GetId(), prepare_batch_cnt_, ONE_BATCH);
  426. PreReleaseCallback* pre_release_callback;
  427. if (do_one_write) {
  428. pre_release_callback = &update_commit_map;
  429. } else {
  430. pre_release_callback = &add_prepared_callback;
  431. }
  432. // Note: the rollback batch does not need AddPrepared since it is written to
  433. // DB in one shot. min_uncommitted still works since it requires capturing
  434. // data that is written to DB but not yet committed, while
  435. // the rollback batch commits with PreReleaseCallback.
  436. s = db_impl_->WriteImpl(write_options_, &rollback_batch, nullptr,
  437. /*user_write_cb=*/nullptr, nullptr, NO_REF_LOG,
  438. !DISABLE_MEMTABLE, &seq_used, ONE_BATCH,
  439. pre_release_callback);
  440. assert(!s.ok() || seq_used != kMaxSequenceNumber);
  441. if (!s.ok()) {
  442. return s;
  443. }
  444. if (do_one_write) {
  445. assert(!db_impl_->immutable_db_options().two_write_queues);
  446. wpt_db_->RemovePrepared(GetId(), prepare_batch_cnt_);
  447. return s;
  448. } // else do the 2nd write for commit
  449. uint64_t rollback_seq = seq_used;
  450. ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log,
  451. "RollbackInternal 2nd write rollback_seq: %" PRIu64,
  452. rollback_seq);
  453. // Commit the batch by writing an empty batch to the queue that will release
  454. // the commit sequence number to readers.
  455. WritePreparedRollbackPreReleaseCallback update_commit_map_with_prepare(
  456. wpt_db_, db_impl_, GetId(), rollback_seq, prepare_batch_cnt_);
  457. WriteBatch empty_batch;
  458. s = empty_batch.PutLogData(Slice());
  459. assert(s.ok());
  460. // In the absence of Prepare markers, use Noop as a batch separator
  461. s = WriteBatchInternal::InsertNoop(&empty_batch);
  462. assert(s.ok());
  463. s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr,
  464. /*user_write_cb=*/nullptr, nullptr, NO_REF_LOG,
  465. DISABLE_MEMTABLE, &seq_used, ONE_BATCH,
  466. &update_commit_map_with_prepare);
  467. assert(!s.ok() || seq_used != kMaxSequenceNumber);
  468. ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log,
  469. "RollbackInternal (status=%s) commit: %" PRIu64,
  470. s.ToString().c_str(), GetId());
  471. // TODO(lth): For WriteUnPrepared that rollback is called frequently,
  472. // RemovePrepared could be moved to the callback to reduce lock contention.
  473. if (s.ok()) {
  474. wpt_db_->RemovePrepared(GetId(), prepare_batch_cnt_);
  475. }
  476. // Note: RemovePrepared for prepared batch is called from within
  477. // PreReleaseCallback
  478. wpt_db_->RemovePrepared(rollback_seq, ONE_BATCH);
  479. return s;
  480. }
  481. Status WritePreparedTxn::ValidateSnapshot(ColumnFamilyHandle* column_family,
  482. const Slice& key,
  483. SequenceNumber* tracked_at_seq) {
  484. assert(snapshot_);
  485. SequenceNumber min_uncommitted =
  486. static_cast_with_check<const SnapshotImpl>(snapshot_.get())
  487. ->min_uncommitted_;
  488. SequenceNumber snap_seq = snapshot_->GetSequenceNumber();
  489. // tracked_at_seq is either max or the last snapshot with which this key was
  490. // trackeed so there is no need to apply the IsInSnapshot to this comparison
  491. // here as tracked_at_seq is not a prepare seq.
  492. if (*tracked_at_seq <= snap_seq) {
  493. // If the key has been previous validated at a sequence number earlier
  494. // than the curent snapshot's sequence number, we already know it has not
  495. // been modified.
  496. return Status::OK();
  497. }
  498. *tracked_at_seq = snap_seq;
  499. ColumnFamilyHandle* cfh =
  500. column_family ? column_family : db_impl_->DefaultColumnFamily();
  501. WritePreparedTxnReadCallback snap_checker(wpt_db_, snap_seq, min_uncommitted,
  502. kBackedByDBSnapshot);
  503. // TODO(yanqin): support user-defined timestamp
  504. return TransactionUtil::CheckKeyForConflicts(
  505. db_impl_, cfh, key.ToString(), snap_seq, /*ts=*/nullptr,
  506. false /* cache_only */, &snap_checker, min_uncommitted,
  507. txn_db_impl_->GetTxnDBOptions().enable_udt_validation);
  508. }
  509. void WritePreparedTxn::SetSnapshot() {
  510. const bool kForWWConflictCheck = true;
  511. SnapshotImpl* snapshot = wpt_db_->GetSnapshotInternal(kForWWConflictCheck);
  512. SetSnapshotInternal(snapshot);
  513. }
  514. Status WritePreparedTxn::RebuildFromWriteBatch(WriteBatch* src_batch) {
  515. auto ret = PessimisticTransaction::RebuildFromWriteBatch(src_batch);
  516. prepare_batch_cnt_ = GetWriteBatch()->SubBatchCnt();
  517. return ret;
  518. }
  519. } // namespace ROCKSDB_NAMESPACE