write_prepared_txn.cc 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #ifndef ROCKSDB_LITE
  6. #include "utilities/transactions/write_prepared_txn.h"
  7. #include <cinttypes>
  8. #include <map>
  9. #include <set>
  10. #include "db/column_family.h"
  11. #include "db/db_impl/db_impl.h"
  12. #include "rocksdb/db.h"
  13. #include "rocksdb/status.h"
  14. #include "rocksdb/utilities/transaction_db.h"
  15. #include "util/cast_util.h"
  16. #include "utilities/transactions/pessimistic_transaction.h"
  17. #include "utilities/transactions/write_prepared_txn_db.h"
  18. namespace ROCKSDB_NAMESPACE {
  19. struct WriteOptions;
  20. WritePreparedTxn::WritePreparedTxn(WritePreparedTxnDB* txn_db,
  21. const WriteOptions& write_options,
  22. const TransactionOptions& txn_options)
  23. : PessimisticTransaction(txn_db, write_options, txn_options, false),
  24. wpt_db_(txn_db) {
  25. // Call Initialize outside PessimisticTransaction constructor otherwise it
  26. // would skip overridden functions in WritePreparedTxn since they are not
  27. // defined yet in the constructor of PessimisticTransaction
  28. Initialize(txn_options);
  29. }
  30. void WritePreparedTxn::Initialize(const TransactionOptions& txn_options) {
  31. PessimisticTransaction::Initialize(txn_options);
  32. prepare_batch_cnt_ = 0;
  33. }
  34. void WritePreparedTxn::MultiGet(const ReadOptions& options,
  35. ColumnFamilyHandle* column_family,
  36. const size_t num_keys, const Slice* keys,
  37. PinnableSlice* values, Status* statuses,
  38. const bool sorted_input) {
  39. SequenceNumber min_uncommitted, snap_seq;
  40. const SnapshotBackup backed_by_snapshot =
  41. wpt_db_->AssignMinMaxSeqs(options.snapshot, &min_uncommitted, &snap_seq);
  42. WritePreparedTxnReadCallback callback(wpt_db_, snap_seq, min_uncommitted,
  43. backed_by_snapshot);
  44. write_batch_.MultiGetFromBatchAndDB(db_, options, column_family, num_keys,
  45. keys, values, statuses, sorted_input,
  46. &callback);
  47. if (UNLIKELY(!callback.valid() ||
  48. !wpt_db_->ValidateSnapshot(snap_seq, backed_by_snapshot))) {
  49. wpt_db_->WPRecordTick(TXN_GET_TRY_AGAIN);
  50. for (size_t i = 0; i < num_keys; i++) {
  51. statuses[i] = Status::TryAgain();
  52. }
  53. }
  54. }
  55. Status WritePreparedTxn::Get(const ReadOptions& options,
  56. ColumnFamilyHandle* column_family,
  57. const Slice& key, PinnableSlice* pinnable_val) {
  58. SequenceNumber min_uncommitted, snap_seq;
  59. const SnapshotBackup backed_by_snapshot =
  60. wpt_db_->AssignMinMaxSeqs(options.snapshot, &min_uncommitted, &snap_seq);
  61. WritePreparedTxnReadCallback callback(wpt_db_, snap_seq, min_uncommitted,
  62. backed_by_snapshot);
  63. auto res = write_batch_.GetFromBatchAndDB(db_, options, column_family, key,
  64. pinnable_val, &callback);
  65. if (LIKELY(callback.valid() &&
  66. wpt_db_->ValidateSnapshot(callback.max_visible_seq(),
  67. backed_by_snapshot))) {
  68. return res;
  69. } else {
  70. wpt_db_->WPRecordTick(TXN_GET_TRY_AGAIN);
  71. return Status::TryAgain();
  72. }
  73. }
  74. Iterator* WritePreparedTxn::GetIterator(const ReadOptions& options) {
  75. // Make sure to get iterator from WritePrepareTxnDB, not the root db.
  76. Iterator* db_iter = wpt_db_->NewIterator(options);
  77. assert(db_iter);
  78. return write_batch_.NewIteratorWithBase(db_iter);
  79. }
  80. Iterator* WritePreparedTxn::GetIterator(const ReadOptions& options,
  81. ColumnFamilyHandle* column_family) {
  82. // Make sure to get iterator from WritePrepareTxnDB, not the root db.
  83. Iterator* db_iter = wpt_db_->NewIterator(options, column_family);
  84. assert(db_iter);
  85. return write_batch_.NewIteratorWithBase(column_family, db_iter);
  86. }
  87. Status WritePreparedTxn::PrepareInternal() {
  88. WriteOptions write_options = write_options_;
  89. write_options.disableWAL = false;
  90. const bool WRITE_AFTER_COMMIT = true;
  91. const bool kFirstPrepareBatch = true;
  92. WriteBatchInternal::MarkEndPrepare(GetWriteBatch()->GetWriteBatch(), name_,
  93. !WRITE_AFTER_COMMIT);
  94. // For each duplicate key we account for a new sub-batch
  95. prepare_batch_cnt_ = GetWriteBatch()->SubBatchCnt();
  96. // Having AddPrepared in the PreReleaseCallback allows in-order addition of
  97. // prepared entries to PreparedHeap and hence enables an optimization. Refer to
  98. // SmallestUnCommittedSeq for more details.
  99. AddPreparedCallback add_prepared_callback(
  100. wpt_db_, db_impl_, prepare_batch_cnt_,
  101. db_impl_->immutable_db_options().two_write_queues, kFirstPrepareBatch);
  102. const bool DISABLE_MEMTABLE = true;
  103. uint64_t seq_used = kMaxSequenceNumber;
  104. Status s = db_impl_->WriteImpl(
  105. write_options, GetWriteBatch()->GetWriteBatch(),
  106. /*callback*/ nullptr, &log_number_, /*log ref*/ 0, !DISABLE_MEMTABLE,
  107. &seq_used, prepare_batch_cnt_, &add_prepared_callback);
  108. assert(!s.ok() || seq_used != kMaxSequenceNumber);
  109. auto prepare_seq = seq_used;
  110. SetId(prepare_seq);
  111. return s;
  112. }
  113. Status WritePreparedTxn::CommitWithoutPrepareInternal() {
  114. // For each duplicate key we account for a new sub-batch
  115. const size_t batch_cnt = GetWriteBatch()->SubBatchCnt();
  116. return CommitBatchInternal(GetWriteBatch()->GetWriteBatch(), batch_cnt);
  117. }
  118. Status WritePreparedTxn::CommitBatchInternal(WriteBatch* batch,
  119. size_t batch_cnt) {
  120. return wpt_db_->WriteInternal(write_options_, batch, batch_cnt, this);
  121. }
  122. Status WritePreparedTxn::CommitInternal() {
  123. ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log,
  124. "CommitInternal prepare_seq: %" PRIu64, GetID());
  125. // We take the commit-time batch and append the Commit marker.
  126. // The Memtable will ignore the Commit marker in non-recovery mode
  127. WriteBatch* working_batch = GetCommitTimeWriteBatch();
  128. const bool empty = working_batch->Count() == 0;
  129. WriteBatchInternal::MarkCommit(working_batch, name_);
  130. const bool for_recovery = use_only_the_last_commit_time_batch_for_recovery_;
  131. if (!empty && for_recovery) {
  132. // When not writing to memtable, we can still cache the latest write batch.
  133. // The cached batch will be written to memtable in WriteRecoverableState
  134. // during FlushMemTable
  135. WriteBatchInternal::SetAsLastestPersistentState(working_batch);
  136. }
  137. auto prepare_seq = GetId();
  138. const bool includes_data = !empty && !for_recovery;
  139. assert(prepare_batch_cnt_);
  140. size_t commit_batch_cnt = 0;
  141. if (UNLIKELY(includes_data)) {
  142. ROCKS_LOG_WARN(db_impl_->immutable_db_options().info_log,
  143. "Duplicate key overhead");
  144. SubBatchCounter counter(*wpt_db_->GetCFComparatorMap());
  145. auto s = working_batch->Iterate(&counter);
  146. assert(s.ok());
  147. commit_batch_cnt = counter.BatchCount();
  148. }
  149. const bool disable_memtable = !includes_data;
  150. const bool do_one_write =
  151. !db_impl_->immutable_db_options().two_write_queues || disable_memtable;
  152. WritePreparedCommitEntryPreReleaseCallback update_commit_map(
  153. wpt_db_, db_impl_, prepare_seq, prepare_batch_cnt_, commit_batch_cnt);
  154. // This is to call AddPrepared on CommitTimeWriteBatch
  155. const bool kFirstPrepareBatch = true;
  156. AddPreparedCallback add_prepared_callback(
  157. wpt_db_, db_impl_, commit_batch_cnt,
  158. db_impl_->immutable_db_options().two_write_queues, !kFirstPrepareBatch);
  159. PreReleaseCallback* pre_release_callback;
  160. if (do_one_write) {
  161. pre_release_callback = &update_commit_map;
  162. } else {
  163. pre_release_callback = &add_prepared_callback;
  164. }
  165. uint64_t seq_used = kMaxSequenceNumber;
  166. // Since the prepared batch is directly written to memtable, there is already
  167. // a connection between the memtable and its WAL, so there is no need to
  168. // redundantly reference the log that contains the prepared data.
  169. const uint64_t zero_log_number = 0ull;
  170. size_t batch_cnt = UNLIKELY(commit_batch_cnt) ? commit_batch_cnt : 1;
  171. auto s = db_impl_->WriteImpl(write_options_, working_batch, nullptr, nullptr,
  172. zero_log_number, disable_memtable, &seq_used,
  173. batch_cnt, pre_release_callback);
  174. assert(!s.ok() || seq_used != kMaxSequenceNumber);
  175. const SequenceNumber commit_batch_seq = seq_used;
  176. if (LIKELY(do_one_write || !s.ok())) {
  177. if (UNLIKELY(!db_impl_->immutable_db_options().two_write_queues &&
  178. s.ok())) {
  179. // Note: RemovePrepared should be called after WriteImpl that publishsed
  180. // the seq. Otherwise SmallestUnCommittedSeq optimization breaks.
  181. wpt_db_->RemovePrepared(prepare_seq, prepare_batch_cnt_);
  182. } // else RemovePrepared is called from within PreReleaseCallback
  183. if (UNLIKELY(!do_one_write)) {
  184. assert(!s.ok());
  185. // Cleanup the prepared entry we added with add_prepared_callback
  186. wpt_db_->RemovePrepared(commit_batch_seq, commit_batch_cnt);
  187. }
  188. return s;
  189. } // else do the 2nd write to publish seq
  190. // Note: the 2nd write comes with a performance penality. So if we have too
  191. // many of commits accompanied with ComitTimeWriteBatch and yet we cannot
  192. // enable use_only_the_last_commit_time_batch_for_recovery_ optimization,
  193. // two_write_queues should be disabled to avoid many additional writes here.
  194. const size_t kZeroData = 0;
  195. // Update commit map only from the 2nd queue
  196. WritePreparedCommitEntryPreReleaseCallback update_commit_map_with_aux_batch(
  197. wpt_db_, db_impl_, prepare_seq, prepare_batch_cnt_, kZeroData,
  198. commit_batch_seq, commit_batch_cnt);
  199. WriteBatch empty_batch;
  200. empty_batch.PutLogData(Slice());
  201. // In the absence of Prepare markers, use Noop as a batch separator
  202. WriteBatchInternal::InsertNoop(&empty_batch);
  203. const bool DISABLE_MEMTABLE = true;
  204. const size_t ONE_BATCH = 1;
  205. const uint64_t NO_REF_LOG = 0;
  206. s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr, nullptr,
  207. NO_REF_LOG, DISABLE_MEMTABLE, &seq_used, ONE_BATCH,
  208. &update_commit_map_with_aux_batch);
  209. assert(!s.ok() || seq_used != kMaxSequenceNumber);
  210. if (UNLIKELY(!db_impl_->immutable_db_options().two_write_queues)) {
  211. if (s.ok()) {
  212. // Note: RemovePrepared should be called after WriteImpl that publishsed
  213. // the seq. Otherwise SmallestUnCommittedSeq optimization breaks.
  214. wpt_db_->RemovePrepared(prepare_seq, prepare_batch_cnt_);
  215. }
  216. wpt_db_->RemovePrepared(commit_batch_seq, commit_batch_cnt);
  217. } // else RemovePrepared is called from within PreReleaseCallback
  218. return s;
  219. }
  220. Status WritePreparedTxn::RollbackInternal() {
  221. ROCKS_LOG_WARN(db_impl_->immutable_db_options().info_log,
  222. "RollbackInternal prepare_seq: %" PRIu64, GetId());
  223. WriteBatch rollback_batch;
  224. assert(GetId() != kMaxSequenceNumber);
  225. assert(GetId() > 0);
  226. auto cf_map_shared_ptr = wpt_db_->GetCFHandleMap();
  227. auto cf_comp_map_shared_ptr = wpt_db_->GetCFComparatorMap();
  228. auto read_at_seq = kMaxSequenceNumber;
  229. ReadOptions roptions;
  230. // to prevent callback's seq to be overrriden inside DBImpk::Get
  231. roptions.snapshot = wpt_db_->GetMaxSnapshot();
  232. struct RollbackWriteBatchBuilder : public WriteBatch::Handler {
  233. DBImpl* db_;
  234. WritePreparedTxnReadCallback callback;
  235. WriteBatch* rollback_batch_;
  236. std::map<uint32_t, const Comparator*>& comparators_;
  237. std::map<uint32_t, ColumnFamilyHandle*>& handles_;
  238. using CFKeys = std::set<Slice, SetComparator>;
  239. std::map<uint32_t, CFKeys> keys_;
  240. bool rollback_merge_operands_;
  241. ReadOptions roptions_;
  242. RollbackWriteBatchBuilder(
  243. DBImpl* db, WritePreparedTxnDB* wpt_db, SequenceNumber snap_seq,
  244. WriteBatch* dst_batch,
  245. std::map<uint32_t, const Comparator*>& comparators,
  246. std::map<uint32_t, ColumnFamilyHandle*>& handles,
  247. bool rollback_merge_operands, ReadOptions _roptions)
  248. : db_(db),
  249. callback(wpt_db, snap_seq), // disable min_uncommitted optimization
  250. rollback_batch_(dst_batch),
  251. comparators_(comparators),
  252. handles_(handles),
  253. rollback_merge_operands_(rollback_merge_operands),
  254. roptions_(_roptions) {}
  255. Status Rollback(uint32_t cf, const Slice& key) {
  256. Status s;
  257. CFKeys& cf_keys = keys_[cf];
  258. if (cf_keys.size() == 0) { // just inserted
  259. auto cmp = comparators_[cf];
  260. keys_[cf] = CFKeys(SetComparator(cmp));
  261. }
  262. auto it = cf_keys.insert(key);
  263. if (it.second ==
  264. false) { // second is false if a element already existed.
  265. return s;
  266. }
  267. PinnableSlice pinnable_val;
  268. bool not_used;
  269. auto cf_handle = handles_[cf];
  270. DBImpl::GetImplOptions get_impl_options;
  271. get_impl_options.column_family = cf_handle;
  272. get_impl_options.value = &pinnable_val;
  273. get_impl_options.value_found = &not_used;
  274. get_impl_options.callback = &callback;
  275. s = db_->GetImpl(roptions_, key, get_impl_options);
  276. assert(s.ok() || s.IsNotFound());
  277. if (s.ok()) {
  278. s = rollback_batch_->Put(cf_handle, key, pinnable_val);
  279. assert(s.ok());
  280. } else if (s.IsNotFound()) {
  281. // There has been no readable value before txn. By adding a delete we
  282. // make sure that there will be none afterwards either.
  283. s = rollback_batch_->Delete(cf_handle, key);
  284. assert(s.ok());
  285. } else {
  286. // Unexpected status. Return it to the user.
  287. }
  288. return s;
  289. }
  290. Status PutCF(uint32_t cf, const Slice& key, const Slice& /*val*/) override {
  291. return Rollback(cf, key);
  292. }
  293. Status DeleteCF(uint32_t cf, const Slice& key) override {
  294. return Rollback(cf, key);
  295. }
  296. Status SingleDeleteCF(uint32_t cf, const Slice& key) override {
  297. return Rollback(cf, key);
  298. }
  299. Status MergeCF(uint32_t cf, const Slice& key,
  300. const Slice& /*val*/) override {
  301. if (rollback_merge_operands_) {
  302. return Rollback(cf, key);
  303. } else {
  304. return Status::OK();
  305. }
  306. }
  307. Status MarkNoop(bool) override { return Status::OK(); }
  308. Status MarkBeginPrepare(bool) override { return Status::OK(); }
  309. Status MarkEndPrepare(const Slice&) override { return Status::OK(); }
  310. Status MarkCommit(const Slice&) override { return Status::OK(); }
  311. Status MarkRollback(const Slice&) override {
  312. return Status::InvalidArgument();
  313. }
  314. protected:
  315. bool WriteAfterCommit() const override { return false; }
  316. } rollback_handler(db_impl_, wpt_db_, read_at_seq, &rollback_batch,
  317. *cf_comp_map_shared_ptr.get(), *cf_map_shared_ptr.get(),
  318. wpt_db_->txn_db_options_.rollback_merge_operands,
  319. roptions);
  320. auto s = GetWriteBatch()->GetWriteBatch()->Iterate(&rollback_handler);
  321. assert(s.ok());
  322. if (!s.ok()) {
  323. return s;
  324. }
  325. // The Rollback marker will be used as a batch separator
  326. WriteBatchInternal::MarkRollback(&rollback_batch, name_);
  327. bool do_one_write = !db_impl_->immutable_db_options().two_write_queues;
  328. const bool DISABLE_MEMTABLE = true;
  329. const uint64_t NO_REF_LOG = 0;
  330. uint64_t seq_used = kMaxSequenceNumber;
  331. const size_t ONE_BATCH = 1;
  332. const bool kFirstPrepareBatch = true;
  333. // We commit the rolled back prepared batches. Although this is
  334. // counter-intuitive, i) it is safe to do so, since the prepared batches are
  335. // already canceled out by the rollback batch, ii) adding the commit entry to
  336. // CommitCache will allow us to benefit from the existing mechanism in
  337. // CommitCache that keeps an entry evicted due to max advance and yet overlaps
  338. // with a live snapshot around so that the live snapshot properly skips the
  339. // entry even if its prepare seq is lower than max_evicted_seq_.
  340. AddPreparedCallback add_prepared_callback(
  341. wpt_db_, db_impl_, ONE_BATCH,
  342. db_impl_->immutable_db_options().two_write_queues, !kFirstPrepareBatch);
  343. WritePreparedCommitEntryPreReleaseCallback update_commit_map(
  344. wpt_db_, db_impl_, GetId(), prepare_batch_cnt_, ONE_BATCH);
  345. PreReleaseCallback* pre_release_callback;
  346. if (do_one_write) {
  347. pre_release_callback = &update_commit_map;
  348. } else {
  349. pre_release_callback = &add_prepared_callback;
  350. }
  351. // Note: the rollback batch does not need AddPrepared since it is written to
  352. // DB in one shot. min_uncommitted still works since it requires capturing
  353. // data that is written to DB but not yet committed, while
  354. // the rollback batch commits with PreReleaseCallback.
  355. s = db_impl_->WriteImpl(write_options_, &rollback_batch, nullptr, nullptr,
  356. NO_REF_LOG, !DISABLE_MEMTABLE, &seq_used, ONE_BATCH,
  357. pre_release_callback);
  358. assert(!s.ok() || seq_used != kMaxSequenceNumber);
  359. if (!s.ok()) {
  360. return s;
  361. }
  362. if (do_one_write) {
  363. assert(!db_impl_->immutable_db_options().two_write_queues);
  364. wpt_db_->RemovePrepared(GetId(), prepare_batch_cnt_);
  365. return s;
  366. } // else do the 2nd write for commit
  367. uint64_t rollback_seq = seq_used;
  368. ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log,
  369. "RollbackInternal 2nd write rollback_seq: %" PRIu64,
  370. rollback_seq);
  371. // Commit the batch by writing an empty batch to the queue that will release
  372. // the commit sequence number to readers.
  373. WritePreparedRollbackPreReleaseCallback update_commit_map_with_prepare(
  374. wpt_db_, db_impl_, GetId(), rollback_seq, prepare_batch_cnt_);
  375. WriteBatch empty_batch;
  376. empty_batch.PutLogData(Slice());
  377. // In the absence of Prepare markers, use Noop as a batch separator
  378. WriteBatchInternal::InsertNoop(&empty_batch);
  379. s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr, nullptr,
  380. NO_REF_LOG, DISABLE_MEMTABLE, &seq_used, ONE_BATCH,
  381. &update_commit_map_with_prepare);
  382. assert(!s.ok() || seq_used != kMaxSequenceNumber);
  383. ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log,
  384. "RollbackInternal (status=%s) commit: %" PRIu64,
  385. s.ToString().c_str(), GetId());
  386. // TODO(lth): For WriteUnPrepared that rollback is called frequently,
  387. // RemovePrepared could be moved to the callback to reduce lock contention.
  388. if (s.ok()) {
  389. wpt_db_->RemovePrepared(GetId(), prepare_batch_cnt_);
  390. }
  391. // Note: RemovePrepared for prepared batch is called from within
  392. // PreReleaseCallback
  393. wpt_db_->RemovePrepared(rollback_seq, ONE_BATCH);
  394. return s;
  395. }
  396. Status WritePreparedTxn::ValidateSnapshot(ColumnFamilyHandle* column_family,
  397. const Slice& key,
  398. SequenceNumber* tracked_at_seq) {
  399. assert(snapshot_);
  400. SequenceNumber min_uncommitted =
  401. static_cast_with_check<const SnapshotImpl, const Snapshot>(
  402. snapshot_.get())
  403. ->min_uncommitted_;
  404. SequenceNumber snap_seq = snapshot_->GetSequenceNumber();
  405. // tracked_at_seq is either max or the last snapshot with which this key was
  406. // trackeed so there is no need to apply the IsInSnapshot to this comparison
  407. // here as tracked_at_seq is not a prepare seq.
  408. if (*tracked_at_seq <= snap_seq) {
  409. // If the key has been previous validated at a sequence number earlier
  410. // than the curent snapshot's sequence number, we already know it has not
  411. // been modified.
  412. return Status::OK();
  413. }
  414. *tracked_at_seq = snap_seq;
  415. ColumnFamilyHandle* cfh =
  416. column_family ? column_family : db_impl_->DefaultColumnFamily();
  417. WritePreparedTxnReadCallback snap_checker(wpt_db_, snap_seq, min_uncommitted,
  418. kBackedByDBSnapshot);
  419. return TransactionUtil::CheckKeyForConflicts(db_impl_, cfh, key.ToString(),
  420. snap_seq, false /* cache_only */,
  421. &snap_checker, min_uncommitted);
  422. }
  423. void WritePreparedTxn::SetSnapshot() {
  424. const bool kForWWConflictCheck = true;
  425. SnapshotImpl* snapshot = wpt_db_->GetSnapshotInternal(kForWWConflictCheck);
  426. SetSnapshotInternal(snapshot);
  427. }
  428. Status WritePreparedTxn::RebuildFromWriteBatch(WriteBatch* src_batch) {
  429. auto ret = PessimisticTransaction::RebuildFromWriteBatch(src_batch);
  430. prepare_batch_cnt_ = GetWriteBatch()->SubBatchCnt();
  431. return ret;
  432. }
  433. } // namespace ROCKSDB_NAMESPACE
  434. #endif // ROCKSDB_LITE