transaction_base.cc 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #include "utilities/transactions/transaction_base.h"
  6. #include <cinttypes>
  7. #include "db/attribute_group_iterator_impl.h"
  8. #include "db/coalescing_iterator.h"
  9. #include "db/column_family.h"
  10. #include "db/db_impl/db_impl.h"
  11. #include "logging/logging.h"
  12. #include "rocksdb/comparator.h"
  13. #include "rocksdb/db.h"
  14. #include "rocksdb/status.h"
  15. #include "util/cast_util.h"
  16. #include "util/string_util.h"
  17. #include "utilities/transactions/lock/lock_tracker.h"
  18. namespace ROCKSDB_NAMESPACE {
  19. Status Transaction::CommitAndTryCreateSnapshot(
  20. std::shared_ptr<TransactionNotifier> notifier, TxnTimestamp ts,
  21. std::shared_ptr<const Snapshot>* snapshot) {
  22. if (snapshot) {
  23. snapshot->reset();
  24. }
  25. TxnTimestamp commit_ts = GetCommitTimestamp();
  26. if (commit_ts == kMaxTxnTimestamp) {
  27. if (ts == kMaxTxnTimestamp) {
  28. return Status::InvalidArgument("Commit timestamp unset");
  29. } else {
  30. const Status s = SetCommitTimestamp(ts);
  31. if (!s.ok()) {
  32. return s;
  33. }
  34. }
  35. } else if (ts != kMaxTxnTimestamp) {
  36. if (ts != commit_ts) {
  37. // For now we treat this as error.
  38. return Status::InvalidArgument("Different commit ts specified");
  39. }
  40. }
  41. SetSnapshotOnNextOperation(notifier);
  42. Status s = Commit();
  43. if (!s.ok()) {
  44. return s;
  45. }
  46. assert(s.ok());
  47. // If we reach here, we must return ok status for this function.
  48. std::shared_ptr<const Snapshot> new_snapshot = GetTimestampedSnapshot();
  49. if (snapshot) {
  50. *snapshot = new_snapshot;
  51. }
  52. return Status::OK();
  53. }
  54. TransactionBaseImpl::TransactionBaseImpl(
  55. DB* db, const WriteOptions& write_options,
  56. const LockTrackerFactory& lock_tracker_factory)
  57. : db_(db),
  58. dbimpl_(static_cast_with_check<DBImpl>(db)),
  59. write_options_(write_options),
  60. cmp_(GetColumnFamilyUserComparator(db->DefaultColumnFamily())),
  61. lock_tracker_factory_(lock_tracker_factory),
  62. start_time_(dbimpl_->GetSystemClock()->NowMicros()),
  63. write_batch_(cmp_, 0, true, 0, write_options.protection_bytes_per_key),
  64. tracked_locks_(lock_tracker_factory_.Create()),
  65. commit_time_batch_(0 /* reserved_bytes */, 0 /* max_bytes */,
  66. write_options.protection_bytes_per_key,
  67. 0 /* default_cf_ts_sz */),
  68. indexing_enabled_(true) {
  69. assert(dynamic_cast<DBImpl*>(db_) != nullptr);
  70. log_number_ = 0;
  71. if (dbimpl_->allow_2pc()) {
  72. InitWriteBatch();
  73. }
  74. }
  75. TransactionBaseImpl::~TransactionBaseImpl() {
  76. // Release snapshot if snapshot is set
  77. SetSnapshotInternal(nullptr);
  78. }
  79. void TransactionBaseImpl::Clear() {
  80. save_points_.reset(nullptr);
  81. write_batch_.Clear();
  82. commit_time_batch_.Clear();
  83. tracked_locks_->Clear();
  84. num_puts_ = 0;
  85. num_put_entities_ = 0;
  86. num_deletes_ = 0;
  87. num_merges_ = 0;
  88. if (dbimpl_->allow_2pc()) {
  89. InitWriteBatch();
  90. }
  91. }
  92. void TransactionBaseImpl::Reinitialize(DB* db,
  93. const WriteOptions& write_options) {
  94. Clear();
  95. ClearSnapshot();
  96. id_ = 0;
  97. db_ = db;
  98. name_.clear();
  99. log_number_ = 0;
  100. write_options_ = write_options;
  101. start_time_ = dbimpl_->GetSystemClock()->NowMicros();
  102. indexing_enabled_ = true;
  103. cmp_ = GetColumnFamilyUserComparator(db_->DefaultColumnFamily());
  104. WriteBatchInternal::SetDefaultColumnFamilyTimestampSize(
  105. write_batch_.GetWriteBatch(), cmp_->timestamp_size());
  106. WriteBatchInternal::UpdateProtectionInfo(
  107. write_batch_.GetWriteBatch(), write_options_.protection_bytes_per_key)
  108. .PermitUncheckedError();
  109. WriteBatchInternal::UpdateProtectionInfo(
  110. &commit_time_batch_, write_options_.protection_bytes_per_key)
  111. .PermitUncheckedError();
  112. }
  113. void TransactionBaseImpl::SetSnapshot() {
  114. const Snapshot* snapshot = dbimpl_->GetSnapshotForWriteConflictBoundary();
  115. SetSnapshotInternal(snapshot);
  116. }
  117. void TransactionBaseImpl::SetSnapshotInternal(const Snapshot* snapshot) {
  118. // Set a custom deleter for the snapshot_ SharedPtr as the snapshot needs to
  119. // be released, not deleted when it is no longer referenced.
  120. snapshot_.reset(snapshot, std::bind(&TransactionBaseImpl::ReleaseSnapshot,
  121. this, std::placeholders::_1, db_));
  122. snapshot_needed_ = false;
  123. snapshot_notifier_ = nullptr;
  124. }
  125. void TransactionBaseImpl::SetSnapshotOnNextOperation(
  126. std::shared_ptr<TransactionNotifier> notifier) {
  127. snapshot_needed_ = true;
  128. snapshot_notifier_ = notifier;
  129. }
  130. void TransactionBaseImpl::SetSnapshotIfNeeded() {
  131. if (snapshot_needed_) {
  132. std::shared_ptr<TransactionNotifier> notifier = snapshot_notifier_;
  133. SetSnapshot();
  134. if (notifier != nullptr) {
  135. notifier->SnapshotCreated(GetSnapshot());
  136. }
  137. }
  138. }
  139. Status TransactionBaseImpl::TryLock(ColumnFamilyHandle* column_family,
  140. const SliceParts& key, bool read_only,
  141. bool exclusive, const bool do_validate,
  142. const bool assume_tracked) {
  143. size_t key_size = 0;
  144. for (int i = 0; i < key.num_parts; ++i) {
  145. key_size += key.parts[i].size();
  146. }
  147. std::string str;
  148. str.reserve(key_size);
  149. for (int i = 0; i < key.num_parts; ++i) {
  150. str.append(key.parts[i].data(), key.parts[i].size());
  151. }
  152. return TryLock(column_family, str, read_only, exclusive, do_validate,
  153. assume_tracked);
  154. }
  155. void TransactionBaseImpl::SetSavePoint() {
  156. if (save_points_ == nullptr) {
  157. save_points_.reset(
  158. new std::stack<TransactionBaseImpl::SavePoint,
  159. autovector<TransactionBaseImpl::SavePoint>>());
  160. }
  161. save_points_->emplace(snapshot_, snapshot_needed_, snapshot_notifier_,
  162. num_puts_, num_put_entities_, num_deletes_, num_merges_,
  163. lock_tracker_factory_);
  164. write_batch_.SetSavePoint();
  165. }
  166. Status TransactionBaseImpl::RollbackToSavePoint() {
  167. if (save_points_ != nullptr && save_points_->size() > 0) {
  168. // Restore saved SavePoint
  169. TransactionBaseImpl::SavePoint& save_point = save_points_->top();
  170. snapshot_ = save_point.snapshot_;
  171. snapshot_needed_ = save_point.snapshot_needed_;
  172. snapshot_notifier_ = save_point.snapshot_notifier_;
  173. num_puts_ = save_point.num_puts_;
  174. num_put_entities_ = save_point.num_put_entities_;
  175. num_deletes_ = save_point.num_deletes_;
  176. num_merges_ = save_point.num_merges_;
  177. // Rollback batch
  178. Status s = write_batch_.RollbackToSavePoint();
  179. assert(s.ok());
  180. // Rollback any keys that were tracked since the last savepoint
  181. tracked_locks_->Subtract(*save_point.new_locks_);
  182. save_points_->pop();
  183. return s;
  184. } else {
  185. assert(write_batch_.RollbackToSavePoint().IsNotFound());
  186. return Status::NotFound();
  187. }
  188. }
  189. Status TransactionBaseImpl::PopSavePoint() {
  190. if (save_points_ == nullptr || save_points_->empty()) {
  191. // No SavePoint yet.
  192. assert(write_batch_.PopSavePoint().IsNotFound());
  193. return Status::NotFound();
  194. }
  195. assert(!save_points_->empty());
  196. // If there is another savepoint A below the current savepoint B, then A needs
  197. // to inherit tracked_keys in B so that if we rollback to savepoint A, we
  198. // remember to unlock keys in B. If there is no other savepoint below, then we
  199. // can safely discard savepoint info.
  200. if (save_points_->size() == 1) {
  201. save_points_->pop();
  202. } else {
  203. TransactionBaseImpl::SavePoint top(lock_tracker_factory_);
  204. std::swap(top, save_points_->top());
  205. save_points_->pop();
  206. save_points_->top().new_locks_->Merge(*top.new_locks_);
  207. }
  208. return write_batch_.PopSavePoint();
  209. }
  210. Status TransactionBaseImpl::Get(const ReadOptions& _read_options,
  211. ColumnFamilyHandle* column_family,
  212. const Slice& key, std::string* value) {
  213. if (_read_options.io_activity != Env::IOActivity::kUnknown &&
  214. _read_options.io_activity != Env::IOActivity::kGet) {
  215. return Status::InvalidArgument(
  216. "Can only call Get with `ReadOptions::io_activity` is "
  217. "`Env::IOActivity::kUnknown` or `Env::IOActivity::kGet`");
  218. }
  219. ReadOptions read_options(_read_options);
  220. if (read_options.io_activity == Env::IOActivity::kUnknown) {
  221. read_options.io_activity = Env::IOActivity::kGet;
  222. }
  223. auto s = GetImpl(read_options, column_family, key, value);
  224. return s;
  225. }
  226. Status TransactionBaseImpl::GetImpl(const ReadOptions& read_options,
  227. ColumnFamilyHandle* column_family,
  228. const Slice& key, std::string* value) {
  229. assert(value != nullptr);
  230. PinnableSlice pinnable_val(value);
  231. assert(!pinnable_val.IsPinned());
  232. auto s = GetImpl(read_options, column_family, key, &pinnable_val);
  233. if (s.ok() && pinnable_val.IsPinned()) {
  234. value->assign(pinnable_val.data(), pinnable_val.size());
  235. } // else value is already assigned
  236. return s;
  237. }
  238. Status TransactionBaseImpl::Get(const ReadOptions& _read_options,
  239. ColumnFamilyHandle* column_family,
  240. const Slice& key, PinnableSlice* pinnable_val) {
  241. if (_read_options.io_activity != Env::IOActivity::kUnknown &&
  242. _read_options.io_activity != Env::IOActivity::kGet) {
  243. return Status::InvalidArgument(
  244. "Can only call Get with `ReadOptions::io_activity` is "
  245. "`Env::IOActivity::kUnknown` or `Env::IOActivity::kGet`");
  246. }
  247. ReadOptions read_options(_read_options);
  248. if (read_options.io_activity == Env::IOActivity::kUnknown) {
  249. read_options.io_activity = Env::IOActivity::kGet;
  250. }
  251. return GetImpl(read_options, column_family, key, pinnable_val);
  252. }
  253. Status TransactionBaseImpl::GetImpl(const ReadOptions& read_options,
  254. ColumnFamilyHandle* column_family,
  255. const Slice& key,
  256. PinnableSlice* pinnable_val) {
  257. return write_batch_.GetFromBatchAndDB(db_, read_options, column_family, key,
  258. pinnable_val);
  259. }
  260. Status TransactionBaseImpl::GetEntity(const ReadOptions& read_options,
  261. ColumnFamilyHandle* column_family,
  262. const Slice& key,
  263. PinnableWideColumns* columns) {
  264. return GetEntityImpl(read_options, column_family, key, columns);
  265. }
  266. Status TransactionBaseImpl::GetForUpdate(const ReadOptions& read_options,
  267. ColumnFamilyHandle* column_family,
  268. const Slice& key, std::string* value,
  269. bool exclusive,
  270. const bool do_validate) {
  271. if (!do_validate && read_options.snapshot != nullptr) {
  272. return Status::InvalidArgument(
  273. "If do_validate is false then GetForUpdate with snapshot is not "
  274. "defined.");
  275. }
  276. if (read_options.io_activity != Env::IOActivity::kUnknown) {
  277. return Status::InvalidArgument(
  278. "Cannot call GetForUpdate with `ReadOptions::io_activity` != "
  279. "`Env::IOActivity::kUnknown`");
  280. }
  281. Status s =
  282. TryLock(column_family, key, true /* read_only */, exclusive, do_validate);
  283. if (s.ok() && value != nullptr) {
  284. assert(value != nullptr);
  285. PinnableSlice pinnable_val(value);
  286. assert(!pinnable_val.IsPinned());
  287. s = GetImpl(read_options, column_family, key, &pinnable_val);
  288. if (s.ok() && pinnable_val.IsPinned()) {
  289. value->assign(pinnable_val.data(), pinnable_val.size());
  290. } // else value is already assigned
  291. }
  292. return s;
  293. }
  294. Status TransactionBaseImpl::GetForUpdate(const ReadOptions& read_options,
  295. ColumnFamilyHandle* column_family,
  296. const Slice& key,
  297. PinnableSlice* pinnable_val,
  298. bool exclusive,
  299. const bool do_validate) {
  300. if (!do_validate && read_options.snapshot != nullptr) {
  301. return Status::InvalidArgument(
  302. "If do_validate is false then GetForUpdate with snapshot is not "
  303. "defined.");
  304. }
  305. if (read_options.io_activity != Env::IOActivity::kUnknown) {
  306. return Status::InvalidArgument(
  307. "Cannot call GetForUpdate with `ReadOptions::io_activity` != "
  308. "`Env::IOActivity::kUnknown`");
  309. }
  310. Status s =
  311. TryLock(column_family, key, true /* read_only */, exclusive, do_validate);
  312. if (s.ok() && pinnable_val != nullptr) {
  313. s = GetImpl(read_options, column_family, key, pinnable_val);
  314. }
  315. return s;
  316. }
  317. Status TransactionBaseImpl::GetEntityForUpdate(
  318. const ReadOptions& read_options, ColumnFamilyHandle* column_family,
  319. const Slice& key, PinnableWideColumns* columns, bool exclusive,
  320. bool do_validate) {
  321. if (!do_validate && read_options.snapshot != nullptr) {
  322. return Status::InvalidArgument(
  323. "Snapshot must not be set if validation is disabled");
  324. }
  325. const Status s =
  326. TryLock(column_family, key, true /* read_only */, exclusive, do_validate);
  327. if (!s.ok()) {
  328. return s;
  329. }
  330. return GetEntityImpl(read_options, column_family, key, columns);
  331. }
  332. std::vector<Status> TransactionBaseImpl::MultiGet(
  333. const ReadOptions& _read_options,
  334. const std::vector<ColumnFamilyHandle*>& column_family,
  335. const std::vector<Slice>& keys, std::vector<std::string>* values) {
  336. size_t num_keys = keys.size();
  337. std::vector<Status> stat_list(num_keys);
  338. if (_read_options.io_activity != Env::IOActivity::kUnknown &&
  339. _read_options.io_activity != Env::IOActivity::kMultiGet) {
  340. Status s = Status::InvalidArgument(
  341. "Can only call MultiGet with `ReadOptions::io_activity` is "
  342. "`Env::IOActivity::kUnknown` or `Env::IOActivity::kMultiGet`");
  343. for (size_t i = 0; i < num_keys; ++i) {
  344. stat_list[i] = s;
  345. }
  346. return stat_list;
  347. }
  348. ReadOptions read_options(_read_options);
  349. if (read_options.io_activity == Env::IOActivity::kUnknown) {
  350. read_options.io_activity = Env::IOActivity::kMultiGet;
  351. }
  352. values->resize(num_keys);
  353. for (size_t i = 0; i < num_keys; ++i) {
  354. stat_list[i] =
  355. GetImpl(read_options, column_family[i], keys[i], &(*values)[i]);
  356. }
  357. return stat_list;
  358. }
  359. void TransactionBaseImpl::MultiGet(const ReadOptions& _read_options,
  360. ColumnFamilyHandle* column_family,
  361. const size_t num_keys, const Slice* keys,
  362. PinnableSlice* values, Status* statuses,
  363. const bool sorted_input) {
  364. if (_read_options.io_activity != Env::IOActivity::kUnknown &&
  365. _read_options.io_activity != Env::IOActivity::kMultiGet) {
  366. Status s = Status::InvalidArgument(
  367. "Can only call MultiGet with `ReadOptions::io_activity` is "
  368. "`Env::IOActivity::kUnknown` or `Env::IOActivity::kMultiGet`");
  369. for (size_t i = 0; i < num_keys; ++i) {
  370. if (statuses[i].ok()) {
  371. statuses[i] = s;
  372. }
  373. }
  374. return;
  375. }
  376. ReadOptions read_options(_read_options);
  377. if (read_options.io_activity == Env::IOActivity::kUnknown) {
  378. read_options.io_activity = Env::IOActivity::kMultiGet;
  379. }
  380. write_batch_.MultiGetFromBatchAndDB(db_, read_options, column_family,
  381. num_keys, keys, values, statuses,
  382. sorted_input);
  383. }
  384. void TransactionBaseImpl::MultiGetEntity(const ReadOptions& read_options,
  385. ColumnFamilyHandle* column_family,
  386. size_t num_keys, const Slice* keys,
  387. PinnableWideColumns* results,
  388. Status* statuses, bool sorted_input) {
  389. MultiGetEntityImpl(read_options, column_family, num_keys, keys, results,
  390. statuses, sorted_input);
  391. }
  392. std::vector<Status> TransactionBaseImpl::MultiGetForUpdate(
  393. const ReadOptions& read_options,
  394. const std::vector<ColumnFamilyHandle*>& column_family,
  395. const std::vector<Slice>& keys, std::vector<std::string>* values) {
  396. size_t num_keys = keys.size();
  397. if (read_options.io_activity != Env::IOActivity::kUnknown) {
  398. Status s = Status::InvalidArgument(
  399. "Cannot call MultiGetForUpdate with `ReadOptions::io_activity` != "
  400. "`Env::IOActivity::kUnknown`");
  401. return std::vector<Status>(num_keys, s);
  402. }
  403. // Regardless of whether the MultiGet succeeded, track these keys.
  404. values->resize(num_keys);
  405. // Lock all keys
  406. for (size_t i = 0; i < num_keys; ++i) {
  407. Status s = TryLock(column_family[i], keys[i], true /* read_only */,
  408. true /* exclusive */);
  409. if (!s.ok()) {
  410. // Fail entire multiget if we cannot lock all keys
  411. return std::vector<Status>(num_keys, s);
  412. }
  413. }
  414. // TODO(agiardullo): optimize multiget?
  415. std::vector<Status> stat_list(num_keys);
  416. for (size_t i = 0; i < num_keys; ++i) {
  417. stat_list[i] =
  418. GetImpl(read_options, column_family[i], keys[i], &(*values)[i]);
  419. }
  420. return stat_list;
  421. }
  422. Iterator* TransactionBaseImpl::GetIterator(const ReadOptions& read_options) {
  423. Iterator* db_iter = db_->NewIterator(read_options);
  424. assert(db_iter);
  425. return write_batch_.NewIteratorWithBase(db_->DefaultColumnFamily(), db_iter,
  426. &read_options);
  427. }
  428. Iterator* TransactionBaseImpl::GetIterator(const ReadOptions& read_options,
  429. ColumnFamilyHandle* column_family) {
  430. Iterator* db_iter = db_->NewIterator(read_options, column_family);
  431. assert(db_iter);
  432. return write_batch_.NewIteratorWithBase(column_family, db_iter,
  433. &read_options);
  434. }
  435. template <typename IterType, typename ImplType, typename ErrorIteratorFuncType>
  436. std::unique_ptr<IterType> TransactionBaseImpl::NewMultiCfIterator(
  437. const ReadOptions& read_options,
  438. const std::vector<ColumnFamilyHandle*>& column_families,
  439. ErrorIteratorFuncType error_iterator_func) {
  440. if (column_families.empty()) {
  441. return error_iterator_func(
  442. Status::InvalidArgument("No Column Family was provided"));
  443. }
  444. const Comparator* const first_comparator =
  445. column_families[0]->GetComparator();
  446. assert(first_comparator);
  447. for (size_t i = 1; i < column_families.size(); ++i) {
  448. const Comparator* cf_comparator = column_families[i]->GetComparator();
  449. assert(cf_comparator);
  450. if (first_comparator != cf_comparator &&
  451. first_comparator->GetId() != cf_comparator->GetId()) {
  452. return error_iterator_func(Status::InvalidArgument(
  453. "Different comparators are being used across CFs"));
  454. }
  455. }
  456. std::vector<Iterator*> child_iterators;
  457. const Status s =
  458. db_->NewIterators(read_options, column_families, &child_iterators);
  459. if (!s.ok()) {
  460. return error_iterator_func(s);
  461. }
  462. assert(column_families.size() == child_iterators.size());
  463. std::vector<std::pair<ColumnFamilyHandle*, std::unique_ptr<Iterator>>>
  464. cfh_iter_pairs;
  465. cfh_iter_pairs.reserve(column_families.size());
  466. for (size_t i = 0; i < column_families.size(); ++i) {
  467. cfh_iter_pairs.emplace_back(
  468. column_families[i],
  469. write_batch_.NewIteratorWithBase(column_families[i], child_iterators[i],
  470. &read_options));
  471. }
  472. return std::make_unique<ImplType>(read_options,
  473. column_families[0]->GetComparator(),
  474. std::move(cfh_iter_pairs));
  475. }
  476. std::unique_ptr<Iterator> TransactionBaseImpl::GetCoalescingIterator(
  477. const ReadOptions& read_options,
  478. const std::vector<ColumnFamilyHandle*>& column_families) {
  479. return NewMultiCfIterator<Iterator, CoalescingIterator>(
  480. read_options, column_families, [](const Status& s) {
  481. return std::unique_ptr<Iterator>(NewErrorIterator(s));
  482. });
  483. }
  484. std::unique_ptr<AttributeGroupIterator>
  485. TransactionBaseImpl::GetAttributeGroupIterator(
  486. const ReadOptions& read_options,
  487. const std::vector<ColumnFamilyHandle*>& column_families) {
  488. return NewMultiCfIterator<AttributeGroupIterator, AttributeGroupIteratorImpl>(
  489. read_options, column_families,
  490. [](const Status& s) { return NewAttributeGroupErrorIterator(s); });
  491. }
  492. Status TransactionBaseImpl::PutEntityImpl(ColumnFamilyHandle* column_family,
  493. const Slice& key,
  494. const WideColumns& columns,
  495. bool do_validate,
  496. bool assume_tracked) {
  497. {
  498. constexpr bool read_only = false;
  499. constexpr bool exclusive = true;
  500. const Status s = TryLock(column_family, key, read_only, exclusive,
  501. do_validate, assume_tracked);
  502. if (!s.ok()) {
  503. return s;
  504. }
  505. }
  506. {
  507. const Status s = GetBatchForWrite()->PutEntity(column_family, key, columns);
  508. if (!s.ok()) {
  509. return s;
  510. }
  511. }
  512. ++num_put_entities_;
  513. return Status::OK();
  514. }
  515. Status TransactionBaseImpl::Put(ColumnFamilyHandle* column_family,
  516. const Slice& key, const Slice& value,
  517. const bool assume_tracked) {
  518. const bool do_validate = !assume_tracked;
  519. Status s = TryLock(column_family, key, false /* read_only */,
  520. true /* exclusive */, do_validate, assume_tracked);
  521. if (s.ok()) {
  522. s = GetBatchForWrite()->Put(column_family, key, value);
  523. if (s.ok()) {
  524. num_puts_++;
  525. }
  526. }
  527. return s;
  528. }
  529. Status TransactionBaseImpl::Put(ColumnFamilyHandle* column_family,
  530. const SliceParts& key, const SliceParts& value,
  531. const bool assume_tracked) {
  532. const bool do_validate = !assume_tracked;
  533. Status s = TryLock(column_family, key, false /* read_only */,
  534. true /* exclusive */, do_validate, assume_tracked);
  535. if (s.ok()) {
  536. s = GetBatchForWrite()->Put(column_family, key, value);
  537. if (s.ok()) {
  538. num_puts_++;
  539. }
  540. }
  541. return s;
  542. }
  543. Status TransactionBaseImpl::Merge(ColumnFamilyHandle* column_family,
  544. const Slice& key, const Slice& value,
  545. const bool assume_tracked) {
  546. const bool do_validate = !assume_tracked;
  547. Status s = TryLock(column_family, key, false /* read_only */,
  548. true /* exclusive */, do_validate, assume_tracked);
  549. if (s.ok()) {
  550. s = GetBatchForWrite()->Merge(column_family, key, value);
  551. if (s.ok()) {
  552. num_merges_++;
  553. }
  554. }
  555. return s;
  556. }
  557. Status TransactionBaseImpl::Delete(ColumnFamilyHandle* column_family,
  558. const Slice& key,
  559. const bool assume_tracked) {
  560. const bool do_validate = !assume_tracked;
  561. Status s = TryLock(column_family, key, false /* read_only */,
  562. true /* exclusive */, do_validate, assume_tracked);
  563. if (s.ok()) {
  564. s = GetBatchForWrite()->Delete(column_family, key);
  565. if (s.ok()) {
  566. num_deletes_++;
  567. }
  568. }
  569. return s;
  570. }
  571. Status TransactionBaseImpl::Delete(ColumnFamilyHandle* column_family,
  572. const SliceParts& key,
  573. const bool assume_tracked) {
  574. const bool do_validate = !assume_tracked;
  575. Status s = TryLock(column_family, key, false /* read_only */,
  576. true /* exclusive */, do_validate, assume_tracked);
  577. if (s.ok()) {
  578. s = GetBatchForWrite()->Delete(column_family, key);
  579. if (s.ok()) {
  580. num_deletes_++;
  581. }
  582. }
  583. return s;
  584. }
  585. Status TransactionBaseImpl::SingleDelete(ColumnFamilyHandle* column_family,
  586. const Slice& key,
  587. const bool assume_tracked) {
  588. const bool do_validate = !assume_tracked;
  589. Status s = TryLock(column_family, key, false /* read_only */,
  590. true /* exclusive */, do_validate, assume_tracked);
  591. if (s.ok()) {
  592. s = GetBatchForWrite()->SingleDelete(column_family, key);
  593. if (s.ok()) {
  594. num_deletes_++;
  595. }
  596. }
  597. return s;
  598. }
  599. Status TransactionBaseImpl::SingleDelete(ColumnFamilyHandle* column_family,
  600. const SliceParts& key,
  601. const bool assume_tracked) {
  602. const bool do_validate = !assume_tracked;
  603. Status s = TryLock(column_family, key, false /* read_only */,
  604. true /* exclusive */, do_validate, assume_tracked);
  605. if (s.ok()) {
  606. s = GetBatchForWrite()->SingleDelete(column_family, key);
  607. if (s.ok()) {
  608. num_deletes_++;
  609. }
  610. }
  611. return s;
  612. }
  613. Status TransactionBaseImpl::PutUntracked(ColumnFamilyHandle* column_family,
  614. const Slice& key, const Slice& value) {
  615. Status s = TryLock(column_family, key, false /* read_only */,
  616. true /* exclusive */, false /* do_validate */);
  617. if (s.ok()) {
  618. s = GetBatchForWrite()->Put(column_family, key, value);
  619. if (s.ok()) {
  620. num_puts_++;
  621. }
  622. }
  623. return s;
  624. }
  625. Status TransactionBaseImpl::PutUntracked(ColumnFamilyHandle* column_family,
  626. const SliceParts& key,
  627. const SliceParts& value) {
  628. Status s = TryLock(column_family, key, false /* read_only */,
  629. true /* exclusive */, false /* do_validate */);
  630. if (s.ok()) {
  631. s = GetBatchForWrite()->Put(column_family, key, value);
  632. if (s.ok()) {
  633. num_puts_++;
  634. }
  635. }
  636. return s;
  637. }
  638. Status TransactionBaseImpl::MergeUntracked(ColumnFamilyHandle* column_family,
  639. const Slice& key,
  640. const Slice& value) {
  641. Status s = TryLock(column_family, key, false /* read_only */,
  642. true /* exclusive */, false /* do_validate */);
  643. if (s.ok()) {
  644. s = GetBatchForWrite()->Merge(column_family, key, value);
  645. if (s.ok()) {
  646. num_merges_++;
  647. }
  648. }
  649. return s;
  650. }
  651. Status TransactionBaseImpl::DeleteUntracked(ColumnFamilyHandle* column_family,
  652. const Slice& key) {
  653. Status s = TryLock(column_family, key, false /* read_only */,
  654. true /* exclusive */, false /* do_validate */);
  655. if (s.ok()) {
  656. s = GetBatchForWrite()->Delete(column_family, key);
  657. if (s.ok()) {
  658. num_deletes_++;
  659. }
  660. }
  661. return s;
  662. }
  663. Status TransactionBaseImpl::DeleteUntracked(ColumnFamilyHandle* column_family,
  664. const SliceParts& key) {
  665. Status s = TryLock(column_family, key, false /* read_only */,
  666. true /* exclusive */, false /* do_validate */);
  667. if (s.ok()) {
  668. s = GetBatchForWrite()->Delete(column_family, key);
  669. if (s.ok()) {
  670. num_deletes_++;
  671. }
  672. }
  673. return s;
  674. }
  675. Status TransactionBaseImpl::SingleDeleteUntracked(
  676. ColumnFamilyHandle* column_family, const Slice& key) {
  677. Status s = TryLock(column_family, key, false /* read_only */,
  678. true /* exclusive */, false /* do_validate */);
  679. if (s.ok()) {
  680. s = GetBatchForWrite()->SingleDelete(column_family, key);
  681. if (s.ok()) {
  682. num_deletes_++;
  683. }
  684. }
  685. return s;
  686. }
  687. void TransactionBaseImpl::PutLogData(const Slice& blob) {
  688. auto s = write_batch_.PutLogData(blob);
  689. (void)s;
  690. assert(s.ok());
  691. }
  692. WriteBatchWithIndex* TransactionBaseImpl::GetWriteBatch() {
  693. return &write_batch_;
  694. }
  695. uint64_t TransactionBaseImpl::GetElapsedTime() const {
  696. return (dbimpl_->GetSystemClock()->NowMicros() - start_time_) / 1000;
  697. }
  698. uint64_t TransactionBaseImpl::GetNumPuts() const { return num_puts_; }
  699. uint64_t TransactionBaseImpl::GetNumPutEntities() const {
  700. return num_put_entities_;
  701. }
  702. uint64_t TransactionBaseImpl::GetNumDeletes() const { return num_deletes_; }
  703. uint64_t TransactionBaseImpl::GetNumMerges() const { return num_merges_; }
  704. uint64_t TransactionBaseImpl::GetNumKeys() const {
  705. return tracked_locks_->GetNumPointLocks();
  706. }
  707. void TransactionBaseImpl::TrackKey(uint32_t cfh_id, const std::string& key,
  708. SequenceNumber seq, bool read_only,
  709. bool exclusive) {
  710. PointLockRequest r;
  711. r.column_family_id = cfh_id;
  712. r.key = key;
  713. r.seq = seq;
  714. r.read_only = read_only;
  715. r.exclusive = exclusive;
  716. // Update map of all tracked keys for this transaction
  717. tracked_locks_->Track(r);
  718. if (save_points_ != nullptr && !save_points_->empty()) {
  719. // Update map of tracked keys in this SavePoint
  720. save_points_->top().new_locks_->Track(r);
  721. }
  722. }
  723. // Gets the write batch that should be used for Put/PutEntity/Merge/Delete
  724. // operations.
  725. //
  726. // Returns either a WriteBatch or WriteBatchWithIndex depending on whether
  727. // DisableIndexing() has been called.
  728. WriteBatchBase* TransactionBaseImpl::GetBatchForWrite() {
  729. if (indexing_enabled_) {
  730. // Use WriteBatchWithIndex
  731. return &write_batch_;
  732. } else {
  733. // Don't use WriteBatchWithIndex. Return base WriteBatch.
  734. return write_batch_.GetWriteBatch();
  735. }
  736. }
  737. void TransactionBaseImpl::ReleaseSnapshot(const Snapshot* snapshot, DB* db) {
  738. if (snapshot != nullptr) {
  739. ROCKS_LOG_DETAILS(dbimpl_->immutable_db_options().info_log,
  740. "ReleaseSnapshot %" PRIu64 " Set",
  741. snapshot->GetSequenceNumber());
  742. db->ReleaseSnapshot(snapshot);
  743. }
  744. }
  745. void TransactionBaseImpl::UndoGetForUpdate(ColumnFamilyHandle* column_family,
  746. const Slice& key) {
  747. PointLockRequest r;
  748. r.column_family_id = GetColumnFamilyID(column_family);
  749. r.key = key.ToString();
  750. r.read_only = true;
  751. bool can_untrack = false;
  752. if (save_points_ != nullptr && !save_points_->empty()) {
  753. // If there is no GetForUpdate of the key in this save point,
  754. // then cannot untrack from the global lock tracker.
  755. UntrackStatus s = save_points_->top().new_locks_->Untrack(r);
  756. can_untrack = (s != UntrackStatus::NOT_TRACKED);
  757. } else {
  758. // No save point, so can untrack from the global lock tracker.
  759. can_untrack = true;
  760. }
  761. if (can_untrack) {
  762. // If erased from the global tracker, then can unlock the key.
  763. UntrackStatus s = tracked_locks_->Untrack(r);
  764. bool can_unlock = (s == UntrackStatus::REMOVED);
  765. if (can_unlock) {
  766. UnlockGetForUpdate(column_family, key);
  767. }
  768. }
  769. }
  770. Status TransactionBaseImpl::RebuildFromWriteBatch(WriteBatch* src_batch) {
  771. struct IndexedWriteBatchBuilder : public WriteBatch::Handler {
  772. Transaction* txn_;
  773. DBImpl* db_;
  774. IndexedWriteBatchBuilder(Transaction* txn, DBImpl* db)
  775. : txn_(txn), db_(db) {
  776. assert(dynamic_cast<TransactionBaseImpl*>(txn_) != nullptr);
  777. }
  778. Status PutCF(uint32_t cf, const Slice& key, const Slice& val) override {
  779. Slice user_key = GetUserKey(cf, key);
  780. return txn_->Put(db_->GetColumnFamilyHandle(cf), user_key, val);
  781. }
  782. Status PutEntityCF(uint32_t cf, const Slice& key,
  783. const Slice& entity) override {
  784. Slice user_key = GetUserKey(cf, key);
  785. Slice entity_copy = entity;
  786. WideColumns columns;
  787. const Status s =
  788. WideColumnSerialization::Deserialize(entity_copy, columns);
  789. if (!s.ok()) {
  790. return s;
  791. }
  792. return txn_->PutEntity(db_->GetColumnFamilyHandle(cf), user_key, columns);
  793. }
  794. Status DeleteCF(uint32_t cf, const Slice& key) override {
  795. Slice user_key = GetUserKey(cf, key);
  796. return txn_->Delete(db_->GetColumnFamilyHandle(cf), user_key);
  797. }
  798. Status SingleDeleteCF(uint32_t cf, const Slice& key) override {
  799. Slice user_key = GetUserKey(cf, key);
  800. return txn_->SingleDelete(db_->GetColumnFamilyHandle(cf), user_key);
  801. }
  802. Status MergeCF(uint32_t cf, const Slice& key, const Slice& val) override {
  803. Slice user_key = GetUserKey(cf, key);
  804. return txn_->Merge(db_->GetColumnFamilyHandle(cf), user_key, val);
  805. }
  806. // this is used for reconstructing prepared transactions upon
  807. // recovery. there should not be any meta markers in the batches
  808. // we are processing.
  809. Status MarkBeginPrepare(bool) override { return Status::InvalidArgument(); }
  810. Status MarkEndPrepare(const Slice&) override {
  811. return Status::InvalidArgument();
  812. }
  813. Status MarkCommit(const Slice&) override {
  814. return Status::InvalidArgument();
  815. }
  816. Status MarkCommitWithTimestamp(const Slice&, const Slice&) override {
  817. return Status::InvalidArgument();
  818. }
  819. Status MarkRollback(const Slice&) override {
  820. return Status::InvalidArgument();
  821. }
  822. size_t GetTimestampSize(uint32_t cf_id) {
  823. auto cfd = db_->versions_->GetColumnFamilySet()->GetColumnFamily(cf_id);
  824. const Comparator* ucmp = cfd->user_comparator();
  825. assert(ucmp);
  826. return ucmp->timestamp_size();
  827. }
  828. Slice GetUserKey(uint32_t cf_id, const Slice& key) {
  829. size_t ts_sz = GetTimestampSize(cf_id);
  830. if (ts_sz == 0) {
  831. return key;
  832. }
  833. assert(key.size() >= ts_sz);
  834. return Slice(key.data(), key.size() - ts_sz);
  835. }
  836. };
  837. IndexedWriteBatchBuilder copycat(this, dbimpl_);
  838. return src_batch->Iterate(&copycat);
  839. }
  840. WriteBatch* TransactionBaseImpl::GetCommitTimeWriteBatch() {
  841. return &commit_time_batch_;
  842. }
  843. } // namespace ROCKSDB_NAMESPACE