pessimistic_transaction.cc 49 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #include "utilities/transactions/pessimistic_transaction.h"
  6. #include <map>
  7. #include <set>
  8. #include <string>
  9. #include <vector>
  10. #include "db/column_family.h"
  11. #include "db/db_impl/db_impl.h"
  12. #include "logging/logging.h"
  13. #include "rocksdb/comparator.h"
  14. #include "rocksdb/db.h"
  15. #include "rocksdb/snapshot.h"
  16. #include "rocksdb/status.h"
  17. #include "rocksdb/utilities/transaction_db.h"
  18. #include "test_util/sync_point.h"
  19. #include "util/cast_util.h"
  20. #include "util/string_util.h"
  21. #include "utilities/transactions/pessimistic_transaction_db.h"
  22. #include "utilities/transactions/transaction_util.h"
  23. #include "utilities/write_batch_with_index/write_batch_with_index_internal.h"
  24. namespace ROCKSDB_NAMESPACE {
  25. struct WriteOptions;
  26. std::atomic<TransactionID> PessimisticTransaction::txn_id_counter_(1);
  27. TransactionID PessimisticTransaction::GenTxnID() {
  28. return txn_id_counter_.fetch_add(1);
  29. }
  30. PessimisticTransaction::PessimisticTransaction(
  31. TransactionDB* txn_db, const WriteOptions& write_options,
  32. const TransactionOptions& txn_options, const bool init)
  33. : TransactionBaseImpl(
  34. txn_db->GetRootDB(), write_options,
  35. static_cast_with_check<PessimisticTransactionDB>(txn_db)
  36. ->GetLockTrackerFactory()),
  37. txn_db_impl_(nullptr),
  38. expiration_time_(0),
  39. txn_id_(0),
  40. waiting_cf_id_(0),
  41. waiting_key_(nullptr),
  42. lock_timeout_(0),
  43. deadlock_detect_(false),
  44. deadlock_detect_depth_(0),
  45. skip_concurrency_control_(false) {
  46. txn_db_impl_ = static_cast_with_check<PessimisticTransactionDB>(txn_db);
  47. db_impl_ = static_cast_with_check<DBImpl>(db_);
  48. if (init) {
  49. Initialize(txn_options);
  50. }
  51. }
  52. void PessimisticTransaction::Initialize(const TransactionOptions& txn_options) {
  53. // Range lock manager uses address of transaction object as TXNID
  54. const TransactionDBOptions& db_options = txn_db_impl_->GetTxnDBOptions();
  55. if (db_options.lock_mgr_handle &&
  56. db_options.lock_mgr_handle->getLockManager()->IsRangeLockSupported()) {
  57. txn_id_ = reinterpret_cast<TransactionID>(this);
  58. } else {
  59. txn_id_ = GenTxnID();
  60. }
  61. txn_state_ = STARTED;
  62. deadlock_detect_ = txn_options.deadlock_detect;
  63. deadlock_detect_depth_ = txn_options.deadlock_detect_depth;
  64. write_batch_.SetMaxBytes(txn_options.max_write_batch_size);
  65. write_batch_.GetWriteBatch()->SetTrackTimestampSize(
  66. txn_options.write_batch_track_timestamp_size);
  67. skip_concurrency_control_ = txn_options.skip_concurrency_control;
  68. lock_timeout_ = txn_options.lock_timeout * 1000;
  69. if (lock_timeout_ < 0) {
  70. // Lock timeout not set, use default
  71. lock_timeout_ =
  72. txn_db_impl_->GetTxnDBOptions().transaction_lock_timeout * 1000;
  73. }
  74. // deadlock timeout should be lower than lock timeout
  75. deadlock_timeout_us_ =
  76. std::min(txn_options.deadlock_timeout_us, lock_timeout_);
  77. if (txn_options.expiration >= 0) {
  78. expiration_time_ = start_time_ + txn_options.expiration * 1000;
  79. } else {
  80. expiration_time_ = 0;
  81. }
  82. if (txn_options.set_snapshot) {
  83. SetSnapshot();
  84. }
  85. if (expiration_time_ > 0) {
  86. txn_db_impl_->InsertExpirableTransaction(txn_id_, this);
  87. }
  88. use_only_the_last_commit_time_batch_for_recovery_ =
  89. txn_options.use_only_the_last_commit_time_batch_for_recovery;
  90. skip_prepare_ = txn_options.skip_prepare;
  91. read_timestamp_ = kMaxTxnTimestamp;
  92. commit_timestamp_ = kMaxTxnTimestamp;
  93. if (txn_options.commit_bypass_memtable) {
  94. // No need to optimize for empty transction
  95. commit_bypass_memtable_threshold_ = 1;
  96. } else {
  97. commit_bypass_memtable_threshold_ =
  98. txn_options.large_txn_commit_optimize_threshold;
  99. }
  100. commit_bypass_memtable_byte_threshold_ =
  101. txn_options.large_txn_commit_optimize_byte_threshold;
  102. }
  103. PessimisticTransaction::~PessimisticTransaction() {
  104. txn_db_impl_->UnLock(this, *tracked_locks_);
  105. if (expiration_time_ > 0) {
  106. txn_db_impl_->RemoveExpirableTransaction(txn_id_);
  107. }
  108. if (!name_.empty() && txn_state_ != COMMITTED) {
  109. txn_db_impl_->UnregisterTransaction(this);
  110. }
  111. }
  112. void PessimisticTransaction::Clear() {
  113. txn_db_impl_->UnLock(this, *tracked_locks_);
  114. TransactionBaseImpl::Clear();
  115. }
  116. void PessimisticTransaction::Reinitialize(
  117. TransactionDB* txn_db, const WriteOptions& write_options,
  118. const TransactionOptions& txn_options) {
  119. if (!name_.empty() && txn_state_ != COMMITTED) {
  120. txn_db_impl_->UnregisterTransaction(this);
  121. }
  122. TransactionBaseImpl::Reinitialize(txn_db->GetRootDB(), write_options);
  123. Initialize(txn_options);
  124. }
  125. bool PessimisticTransaction::IsExpired() const {
  126. if (expiration_time_ > 0) {
  127. if (dbimpl_->GetSystemClock()->NowMicros() >= expiration_time_) {
  128. // Transaction is expired.
  129. return true;
  130. }
  131. }
  132. return false;
  133. }
  134. WriteCommittedTxn::WriteCommittedTxn(TransactionDB* txn_db,
  135. const WriteOptions& write_options,
  136. const TransactionOptions& txn_options)
  137. : PessimisticTransaction(txn_db, write_options, txn_options) {}
  138. Status WriteCommittedTxn::GetForUpdate(const ReadOptions& read_options,
  139. ColumnFamilyHandle* column_family,
  140. const Slice& key, std::string* value,
  141. bool exclusive, const bool do_validate) {
  142. return GetForUpdateImpl(read_options, column_family, key, value, exclusive,
  143. do_validate);
  144. }
  145. Status WriteCommittedTxn::GetForUpdate(const ReadOptions& read_options,
  146. ColumnFamilyHandle* column_family,
  147. const Slice& key,
  148. PinnableSlice* pinnable_val,
  149. bool exclusive, const bool do_validate) {
  150. return GetForUpdateImpl(read_options, column_family, key, pinnable_val,
  151. exclusive, do_validate);
  152. }
  153. template <typename TValue>
  154. inline Status WriteCommittedTxn::GetForUpdateImpl(
  155. const ReadOptions& read_options, ColumnFamilyHandle* column_family,
  156. const Slice& key, TValue* value, bool exclusive, const bool do_validate) {
  157. if (read_options.io_activity != Env::IOActivity::kUnknown) {
  158. return Status::InvalidArgument(
  159. "Cannot call GetForUpdate with `ReadOptions::io_activity` != "
  160. "`Env::IOActivity::kUnknown`");
  161. }
  162. column_family =
  163. column_family ? column_family : db_impl_->DefaultColumnFamily();
  164. assert(column_family);
  165. if (!read_options.timestamp) {
  166. const Comparator* const ucmp = column_family->GetComparator();
  167. assert(ucmp);
  168. size_t ts_sz = ucmp->timestamp_size();
  169. if (0 == ts_sz) {
  170. return TransactionBaseImpl::GetForUpdate(read_options, column_family, key,
  171. value, exclusive, do_validate);
  172. }
  173. } else {
  174. Status s =
  175. db_impl_->FailIfTsMismatchCf(column_family, *(read_options.timestamp));
  176. if (!s.ok()) {
  177. return s;
  178. }
  179. }
  180. Status s = SanityCheckReadTimestamp(do_validate);
  181. if (!s.ok()) {
  182. return s;
  183. }
  184. if (!read_options.timestamp) {
  185. ReadOptions read_opts_copy = read_options;
  186. char ts_buf[sizeof(kMaxTxnTimestamp)];
  187. EncodeFixed64(ts_buf, read_timestamp_);
  188. Slice ts(ts_buf, sizeof(ts_buf));
  189. read_opts_copy.timestamp = &ts;
  190. return TransactionBaseImpl::GetForUpdate(read_opts_copy, column_family, key,
  191. value, exclusive, do_validate);
  192. }
  193. assert(read_options.timestamp);
  194. const char* const ts_buf = read_options.timestamp->data();
  195. assert(read_options.timestamp->size() == sizeof(kMaxTxnTimestamp));
  196. TxnTimestamp ts = DecodeFixed64(ts_buf);
  197. if (ts != read_timestamp_) {
  198. return Status::InvalidArgument("Must read from the same read_timestamp");
  199. }
  200. return TransactionBaseImpl::GetForUpdate(read_options, column_family, key,
  201. value, exclusive, do_validate);
  202. }
  203. Status WriteCommittedTxn::GetEntityForUpdate(const ReadOptions& read_options,
  204. ColumnFamilyHandle* column_family,
  205. const Slice& key,
  206. PinnableWideColumns* columns,
  207. bool exclusive, bool do_validate) {
  208. if (!column_family) {
  209. return Status::InvalidArgument(
  210. "Cannot call GetEntityForUpdate without a column family handle");
  211. }
  212. const Comparator* const ucmp = column_family->GetComparator();
  213. assert(ucmp);
  214. const size_t ts_sz = ucmp->timestamp_size();
  215. if (ts_sz == 0) {
  216. return TransactionBaseImpl::GetEntityForUpdate(
  217. read_options, column_family, key, columns, exclusive, do_validate);
  218. }
  219. assert(ts_sz > 0);
  220. Status s = SanityCheckReadTimestamp(do_validate);
  221. if (!s.ok()) {
  222. return s;
  223. }
  224. std::string ts_buf;
  225. PutFixed64(&ts_buf, read_timestamp_);
  226. Slice ts(ts_buf);
  227. if (!read_options.timestamp) {
  228. ReadOptions read_options_copy = read_options;
  229. read_options_copy.timestamp = &ts;
  230. return TransactionBaseImpl::GetEntityForUpdate(
  231. read_options_copy, column_family, key, columns, exclusive, do_validate);
  232. }
  233. assert(read_options.timestamp);
  234. if (*read_options.timestamp != ts) {
  235. return Status::InvalidArgument("Must read from the same read timestamp");
  236. }
  237. return TransactionBaseImpl::GetEntityForUpdate(
  238. read_options, column_family, key, columns, exclusive, do_validate);
  239. }
  240. Status WriteCommittedTxn::SanityCheckReadTimestamp(bool do_validate) {
  241. bool enable_udt_validation =
  242. txn_db_impl_->GetTxnDBOptions().enable_udt_validation;
  243. if (!enable_udt_validation) {
  244. if (kMaxTxnTimestamp != read_timestamp_) {
  245. return Status::InvalidArgument(
  246. "read_timestamp is set but timestamp validation is disabled for the "
  247. "DB");
  248. }
  249. } else {
  250. if (!do_validate) {
  251. if (kMaxTxnTimestamp != read_timestamp_) {
  252. return Status::InvalidArgument(
  253. "If do_validate is false then GetForUpdate with read_timestamp is "
  254. "not "
  255. "defined.");
  256. }
  257. } else {
  258. if (kMaxTxnTimestamp == read_timestamp_) {
  259. return Status::InvalidArgument(
  260. "read_timestamp must be set for validation");
  261. }
  262. }
  263. }
  264. return Status::OK();
  265. }
  266. Status WriteCommittedTxn::PutEntityImpl(ColumnFamilyHandle* column_family,
  267. const Slice& key,
  268. const WideColumns& columns,
  269. bool do_validate, bool assume_tracked) {
  270. return Operate(column_family, key, do_validate, assume_tracked,
  271. [column_family, &key, &columns, this]() {
  272. Status s = GetBatchForWrite()->PutEntity(column_family, key,
  273. columns);
  274. if (s.ok()) {
  275. ++num_put_entities_;
  276. }
  277. return s;
  278. });
  279. }
  280. Status WriteCommittedTxn::Put(ColumnFamilyHandle* column_family,
  281. const Slice& key, const Slice& value,
  282. const bool assume_tracked) {
  283. const bool do_validate = !assume_tracked;
  284. return Operate(column_family, key, do_validate, assume_tracked,
  285. [column_family, &key, &value, this]() {
  286. Status s =
  287. GetBatchForWrite()->Put(column_family, key, value);
  288. if (s.ok()) {
  289. ++num_puts_;
  290. }
  291. return s;
  292. });
  293. }
  294. Status WriteCommittedTxn::Put(ColumnFamilyHandle* column_family,
  295. const SliceParts& key, const SliceParts& value,
  296. const bool assume_tracked) {
  297. const bool do_validate = !assume_tracked;
  298. return Operate(column_family, key, do_validate, assume_tracked,
  299. [column_family, &key, &value, this]() {
  300. Status s =
  301. GetBatchForWrite()->Put(column_family, key, value);
  302. if (s.ok()) {
  303. ++num_puts_;
  304. }
  305. return s;
  306. });
  307. }
  308. Status WriteCommittedTxn::PutUntracked(ColumnFamilyHandle* column_family,
  309. const Slice& key, const Slice& value) {
  310. return Operate(
  311. column_family, key, /*do_validate=*/false,
  312. /*assume_tracked=*/false, [column_family, &key, &value, this]() {
  313. Status s = GetBatchForWrite()->Put(column_family, key, value);
  314. if (s.ok()) {
  315. ++num_puts_;
  316. }
  317. return s;
  318. });
  319. }
  320. Status WriteCommittedTxn::PutUntracked(ColumnFamilyHandle* column_family,
  321. const SliceParts& key,
  322. const SliceParts& value) {
  323. return Operate(
  324. column_family, key, /*do_validate=*/false,
  325. /*assume_tracked=*/false, [column_family, &key, &value, this]() {
  326. Status s = GetBatchForWrite()->Put(column_family, key, value);
  327. if (s.ok()) {
  328. ++num_puts_;
  329. }
  330. return s;
  331. });
  332. }
  333. Status WriteCommittedTxn::Delete(ColumnFamilyHandle* column_family,
  334. const Slice& key, const bool assume_tracked) {
  335. const bool do_validate = !assume_tracked;
  336. return Operate(column_family, key, do_validate, assume_tracked,
  337. [column_family, &key, this]() {
  338. Status s = GetBatchForWrite()->Delete(column_family, key);
  339. if (s.ok()) {
  340. ++num_deletes_;
  341. }
  342. return s;
  343. });
  344. }
  345. Status WriteCommittedTxn::Delete(ColumnFamilyHandle* column_family,
  346. const SliceParts& key,
  347. const bool assume_tracked) {
  348. const bool do_validate = !assume_tracked;
  349. return Operate(column_family, key, do_validate, assume_tracked,
  350. [column_family, &key, this]() {
  351. Status s = GetBatchForWrite()->Delete(column_family, key);
  352. if (s.ok()) {
  353. ++num_deletes_;
  354. }
  355. return s;
  356. });
  357. }
  358. Status WriteCommittedTxn::DeleteUntracked(ColumnFamilyHandle* column_family,
  359. const Slice& key) {
  360. return Operate(column_family, key, /*do_validate=*/false,
  361. /*assume_tracked=*/false, [column_family, &key, this]() {
  362. Status s = GetBatchForWrite()->Delete(column_family, key);
  363. if (s.ok()) {
  364. ++num_deletes_;
  365. }
  366. return s;
  367. });
  368. }
  369. Status WriteCommittedTxn::DeleteUntracked(ColumnFamilyHandle* column_family,
  370. const SliceParts& key) {
  371. return Operate(column_family, key, /*do_validate=*/false,
  372. /*assume_tracked=*/false, [column_family, &key, this]() {
  373. Status s = GetBatchForWrite()->Delete(column_family, key);
  374. if (s.ok()) {
  375. ++num_deletes_;
  376. }
  377. return s;
  378. });
  379. }
  380. Status WriteCommittedTxn::SingleDelete(ColumnFamilyHandle* column_family,
  381. const Slice& key,
  382. const bool assume_tracked) {
  383. const bool do_validate = !assume_tracked;
  384. return Operate(column_family, key, do_validate, assume_tracked,
  385. [column_family, &key, this]() {
  386. Status s =
  387. GetBatchForWrite()->SingleDelete(column_family, key);
  388. if (s.ok()) {
  389. ++num_deletes_;
  390. }
  391. return s;
  392. });
  393. }
  394. Status WriteCommittedTxn::SingleDelete(ColumnFamilyHandle* column_family,
  395. const SliceParts& key,
  396. const bool assume_tracked) {
  397. const bool do_validate = !assume_tracked;
  398. return Operate(column_family, key, do_validate, assume_tracked,
  399. [column_family, &key, this]() {
  400. Status s =
  401. GetBatchForWrite()->SingleDelete(column_family, key);
  402. if (s.ok()) {
  403. ++num_deletes_;
  404. }
  405. return s;
  406. });
  407. }
  408. Status WriteCommittedTxn::SingleDeleteUntracked(
  409. ColumnFamilyHandle* column_family, const Slice& key) {
  410. return Operate(column_family, key, /*do_validate=*/false,
  411. /*assume_tracked=*/false, [column_family, &key, this]() {
  412. Status s =
  413. GetBatchForWrite()->SingleDelete(column_family, key);
  414. if (s.ok()) {
  415. ++num_deletes_;
  416. }
  417. return s;
  418. });
  419. }
  420. Status WriteCommittedTxn::Merge(ColumnFamilyHandle* column_family,
  421. const Slice& key, const Slice& value,
  422. const bool assume_tracked) {
  423. const bool do_validate = !assume_tracked;
  424. return Operate(column_family, key, do_validate, assume_tracked,
  425. [column_family, &key, &value, this]() {
  426. Status s =
  427. GetBatchForWrite()->Merge(column_family, key, value);
  428. if (s.ok()) {
  429. ++num_merges_;
  430. }
  431. return s;
  432. });
  433. }
  434. template <typename TKey, typename TOperation>
  435. Status WriteCommittedTxn::Operate(ColumnFamilyHandle* column_family,
  436. const TKey& key, const bool do_validate,
  437. const bool assume_tracked,
  438. TOperation&& operation) {
  439. Status s;
  440. if constexpr (std::is_same_v<Slice, TKey>) {
  441. s = TryLock(column_family, key, /*read_only=*/false, /*exclusive=*/true,
  442. do_validate, assume_tracked);
  443. } else if constexpr (std::is_same_v<SliceParts, TKey>) {
  444. std::string key_buf;
  445. Slice contiguous_key(key, &key_buf);
  446. s = TryLock(column_family, contiguous_key, /*read_only=*/false,
  447. /*exclusive=*/true, do_validate, assume_tracked);
  448. }
  449. if (!s.ok()) {
  450. return s;
  451. }
  452. column_family =
  453. column_family ? column_family : db_impl_->DefaultColumnFamily();
  454. assert(column_family);
  455. const Comparator* const ucmp = column_family->GetComparator();
  456. assert(ucmp);
  457. size_t ts_sz = ucmp->timestamp_size();
  458. if (ts_sz > 0) {
  459. assert(ts_sz == sizeof(TxnTimestamp));
  460. if (!IndexingEnabled()) {
  461. cfs_with_ts_tracked_when_indexing_disabled_.insert(
  462. column_family->GetID());
  463. }
  464. }
  465. return operation();
  466. }
  467. Status WriteCommittedTxn::SetReadTimestampForValidation(TxnTimestamp ts) {
  468. if (read_timestamp_ < kMaxTxnTimestamp && ts < read_timestamp_) {
  469. return Status::InvalidArgument(
  470. "Cannot decrease read timestamp for validation");
  471. }
  472. read_timestamp_ = ts;
  473. return Status::OK();
  474. }
  475. Status WriteCommittedTxn::SetCommitTimestamp(TxnTimestamp ts) {
  476. if (txn_db_impl_->GetTxnDBOptions().enable_udt_validation &&
  477. read_timestamp_ < kMaxTxnTimestamp && ts <= read_timestamp_) {
  478. return Status::InvalidArgument(
  479. "Cannot commit at timestamp smaller than or equal to read timestamp");
  480. }
  481. commit_timestamp_ = ts;
  482. return Status::OK();
  483. }
  484. Status PessimisticTransaction::CommitBatch(WriteBatch* batch) {
  485. if (batch && WriteBatchInternal::HasKeyWithTimestamp(*batch)) {
  486. // CommitBatch() needs to lock the keys in the batch.
  487. // However, the application also needs to specify the timestamp for the
  488. // keys in batch before calling this API.
  489. // This means timestamp order may violate the order of locking, thus
  490. // violate the sequence number order for the same user key.
  491. // Therefore, we disallow this operation for now.
  492. return Status::NotSupported(
  493. "Batch to commit includes timestamp assigned before locking");
  494. }
  495. std::unique_ptr<LockTracker> keys_to_unlock(lock_tracker_factory_.Create());
  496. Status s = LockBatch(batch, keys_to_unlock.get());
  497. if (!s.ok()) {
  498. return s;
  499. }
  500. bool can_commit = false;
  501. if (IsExpired()) {
  502. s = Status::Expired();
  503. } else if (expiration_time_ > 0) {
  504. TransactionState expected = STARTED;
  505. can_commit = std::atomic_compare_exchange_strong(&txn_state_, &expected,
  506. AWAITING_COMMIT);
  507. } else if (txn_state_ == STARTED) {
  508. // lock stealing is not a concern
  509. can_commit = true;
  510. }
  511. if (can_commit) {
  512. txn_state_.store(AWAITING_COMMIT);
  513. s = CommitBatchInternal(batch);
  514. if (s.ok()) {
  515. txn_state_.store(COMMITTED);
  516. }
  517. } else if (txn_state_ == LOCKS_STOLEN) {
  518. s = Status::Expired();
  519. } else {
  520. s = Status::InvalidArgument("Transaction is not in state for commit.");
  521. }
  522. txn_db_impl_->UnLock(this, *keys_to_unlock);
  523. return s;
  524. }
  525. Status PessimisticTransaction::Prepare() {
  526. if (name_.empty()) {
  527. return Status::InvalidArgument(
  528. "Cannot prepare a transaction that has not been named.");
  529. }
  530. if (IsExpired()) {
  531. return Status::Expired();
  532. }
  533. Status s;
  534. bool can_prepare = false;
  535. if (expiration_time_ > 0) {
  536. // must concern ourselves with expiraton and/or lock stealing
  537. // need to compare/exchange bc locks could be stolen under us here
  538. TransactionState expected = STARTED;
  539. can_prepare = std::atomic_compare_exchange_strong(&txn_state_, &expected,
  540. AWAITING_PREPARE);
  541. } else if (txn_state_ == STARTED) {
  542. // expiration and lock stealing is not possible
  543. txn_state_.store(AWAITING_PREPARE);
  544. can_prepare = true;
  545. }
  546. if (can_prepare) {
  547. // transaction can't expire after preparation
  548. expiration_time_ = 0;
  549. assert(log_number_ == 0 ||
  550. txn_db_impl_->GetTxnDBOptions().write_policy == WRITE_UNPREPARED);
  551. s = PrepareInternal();
  552. if (s.ok()) {
  553. txn_state_.store(PREPARED);
  554. }
  555. } else if (txn_state_ == LOCKS_STOLEN) {
  556. s = Status::Expired();
  557. } else if (txn_state_ == PREPARED) {
  558. s = Status::InvalidArgument("Transaction has already been prepared.");
  559. } else if (txn_state_ == COMMITTED) {
  560. s = Status::InvalidArgument("Transaction has already been committed.");
  561. } else if (txn_state_ == ROLLEDBACK) {
  562. s = Status::InvalidArgument("Transaction has already been rolledback.");
  563. } else {
  564. s = Status::InvalidArgument("Transaction is not in state for commit.");
  565. }
  566. return s;
  567. }
  568. Status WriteCommittedTxn::PrepareInternal() {
  569. WriteOptions write_options = write_options_;
  570. write_options.disableWAL = false;
  571. auto s = WriteBatchInternal::MarkEndPrepare(GetWriteBatch()->GetWriteBatch(),
  572. name_);
  573. assert(s.ok());
  574. class MarkLogCallback : public PreReleaseCallback {
  575. public:
  576. MarkLogCallback(DBImpl* db, bool two_write_queues)
  577. : db_(db), two_write_queues_(two_write_queues) {
  578. (void)two_write_queues_; // to silence unused private field warning
  579. }
  580. Status Callback(SequenceNumber, bool is_mem_disabled, uint64_t log_number,
  581. size_t /*index*/, size_t /*total*/) override {
  582. #ifdef NDEBUG
  583. (void)is_mem_disabled;
  584. #endif
  585. assert(log_number != 0);
  586. assert(!two_write_queues_ || is_mem_disabled); // implies the 2nd queue
  587. db_->logs_with_prep_tracker()->MarkLogAsContainingPrepSection(log_number);
  588. return Status::OK();
  589. }
  590. private:
  591. DBImpl* db_;
  592. bool two_write_queues_;
  593. } mark_log_callback(db_impl_,
  594. db_impl_->immutable_db_options().two_write_queues);
  595. WriteCallback* const kNoWriteCallback = nullptr;
  596. const uint64_t kRefNoLog = 0;
  597. const bool kDisableMemtable = true;
  598. SequenceNumber* const KIgnoreSeqUsed = nullptr;
  599. const size_t kNoBatchCount = 0;
  600. s = db_impl_->WriteImpl(write_options, GetWriteBatch()->GetWriteBatch(),
  601. kNoWriteCallback, /*user_write_cb=*/nullptr,
  602. &log_number_, kRefNoLog, kDisableMemtable,
  603. KIgnoreSeqUsed, kNoBatchCount, &mark_log_callback);
  604. return s;
  605. }
  606. Status PessimisticTransaction::Commit() {
  607. bool commit_without_prepare = false;
  608. bool commit_prepared = false;
  609. if (IsExpired()) {
  610. return Status::Expired();
  611. }
  612. if (expiration_time_ > 0) {
  613. // we must atomicaly compare and exchange the state here because at
  614. // this state in the transaction it is possible for another thread
  615. // to change our state out from under us in the even that we expire and have
  616. // our locks stolen. In this case the only valid state is STARTED because
  617. // a state of PREPARED would have a cleared expiration_time_.
  618. TransactionState expected = STARTED;
  619. commit_without_prepare = std::atomic_compare_exchange_strong(
  620. &txn_state_, &expected, AWAITING_COMMIT);
  621. TEST_SYNC_POINT("TransactionTest::ExpirableTransactionDataRace:1");
  622. } else if (txn_state_ == PREPARED) {
  623. // expiration and lock stealing is not a concern
  624. commit_prepared = true;
  625. } else if (txn_state_ == STARTED) {
  626. // expiration and lock stealing is not a concern
  627. if (skip_prepare_) {
  628. commit_without_prepare = true;
  629. } else {
  630. return Status::TxnNotPrepared();
  631. }
  632. }
  633. Status s;
  634. if (commit_without_prepare) {
  635. assert(!commit_prepared);
  636. if (WriteBatchInternal::Count(GetCommitTimeWriteBatch()) > 0) {
  637. s = Status::InvalidArgument(
  638. "Commit-time batch contains values that will not be committed.");
  639. } else {
  640. txn_state_.store(AWAITING_COMMIT);
  641. if (log_number_ > 0) {
  642. dbimpl_->logs_with_prep_tracker()->MarkLogAsHavingPrepSectionFlushed(
  643. log_number_);
  644. }
  645. s = CommitWithoutPrepareInternal();
  646. if (!name_.empty()) {
  647. txn_db_impl_->UnregisterTransaction(this);
  648. }
  649. Clear();
  650. if (s.ok()) {
  651. txn_state_.store(COMMITTED);
  652. }
  653. }
  654. } else if (commit_prepared) {
  655. txn_state_.store(AWAITING_COMMIT);
  656. s = CommitInternal();
  657. if (!s.ok()) {
  658. ROCKS_LOG_WARN(db_impl_->immutable_db_options().info_log,
  659. "Commit write failed");
  660. return s;
  661. }
  662. // FindObsoleteFiles must now look to the memtables
  663. // to determine what prep logs must be kept around,
  664. // not the prep section heap.
  665. assert(log_number_ > 0);
  666. dbimpl_->logs_with_prep_tracker()->MarkLogAsHavingPrepSectionFlushed(
  667. log_number_);
  668. txn_db_impl_->UnregisterTransaction(this);
  669. Clear();
  670. txn_state_.store(COMMITTED);
  671. } else if (txn_state_ == LOCKS_STOLEN) {
  672. s = Status::Expired();
  673. } else if (txn_state_ == COMMITTED) {
  674. s = Status::InvalidArgument("Transaction has already been committed.");
  675. } else if (txn_state_ == ROLLEDBACK) {
  676. s = Status::InvalidArgument("Transaction has already been rolledback.");
  677. } else {
  678. s = Status::InvalidArgument("Transaction is not in state for commit.");
  679. }
  680. return s;
  681. }
  682. Status WriteCommittedTxn::CommitWithoutPrepareInternal() {
  683. WriteBatchWithIndex* wbwi = GetWriteBatch();
  684. assert(wbwi);
  685. WriteBatch* wb = wbwi->GetWriteBatch();
  686. assert(wb);
  687. const bool needs_ts = WriteBatchInternal::HasKeyWithTimestamp(*wb);
  688. if (needs_ts && commit_timestamp_ == kMaxTxnTimestamp) {
  689. return Status::InvalidArgument("Must assign a commit timestamp");
  690. }
  691. if (needs_ts) {
  692. assert(commit_timestamp_ != kMaxTxnTimestamp);
  693. char commit_ts_buf[sizeof(kMaxTxnTimestamp)];
  694. EncodeFixed64(commit_ts_buf, commit_timestamp_);
  695. Slice commit_ts(commit_ts_buf, sizeof(commit_ts_buf));
  696. Status s = wb->UpdateTimestamps(
  697. commit_ts, [wb, wbwi, this](uint32_t cf) -> size_t {
  698. // First search through timestamp info kept inside the WriteBatch
  699. // in case some writes bypassed the Transaction's write APIs.
  700. auto cf_id_to_ts_sz = wb->GetColumnFamilyToTimestampSize();
  701. auto iter = cf_id_to_ts_sz.find(cf);
  702. if (iter != cf_id_to_ts_sz.end()) {
  703. size_t ts_sz = iter->second;
  704. return ts_sz;
  705. }
  706. auto cf_iter = cfs_with_ts_tracked_when_indexing_disabled_.find(cf);
  707. if (cf_iter != cfs_with_ts_tracked_when_indexing_disabled_.end()) {
  708. return sizeof(kMaxTxnTimestamp);
  709. }
  710. const Comparator* ucmp =
  711. WriteBatchWithIndexInternal::GetUserComparator(*wbwi, cf);
  712. return ucmp ? ucmp->timestamp_size()
  713. : std::numeric_limits<size_t>::max();
  714. });
  715. if (!s.ok()) {
  716. return s;
  717. }
  718. }
  719. uint64_t seq_used = kMaxSequenceNumber;
  720. SnapshotCreationCallback snapshot_creation_cb(db_impl_, commit_timestamp_,
  721. snapshot_notifier_, snapshot_);
  722. PostMemTableCallback* post_mem_cb = nullptr;
  723. if (snapshot_needed_) {
  724. if (commit_timestamp_ == kMaxTxnTimestamp) {
  725. return Status::InvalidArgument("Must set transaction commit timestamp");
  726. } else {
  727. post_mem_cb = &snapshot_creation_cb;
  728. }
  729. }
  730. auto s = db_impl_->WriteImpl(
  731. write_options_, wb,
  732. /*callback*/ nullptr, /*user_write_cb=*/nullptr, /*wal_used*/ nullptr,
  733. /*log_ref*/ 0, /*disable_memtable*/ false, &seq_used, /*batch_cnt=*/0,
  734. /*pre_release_callback=*/nullptr, post_mem_cb);
  735. assert(!s.ok() || seq_used != kMaxSequenceNumber);
  736. if (s.ok()) {
  737. SetId(seq_used);
  738. }
  739. return s;
  740. }
  741. Status WriteCommittedTxn::CommitBatchInternal(WriteBatch* batch, size_t) {
  742. uint64_t seq_used = kMaxSequenceNumber;
  743. auto s = db_impl_->WriteImpl(write_options_, batch, /*callback*/ nullptr,
  744. /*user_write_cb=*/nullptr,
  745. /*wal_used*/ nullptr, /*log_ref*/ 0,
  746. /*disable_memtable*/ false, &seq_used);
  747. assert(!s.ok() || seq_used != kMaxSequenceNumber);
  748. if (s.ok()) {
  749. SetId(seq_used);
  750. }
  751. return s;
  752. }
  753. Status WriteCommittedTxn::CommitInternal() {
  754. WriteBatchWithIndex* wbwi = GetWriteBatch();
  755. assert(wbwi);
  756. WriteBatch* wb = wbwi->GetWriteBatch();
  757. assert(wb);
  758. const bool needs_ts = WriteBatchInternal::HasKeyWithTimestamp(*wb);
  759. if (needs_ts && commit_timestamp_ == kMaxTxnTimestamp) {
  760. return Status::InvalidArgument("Must assign a commit timestamp");
  761. }
  762. // We take the commit-time batch and append the Commit marker.
  763. // The Memtable will ignore the Commit marker in non-recovery mode
  764. WriteBatch* working_batch = GetCommitTimeWriteBatch();
  765. Status s;
  766. if (!needs_ts) {
  767. s = WriteBatchInternal::MarkCommit(working_batch, name_);
  768. } else {
  769. assert(!commit_bypass_memtable_threshold_);
  770. assert(!commit_bypass_memtable_byte_threshold_);
  771. assert(commit_timestamp_ != kMaxTxnTimestamp);
  772. char commit_ts_buf[sizeof(kMaxTxnTimestamp)];
  773. EncodeFixed64(commit_ts_buf, commit_timestamp_);
  774. Slice commit_ts(commit_ts_buf, sizeof(commit_ts_buf));
  775. s = WriteBatchInternal::MarkCommitWithTimestamp(working_batch, name_,
  776. commit_ts);
  777. if (s.ok()) {
  778. s = wb->UpdateTimestamps(
  779. commit_ts, [wb, wbwi, this](uint32_t cf) -> size_t {
  780. // first search through timestamp info kept inside the WriteBatch
  781. // in case some writes bypassed the Transaction's write APIs.
  782. auto cf_id_to_ts_sz = wb->GetColumnFamilyToTimestampSize();
  783. auto iter = cf_id_to_ts_sz.find(cf);
  784. if (iter != cf_id_to_ts_sz.end()) {
  785. return iter->second;
  786. }
  787. if (cfs_with_ts_tracked_when_indexing_disabled_.find(cf) !=
  788. cfs_with_ts_tracked_when_indexing_disabled_.end()) {
  789. return sizeof(kMaxTxnTimestamp);
  790. }
  791. const Comparator* ucmp =
  792. WriteBatchWithIndexInternal::GetUserComparator(*wbwi, cf);
  793. return ucmp ? ucmp->timestamp_size()
  794. : std::numeric_limits<size_t>::max();
  795. });
  796. }
  797. }
  798. if (!s.ok()) {
  799. return s;
  800. }
  801. // any operations appended to this working_batch will be ignored from WAL
  802. working_batch->MarkWalTerminationPoint();
  803. uint32_t wb_count = wb->Count();
  804. RecordInHistogram(db_impl_->immutable_db_options_.stats,
  805. NUM_OP_PER_TRANSACTION, wb_count);
  806. bool bypass_memtable = false;
  807. if (!needs_ts) {
  808. if (commit_bypass_memtable_threshold_ &&
  809. wb_count >= commit_bypass_memtable_threshold_) {
  810. if (wbwi->GetWBWIOpCount() != wb_count) {
  811. ROCKS_LOG_WARN(
  812. db_impl_->immutable_db_options().info_log,
  813. "Transaction %s qualifies for commit optimization due to update "
  814. "count. However, it will commit normally due to wbwi and wb record "
  815. "count mismatch. Some updates were added directly to the "
  816. "transaction's underlying write batch.",
  817. GetName().c_str());
  818. } else {
  819. bypass_memtable = true;
  820. }
  821. } else if (commit_bypass_memtable_byte_threshold_ &&
  822. wb->GetDataSize() >= commit_bypass_memtable_byte_threshold_) {
  823. if (wbwi->GetWBWIOpCount() != wb_count) {
  824. ROCKS_LOG_WARN(
  825. db_impl_->immutable_db_options().info_log,
  826. "Transaction %s qualifies for commit optimization due to write "
  827. "batch size. However, it will commit normally due to wbwi and wb "
  828. "record count mismatch. Some updates were added directly to the "
  829. "transaction's underlying write batch.",
  830. GetName().c_str());
  831. } else {
  832. bypass_memtable = true;
  833. }
  834. }
  835. }
  836. if (!bypass_memtable) {
  837. // insert prepared batch into Memtable only skipping WAL.
  838. // Memtable will ignore BeginPrepare/EndPrepare markers
  839. // in non recovery mode and simply insert the values
  840. s = WriteBatchInternal::Append(working_batch, wb);
  841. assert(s.ok());
  842. }
  843. uint64_t seq_used = kMaxSequenceNumber;
  844. SnapshotCreationCallback snapshot_creation_cb(db_impl_, commit_timestamp_,
  845. snapshot_notifier_, snapshot_);
  846. PostMemTableCallback* post_mem_cb = nullptr;
  847. if (snapshot_needed_) {
  848. if (commit_timestamp_ == kMaxTxnTimestamp) {
  849. s = Status::InvalidArgument("Must set transaction commit timestamp");
  850. return s;
  851. } else {
  852. post_mem_cb = &snapshot_creation_cb;
  853. }
  854. }
  855. assert(log_number_ > 0);
  856. TEST_SYNC_POINT_CALLBACK("WriteCommittedTxn::CommitInternal:bypass_memtable",
  857. static_cast<void*>(&bypass_memtable));
  858. if (bypass_memtable) {
  859. // Used for differentiating commiting WBWI vs directly ingesting WBWI
  860. // see (IngestWriteBatchWithIndex())
  861. assert(working_batch->HasCommit());
  862. s = db_impl_->WriteImpl(
  863. write_options_, working_batch, /*callback*/ nullptr,
  864. /*user_write_cb=*/nullptr,
  865. /*wal_used*/ nullptr, /*log_ref*/ log_number_,
  866. /*disable_memtable*/ false, &seq_used,
  867. /*batch_cnt=*/0, /*pre_release_callback=*/nullptr, post_mem_cb,
  868. /*wbwi=*/
  869. std::make_shared<WriteBatchWithIndex>(std::move(write_batch_)));
  870. // Reset write_batch_ since it's accessed in transaction clean up and
  871. // might be used for transaction reuse.
  872. write_batch_ = WriteBatchWithIndex(cmp_, 0, true, 0,
  873. write_options_.protection_bytes_per_key);
  874. } else {
  875. s = db_impl_->WriteImpl(write_options_, working_batch, /*callback*/ nullptr,
  876. /*user_write_cb=*/nullptr,
  877. /*wal_used*/ nullptr, /*log_ref*/ log_number_,
  878. /*disable_memtable*/ false, &seq_used,
  879. /*batch_cnt=*/0, /*pre_release_callback=*/nullptr,
  880. post_mem_cb);
  881. }
  882. assert(!s.ok() || seq_used != kMaxSequenceNumber);
  883. if (s.ok()) {
  884. SetId(seq_used);
  885. }
  886. return s;
  887. }
  888. Status PessimisticTransaction::Rollback() {
  889. Status s;
  890. if (txn_state_ == PREPARED) {
  891. txn_state_.store(AWAITING_ROLLBACK);
  892. s = RollbackInternal();
  893. if (s.ok()) {
  894. // we do not need to keep our prepared section around
  895. assert(log_number_ > 0);
  896. dbimpl_->logs_with_prep_tracker()->MarkLogAsHavingPrepSectionFlushed(
  897. log_number_);
  898. Clear();
  899. txn_state_.store(ROLLEDBACK);
  900. }
  901. } else if (txn_state_ == STARTED) {
  902. if (log_number_ > 0) {
  903. assert(txn_db_impl_->GetTxnDBOptions().write_policy == WRITE_UNPREPARED);
  904. assert(GetId() > 0);
  905. s = RollbackInternal();
  906. if (s.ok()) {
  907. dbimpl_->logs_with_prep_tracker()->MarkLogAsHavingPrepSectionFlushed(
  908. log_number_);
  909. }
  910. }
  911. // prepare couldn't have taken place
  912. Clear();
  913. } else if (txn_state_ == COMMITTED) {
  914. s = Status::InvalidArgument("This transaction has already been committed.");
  915. } else {
  916. s = Status::InvalidArgument(
  917. "Two phase transaction is not in state for rollback.");
  918. }
  919. return s;
  920. }
  921. Status WriteCommittedTxn::RollbackInternal() {
  922. WriteBatch rollback_marker;
  923. auto s = WriteBatchInternal::MarkRollback(&rollback_marker, name_);
  924. assert(s.ok());
  925. s = db_impl_->WriteImpl(write_options_, &rollback_marker);
  926. return s;
  927. }
  928. Status PessimisticTransaction::RollbackToSavePoint() {
  929. if (txn_state_ != STARTED) {
  930. return Status::InvalidArgument("Transaction is beyond state for rollback.");
  931. }
  932. if (save_points_ != nullptr && !save_points_->empty()) {
  933. // Unlock any keys locked since last transaction
  934. auto& save_point_tracker = *save_points_->top().new_locks_;
  935. std::unique_ptr<LockTracker> t(
  936. tracked_locks_->GetTrackedLocksSinceSavePoint(save_point_tracker));
  937. if (t) {
  938. txn_db_impl_->UnLock(this, *t);
  939. }
  940. }
  941. return TransactionBaseImpl::RollbackToSavePoint();
  942. }
  943. // Lock all keys in this batch.
  944. // On success, caller should unlock keys_to_unlock
  945. Status PessimisticTransaction::LockBatch(WriteBatch* batch,
  946. LockTracker* keys_to_unlock) {
  947. if (!batch) {
  948. return Status::InvalidArgument("batch is nullptr");
  949. }
  950. class Handler : public WriteBatch::Handler {
  951. public:
  952. // Sorted map of column_family_id to sorted set of keys.
  953. // Since LockBatch() always locks keys in sorted order, it cannot deadlock
  954. // with itself. We're not using a comparator here since it doesn't matter
  955. // what the sorting is as long as it's consistent.
  956. std::map<uint32_t, std::set<std::string>> keys_;
  957. Handler() = default;
  958. void RecordKey(uint32_t column_family_id, const Slice& key) {
  959. auto& cfh_keys = keys_[column_family_id];
  960. cfh_keys.insert(key.ToString());
  961. }
  962. Status PutCF(uint32_t column_family_id, const Slice& key,
  963. const Slice& /* unused */) override {
  964. RecordKey(column_family_id, key);
  965. return Status::OK();
  966. }
  967. Status PutEntityCF(uint32_t column_family_id, const Slice& key,
  968. const Slice& /* unused */) override {
  969. RecordKey(column_family_id, key);
  970. return Status::OK();
  971. }
  972. Status MergeCF(uint32_t column_family_id, const Slice& key,
  973. const Slice& /* unused */) override {
  974. RecordKey(column_family_id, key);
  975. return Status::OK();
  976. }
  977. Status DeleteCF(uint32_t column_family_id, const Slice& key) override {
  978. RecordKey(column_family_id, key);
  979. return Status::OK();
  980. }
  981. };
  982. // Iterating on this handler will add all keys in this batch into keys
  983. Handler handler;
  984. Status s = batch->Iterate(&handler);
  985. if (!s.ok()) {
  986. return s;
  987. }
  988. // Attempt to lock all keys
  989. for (const auto& cf_iter : handler.keys_) {
  990. uint32_t cfh_id = cf_iter.first;
  991. auto& cfh_keys = cf_iter.second;
  992. for (const auto& key_iter : cfh_keys) {
  993. const std::string& key = key_iter;
  994. s = txn_db_impl_->TryLock(this, cfh_id, key, true /* exclusive */);
  995. if (!s.ok()) {
  996. break;
  997. }
  998. PointLockRequest r;
  999. r.column_family_id = cfh_id;
  1000. r.key = key;
  1001. r.seq = kMaxSequenceNumber;
  1002. r.read_only = false;
  1003. r.exclusive = true;
  1004. keys_to_unlock->Track(r);
  1005. }
  1006. if (!s.ok()) {
  1007. break;
  1008. }
  1009. }
  1010. if (!s.ok()) {
  1011. txn_db_impl_->UnLock(this, *keys_to_unlock);
  1012. }
  1013. return s;
  1014. }
  1015. // Attempt to lock this key.
  1016. // Returns OK if the key has been successfully locked. Non-ok, otherwise.
  1017. // If check_shapshot is true and this transaction has a snapshot set,
  1018. // this key will only be locked if there have been no writes to this key since
  1019. // the snapshot time.
  1020. Status PessimisticTransaction::TryLock(ColumnFamilyHandle* column_family,
  1021. const Slice& key, bool read_only,
  1022. bool exclusive, const bool do_validate,
  1023. const bool assume_tracked) {
  1024. assert(!assume_tracked || !do_validate);
  1025. Status s;
  1026. if (UNLIKELY(skip_concurrency_control_)) {
  1027. return s;
  1028. }
  1029. uint32_t cfh_id = GetColumnFamilyID(column_family);
  1030. std::string key_str = key.ToString();
  1031. PointLockStatus status;
  1032. bool lock_upgrade;
  1033. bool previously_locked;
  1034. if (tracked_locks_->IsPointLockSupported()) {
  1035. status = tracked_locks_->GetPointLockStatus(cfh_id, key_str);
  1036. previously_locked = status.locked;
  1037. lock_upgrade = previously_locked && exclusive && !status.exclusive;
  1038. } else {
  1039. // If the record is tracked, we can assume it was locked, too.
  1040. previously_locked = assume_tracked;
  1041. status.locked = false;
  1042. lock_upgrade = false;
  1043. }
  1044. // Lock this key if this transactions hasn't already locked it or we require
  1045. // an upgrade.
  1046. if (!previously_locked || lock_upgrade) {
  1047. s = txn_db_impl_->TryLock(this, cfh_id, key_str, exclusive);
  1048. }
  1049. const ColumnFamilyHandle* const cfh =
  1050. column_family ? column_family : db_impl_->DefaultColumnFamily();
  1051. assert(cfh);
  1052. const Comparator* const ucmp = cfh->GetComparator();
  1053. assert(ucmp);
  1054. size_t ts_sz = ucmp->timestamp_size();
  1055. SetSnapshotIfNeeded();
  1056. // Even though we do not care about doing conflict checking for this write,
  1057. // we still need to take a lock to make sure we do not cause a conflict with
  1058. // some other write. However, we do not need to check if there have been
  1059. // any writes since this transaction's snapshot.
  1060. // TODO(agiardullo): could optimize by supporting shared txn locks in the
  1061. // future.
  1062. SequenceNumber tracked_at_seq =
  1063. status.locked ? status.seq : kMaxSequenceNumber;
  1064. if (!do_validate || (snapshot_ == nullptr &&
  1065. (0 == ts_sz || kMaxTxnTimestamp == read_timestamp_))) {
  1066. if (assume_tracked && !previously_locked &&
  1067. tracked_locks_->IsPointLockSupported()) {
  1068. s = Status::InvalidArgument(
  1069. "assume_tracked is set but it is not tracked yet");
  1070. }
  1071. // Need to remember the earliest sequence number that we know that this
  1072. // key has not been modified after. This is useful if this same
  1073. // transaction later tries to lock this key again.
  1074. if (tracked_at_seq == kMaxSequenceNumber) {
  1075. // Since we haven't checked a snapshot, we only know this key has not
  1076. // been modified since after we locked it.
  1077. // Note: when last_seq_same_as_publish_seq_==false this is less than the
  1078. // latest allocated seq but it is ok since i) this is just a heuristic
  1079. // used only as a hint to avoid actual check for conflicts, ii) this would
  1080. // cause a false positive only if the snapthot is taken right after the
  1081. // lock, which would be an unusual sequence.
  1082. tracked_at_seq = db_->GetLatestSequenceNumber();
  1083. }
  1084. } else if (s.ok()) {
  1085. // If a snapshot is set, we need to make sure the key hasn't been modified
  1086. // since the snapshot. This must be done after we locked the key.
  1087. // If we already have validated an earilier snapshot it must has been
  1088. // reflected in tracked_at_seq and ValidateSnapshot will return OK.
  1089. s = ValidateSnapshot(column_family, key, &tracked_at_seq);
  1090. if (!s.ok()) {
  1091. // Failed to validate key
  1092. // Unlock key we just locked
  1093. if (lock_upgrade) {
  1094. s = txn_db_impl_->TryLock(this, cfh_id, key_str, false /* exclusive */);
  1095. assert(s.ok());
  1096. } else if (!previously_locked) {
  1097. txn_db_impl_->UnLock(this, cfh_id, key.ToString());
  1098. }
  1099. }
  1100. }
  1101. if (s.ok()) {
  1102. // We must track all the locked keys so that we can unlock them later. If
  1103. // the key is already locked, this func will update some stats on the
  1104. // tracked key. It could also update the tracked_at_seq if it is lower
  1105. // than the existing tracked key seq. These stats are necessary for
  1106. // RollbackToSavePoint to determine whether a key can be safely removed
  1107. // from tracked_keys_. Removal can only be done if a key was only locked
  1108. // during the current savepoint.
  1109. //
  1110. // Recall that if assume_tracked is true, we assume that TrackKey has been
  1111. // called previously since the last savepoint, with the same exclusive
  1112. // setting, and at a lower sequence number, so skipping here should be
  1113. // safe.
  1114. if (!assume_tracked) {
  1115. TrackKey(cfh_id, key_str, tracked_at_seq, read_only, exclusive);
  1116. } else {
  1117. #ifndef NDEBUG
  1118. if (tracked_locks_->IsPointLockSupported()) {
  1119. PointLockStatus lock_status =
  1120. tracked_locks_->GetPointLockStatus(cfh_id, key_str);
  1121. assert(lock_status.locked);
  1122. assert(lock_status.seq <= tracked_at_seq);
  1123. assert(lock_status.exclusive == exclusive);
  1124. }
  1125. #endif
  1126. }
  1127. }
  1128. return s;
  1129. }
  1130. Status PessimisticTransaction::GetRangeLock(ColumnFamilyHandle* column_family,
  1131. const Endpoint& start_endp,
  1132. const Endpoint& end_endp) {
  1133. ColumnFamilyHandle* cfh =
  1134. column_family ? column_family : db_impl_->DefaultColumnFamily();
  1135. uint32_t cfh_id = GetColumnFamilyID(cfh);
  1136. Status s = txn_db_impl_->TryRangeLock(this, cfh_id, start_endp, end_endp);
  1137. if (s.ok()) {
  1138. RangeLockRequest req{cfh_id, start_endp, end_endp};
  1139. tracked_locks_->Track(req);
  1140. }
  1141. return s;
  1142. }
  1143. // Return OK() if this key has not been modified more recently than the
  1144. // transaction snapshot_.
  1145. // tracked_at_seq is the global seq at which we either locked the key or already
  1146. // have done ValidateSnapshot.
  1147. Status PessimisticTransaction::ValidateSnapshot(
  1148. ColumnFamilyHandle* column_family, const Slice& key,
  1149. SequenceNumber* tracked_at_seq) {
  1150. assert(snapshot_ || read_timestamp_ < kMaxTxnTimestamp);
  1151. SequenceNumber snap_seq = 0;
  1152. if (snapshot_) {
  1153. snap_seq = snapshot_->GetSequenceNumber();
  1154. if (*tracked_at_seq <= snap_seq) {
  1155. // If the key has been previous validated (or locked) at a sequence number
  1156. // earlier than the current snapshot's sequence number, we already know it
  1157. // has not been modified aftter snap_seq either.
  1158. return Status::OK();
  1159. }
  1160. } else {
  1161. snap_seq = db_impl_->GetLatestSequenceNumber();
  1162. }
  1163. // Otherwise we have either
  1164. // 1: tracked_at_seq == kMaxSequenceNumber, i.e., first time tracking the key
  1165. // 2: snap_seq < tracked_at_seq: last time we lock the key was via
  1166. // do_validate=false which means we had skipped ValidateSnapshot. In both
  1167. // cases we should do ValidateSnapshot now.
  1168. *tracked_at_seq = snap_seq;
  1169. ColumnFamilyHandle* cfh =
  1170. column_family ? column_family : db_impl_->DefaultColumnFamily();
  1171. assert(cfh);
  1172. const Comparator* const ucmp = cfh->GetComparator();
  1173. assert(ucmp);
  1174. size_t ts_sz = ucmp->timestamp_size();
  1175. std::string ts_buf;
  1176. if (ts_sz > 0 && read_timestamp_ < kMaxTxnTimestamp) {
  1177. assert(ts_sz == sizeof(read_timestamp_));
  1178. PutFixed64(&ts_buf, read_timestamp_);
  1179. }
  1180. return TransactionUtil::CheckKeyForConflicts(
  1181. db_impl_, cfh, key.ToString(), snap_seq, ts_sz == 0 ? nullptr : &ts_buf,
  1182. false /* cache_only */,
  1183. /* snap_checker */ nullptr,
  1184. /* min_uncommitted */ kMaxSequenceNumber,
  1185. txn_db_impl_->GetTxnDBOptions().enable_udt_validation);
  1186. }
  1187. bool PessimisticTransaction::TryStealingLocks() {
  1188. assert(IsExpired());
  1189. TransactionState expected = STARTED;
  1190. return std::atomic_compare_exchange_strong(&txn_state_, &expected,
  1191. LOCKS_STOLEN);
  1192. }
  1193. void PessimisticTransaction::UnlockGetForUpdate(
  1194. ColumnFamilyHandle* column_family, const Slice& key) {
  1195. txn_db_impl_->UnLock(this, GetColumnFamilyID(column_family), key.ToString());
  1196. }
  1197. Status PessimisticTransaction::SetName(const TransactionName& name) {
  1198. Status s;
  1199. if (txn_state_ == STARTED) {
  1200. if (name_.length()) {
  1201. s = Status::InvalidArgument("Transaction has already been named.");
  1202. } else if (name.length() < 1 || name.length() > 512) {
  1203. s = Status::InvalidArgument(
  1204. "Transaction name length must be between 1 and 512 chars.");
  1205. } else {
  1206. name_ = name;
  1207. s = txn_db_impl_->RegisterTransaction(this);
  1208. if (!s.ok()) {
  1209. name_.clear();
  1210. }
  1211. }
  1212. } else {
  1213. s = Status::InvalidArgument("Transaction is beyond state for naming.");
  1214. }
  1215. return s;
  1216. }
  1217. Status PessimisticTransaction::CollapseKey(const ReadOptions& options,
  1218. const Slice& key,
  1219. ColumnFamilyHandle* column_family) {
  1220. auto* cfh = column_family ? column_family : db_impl_->DefaultColumnFamily();
  1221. std::string value;
  1222. const auto status = GetForUpdate(options, cfh, key, &value, true, true);
  1223. if (!status.ok()) {
  1224. return status;
  1225. }
  1226. return Put(column_family, key, value);
  1227. }
  1228. } // namespace ROCKSDB_NAMESPACE