write_unprepared_txn.cc 37 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #ifndef ROCKSDB_LITE
  6. #include "utilities/transactions/write_unprepared_txn.h"
  7. #include "db/db_impl/db_impl.h"
  8. #include "util/cast_util.h"
  9. #include "utilities/transactions/write_unprepared_txn_db.h"
  10. namespace ROCKSDB_NAMESPACE {
  11. bool WriteUnpreparedTxnReadCallback::IsVisibleFullCheck(SequenceNumber seq) {
  12. // Since unprep_seqs maps prep_seq => prepare_batch_cnt, to check if seq is
  13. // in unprep_seqs, we have to check if seq is equal to prep_seq or any of
  14. // the prepare_batch_cnt seq nums after it.
  15. //
  16. // TODO(lth): Can be optimized with std::lower_bound if unprep_seqs is
  17. // large.
  18. for (const auto& it : unprep_seqs_) {
  19. if (it.first <= seq && seq < it.first + it.second) {
  20. return true;
  21. }
  22. }
  23. bool snap_released = false;
  24. auto ret =
  25. db_->IsInSnapshot(seq, wup_snapshot_, min_uncommitted_, &snap_released);
  26. assert(!snap_released || backed_by_snapshot_ == kUnbackedByDBSnapshot);
  27. snap_released_ |= snap_released;
  28. return ret;
  29. }
  30. WriteUnpreparedTxn::WriteUnpreparedTxn(WriteUnpreparedTxnDB* txn_db,
  31. const WriteOptions& write_options,
  32. const TransactionOptions& txn_options)
  33. : WritePreparedTxn(txn_db, write_options, txn_options),
  34. wupt_db_(txn_db),
  35. last_log_number_(0),
  36. recovered_txn_(false),
  37. largest_validated_seq_(0) {
  38. if (txn_options.write_batch_flush_threshold < 0) {
  39. write_batch_flush_threshold_ =
  40. txn_db_impl_->GetTxnDBOptions().default_write_batch_flush_threshold;
  41. } else {
  42. write_batch_flush_threshold_ = txn_options.write_batch_flush_threshold;
  43. }
  44. }
  45. WriteUnpreparedTxn::~WriteUnpreparedTxn() {
  46. if (!unprep_seqs_.empty()) {
  47. assert(log_number_ > 0);
  48. assert(GetId() > 0);
  49. assert(!name_.empty());
  50. // We should rollback regardless of GetState, but some unit tests that
  51. // test crash recovery run the destructor assuming that rollback does not
  52. // happen, so that rollback during recovery can be exercised.
  53. if (GetState() == STARTED || GetState() == LOCKS_STOLEN) {
  54. auto s = RollbackInternal();
  55. assert(s.ok());
  56. if (!s.ok()) {
  57. ROCKS_LOG_FATAL(
  58. wupt_db_->info_log_,
  59. "Rollback of WriteUnprepared transaction failed in destructor: %s",
  60. s.ToString().c_str());
  61. }
  62. dbimpl_->logs_with_prep_tracker()->MarkLogAsHavingPrepSectionFlushed(
  63. log_number_);
  64. }
  65. }
  66. // Call tracked_keys_.clear() so that ~PessimisticTransaction does not
  67. // try to unlock keys for recovered transactions.
  68. if (recovered_txn_) {
  69. tracked_keys_.clear();
  70. }
  71. }
  72. void WriteUnpreparedTxn::Initialize(const TransactionOptions& txn_options) {
  73. PessimisticTransaction::Initialize(txn_options);
  74. if (txn_options.write_batch_flush_threshold < 0) {
  75. write_batch_flush_threshold_ =
  76. txn_db_impl_->GetTxnDBOptions().default_write_batch_flush_threshold;
  77. } else {
  78. write_batch_flush_threshold_ = txn_options.write_batch_flush_threshold;
  79. }
  80. unprep_seqs_.clear();
  81. flushed_save_points_.reset(nullptr);
  82. unflushed_save_points_.reset(nullptr);
  83. recovered_txn_ = false;
  84. largest_validated_seq_ = 0;
  85. assert(active_iterators_.empty());
  86. active_iterators_.clear();
  87. untracked_keys_.clear();
  88. }
  89. Status WriteUnpreparedTxn::HandleWrite(std::function<Status()> do_write) {
  90. Status s;
  91. if (active_iterators_.empty()) {
  92. s = MaybeFlushWriteBatchToDB();
  93. if (!s.ok()) {
  94. return s;
  95. }
  96. }
  97. s = do_write();
  98. if (s.ok()) {
  99. if (snapshot_) {
  100. largest_validated_seq_ =
  101. std::max(largest_validated_seq_, snapshot_->GetSequenceNumber());
  102. } else {
  103. // TODO(lth): We should use the same number as tracked_at_seq in TryLock,
  104. // because what is actually being tracked is the sequence number at which
  105. // this key was locked at.
  106. largest_validated_seq_ = db_impl_->GetLastPublishedSequence();
  107. }
  108. }
  109. return s;
  110. }
  111. Status WriteUnpreparedTxn::Put(ColumnFamilyHandle* column_family,
  112. const Slice& key, const Slice& value,
  113. const bool assume_tracked) {
  114. return HandleWrite([&]() {
  115. return TransactionBaseImpl::Put(column_family, key, value, assume_tracked);
  116. });
  117. }
  118. Status WriteUnpreparedTxn::Put(ColumnFamilyHandle* column_family,
  119. const SliceParts& key, const SliceParts& value,
  120. const bool assume_tracked) {
  121. return HandleWrite([&]() {
  122. return TransactionBaseImpl::Put(column_family, key, value, assume_tracked);
  123. });
  124. }
  125. Status WriteUnpreparedTxn::Merge(ColumnFamilyHandle* column_family,
  126. const Slice& key, const Slice& value,
  127. const bool assume_tracked) {
  128. return HandleWrite([&]() {
  129. return TransactionBaseImpl::Merge(column_family, key, value,
  130. assume_tracked);
  131. });
  132. }
  133. Status WriteUnpreparedTxn::Delete(ColumnFamilyHandle* column_family,
  134. const Slice& key, const bool assume_tracked) {
  135. return HandleWrite([&]() {
  136. return TransactionBaseImpl::Delete(column_family, key, assume_tracked);
  137. });
  138. }
  139. Status WriteUnpreparedTxn::Delete(ColumnFamilyHandle* column_family,
  140. const SliceParts& key,
  141. const bool assume_tracked) {
  142. return HandleWrite([&]() {
  143. return TransactionBaseImpl::Delete(column_family, key, assume_tracked);
  144. });
  145. }
  146. Status WriteUnpreparedTxn::SingleDelete(ColumnFamilyHandle* column_family,
  147. const Slice& key,
  148. const bool assume_tracked) {
  149. return HandleWrite([&]() {
  150. return TransactionBaseImpl::SingleDelete(column_family, key,
  151. assume_tracked);
  152. });
  153. }
  154. Status WriteUnpreparedTxn::SingleDelete(ColumnFamilyHandle* column_family,
  155. const SliceParts& key,
  156. const bool assume_tracked) {
  157. return HandleWrite([&]() {
  158. return TransactionBaseImpl::SingleDelete(column_family, key,
  159. assume_tracked);
  160. });
  161. }
  162. // WriteUnpreparedTxn::RebuildFromWriteBatch is only called on recovery. For
  163. // WriteUnprepared, the write batches have already been written into the
  164. // database during WAL replay, so all we have to do is just to "retrack" the key
  165. // so that rollbacks are possible.
  166. //
  167. // Calling TryLock instead of TrackKey is also possible, but as an optimization,
  168. // recovered transactions do not hold locks on their keys. This follows the
  169. // implementation in PessimisticTransactionDB::Initialize where we set
  170. // skip_concurrency_control to true.
  171. Status WriteUnpreparedTxn::RebuildFromWriteBatch(WriteBatch* wb) {
  172. struct TrackKeyHandler : public WriteBatch::Handler {
  173. WriteUnpreparedTxn* txn_;
  174. bool rollback_merge_operands_;
  175. TrackKeyHandler(WriteUnpreparedTxn* txn, bool rollback_merge_operands)
  176. : txn_(txn), rollback_merge_operands_(rollback_merge_operands) {}
  177. Status PutCF(uint32_t cf, const Slice& key, const Slice&) override {
  178. txn_->TrackKey(cf, key.ToString(), kMaxSequenceNumber,
  179. false /* read_only */, true /* exclusive */);
  180. return Status::OK();
  181. }
  182. Status DeleteCF(uint32_t cf, const Slice& key) override {
  183. txn_->TrackKey(cf, key.ToString(), kMaxSequenceNumber,
  184. false /* read_only */, true /* exclusive */);
  185. return Status::OK();
  186. }
  187. Status SingleDeleteCF(uint32_t cf, const Slice& key) override {
  188. txn_->TrackKey(cf, key.ToString(), kMaxSequenceNumber,
  189. false /* read_only */, true /* exclusive */);
  190. return Status::OK();
  191. }
  192. Status MergeCF(uint32_t cf, const Slice& key, const Slice&) override {
  193. if (rollback_merge_operands_) {
  194. txn_->TrackKey(cf, key.ToString(), kMaxSequenceNumber,
  195. false /* read_only */, true /* exclusive */);
  196. }
  197. return Status::OK();
  198. }
  199. // Recovered batches do not contain 2PC markers.
  200. Status MarkBeginPrepare(bool) override { return Status::InvalidArgument(); }
  201. Status MarkEndPrepare(const Slice&) override {
  202. return Status::InvalidArgument();
  203. }
  204. Status MarkNoop(bool) override { return Status::InvalidArgument(); }
  205. Status MarkCommit(const Slice&) override {
  206. return Status::InvalidArgument();
  207. }
  208. Status MarkRollback(const Slice&) override {
  209. return Status::InvalidArgument();
  210. }
  211. };
  212. TrackKeyHandler handler(this,
  213. wupt_db_->txn_db_options_.rollback_merge_operands);
  214. return wb->Iterate(&handler);
  215. }
  216. Status WriteUnpreparedTxn::MaybeFlushWriteBatchToDB() {
  217. const bool kPrepared = true;
  218. Status s;
  219. if (write_batch_flush_threshold_ > 0 &&
  220. write_batch_.GetWriteBatch()->Count() > 0 &&
  221. write_batch_.GetDataSize() >
  222. static_cast<size_t>(write_batch_flush_threshold_)) {
  223. assert(GetState() != PREPARED);
  224. s = FlushWriteBatchToDB(!kPrepared);
  225. }
  226. return s;
  227. }
  228. Status WriteUnpreparedTxn::FlushWriteBatchToDB(bool prepared) {
  229. // If the current write batch contains savepoints, then some special handling
  230. // is required so that RollbackToSavepoint can work.
  231. //
  232. // RollbackToSavepoint is not supported after Prepare() is called, so only do
  233. // this for unprepared batches.
  234. if (!prepared && unflushed_save_points_ != nullptr &&
  235. !unflushed_save_points_->empty()) {
  236. return FlushWriteBatchWithSavePointToDB();
  237. }
  238. return FlushWriteBatchToDBInternal(prepared);
  239. }
  240. Status WriteUnpreparedTxn::FlushWriteBatchToDBInternal(bool prepared) {
  241. if (name_.empty()) {
  242. assert(!prepared);
  243. #ifndef NDEBUG
  244. static std::atomic_ullong autogen_id{0};
  245. // To avoid changing all tests to call SetName, just autogenerate one.
  246. if (wupt_db_->txn_db_options_.autogenerate_name) {
  247. SetName(std::string("autoxid") + ToString(autogen_id.fetch_add(1)));
  248. } else
  249. #endif
  250. {
  251. return Status::InvalidArgument("Cannot write to DB without SetName.");
  252. }
  253. }
  254. struct UntrackedKeyHandler : public WriteBatch::Handler {
  255. WriteUnpreparedTxn* txn_;
  256. bool rollback_merge_operands_;
  257. UntrackedKeyHandler(WriteUnpreparedTxn* txn, bool rollback_merge_operands)
  258. : txn_(txn), rollback_merge_operands_(rollback_merge_operands) {}
  259. Status AddUntrackedKey(uint32_t cf, const Slice& key) {
  260. auto str = key.ToString();
  261. if (txn_->tracked_keys_[cf].count(str) == 0) {
  262. txn_->untracked_keys_[cf].push_back(str);
  263. }
  264. return Status::OK();
  265. }
  266. Status PutCF(uint32_t cf, const Slice& key, const Slice&) override {
  267. return AddUntrackedKey(cf, key);
  268. }
  269. Status DeleteCF(uint32_t cf, const Slice& key) override {
  270. return AddUntrackedKey(cf, key);
  271. }
  272. Status SingleDeleteCF(uint32_t cf, const Slice& key) override {
  273. return AddUntrackedKey(cf, key);
  274. }
  275. Status MergeCF(uint32_t cf, const Slice& key, const Slice&) override {
  276. if (rollback_merge_operands_) {
  277. return AddUntrackedKey(cf, key);
  278. }
  279. return Status::OK();
  280. }
  281. // The only expected 2PC marker is the initial Noop marker.
  282. Status MarkNoop(bool empty_batch) override {
  283. return empty_batch ? Status::OK() : Status::InvalidArgument();
  284. }
  285. Status MarkBeginPrepare(bool) override { return Status::InvalidArgument(); }
  286. Status MarkEndPrepare(const Slice&) override {
  287. return Status::InvalidArgument();
  288. }
  289. Status MarkCommit(const Slice&) override {
  290. return Status::InvalidArgument();
  291. }
  292. Status MarkRollback(const Slice&) override {
  293. return Status::InvalidArgument();
  294. }
  295. };
  296. UntrackedKeyHandler handler(
  297. this, wupt_db_->txn_db_options_.rollback_merge_operands);
  298. auto s = GetWriteBatch()->GetWriteBatch()->Iterate(&handler);
  299. assert(s.ok());
  300. // TODO(lth): Reduce duplicate code with WritePrepared prepare logic.
  301. WriteOptions write_options = write_options_;
  302. write_options.disableWAL = false;
  303. const bool WRITE_AFTER_COMMIT = true;
  304. const bool first_prepare_batch = log_number_ == 0;
  305. // MarkEndPrepare will change Noop marker to the appropriate marker.
  306. WriteBatchInternal::MarkEndPrepare(GetWriteBatch()->GetWriteBatch(), name_,
  307. !WRITE_AFTER_COMMIT, !prepared);
  308. // For each duplicate key we account for a new sub-batch
  309. prepare_batch_cnt_ = GetWriteBatch()->SubBatchCnt();
  310. // AddPrepared better to be called in the pre-release callback otherwise there
  311. // is a non-zero chance of max advancing prepare_seq and readers assume the
  312. // data as committed.
  313. // Also having it in the PreReleaseCallback allows in-order addition of
  314. // prepared entries to PreparedHeap and hence enables an optimization. Refer
  315. // to SmallestUnCommittedSeq for more details.
  316. AddPreparedCallback add_prepared_callback(
  317. wpt_db_, db_impl_, prepare_batch_cnt_,
  318. db_impl_->immutable_db_options().two_write_queues, first_prepare_batch);
  319. const bool DISABLE_MEMTABLE = true;
  320. uint64_t seq_used = kMaxSequenceNumber;
  321. // log_number_ should refer to the oldest log containing uncommitted data
  322. // from the current transaction. This means that if log_number_ is set,
  323. // WriteImpl should not overwrite that value, so set log_used to nullptr if
  324. // log_number_ is already set.
  325. s = db_impl_->WriteImpl(write_options, GetWriteBatch()->GetWriteBatch(),
  326. /*callback*/ nullptr, &last_log_number_,
  327. /*log ref*/ 0, !DISABLE_MEMTABLE, &seq_used,
  328. prepare_batch_cnt_, &add_prepared_callback);
  329. if (log_number_ == 0) {
  330. log_number_ = last_log_number_;
  331. }
  332. assert(!s.ok() || seq_used != kMaxSequenceNumber);
  333. auto prepare_seq = seq_used;
  334. // Only call SetId if it hasn't been set yet.
  335. if (GetId() == 0) {
  336. SetId(prepare_seq);
  337. }
  338. // unprep_seqs_ will also contain prepared seqnos since they are treated in
  339. // the same way in the prepare/commit callbacks. See the comment on the
  340. // definition of unprep_seqs_.
  341. unprep_seqs_[prepare_seq] = prepare_batch_cnt_;
  342. // Reset transaction state.
  343. if (!prepared) {
  344. prepare_batch_cnt_ = 0;
  345. const bool kClear = true;
  346. TransactionBaseImpl::InitWriteBatch(kClear);
  347. }
  348. return s;
  349. }
  350. Status WriteUnpreparedTxn::FlushWriteBatchWithSavePointToDB() {
  351. assert(unflushed_save_points_ != nullptr &&
  352. unflushed_save_points_->size() > 0);
  353. assert(save_points_ != nullptr && save_points_->size() > 0);
  354. assert(save_points_->size() >= unflushed_save_points_->size());
  355. // Handler class for creating an unprepared batch from a savepoint.
  356. struct SavePointBatchHandler : public WriteBatch::Handler {
  357. WriteBatchWithIndex* wb_;
  358. const std::map<uint32_t, ColumnFamilyHandle*>& handles_;
  359. SavePointBatchHandler(
  360. WriteBatchWithIndex* wb,
  361. const std::map<uint32_t, ColumnFamilyHandle*>& handles)
  362. : wb_(wb), handles_(handles) {}
  363. Status PutCF(uint32_t cf, const Slice& key, const Slice& value) override {
  364. return wb_->Put(handles_.at(cf), key, value);
  365. }
  366. Status DeleteCF(uint32_t cf, const Slice& key) override {
  367. return wb_->Delete(handles_.at(cf), key);
  368. }
  369. Status SingleDeleteCF(uint32_t cf, const Slice& key) override {
  370. return wb_->SingleDelete(handles_.at(cf), key);
  371. }
  372. Status MergeCF(uint32_t cf, const Slice& key, const Slice& value) override {
  373. return wb_->Merge(handles_.at(cf), key, value);
  374. }
  375. // The only expected 2PC marker is the initial Noop marker.
  376. Status MarkNoop(bool empty_batch) override {
  377. return empty_batch ? Status::OK() : Status::InvalidArgument();
  378. }
  379. Status MarkBeginPrepare(bool) override { return Status::InvalidArgument(); }
  380. Status MarkEndPrepare(const Slice&) override {
  381. return Status::InvalidArgument();
  382. }
  383. Status MarkCommit(const Slice&) override {
  384. return Status::InvalidArgument();
  385. }
  386. Status MarkRollback(const Slice&) override {
  387. return Status::InvalidArgument();
  388. }
  389. };
  390. // The comparator of the default cf is passed in, similar to the
  391. // initialization of TransactionBaseImpl::write_batch_. This comparator is
  392. // only used if the write batch encounters an invalid cf id, and falls back to
  393. // this comparator.
  394. WriteBatchWithIndex wb(wpt_db_->DefaultColumnFamily()->GetComparator(), 0,
  395. true, 0);
  396. // Swap with write_batch_ so that wb contains the complete write batch. The
  397. // actual write batch that will be flushed to DB will be built in
  398. // write_batch_, and will be read by FlushWriteBatchToDBInternal.
  399. std::swap(wb, write_batch_);
  400. TransactionBaseImpl::InitWriteBatch();
  401. size_t prev_boundary = WriteBatchInternal::kHeader;
  402. const bool kPrepared = true;
  403. for (size_t i = 0; i < unflushed_save_points_->size() + 1; i++) {
  404. bool trailing_batch = i == unflushed_save_points_->size();
  405. SavePointBatchHandler sp_handler(&write_batch_,
  406. *wupt_db_->GetCFHandleMap().get());
  407. size_t curr_boundary = trailing_batch ? wb.GetWriteBatch()->GetDataSize()
  408. : (*unflushed_save_points_)[i];
  409. // Construct the partial write batch up to the savepoint.
  410. //
  411. // Theoretically, a memcpy between the write batches should be sufficient
  412. // since the rewriting into the batch should produce the exact same byte
  413. // representation. Rebuilding the WriteBatchWithIndex index is still
  414. // necessary though, and would imply doing two passes over the batch though.
  415. Status s = WriteBatchInternal::Iterate(wb.GetWriteBatch(), &sp_handler,
  416. prev_boundary, curr_boundary);
  417. if (!s.ok()) {
  418. return s;
  419. }
  420. if (write_batch_.GetWriteBatch()->Count() > 0) {
  421. // Flush the write batch.
  422. s = FlushWriteBatchToDBInternal(!kPrepared);
  423. if (!s.ok()) {
  424. return s;
  425. }
  426. }
  427. if (!trailing_batch) {
  428. if (flushed_save_points_ == nullptr) {
  429. flushed_save_points_.reset(
  430. new autovector<WriteUnpreparedTxn::SavePoint>());
  431. }
  432. flushed_save_points_->emplace_back(
  433. unprep_seqs_, new ManagedSnapshot(db_impl_, wupt_db_->GetSnapshot()));
  434. }
  435. prev_boundary = curr_boundary;
  436. const bool kClear = true;
  437. TransactionBaseImpl::InitWriteBatch(kClear);
  438. }
  439. unflushed_save_points_->clear();
  440. return Status::OK();
  441. }
  442. Status WriteUnpreparedTxn::PrepareInternal() {
  443. const bool kPrepared = true;
  444. return FlushWriteBatchToDB(kPrepared);
  445. }
  446. Status WriteUnpreparedTxn::CommitWithoutPrepareInternal() {
  447. if (unprep_seqs_.empty()) {
  448. assert(log_number_ == 0);
  449. assert(GetId() == 0);
  450. return WritePreparedTxn::CommitWithoutPrepareInternal();
  451. }
  452. // TODO(lth): We should optimize commit without prepare to not perform
  453. // a prepare under the hood.
  454. auto s = PrepareInternal();
  455. if (!s.ok()) {
  456. return s;
  457. }
  458. return CommitInternal();
  459. }
  460. Status WriteUnpreparedTxn::CommitInternal() {
  461. // TODO(lth): Reduce duplicate code with WritePrepared commit logic.
  462. // We take the commit-time batch and append the Commit marker. The Memtable
  463. // will ignore the Commit marker in non-recovery mode
  464. WriteBatch* working_batch = GetCommitTimeWriteBatch();
  465. const bool empty = working_batch->Count() == 0;
  466. WriteBatchInternal::MarkCommit(working_batch, name_);
  467. const bool for_recovery = use_only_the_last_commit_time_batch_for_recovery_;
  468. if (!empty && for_recovery) {
  469. // When not writing to memtable, we can still cache the latest write batch.
  470. // The cached batch will be written to memtable in WriteRecoverableState
  471. // during FlushMemTable
  472. WriteBatchInternal::SetAsLastestPersistentState(working_batch);
  473. }
  474. const bool includes_data = !empty && !for_recovery;
  475. size_t commit_batch_cnt = 0;
  476. if (UNLIKELY(includes_data)) {
  477. ROCKS_LOG_WARN(db_impl_->immutable_db_options().info_log,
  478. "Duplicate key overhead");
  479. SubBatchCounter counter(*wpt_db_->GetCFComparatorMap());
  480. auto s = working_batch->Iterate(&counter);
  481. assert(s.ok());
  482. commit_batch_cnt = counter.BatchCount();
  483. }
  484. const bool disable_memtable = !includes_data;
  485. const bool do_one_write =
  486. !db_impl_->immutable_db_options().two_write_queues || disable_memtable;
  487. WriteUnpreparedCommitEntryPreReleaseCallback update_commit_map(
  488. wpt_db_, db_impl_, unprep_seqs_, commit_batch_cnt);
  489. const bool kFirstPrepareBatch = true;
  490. AddPreparedCallback add_prepared_callback(
  491. wpt_db_, db_impl_, commit_batch_cnt,
  492. db_impl_->immutable_db_options().two_write_queues, !kFirstPrepareBatch);
  493. PreReleaseCallback* pre_release_callback;
  494. if (do_one_write) {
  495. pre_release_callback = &update_commit_map;
  496. } else {
  497. pre_release_callback = &add_prepared_callback;
  498. }
  499. uint64_t seq_used = kMaxSequenceNumber;
  500. // Since the prepared batch is directly written to memtable, there is
  501. // already a connection between the memtable and its WAL, so there is no
  502. // need to redundantly reference the log that contains the prepared data.
  503. const uint64_t zero_log_number = 0ull;
  504. size_t batch_cnt = UNLIKELY(commit_batch_cnt) ? commit_batch_cnt : 1;
  505. auto s = db_impl_->WriteImpl(write_options_, working_batch, nullptr, nullptr,
  506. zero_log_number, disable_memtable, &seq_used,
  507. batch_cnt, pre_release_callback);
  508. assert(!s.ok() || seq_used != kMaxSequenceNumber);
  509. const SequenceNumber commit_batch_seq = seq_used;
  510. if (LIKELY(do_one_write || !s.ok())) {
  511. if (LIKELY(s.ok())) {
  512. // Note RemovePrepared should be called after WriteImpl that publishsed
  513. // the seq. Otherwise SmallestUnCommittedSeq optimization breaks.
  514. for (const auto& seq : unprep_seqs_) {
  515. wpt_db_->RemovePrepared(seq.first, seq.second);
  516. }
  517. }
  518. if (UNLIKELY(!do_one_write)) {
  519. wpt_db_->RemovePrepared(commit_batch_seq, commit_batch_cnt);
  520. }
  521. unprep_seqs_.clear();
  522. flushed_save_points_.reset(nullptr);
  523. unflushed_save_points_.reset(nullptr);
  524. return s;
  525. } // else do the 2nd write to publish seq
  526. // Populate unprep_seqs_ with commit_batch_seq, since we treat data in the
  527. // commit write batch as just another "unprepared" batch. This will also
  528. // update the unprep_seqs_ in the update_commit_map callback.
  529. unprep_seqs_[commit_batch_seq] = commit_batch_cnt;
  530. // Note: the 2nd write comes with a performance penality. So if we have too
  531. // many of commits accompanied with ComitTimeWriteBatch and yet we cannot
  532. // enable use_only_the_last_commit_time_batch_for_recovery_ optimization,
  533. // two_write_queues should be disabled to avoid many additional writes here.
  534. // Update commit map only from the 2nd queue
  535. WriteBatch empty_batch;
  536. empty_batch.PutLogData(Slice());
  537. // In the absence of Prepare markers, use Noop as a batch separator
  538. WriteBatchInternal::InsertNoop(&empty_batch);
  539. const bool DISABLE_MEMTABLE = true;
  540. const size_t ONE_BATCH = 1;
  541. const uint64_t NO_REF_LOG = 0;
  542. s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr, nullptr,
  543. NO_REF_LOG, DISABLE_MEMTABLE, &seq_used, ONE_BATCH,
  544. &update_commit_map);
  545. assert(!s.ok() || seq_used != kMaxSequenceNumber);
  546. // Note RemovePrepared should be called after WriteImpl that publishsed the
  547. // seq. Otherwise SmallestUnCommittedSeq optimization breaks.
  548. for (const auto& seq : unprep_seqs_) {
  549. wpt_db_->RemovePrepared(seq.first, seq.second);
  550. }
  551. unprep_seqs_.clear();
  552. flushed_save_points_.reset(nullptr);
  553. unflushed_save_points_.reset(nullptr);
  554. return s;
  555. }
  556. Status WriteUnpreparedTxn::WriteRollbackKeys(
  557. const TransactionKeyMap& tracked_keys, WriteBatchWithIndex* rollback_batch,
  558. ReadCallback* callback, const ReadOptions& roptions) {
  559. const auto& cf_map = *wupt_db_->GetCFHandleMap();
  560. auto WriteRollbackKey = [&](const std::string& key, uint32_t cfid) {
  561. const auto& cf_handle = cf_map.at(cfid);
  562. PinnableSlice pinnable_val;
  563. bool not_used;
  564. DBImpl::GetImplOptions get_impl_options;
  565. get_impl_options.column_family = cf_handle;
  566. get_impl_options.value = &pinnable_val;
  567. get_impl_options.value_found = &not_used;
  568. get_impl_options.callback = callback;
  569. auto s = db_impl_->GetImpl(roptions, key, get_impl_options);
  570. if (s.ok()) {
  571. s = rollback_batch->Put(cf_handle, key, pinnable_val);
  572. assert(s.ok());
  573. } else if (s.IsNotFound()) {
  574. s = rollback_batch->Delete(cf_handle, key);
  575. assert(s.ok());
  576. } else {
  577. return s;
  578. }
  579. return Status::OK();
  580. };
  581. for (const auto& cfkey : tracked_keys) {
  582. const auto cfid = cfkey.first;
  583. const auto& keys = cfkey.second;
  584. for (const auto& pair : keys) {
  585. auto s = WriteRollbackKey(pair.first, cfid);
  586. if (!s.ok()) {
  587. return s;
  588. }
  589. }
  590. }
  591. for (const auto& cfkey : untracked_keys_) {
  592. const auto cfid = cfkey.first;
  593. const auto& keys = cfkey.second;
  594. for (const auto& key : keys) {
  595. auto s = WriteRollbackKey(key, cfid);
  596. if (!s.ok()) {
  597. return s;
  598. }
  599. }
  600. }
  601. return Status::OK();
  602. }
  603. Status WriteUnpreparedTxn::RollbackInternal() {
  604. // TODO(lth): Reduce duplicate code with WritePrepared rollback logic.
  605. WriteBatchWithIndex rollback_batch(
  606. wpt_db_->DefaultColumnFamily()->GetComparator(), 0, true, 0);
  607. assert(GetId() != kMaxSequenceNumber);
  608. assert(GetId() > 0);
  609. Status s;
  610. auto read_at_seq = kMaxSequenceNumber;
  611. ReadOptions roptions;
  612. // to prevent callback's seq to be overrriden inside DBImpk::Get
  613. roptions.snapshot = wpt_db_->GetMaxSnapshot();
  614. // Note that we do not use WriteUnpreparedTxnReadCallback because we do not
  615. // need to read our own writes when reading prior versions of the key for
  616. // rollback.
  617. WritePreparedTxnReadCallback callback(wpt_db_, read_at_seq);
  618. WriteRollbackKeys(GetTrackedKeys(), &rollback_batch, &callback, roptions);
  619. // The Rollback marker will be used as a batch separator
  620. WriteBatchInternal::MarkRollback(rollback_batch.GetWriteBatch(), name_);
  621. bool do_one_write = !db_impl_->immutable_db_options().two_write_queues;
  622. const bool DISABLE_MEMTABLE = true;
  623. const uint64_t NO_REF_LOG = 0;
  624. uint64_t seq_used = kMaxSequenceNumber;
  625. // TODO(lth): We write rollback batch all in a single batch here, but this
  626. // should be subdivded into multiple batches as well. In phase 2, when key
  627. // sets are read from WAL, this will happen naturally.
  628. const size_t ONE_BATCH = 1;
  629. // We commit the rolled back prepared batches. ALthough this is
  630. // counter-intuitive, i) it is safe to do so, since the prepared batches are
  631. // already canceled out by the rollback batch, ii) adding the commit entry to
  632. // CommitCache will allow us to benefit from the existing mechanism in
  633. // CommitCache that keeps an entry evicted due to max advance and yet overlaps
  634. // with a live snapshot around so that the live snapshot properly skips the
  635. // entry even if its prepare seq is lower than max_evicted_seq_.
  636. WriteUnpreparedCommitEntryPreReleaseCallback update_commit_map(
  637. wpt_db_, db_impl_, unprep_seqs_, ONE_BATCH);
  638. // Note: the rollback batch does not need AddPrepared since it is written to
  639. // DB in one shot. min_uncommitted still works since it requires capturing
  640. // data that is written to DB but not yet committed, while the roolback
  641. // batch commits with PreReleaseCallback.
  642. s = db_impl_->WriteImpl(write_options_, rollback_batch.GetWriteBatch(),
  643. nullptr, nullptr, NO_REF_LOG, !DISABLE_MEMTABLE,
  644. &seq_used, rollback_batch.SubBatchCnt(),
  645. do_one_write ? &update_commit_map : nullptr);
  646. assert(!s.ok() || seq_used != kMaxSequenceNumber);
  647. if (!s.ok()) {
  648. return s;
  649. }
  650. if (do_one_write) {
  651. for (const auto& seq : unprep_seqs_) {
  652. wpt_db_->RemovePrepared(seq.first, seq.second);
  653. }
  654. unprep_seqs_.clear();
  655. flushed_save_points_.reset(nullptr);
  656. unflushed_save_points_.reset(nullptr);
  657. return s;
  658. } // else do the 2nd write for commit
  659. uint64_t& prepare_seq = seq_used;
  660. ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log,
  661. "RollbackInternal 2nd write prepare_seq: %" PRIu64,
  662. prepare_seq);
  663. // Commit the batch by writing an empty batch to the queue that will release
  664. // the commit sequence number to readers.
  665. WriteUnpreparedRollbackPreReleaseCallback update_commit_map_with_prepare(
  666. wpt_db_, db_impl_, unprep_seqs_, prepare_seq);
  667. WriteBatch empty_batch;
  668. empty_batch.PutLogData(Slice());
  669. // In the absence of Prepare markers, use Noop as a batch separator
  670. WriteBatchInternal::InsertNoop(&empty_batch);
  671. s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr, nullptr,
  672. NO_REF_LOG, DISABLE_MEMTABLE, &seq_used, ONE_BATCH,
  673. &update_commit_map_with_prepare);
  674. assert(!s.ok() || seq_used != kMaxSequenceNumber);
  675. // Mark the txn as rolled back
  676. if (s.ok()) {
  677. for (const auto& seq : unprep_seqs_) {
  678. wpt_db_->RemovePrepared(seq.first, seq.second);
  679. }
  680. }
  681. unprep_seqs_.clear();
  682. flushed_save_points_.reset(nullptr);
  683. unflushed_save_points_.reset(nullptr);
  684. return s;
  685. }
  686. void WriteUnpreparedTxn::Clear() {
  687. if (!recovered_txn_) {
  688. txn_db_impl_->UnLock(this, &GetTrackedKeys());
  689. }
  690. unprep_seqs_.clear();
  691. flushed_save_points_.reset(nullptr);
  692. unflushed_save_points_.reset(nullptr);
  693. recovered_txn_ = false;
  694. largest_validated_seq_ = 0;
  695. assert(active_iterators_.empty());
  696. active_iterators_.clear();
  697. untracked_keys_.clear();
  698. TransactionBaseImpl::Clear();
  699. }
  700. void WriteUnpreparedTxn::SetSavePoint() {
  701. assert((unflushed_save_points_ ? unflushed_save_points_->size() : 0) +
  702. (flushed_save_points_ ? flushed_save_points_->size() : 0) ==
  703. (save_points_ ? save_points_->size() : 0));
  704. PessimisticTransaction::SetSavePoint();
  705. if (unflushed_save_points_ == nullptr) {
  706. unflushed_save_points_.reset(new autovector<size_t>());
  707. }
  708. unflushed_save_points_->push_back(write_batch_.GetDataSize());
  709. }
  710. Status WriteUnpreparedTxn::RollbackToSavePoint() {
  711. assert((unflushed_save_points_ ? unflushed_save_points_->size() : 0) +
  712. (flushed_save_points_ ? flushed_save_points_->size() : 0) ==
  713. (save_points_ ? save_points_->size() : 0));
  714. if (unflushed_save_points_ != nullptr && unflushed_save_points_->size() > 0) {
  715. Status s = PessimisticTransaction::RollbackToSavePoint();
  716. assert(!s.IsNotFound());
  717. unflushed_save_points_->pop_back();
  718. return s;
  719. }
  720. if (flushed_save_points_ != nullptr && !flushed_save_points_->empty()) {
  721. return RollbackToSavePointInternal();
  722. }
  723. return Status::NotFound();
  724. }
  725. Status WriteUnpreparedTxn::RollbackToSavePointInternal() {
  726. Status s;
  727. const bool kClear = true;
  728. TransactionBaseImpl::InitWriteBatch(kClear);
  729. assert(flushed_save_points_->size() > 0);
  730. WriteUnpreparedTxn::SavePoint& top = flushed_save_points_->back();
  731. assert(save_points_ != nullptr && save_points_->size() > 0);
  732. const TransactionKeyMap& tracked_keys = save_points_->top().new_keys_;
  733. ReadOptions roptions;
  734. roptions.snapshot = top.snapshot_->snapshot();
  735. SequenceNumber min_uncommitted =
  736. static_cast_with_check<const SnapshotImpl, const Snapshot>(
  737. roptions.snapshot)
  738. ->min_uncommitted_;
  739. SequenceNumber snap_seq = roptions.snapshot->GetSequenceNumber();
  740. WriteUnpreparedTxnReadCallback callback(wupt_db_, snap_seq, min_uncommitted,
  741. top.unprep_seqs_,
  742. kBackedByDBSnapshot);
  743. WriteRollbackKeys(tracked_keys, &write_batch_, &callback, roptions);
  744. const bool kPrepared = true;
  745. s = FlushWriteBatchToDBInternal(!kPrepared);
  746. assert(s.ok());
  747. if (!s.ok()) {
  748. return s;
  749. }
  750. // PessimisticTransaction::RollbackToSavePoint will call also call
  751. // RollbackToSavepoint on write_batch_. However, write_batch_ is empty and has
  752. // no savepoints because this savepoint has already been flushed. Work around
  753. // this by setting a fake savepoint.
  754. write_batch_.SetSavePoint();
  755. s = PessimisticTransaction::RollbackToSavePoint();
  756. assert(s.ok());
  757. if (!s.ok()) {
  758. return s;
  759. }
  760. flushed_save_points_->pop_back();
  761. return s;
  762. }
  763. Status WriteUnpreparedTxn::PopSavePoint() {
  764. assert((unflushed_save_points_ ? unflushed_save_points_->size() : 0) +
  765. (flushed_save_points_ ? flushed_save_points_->size() : 0) ==
  766. (save_points_ ? save_points_->size() : 0));
  767. if (unflushed_save_points_ != nullptr && unflushed_save_points_->size() > 0) {
  768. Status s = PessimisticTransaction::PopSavePoint();
  769. assert(!s.IsNotFound());
  770. unflushed_save_points_->pop_back();
  771. return s;
  772. }
  773. if (flushed_save_points_ != nullptr && !flushed_save_points_->empty()) {
  774. // PessimisticTransaction::PopSavePoint will call also call PopSavePoint on
  775. // write_batch_. However, write_batch_ is empty and has no savepoints
  776. // because this savepoint has already been flushed. Work around this by
  777. // setting a fake savepoint.
  778. write_batch_.SetSavePoint();
  779. Status s = PessimisticTransaction::PopSavePoint();
  780. assert(!s.IsNotFound());
  781. flushed_save_points_->pop_back();
  782. return s;
  783. }
  784. return Status::NotFound();
  785. }
  786. void WriteUnpreparedTxn::MultiGet(const ReadOptions& options,
  787. ColumnFamilyHandle* column_family,
  788. const size_t num_keys, const Slice* keys,
  789. PinnableSlice* values, Status* statuses,
  790. const bool sorted_input) {
  791. SequenceNumber min_uncommitted, snap_seq;
  792. const SnapshotBackup backed_by_snapshot =
  793. wupt_db_->AssignMinMaxSeqs(options.snapshot, &min_uncommitted, &snap_seq);
  794. WriteUnpreparedTxnReadCallback callback(wupt_db_, snap_seq, min_uncommitted,
  795. unprep_seqs_, backed_by_snapshot);
  796. write_batch_.MultiGetFromBatchAndDB(db_, options, column_family, num_keys,
  797. keys, values, statuses, sorted_input,
  798. &callback);
  799. if (UNLIKELY(!callback.valid() ||
  800. !wupt_db_->ValidateSnapshot(snap_seq, backed_by_snapshot))) {
  801. wupt_db_->WPRecordTick(TXN_GET_TRY_AGAIN);
  802. for (size_t i = 0; i < num_keys; i++) {
  803. statuses[i] = Status::TryAgain();
  804. }
  805. }
  806. }
  807. Status WriteUnpreparedTxn::Get(const ReadOptions& options,
  808. ColumnFamilyHandle* column_family,
  809. const Slice& key, PinnableSlice* value) {
  810. SequenceNumber min_uncommitted, snap_seq;
  811. const SnapshotBackup backed_by_snapshot =
  812. wupt_db_->AssignMinMaxSeqs(options.snapshot, &min_uncommitted, &snap_seq);
  813. WriteUnpreparedTxnReadCallback callback(wupt_db_, snap_seq, min_uncommitted,
  814. unprep_seqs_, backed_by_snapshot);
  815. auto res = write_batch_.GetFromBatchAndDB(db_, options, column_family, key,
  816. value, &callback);
  817. if (LIKELY(callback.valid() &&
  818. wupt_db_->ValidateSnapshot(snap_seq, backed_by_snapshot))) {
  819. return res;
  820. } else {
  821. wupt_db_->WPRecordTick(TXN_GET_TRY_AGAIN);
  822. return Status::TryAgain();
  823. }
  824. }
  825. namespace {
  826. static void CleanupWriteUnpreparedWBWIIterator(void* arg1, void* arg2) {
  827. auto txn = reinterpret_cast<WriteUnpreparedTxn*>(arg1);
  828. auto iter = reinterpret_cast<Iterator*>(arg2);
  829. txn->RemoveActiveIterator(iter);
  830. }
  831. } // anonymous namespace
  832. Iterator* WriteUnpreparedTxn::GetIterator(const ReadOptions& options) {
  833. return GetIterator(options, wupt_db_->DefaultColumnFamily());
  834. }
  835. Iterator* WriteUnpreparedTxn::GetIterator(const ReadOptions& options,
  836. ColumnFamilyHandle* column_family) {
  837. // Make sure to get iterator from WriteUnprepareTxnDB, not the root db.
  838. Iterator* db_iter = wupt_db_->NewIterator(options, column_family, this);
  839. assert(db_iter);
  840. auto iter = write_batch_.NewIteratorWithBase(column_family, db_iter);
  841. active_iterators_.push_back(iter);
  842. iter->RegisterCleanup(CleanupWriteUnpreparedWBWIIterator, this, iter);
  843. return iter;
  844. }
  845. Status WriteUnpreparedTxn::ValidateSnapshot(ColumnFamilyHandle* column_family,
  846. const Slice& key,
  847. SequenceNumber* tracked_at_seq) {
  848. // TODO(lth): Reduce duplicate code with WritePrepared ValidateSnapshot logic.
  849. assert(snapshot_);
  850. SequenceNumber min_uncommitted =
  851. static_cast_with_check<const SnapshotImpl, const Snapshot>(
  852. snapshot_.get())
  853. ->min_uncommitted_;
  854. SequenceNumber snap_seq = snapshot_->GetSequenceNumber();
  855. // tracked_at_seq is either max or the last snapshot with which this key was
  856. // trackeed so there is no need to apply the IsInSnapshot to this comparison
  857. // here as tracked_at_seq is not a prepare seq.
  858. if (*tracked_at_seq <= snap_seq) {
  859. // If the key has been previous validated at a sequence number earlier
  860. // than the curent snapshot's sequence number, we already know it has not
  861. // been modified.
  862. return Status::OK();
  863. }
  864. *tracked_at_seq = snap_seq;
  865. ColumnFamilyHandle* cfh =
  866. column_family ? column_family : db_impl_->DefaultColumnFamily();
  867. WriteUnpreparedTxnReadCallback snap_checker(
  868. wupt_db_, snap_seq, min_uncommitted, unprep_seqs_, kBackedByDBSnapshot);
  869. return TransactionUtil::CheckKeyForConflicts(db_impl_, cfh, key.ToString(),
  870. snap_seq, false /* cache_only */,
  871. &snap_checker, min_uncommitted);
  872. }
  873. const std::map<SequenceNumber, size_t>&
  874. WriteUnpreparedTxn::GetUnpreparedSequenceNumbers() {
  875. return unprep_seqs_;
  876. }
  877. } // namespace ROCKSDB_NAMESPACE
  878. #endif // ROCKSDB_LITE