write_unprepared_txn.cc 41 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #include "utilities/transactions/write_unprepared_txn.h"
  6. #include "db/db_impl/db_impl.h"
  7. #include "util/cast_util.h"
  8. #include "utilities/transactions/write_unprepared_txn_db.h"
  9. #include "utilities/write_batch_with_index/write_batch_with_index_internal.h"
  10. namespace ROCKSDB_NAMESPACE {
  11. bool WriteUnpreparedTxnReadCallback::IsVisibleFullCheck(SequenceNumber seq) {
  12. // Since unprep_seqs maps prep_seq => prepare_batch_cnt, to check if seq is
  13. // in unprep_seqs, we have to check if seq is equal to prep_seq or any of
  14. // the prepare_batch_cnt seq nums after it.
  15. //
  16. // TODO(lth): Can be optimized with std::lower_bound if unprep_seqs is
  17. // large.
  18. for (const auto& it : unprep_seqs_) {
  19. if (it.first <= seq && seq < it.first + it.second) {
  20. return true;
  21. }
  22. }
  23. bool snap_released = false;
  24. auto ret =
  25. db_->IsInSnapshot(seq, wup_snapshot_, min_uncommitted_, &snap_released);
  26. assert(!snap_released || backed_by_snapshot_ == kUnbackedByDBSnapshot);
  27. snap_released_ |= snap_released;
  28. return ret;
  29. }
  30. WriteUnpreparedTxn::WriteUnpreparedTxn(WriteUnpreparedTxnDB* txn_db,
  31. const WriteOptions& write_options,
  32. const TransactionOptions& txn_options)
  33. : WritePreparedTxn(txn_db, write_options, txn_options),
  34. wupt_db_(txn_db),
  35. last_log_number_(0),
  36. recovered_txn_(false),
  37. largest_validated_seq_(0) {
  38. if (txn_options.write_batch_flush_threshold < 0) {
  39. write_batch_flush_threshold_ =
  40. txn_db_impl_->GetTxnDBOptions().default_write_batch_flush_threshold;
  41. } else {
  42. write_batch_flush_threshold_ = txn_options.write_batch_flush_threshold;
  43. }
  44. }
  45. WriteUnpreparedTxn::~WriteUnpreparedTxn() {
  46. if (!unprep_seqs_.empty()) {
  47. assert(log_number_ > 0);
  48. assert(GetId() > 0);
  49. assert(!name_.empty());
  50. // We should rollback regardless of GetState, but some unit tests that
  51. // test crash recovery run the destructor assuming that rollback does not
  52. // happen, so that rollback during recovery can be exercised.
  53. if (GetState() == STARTED || GetState() == LOCKS_STOLEN) {
  54. auto s = RollbackInternal();
  55. assert(s.ok());
  56. if (!s.ok()) {
  57. ROCKS_LOG_FATAL(
  58. wupt_db_->info_log_,
  59. "Rollback of WriteUnprepared transaction failed in destructor: %s",
  60. s.ToString().c_str());
  61. }
  62. dbimpl_->logs_with_prep_tracker()->MarkLogAsHavingPrepSectionFlushed(
  63. log_number_);
  64. }
  65. }
  66. // Clear the tracked locks so that ~PessimisticTransaction does not
  67. // try to unlock keys for recovered transactions.
  68. if (recovered_txn_) {
  69. tracked_locks_->Clear();
  70. }
  71. }
  72. void WriteUnpreparedTxn::Initialize(const TransactionOptions& txn_options) {
  73. PessimisticTransaction::Initialize(txn_options);
  74. if (txn_options.write_batch_flush_threshold < 0) {
  75. write_batch_flush_threshold_ =
  76. txn_db_impl_->GetTxnDBOptions().default_write_batch_flush_threshold;
  77. } else {
  78. write_batch_flush_threshold_ = txn_options.write_batch_flush_threshold;
  79. }
  80. unprep_seqs_.clear();
  81. flushed_save_points_.reset(nullptr);
  82. unflushed_save_points_.reset(nullptr);
  83. recovered_txn_ = false;
  84. largest_validated_seq_ = 0;
  85. assert(active_iterators_.empty());
  86. active_iterators_.clear();
  87. untracked_keys_.clear();
  88. }
  89. Status WriteUnpreparedTxn::HandleWrite(std::function<Status()> do_write) {
  90. Status s;
  91. if (active_iterators_.empty()) {
  92. s = MaybeFlushWriteBatchToDB();
  93. if (!s.ok()) {
  94. return s;
  95. }
  96. }
  97. s = do_write();
  98. if (s.ok()) {
  99. if (snapshot_) {
  100. largest_validated_seq_ =
  101. std::max(largest_validated_seq_, snapshot_->GetSequenceNumber());
  102. } else {
  103. // TODO(lth): We should use the same number as tracked_at_seq in TryLock,
  104. // because what is actually being tracked is the sequence number at which
  105. // this key was locked at.
  106. largest_validated_seq_ = db_impl_->GetLastPublishedSequence();
  107. }
  108. }
  109. return s;
  110. }
  111. Status WriteUnpreparedTxn::Put(ColumnFamilyHandle* column_family,
  112. const Slice& key, const Slice& value,
  113. const bool assume_tracked) {
  114. return HandleWrite([&]() {
  115. return TransactionBaseImpl::Put(column_family, key, value, assume_tracked);
  116. });
  117. }
  118. Status WriteUnpreparedTxn::Put(ColumnFamilyHandle* column_family,
  119. const SliceParts& key, const SliceParts& value,
  120. const bool assume_tracked) {
  121. return HandleWrite([&]() {
  122. return TransactionBaseImpl::Put(column_family, key, value, assume_tracked);
  123. });
  124. }
  125. Status WriteUnpreparedTxn::Merge(ColumnFamilyHandle* column_family,
  126. const Slice& key, const Slice& value,
  127. const bool assume_tracked) {
  128. return HandleWrite([&]() {
  129. return TransactionBaseImpl::Merge(column_family, key, value,
  130. assume_tracked);
  131. });
  132. }
  133. Status WriteUnpreparedTxn::Delete(ColumnFamilyHandle* column_family,
  134. const Slice& key, const bool assume_tracked) {
  135. return HandleWrite([&]() {
  136. return TransactionBaseImpl::Delete(column_family, key, assume_tracked);
  137. });
  138. }
  139. Status WriteUnpreparedTxn::Delete(ColumnFamilyHandle* column_family,
  140. const SliceParts& key,
  141. const bool assume_tracked) {
  142. return HandleWrite([&]() {
  143. return TransactionBaseImpl::Delete(column_family, key, assume_tracked);
  144. });
  145. }
  146. Status WriteUnpreparedTxn::SingleDelete(ColumnFamilyHandle* column_family,
  147. const Slice& key,
  148. const bool assume_tracked) {
  149. return HandleWrite([&]() {
  150. return TransactionBaseImpl::SingleDelete(column_family, key,
  151. assume_tracked);
  152. });
  153. }
  154. Status WriteUnpreparedTxn::SingleDelete(ColumnFamilyHandle* column_family,
  155. const SliceParts& key,
  156. const bool assume_tracked) {
  157. return HandleWrite([&]() {
  158. return TransactionBaseImpl::SingleDelete(column_family, key,
  159. assume_tracked);
  160. });
  161. }
  162. // WriteUnpreparedTxn::RebuildFromWriteBatch is only called on recovery. For
  163. // WriteUnprepared, the write batches have already been written into the
  164. // database during WAL replay, so all we have to do is just to "retrack" the key
  165. // so that rollbacks are possible.
  166. //
  167. // Calling TryLock instead of TrackKey is also possible, but as an optimization,
  168. // recovered transactions do not hold locks on their keys. This follows the
  169. // implementation in PessimisticTransactionDB::Initialize where we set
  170. // skip_concurrency_control to true.
  171. Status WriteUnpreparedTxn::RebuildFromWriteBatch(WriteBatch* wb) {
  172. struct TrackKeyHandler : public WriteBatch::Handler {
  173. WriteUnpreparedTxn* txn_;
  174. bool rollback_merge_operands_;
  175. TrackKeyHandler(WriteUnpreparedTxn* txn, bool rollback_merge_operands)
  176. : txn_(txn), rollback_merge_operands_(rollback_merge_operands) {}
  177. Status PutCF(uint32_t cf, const Slice& key, const Slice&) override {
  178. txn_->TrackKey(cf, key.ToString(), kMaxSequenceNumber,
  179. false /* read_only */, true /* exclusive */);
  180. return Status::OK();
  181. }
  182. Status DeleteCF(uint32_t cf, const Slice& key) override {
  183. txn_->TrackKey(cf, key.ToString(), kMaxSequenceNumber,
  184. false /* read_only */, true /* exclusive */);
  185. return Status::OK();
  186. }
  187. Status SingleDeleteCF(uint32_t cf, const Slice& key) override {
  188. txn_->TrackKey(cf, key.ToString(), kMaxSequenceNumber,
  189. false /* read_only */, true /* exclusive */);
  190. return Status::OK();
  191. }
  192. Status MergeCF(uint32_t cf, const Slice& key, const Slice&) override {
  193. if (rollback_merge_operands_) {
  194. txn_->TrackKey(cf, key.ToString(), kMaxSequenceNumber,
  195. false /* read_only */, true /* exclusive */);
  196. }
  197. return Status::OK();
  198. }
  199. // Recovered batches do not contain 2PC markers.
  200. Status MarkBeginPrepare(bool) override { return Status::InvalidArgument(); }
  201. Status MarkEndPrepare(const Slice&) override {
  202. return Status::InvalidArgument();
  203. }
  204. Status MarkNoop(bool) override { return Status::InvalidArgument(); }
  205. Status MarkCommit(const Slice&) override {
  206. return Status::InvalidArgument();
  207. }
  208. Status MarkRollback(const Slice&) override {
  209. return Status::InvalidArgument();
  210. }
  211. };
  212. TrackKeyHandler handler(this,
  213. wupt_db_->txn_db_options_.rollback_merge_operands);
  214. return wb->Iterate(&handler);
  215. }
  216. Status WriteUnpreparedTxn::MaybeFlushWriteBatchToDB() {
  217. const bool kPrepared = true;
  218. Status s;
  219. if (write_batch_flush_threshold_ > 0 &&
  220. write_batch_.GetWriteBatch()->Count() > 0 &&
  221. write_batch_.GetDataSize() >
  222. static_cast<size_t>(write_batch_flush_threshold_)) {
  223. assert(GetState() != PREPARED);
  224. s = FlushWriteBatchToDB(!kPrepared);
  225. }
  226. return s;
  227. }
  228. Status WriteUnpreparedTxn::FlushWriteBatchToDB(bool prepared) {
  229. // If the current write batch contains savepoints, then some special handling
  230. // is required so that RollbackToSavepoint can work.
  231. //
  232. // RollbackToSavepoint is not supported after Prepare() is called, so only do
  233. // this for unprepared batches.
  234. if (!prepared && unflushed_save_points_ != nullptr &&
  235. !unflushed_save_points_->empty()) {
  236. return FlushWriteBatchWithSavePointToDB();
  237. }
  238. return FlushWriteBatchToDBInternal(prepared);
  239. }
  240. Status WriteUnpreparedTxn::FlushWriteBatchToDBInternal(bool prepared) {
  241. if (name_.empty()) {
  242. assert(!prepared);
  243. #ifndef NDEBUG
  244. static std::atomic_ullong autogen_id{0};
  245. // To avoid changing all tests to call SetName, just autogenerate one.
  246. if (wupt_db_->txn_db_options_.autogenerate_name) {
  247. auto s = SetName(std::string("autoxid") +
  248. std::to_string(autogen_id.fetch_add(1)));
  249. assert(s.ok());
  250. } else
  251. #endif
  252. {
  253. return Status::InvalidArgument("Cannot write to DB without SetName.");
  254. }
  255. }
  256. struct UntrackedKeyHandler : public WriteBatch::Handler {
  257. WriteUnpreparedTxn* txn_;
  258. bool rollback_merge_operands_;
  259. UntrackedKeyHandler(WriteUnpreparedTxn* txn, bool rollback_merge_operands)
  260. : txn_(txn), rollback_merge_operands_(rollback_merge_operands) {}
  261. Status AddUntrackedKey(uint32_t cf, const Slice& key) {
  262. auto str = key.ToString();
  263. PointLockStatus lock_status =
  264. txn_->tracked_locks_->GetPointLockStatus(cf, str);
  265. if (!lock_status.locked) {
  266. txn_->untracked_keys_[cf].push_back(str);
  267. }
  268. return Status::OK();
  269. }
  270. Status PutCF(uint32_t cf, const Slice& key, const Slice&) override {
  271. return AddUntrackedKey(cf, key);
  272. }
  273. Status DeleteCF(uint32_t cf, const Slice& key) override {
  274. return AddUntrackedKey(cf, key);
  275. }
  276. Status SingleDeleteCF(uint32_t cf, const Slice& key) override {
  277. return AddUntrackedKey(cf, key);
  278. }
  279. Status MergeCF(uint32_t cf, const Slice& key, const Slice&) override {
  280. if (rollback_merge_operands_) {
  281. return AddUntrackedKey(cf, key);
  282. }
  283. return Status::OK();
  284. }
  285. // The only expected 2PC marker is the initial Noop marker.
  286. Status MarkNoop(bool empty_batch) override {
  287. return empty_batch ? Status::OK() : Status::InvalidArgument();
  288. }
  289. Status MarkBeginPrepare(bool) override { return Status::InvalidArgument(); }
  290. Status MarkEndPrepare(const Slice&) override {
  291. return Status::InvalidArgument();
  292. }
  293. Status MarkCommit(const Slice&) override {
  294. return Status::InvalidArgument();
  295. }
  296. Status MarkRollback(const Slice&) override {
  297. return Status::InvalidArgument();
  298. }
  299. };
  300. UntrackedKeyHandler handler(
  301. this, wupt_db_->txn_db_options_.rollback_merge_operands);
  302. auto s = GetWriteBatch()->GetWriteBatch()->Iterate(&handler);
  303. assert(s.ok());
  304. // TODO(lth): Reduce duplicate code with WritePrepared prepare logic.
  305. WriteOptions write_options = write_options_;
  306. write_options.disableWAL = false;
  307. const bool WRITE_AFTER_COMMIT = true;
  308. const bool first_prepare_batch = log_number_ == 0;
  309. // MarkEndPrepare will change Noop marker to the appropriate marker.
  310. s = WriteBatchInternal::MarkEndPrepare(GetWriteBatch()->GetWriteBatch(),
  311. name_, !WRITE_AFTER_COMMIT, !prepared);
  312. assert(s.ok());
  313. // For each duplicate key we account for a new sub-batch
  314. prepare_batch_cnt_ = GetWriteBatch()->SubBatchCnt();
  315. // AddPrepared better to be called in the pre-release callback otherwise there
  316. // is a non-zero chance of max advancing prepare_seq and readers assume the
  317. // data as committed.
  318. // Also having it in the PreReleaseCallback allows in-order addition of
  319. // prepared entries to PreparedHeap and hence enables an optimization. Refer
  320. // to SmallestUnCommittedSeq for more details.
  321. AddPreparedCallback add_prepared_callback(
  322. wpt_db_, db_impl_, prepare_batch_cnt_,
  323. db_impl_->immutable_db_options().two_write_queues, first_prepare_batch);
  324. const bool DISABLE_MEMTABLE = true;
  325. uint64_t seq_used = kMaxSequenceNumber;
  326. // log_number_ should refer to the oldest log containing uncommitted data
  327. // from the current transaction. This means that if log_number_ is set,
  328. // WriteImpl should not overwrite that value, so set wal_used to nullptr if
  329. // log_number_ is already set.
  330. s = db_impl_->WriteImpl(write_options, GetWriteBatch()->GetWriteBatch(),
  331. /*callback*/ nullptr, /*user_write_cb=*/nullptr,
  332. &last_log_number_,
  333. /*log ref*/ 0, !DISABLE_MEMTABLE, &seq_used,
  334. prepare_batch_cnt_, &add_prepared_callback);
  335. if (log_number_ == 0) {
  336. log_number_ = last_log_number_;
  337. }
  338. assert(!s.ok() || seq_used != kMaxSequenceNumber);
  339. auto prepare_seq = seq_used;
  340. // Only call SetId if it hasn't been set yet.
  341. if (GetId() == 0) {
  342. SetId(prepare_seq);
  343. }
  344. // unprep_seqs_ will also contain prepared seqnos since they are treated in
  345. // the same way in the prepare/commit callbacks. See the comment on the
  346. // definition of unprep_seqs_.
  347. if (s.ok()) {
  348. unprep_seqs_[prepare_seq] = prepare_batch_cnt_;
  349. }
  350. // Reset transaction state.
  351. if (!prepared) {
  352. prepare_batch_cnt_ = 0;
  353. const bool kClear = true;
  354. TransactionBaseImpl::InitWriteBatch(kClear);
  355. }
  356. return s;
  357. }
  358. Status WriteUnpreparedTxn::FlushWriteBatchWithSavePointToDB() {
  359. assert(unflushed_save_points_ != nullptr &&
  360. unflushed_save_points_->size() > 0);
  361. assert(save_points_ != nullptr && save_points_->size() > 0);
  362. assert(save_points_->size() >= unflushed_save_points_->size());
  363. // Handler class for creating an unprepared batch from a savepoint.
  364. struct SavePointBatchHandler : public WriteBatch::Handler {
  365. WriteBatchWithIndex* wb_;
  366. const std::map<uint32_t, ColumnFamilyHandle*>& handles_;
  367. SavePointBatchHandler(
  368. WriteBatchWithIndex* wb,
  369. const std::map<uint32_t, ColumnFamilyHandle*>& handles)
  370. : wb_(wb), handles_(handles) {}
  371. Status PutCF(uint32_t cf, const Slice& key, const Slice& value) override {
  372. return wb_->Put(handles_.at(cf), key, value);
  373. }
  374. Status DeleteCF(uint32_t cf, const Slice& key) override {
  375. return wb_->Delete(handles_.at(cf), key);
  376. }
  377. Status SingleDeleteCF(uint32_t cf, const Slice& key) override {
  378. return wb_->SingleDelete(handles_.at(cf), key);
  379. }
  380. Status MergeCF(uint32_t cf, const Slice& key, const Slice& value) override {
  381. return wb_->Merge(handles_.at(cf), key, value);
  382. }
  383. // The only expected 2PC marker is the initial Noop marker.
  384. Status MarkNoop(bool empty_batch) override {
  385. return empty_batch ? Status::OK() : Status::InvalidArgument();
  386. }
  387. Status MarkBeginPrepare(bool) override { return Status::InvalidArgument(); }
  388. Status MarkEndPrepare(const Slice&) override {
  389. return Status::InvalidArgument();
  390. }
  391. Status MarkCommit(const Slice&) override {
  392. return Status::InvalidArgument();
  393. }
  394. Status MarkRollback(const Slice&) override {
  395. return Status::InvalidArgument();
  396. }
  397. };
  398. // The comparator of the default cf is passed in, similar to the
  399. // initialization of TransactionBaseImpl::write_batch_. This comparator is
  400. // only used if the write batch encounters an invalid cf id, and falls back to
  401. // this comparator.
  402. WriteBatchWithIndex wb(wpt_db_->DefaultColumnFamily()->GetComparator(), 0,
  403. true, 0, write_options_.protection_bytes_per_key);
  404. // Swap with write_batch_ so that wb contains the complete write batch. The
  405. // actual write batch that will be flushed to DB will be built in
  406. // write_batch_, and will be read by FlushWriteBatchToDBInternal.
  407. std::swap(wb, write_batch_);
  408. TransactionBaseImpl::InitWriteBatch();
  409. size_t prev_boundary = WriteBatchInternal::kHeader;
  410. const bool kPrepared = true;
  411. for (size_t i = 0; i < unflushed_save_points_->size() + 1; i++) {
  412. bool trailing_batch = i == unflushed_save_points_->size();
  413. SavePointBatchHandler sp_handler(&write_batch_,
  414. *wupt_db_->GetCFHandleMap().get());
  415. size_t curr_boundary = trailing_batch ? wb.GetWriteBatch()->GetDataSize()
  416. : (*unflushed_save_points_)[i];
  417. // Construct the partial write batch up to the savepoint.
  418. //
  419. // Theoretically, a memcpy between the write batches should be sufficient
  420. // since the rewriting into the batch should produce the exact same byte
  421. // representation. Rebuilding the WriteBatchWithIndex index is still
  422. // necessary though, and would imply doing two passes over the batch though.
  423. Status s = WriteBatchInternal::Iterate(wb.GetWriteBatch(), &sp_handler,
  424. prev_boundary, curr_boundary);
  425. if (!s.ok()) {
  426. return s;
  427. }
  428. if (write_batch_.GetWriteBatch()->Count() > 0) {
  429. // Flush the write batch.
  430. s = FlushWriteBatchToDBInternal(!kPrepared);
  431. if (!s.ok()) {
  432. return s;
  433. }
  434. }
  435. if (!trailing_batch) {
  436. if (flushed_save_points_ == nullptr) {
  437. flushed_save_points_.reset(
  438. new autovector<WriteUnpreparedTxn::SavePoint>());
  439. }
  440. flushed_save_points_->emplace_back(
  441. unprep_seqs_, new ManagedSnapshot(db_impl_, wupt_db_->GetSnapshot()));
  442. }
  443. prev_boundary = curr_boundary;
  444. const bool kClear = true;
  445. TransactionBaseImpl::InitWriteBatch(kClear);
  446. }
  447. unflushed_save_points_->clear();
  448. return Status::OK();
  449. }
  450. Status WriteUnpreparedTxn::PrepareInternal() {
  451. const bool kPrepared = true;
  452. return FlushWriteBatchToDB(kPrepared);
  453. }
  454. Status WriteUnpreparedTxn::CommitWithoutPrepareInternal() {
  455. if (unprep_seqs_.empty()) {
  456. assert(log_number_ == 0);
  457. assert(GetId() == 0);
  458. return WritePreparedTxn::CommitWithoutPrepareInternal();
  459. }
  460. // TODO(lth): We should optimize commit without prepare to not perform
  461. // a prepare under the hood.
  462. auto s = PrepareInternal();
  463. if (!s.ok()) {
  464. return s;
  465. }
  466. return CommitInternal();
  467. }
  468. Status WriteUnpreparedTxn::CommitInternal() {
  469. // TODO(lth): Reduce duplicate code with WritePrepared commit logic.
  470. // We take the commit-time batch and append the Commit marker. The Memtable
  471. // will ignore the Commit marker in non-recovery mode
  472. WriteBatch* working_batch = GetCommitTimeWriteBatch();
  473. const bool empty = working_batch->Count() == 0;
  474. auto s = WriteBatchInternal::MarkCommit(working_batch, name_);
  475. assert(s.ok());
  476. const bool for_recovery = use_only_the_last_commit_time_batch_for_recovery_;
  477. if (!empty) {
  478. // When not writing to memtable, we can still cache the latest write batch.
  479. // The cached batch will be written to memtable in WriteRecoverableState
  480. // during FlushMemTable
  481. if (for_recovery) {
  482. WriteBatchInternal::SetAsLatestPersistentState(working_batch);
  483. } else {
  484. return Status::InvalidArgument(
  485. "Commit-time-batch can only be used if "
  486. "use_only_the_last_commit_time_batch_for_recovery is true");
  487. }
  488. }
  489. const bool includes_data = !empty && !for_recovery;
  490. size_t commit_batch_cnt = 0;
  491. if (UNLIKELY(includes_data)) {
  492. ROCKS_LOG_WARN(db_impl_->immutable_db_options().info_log,
  493. "Duplicate key overhead");
  494. SubBatchCounter counter(*wpt_db_->GetCFComparatorMap());
  495. s = working_batch->Iterate(&counter);
  496. assert(s.ok());
  497. commit_batch_cnt = counter.BatchCount();
  498. }
  499. const bool disable_memtable = !includes_data;
  500. const bool do_one_write =
  501. !db_impl_->immutable_db_options().two_write_queues || disable_memtable;
  502. WriteUnpreparedCommitEntryPreReleaseCallback update_commit_map(
  503. wpt_db_, db_impl_, unprep_seqs_, commit_batch_cnt);
  504. const bool kFirstPrepareBatch = true;
  505. AddPreparedCallback add_prepared_callback(
  506. wpt_db_, db_impl_, commit_batch_cnt,
  507. db_impl_->immutable_db_options().two_write_queues, !kFirstPrepareBatch);
  508. PreReleaseCallback* pre_release_callback;
  509. if (do_one_write) {
  510. pre_release_callback = &update_commit_map;
  511. } else {
  512. pre_release_callback = &add_prepared_callback;
  513. }
  514. uint64_t seq_used = kMaxSequenceNumber;
  515. // Since the prepared batch is directly written to memtable, there is
  516. // already a connection between the memtable and its WAL, so there is no
  517. // need to redundantly reference the log that contains the prepared data.
  518. const uint64_t zero_log_number = 0ull;
  519. size_t batch_cnt = UNLIKELY(commit_batch_cnt) ? commit_batch_cnt : 1;
  520. s = db_impl_->WriteImpl(write_options_, working_batch, nullptr, nullptr,
  521. nullptr, zero_log_number, disable_memtable, &seq_used,
  522. batch_cnt, pre_release_callback);
  523. assert(!s.ok() || seq_used != kMaxSequenceNumber);
  524. const SequenceNumber commit_batch_seq = seq_used;
  525. if (LIKELY(do_one_write || !s.ok())) {
  526. if (LIKELY(s.ok())) {
  527. // Note RemovePrepared should be called after WriteImpl that publishsed
  528. // the seq. Otherwise SmallestUnCommittedSeq optimization breaks.
  529. for (const auto& seq : unprep_seqs_) {
  530. wpt_db_->RemovePrepared(seq.first, seq.second);
  531. }
  532. }
  533. if (UNLIKELY(!do_one_write)) {
  534. wpt_db_->RemovePrepared(commit_batch_seq, commit_batch_cnt);
  535. }
  536. unprep_seqs_.clear();
  537. flushed_save_points_.reset(nullptr);
  538. unflushed_save_points_.reset(nullptr);
  539. return s;
  540. } // else do the 2nd write to publish seq
  541. // Populate unprep_seqs_ with commit_batch_seq, since we treat data in the
  542. // commit write batch as just another "unprepared" batch. This will also
  543. // update the unprep_seqs_ in the update_commit_map callback.
  544. unprep_seqs_[commit_batch_seq] = commit_batch_cnt;
  545. WriteUnpreparedCommitEntryPreReleaseCallback
  546. update_commit_map_with_commit_batch(wpt_db_, db_impl_, unprep_seqs_, 0);
  547. // Note: the 2nd write comes with a performance penality. So if we have too
  548. // many of commits accompanied with ComitTimeWriteBatch and yet we cannot
  549. // enable use_only_the_last_commit_time_batch_for_recovery_ optimization,
  550. // two_write_queues should be disabled to avoid many additional writes here.
  551. // Update commit map only from the 2nd queue
  552. WriteBatch empty_batch;
  553. s = empty_batch.PutLogData(Slice());
  554. assert(s.ok());
  555. // In the absence of Prepare markers, use Noop as a batch separator
  556. s = WriteBatchInternal::InsertNoop(&empty_batch);
  557. assert(s.ok());
  558. const bool DISABLE_MEMTABLE = true;
  559. const size_t ONE_BATCH = 1;
  560. const uint64_t NO_REF_LOG = 0;
  561. s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr, nullptr,
  562. nullptr, NO_REF_LOG, DISABLE_MEMTABLE, &seq_used,
  563. ONE_BATCH, &update_commit_map_with_commit_batch);
  564. assert(!s.ok() || seq_used != kMaxSequenceNumber);
  565. // Note RemovePrepared should be called after WriteImpl that publishsed the
  566. // seq. Otherwise SmallestUnCommittedSeq optimization breaks.
  567. for (const auto& seq : unprep_seqs_) {
  568. wpt_db_->RemovePrepared(seq.first, seq.second);
  569. }
  570. unprep_seqs_.clear();
  571. flushed_save_points_.reset(nullptr);
  572. unflushed_save_points_.reset(nullptr);
  573. return s;
  574. }
  575. Status WriteUnpreparedTxn::WriteRollbackKeys(
  576. const LockTracker& lock_tracker, WriteBatchWithIndex* rollback_batch,
  577. ReadCallback* callback, const ReadOptions& roptions) {
  578. // This assertion can be removed when range lock is supported.
  579. assert(lock_tracker.IsPointLockSupported());
  580. const auto& cf_map = *wupt_db_->GetCFHandleMap();
  581. auto WriteRollbackKey = [&](const std::string& key, uint32_t cfid) {
  582. const auto& cf_handle = cf_map.at(cfid);
  583. PinnableSlice pinnable_val;
  584. bool not_used;
  585. DBImpl::GetImplOptions get_impl_options;
  586. get_impl_options.column_family = cf_handle;
  587. get_impl_options.value = &pinnable_val;
  588. get_impl_options.value_found = &not_used;
  589. get_impl_options.callback = callback;
  590. auto s = db_impl_->GetImpl(roptions, key, get_impl_options);
  591. if (s.ok()) {
  592. s = rollback_batch->Put(cf_handle, key, pinnable_val);
  593. assert(s.ok());
  594. } else if (s.IsNotFound()) {
  595. if (wupt_db_->ShouldRollbackWithSingleDelete(cf_handle, key)) {
  596. s = rollback_batch->SingleDelete(cf_handle, key);
  597. } else {
  598. s = rollback_batch->Delete(cf_handle, key);
  599. }
  600. assert(s.ok());
  601. } else {
  602. return s;
  603. }
  604. return Status::OK();
  605. };
  606. std::unique_ptr<LockTracker::ColumnFamilyIterator> cf_it(
  607. lock_tracker.GetColumnFamilyIterator());
  608. assert(cf_it != nullptr);
  609. while (cf_it->HasNext()) {
  610. ColumnFamilyId cf = cf_it->Next();
  611. std::unique_ptr<LockTracker::KeyIterator> key_it(
  612. lock_tracker.GetKeyIterator(cf));
  613. assert(key_it != nullptr);
  614. while (key_it->HasNext()) {
  615. const std::string& key = key_it->Next();
  616. auto s = WriteRollbackKey(key, cf);
  617. if (!s.ok()) {
  618. return s;
  619. }
  620. }
  621. }
  622. for (const auto& cfkey : untracked_keys_) {
  623. const auto cfid = cfkey.first;
  624. const auto& keys = cfkey.second;
  625. for (const auto& key : keys) {
  626. auto s = WriteRollbackKey(key, cfid);
  627. if (!s.ok()) {
  628. return s;
  629. }
  630. }
  631. }
  632. return Status::OK();
  633. }
  634. Status WriteUnpreparedTxn::RollbackInternal() {
  635. // TODO(lth): Reduce duplicate code with WritePrepared rollback logic.
  636. WriteBatchWithIndex rollback_batch(
  637. wpt_db_->DefaultColumnFamily()->GetComparator(), 0, true, 0,
  638. write_options_.protection_bytes_per_key);
  639. assert(GetId() != kMaxSequenceNumber);
  640. assert(GetId() > 0);
  641. Status s;
  642. auto read_at_seq = kMaxSequenceNumber;
  643. // TODO: plumb Env::IOActivity, Env::IOPriority
  644. ReadOptions roptions;
  645. // to prevent callback's seq to be overrriden inside DBImpk::Get
  646. roptions.snapshot = wpt_db_->GetMaxSnapshot();
  647. // Note that we do not use WriteUnpreparedTxnReadCallback because we do not
  648. // need to read our own writes when reading prior versions of the key for
  649. // rollback.
  650. WritePreparedTxnReadCallback callback(wpt_db_, read_at_seq);
  651. // TODO(lth): We write rollback batch all in a single batch here, but this
  652. // should be subdivded into multiple batches as well. In phase 2, when key
  653. // sets are read from WAL, this will happen naturally.
  654. s = WriteRollbackKeys(*tracked_locks_, &rollback_batch, &callback, roptions);
  655. if (!s.ok()) {
  656. return s;
  657. }
  658. // The Rollback marker will be used as a batch separator
  659. s = WriteBatchInternal::MarkRollback(rollback_batch.GetWriteBatch(), name_);
  660. assert(s.ok());
  661. bool do_one_write = !db_impl_->immutable_db_options().two_write_queues;
  662. const bool DISABLE_MEMTABLE = true;
  663. const uint64_t NO_REF_LOG = 0;
  664. uint64_t seq_used = kMaxSequenceNumber;
  665. // Rollback batch may contain duplicate keys, because tracked_keys_ is not
  666. // comparator aware.
  667. auto rollback_batch_cnt = rollback_batch.SubBatchCnt();
  668. // We commit the rolled back prepared batches. Although this is
  669. // counter-intuitive, i) it is safe to do so, since the prepared batches are
  670. // already canceled out by the rollback batch, ii) adding the commit entry to
  671. // CommitCache will allow us to benefit from the existing mechanism in
  672. // CommitCache that keeps an entry evicted due to max advance and yet overlaps
  673. // with a live snapshot around so that the live snapshot properly skips the
  674. // entry even if its prepare seq is lower than max_evicted_seq_.
  675. //
  676. // TODO(lth): RollbackInternal is conceptually very similar to
  677. // CommitInternal, with the rollback batch simply taking on the role of
  678. // CommitTimeWriteBatch. We should be able to merge the two code paths.
  679. WriteUnpreparedCommitEntryPreReleaseCallback update_commit_map(
  680. wpt_db_, db_impl_, unprep_seqs_, rollback_batch_cnt);
  681. // Note: the rollback batch does not need AddPrepared since it is written to
  682. // DB in one shot. min_uncommitted still works since it requires capturing
  683. // data that is written to DB but not yet committed, while the rollback
  684. // batch commits with PreReleaseCallback.
  685. s = db_impl_->WriteImpl(write_options_, rollback_batch.GetWriteBatch(),
  686. nullptr, nullptr, nullptr, NO_REF_LOG,
  687. !DISABLE_MEMTABLE, &seq_used, rollback_batch_cnt,
  688. do_one_write ? &update_commit_map : nullptr);
  689. assert(!s.ok() || seq_used != kMaxSequenceNumber);
  690. if (!s.ok()) {
  691. return s;
  692. }
  693. if (do_one_write) {
  694. for (const auto& seq : unprep_seqs_) {
  695. wpt_db_->RemovePrepared(seq.first, seq.second);
  696. }
  697. unprep_seqs_.clear();
  698. flushed_save_points_.reset(nullptr);
  699. unflushed_save_points_.reset(nullptr);
  700. return s;
  701. } // else do the 2nd write for commit
  702. uint64_t& prepare_seq = seq_used;
  703. // Populate unprep_seqs_ with rollback_batch_cnt, since we treat data in the
  704. // rollback write batch as just another "unprepared" batch. This will also
  705. // update the unprep_seqs_ in the update_commit_map callback.
  706. unprep_seqs_[prepare_seq] = rollback_batch_cnt;
  707. WriteUnpreparedCommitEntryPreReleaseCallback
  708. update_commit_map_with_rollback_batch(wpt_db_, db_impl_, unprep_seqs_, 0);
  709. ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log,
  710. "RollbackInternal 2nd write prepare_seq: %" PRIu64,
  711. prepare_seq);
  712. WriteBatch empty_batch;
  713. const size_t ONE_BATCH = 1;
  714. s = empty_batch.PutLogData(Slice());
  715. assert(s.ok());
  716. // In the absence of Prepare markers, use Noop as a batch separator
  717. s = WriteBatchInternal::InsertNoop(&empty_batch);
  718. assert(s.ok());
  719. s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr, nullptr,
  720. nullptr, NO_REF_LOG, DISABLE_MEMTABLE, &seq_used,
  721. ONE_BATCH, &update_commit_map_with_rollback_batch);
  722. assert(!s.ok() || seq_used != kMaxSequenceNumber);
  723. // Mark the txn as rolled back
  724. if (s.ok()) {
  725. for (const auto& seq : unprep_seqs_) {
  726. wpt_db_->RemovePrepared(seq.first, seq.second);
  727. }
  728. }
  729. unprep_seqs_.clear();
  730. flushed_save_points_.reset(nullptr);
  731. unflushed_save_points_.reset(nullptr);
  732. return s;
  733. }
  734. void WriteUnpreparedTxn::Clear() {
  735. if (!recovered_txn_) {
  736. txn_db_impl_->UnLock(this, *tracked_locks_);
  737. }
  738. unprep_seqs_.clear();
  739. flushed_save_points_.reset(nullptr);
  740. unflushed_save_points_.reset(nullptr);
  741. recovered_txn_ = false;
  742. largest_validated_seq_ = 0;
  743. for (auto& it : active_iterators_) {
  744. auto bdit = static_cast<BaseDeltaIterator*>(it);
  745. bdit->Invalidate(Status::InvalidArgument(
  746. "Cannot use iterator after transaction has finished"));
  747. }
  748. active_iterators_.clear();
  749. untracked_keys_.clear();
  750. TransactionBaseImpl::Clear();
  751. }
  752. void WriteUnpreparedTxn::SetSavePoint() {
  753. assert((unflushed_save_points_ ? unflushed_save_points_->size() : 0) +
  754. (flushed_save_points_ ? flushed_save_points_->size() : 0) ==
  755. (save_points_ ? save_points_->size() : 0));
  756. PessimisticTransaction::SetSavePoint();
  757. if (unflushed_save_points_ == nullptr) {
  758. unflushed_save_points_.reset(new autovector<size_t>());
  759. }
  760. unflushed_save_points_->push_back(write_batch_.GetDataSize());
  761. }
  762. Status WriteUnpreparedTxn::RollbackToSavePoint() {
  763. assert((unflushed_save_points_ ? unflushed_save_points_->size() : 0) +
  764. (flushed_save_points_ ? flushed_save_points_->size() : 0) ==
  765. (save_points_ ? save_points_->size() : 0));
  766. if (unflushed_save_points_ != nullptr && unflushed_save_points_->size() > 0) {
  767. Status s = PessimisticTransaction::RollbackToSavePoint();
  768. assert(!s.IsNotFound());
  769. unflushed_save_points_->pop_back();
  770. return s;
  771. }
  772. if (flushed_save_points_ != nullptr && !flushed_save_points_->empty()) {
  773. return RollbackToSavePointInternal();
  774. }
  775. return Status::NotFound();
  776. }
  777. Status WriteUnpreparedTxn::RollbackToSavePointInternal() {
  778. Status s;
  779. const bool kClear = true;
  780. TransactionBaseImpl::InitWriteBatch(kClear);
  781. assert(flushed_save_points_->size() > 0);
  782. WriteUnpreparedTxn::SavePoint& top = flushed_save_points_->back();
  783. assert(save_points_ != nullptr && save_points_->size() > 0);
  784. const LockTracker& tracked_keys = *save_points_->top().new_locks_;
  785. // TODO: plumb Env::IOActivity, Env::IOPriority
  786. ReadOptions roptions;
  787. roptions.snapshot = top.snapshot_->snapshot();
  788. SequenceNumber min_uncommitted =
  789. static_cast_with_check<const SnapshotImpl>(roptions.snapshot)
  790. ->min_uncommitted_;
  791. SequenceNumber snap_seq = roptions.snapshot->GetSequenceNumber();
  792. WriteUnpreparedTxnReadCallback callback(wupt_db_, snap_seq, min_uncommitted,
  793. top.unprep_seqs_,
  794. kBackedByDBSnapshot);
  795. s = WriteRollbackKeys(tracked_keys, &write_batch_, &callback, roptions);
  796. if (!s.ok()) {
  797. return s;
  798. }
  799. const bool kPrepared = true;
  800. s = FlushWriteBatchToDBInternal(!kPrepared);
  801. if (!s.ok()) {
  802. return s;
  803. }
  804. // PessimisticTransaction::RollbackToSavePoint will call also call
  805. // RollbackToSavepoint on write_batch_. However, write_batch_ is empty and has
  806. // no savepoints because this savepoint has already been flushed. Work around
  807. // this by setting a fake savepoint.
  808. write_batch_.SetSavePoint();
  809. s = PessimisticTransaction::RollbackToSavePoint();
  810. assert(s.ok());
  811. if (!s.ok()) {
  812. return s;
  813. }
  814. flushed_save_points_->pop_back();
  815. return s;
  816. }
  817. Status WriteUnpreparedTxn::PopSavePoint() {
  818. assert((unflushed_save_points_ ? unflushed_save_points_->size() : 0) +
  819. (flushed_save_points_ ? flushed_save_points_->size() : 0) ==
  820. (save_points_ ? save_points_->size() : 0));
  821. if (unflushed_save_points_ != nullptr && unflushed_save_points_->size() > 0) {
  822. Status s = PessimisticTransaction::PopSavePoint();
  823. assert(!s.IsNotFound());
  824. unflushed_save_points_->pop_back();
  825. return s;
  826. }
  827. if (flushed_save_points_ != nullptr && !flushed_save_points_->empty()) {
  828. // PessimisticTransaction::PopSavePoint will call also call PopSavePoint on
  829. // write_batch_. However, write_batch_ is empty and has no savepoints
  830. // because this savepoint has already been flushed. Work around this by
  831. // setting a fake savepoint.
  832. write_batch_.SetSavePoint();
  833. Status s = PessimisticTransaction::PopSavePoint();
  834. assert(!s.IsNotFound());
  835. flushed_save_points_->pop_back();
  836. return s;
  837. }
  838. return Status::NotFound();
  839. }
  840. void WriteUnpreparedTxn::MultiGet(const ReadOptions& _read_options,
  841. ColumnFamilyHandle* column_family,
  842. const size_t num_keys, const Slice* keys,
  843. PinnableSlice* values, Status* statuses,
  844. const bool sorted_input) {
  845. if (_read_options.io_activity != Env::IOActivity::kUnknown &&
  846. _read_options.io_activity != Env::IOActivity::kMultiGet) {
  847. Status s = Status::InvalidArgument(
  848. "Can only call MultiGet with `ReadOptions::io_activity` is "
  849. "`Env::IOActivity::kUnknown` or `Env::IOActivity::kMultiGet`");
  850. for (size_t i = 0; i < num_keys; ++i) {
  851. if (statuses[i].ok()) {
  852. statuses[i] = s;
  853. }
  854. }
  855. return;
  856. }
  857. ReadOptions read_options(_read_options);
  858. if (read_options.io_activity == Env::IOActivity::kUnknown) {
  859. read_options.io_activity = Env::IOActivity::kMultiGet;
  860. }
  861. SequenceNumber min_uncommitted, snap_seq;
  862. const SnapshotBackup backed_by_snapshot = wupt_db_->AssignMinMaxSeqs(
  863. read_options.snapshot, &min_uncommitted, &snap_seq);
  864. WriteUnpreparedTxnReadCallback callback(wupt_db_, snap_seq, min_uncommitted,
  865. unprep_seqs_, backed_by_snapshot);
  866. write_batch_.MultiGetFromBatchAndDB(db_, read_options, column_family,
  867. num_keys, keys, values, statuses,
  868. sorted_input, &callback);
  869. if (UNLIKELY(!callback.valid() ||
  870. !wupt_db_->ValidateSnapshot(snap_seq, backed_by_snapshot))) {
  871. wupt_db_->WPRecordTick(TXN_GET_TRY_AGAIN);
  872. for (size_t i = 0; i < num_keys; i++) {
  873. statuses[i] = Status::TryAgain();
  874. }
  875. }
  876. }
  877. Status WriteUnpreparedTxn::Get(const ReadOptions& _read_options,
  878. ColumnFamilyHandle* column_family,
  879. const Slice& key, PinnableSlice* value) {
  880. if (_read_options.io_activity != Env::IOActivity::kUnknown &&
  881. _read_options.io_activity != Env::IOActivity::kGet) {
  882. return Status::InvalidArgument(
  883. "Can only call Get with `ReadOptions::io_activity` is "
  884. "`Env::IOActivity::kUnknown` or `Env::IOActivity::kGet`");
  885. }
  886. ReadOptions read_options(_read_options);
  887. if (read_options.io_activity == Env::IOActivity::kUnknown) {
  888. read_options.io_activity = Env::IOActivity::kGet;
  889. }
  890. return GetImpl(read_options, column_family, key, value);
  891. }
  892. Status WriteUnpreparedTxn::GetImpl(const ReadOptions& options,
  893. ColumnFamilyHandle* column_family,
  894. const Slice& key, PinnableSlice* value) {
  895. SequenceNumber min_uncommitted, snap_seq;
  896. const SnapshotBackup backed_by_snapshot =
  897. wupt_db_->AssignMinMaxSeqs(options.snapshot, &min_uncommitted, &snap_seq);
  898. WriteUnpreparedTxnReadCallback callback(wupt_db_, snap_seq, min_uncommitted,
  899. unprep_seqs_, backed_by_snapshot);
  900. auto res = write_batch_.GetFromBatchAndDB(db_, options, column_family, key,
  901. value, &callback);
  902. if (LIKELY(callback.valid() &&
  903. wupt_db_->ValidateSnapshot(snap_seq, backed_by_snapshot))) {
  904. return res;
  905. } else {
  906. res.PermitUncheckedError();
  907. wupt_db_->WPRecordTick(TXN_GET_TRY_AGAIN);
  908. return Status::TryAgain();
  909. }
  910. }
  911. namespace {
  912. static void CleanupWriteUnpreparedWBWIIterator(void* arg1, void* arg2) {
  913. auto txn = static_cast<WriteUnpreparedTxn*>(arg1);
  914. auto iter = static_cast<Iterator*>(arg2);
  915. txn->RemoveActiveIterator(iter);
  916. }
  917. } // anonymous namespace
  918. Iterator* WriteUnpreparedTxn::GetIterator(const ReadOptions& options) {
  919. return GetIterator(options, wupt_db_->DefaultColumnFamily());
  920. }
  921. Iterator* WriteUnpreparedTxn::GetIterator(const ReadOptions& options,
  922. ColumnFamilyHandle* column_family) {
  923. // Make sure to get iterator from WriteUnprepareTxnDB, not the root db.
  924. Iterator* db_iter = wupt_db_->NewIterator(options, column_family, this);
  925. assert(db_iter);
  926. auto iter =
  927. write_batch_.NewIteratorWithBase(column_family, db_iter, &options);
  928. active_iterators_.push_back(iter);
  929. iter->RegisterCleanup(CleanupWriteUnpreparedWBWIIterator, this, iter);
  930. return iter;
  931. }
  932. Status WriteUnpreparedTxn::ValidateSnapshot(ColumnFamilyHandle* column_family,
  933. const Slice& key,
  934. SequenceNumber* tracked_at_seq) {
  935. // TODO(lth): Reduce duplicate code with WritePrepared ValidateSnapshot logic.
  936. assert(snapshot_);
  937. SequenceNumber min_uncommitted =
  938. static_cast_with_check<const SnapshotImpl>(snapshot_.get())
  939. ->min_uncommitted_;
  940. SequenceNumber snap_seq = snapshot_->GetSequenceNumber();
  941. // tracked_at_seq is either max or the last snapshot with which this key was
  942. // trackeed so there is no need to apply the IsInSnapshot to this comparison
  943. // here as tracked_at_seq is not a prepare seq.
  944. if (*tracked_at_seq <= snap_seq) {
  945. // If the key has been previous validated at a sequence number earlier
  946. // than the curent snapshot's sequence number, we already know it has not
  947. // been modified.
  948. return Status::OK();
  949. }
  950. *tracked_at_seq = snap_seq;
  951. ColumnFamilyHandle* cfh =
  952. column_family ? column_family : db_impl_->DefaultColumnFamily();
  953. WriteUnpreparedTxnReadCallback snap_checker(
  954. wupt_db_, snap_seq, min_uncommitted, unprep_seqs_, kBackedByDBSnapshot);
  955. // TODO(yanqin): Support user-defined timestamp.
  956. return TransactionUtil::CheckKeyForConflicts(
  957. db_impl_, cfh, key.ToString(), snap_seq, /*ts=*/nullptr,
  958. false /* cache_only */, &snap_checker, min_uncommitted,
  959. txn_db_impl_->GetTxnDBOptions().enable_udt_validation);
  960. }
  961. const std::map<SequenceNumber, size_t>&
  962. WriteUnpreparedTxn::GetUnpreparedSequenceNumbers() {
  963. return unprep_seqs_;
  964. }
  965. } // namespace ROCKSDB_NAMESPACE