arena_wrapped_db_iter.cc 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #include "db/arena_wrapped_db_iter.h"
  10. #include "memory/arena.h"
  11. #include "rocksdb/env.h"
  12. #include "rocksdb/iterator.h"
  13. #include "rocksdb/options.h"
  14. #include "table/internal_iterator.h"
  15. #include "table/iterator_wrapper.h"
  16. #include "util/user_comparator_wrapper.h"
  17. namespace ROCKSDB_NAMESPACE {
  18. inline static SequenceNumber GetSeqNum(const DBImpl* db, const Snapshot* s) {
  19. if (s) {
  20. return s->GetSequenceNumber();
  21. } else {
  22. return db->GetLatestSequenceNumber();
  23. }
  24. }
  25. Status ArenaWrappedDBIter::GetProperty(std::string prop_name,
  26. std::string* prop) {
  27. if (prop_name == "rocksdb.iterator.super-version-number") {
  28. // First try to pass the value returned from inner iterator.
  29. if (!db_iter_->GetProperty(prop_name, prop).ok()) {
  30. *prop = std::to_string(sv_number_);
  31. }
  32. return Status::OK();
  33. }
  34. return db_iter_->GetProperty(prop_name, prop);
  35. }
  36. void ArenaWrappedDBIter::Init(
  37. Env* env, const ReadOptions& read_options, const ImmutableOptions& ioptions,
  38. const MutableCFOptions& mutable_cf_options, const Version* version,
  39. const SequenceNumber& sequence, uint64_t version_number,
  40. ReadCallback* read_callback, ColumnFamilyHandleImpl* cfh,
  41. bool expose_blob_index, bool allow_refresh, ReadOnlyMemTable* active_mem) {
  42. read_options_ = read_options;
  43. if (!CheckFSFeatureSupport(env->GetFileSystem().get(),
  44. FSSupportedOps::kAsyncIO)) {
  45. read_options_.async_io = false;
  46. }
  47. read_options_.total_order_seek |= ioptions.prefix_seek_opt_in_only;
  48. db_iter_ = DBIter::NewIter(
  49. env, read_options_, ioptions, mutable_cf_options,
  50. ioptions.user_comparator, /*internal_iter=*/nullptr, version, sequence,
  51. read_callback, active_mem, cfh, expose_blob_index, &arena_);
  52. sv_number_ = version_number;
  53. allow_refresh_ = allow_refresh;
  54. allow_mark_memtable_for_flush_ = active_mem;
  55. memtable_range_tombstone_iter_ = nullptr;
  56. }
  57. void ArenaWrappedDBIter::MaybeAutoRefresh(bool is_seek,
  58. DBIter::Direction direction) {
  59. if (cfh_ != nullptr && read_options_.snapshot != nullptr && allow_refresh_ &&
  60. read_options_.auto_refresh_iterator_with_snapshot) {
  61. // The intent here is to capture the superversion number change
  62. // reasonably soon from the time it actually happened. As such,
  63. // we're fine with weaker synchronization / ordering guarantees
  64. // provided by relaxed atomic (in favor of less CPU / mem overhead).
  65. uint64_t cur_sv_number = cfh_->cfd()->GetSuperVersionNumberRelaxed();
  66. if ((sv_number_ != cur_sv_number) && status().ok()) {
  67. // Changing iterators' direction is pretty heavy-weight operation and
  68. // could have unintended consequences when it comes to prefix seek.
  69. // Therefore, we need an efficient implementation that does not duplicate
  70. // the effort by doing things like double seek(forprev).
  71. //
  72. // Auto refresh can be triggered on the following groups of operations:
  73. //
  74. // 1. [Seek]: Seek(), SeekForPrev()
  75. // 2. [Non-Seek]: Next(), Prev()
  76. //
  77. // In case of 'Seek' group, procedure is fairly straightforward as we'll
  78. // simply call refresh and then invoke the operation on intended target.
  79. //
  80. // In case of 'Non-Seek' group, we'll first advance the cursor by invoking
  81. // intended user operation (Next() or Prev()), capture the target key T,
  82. // refresh the iterator and then reconcile the refreshed iterator by
  83. // explicitly calling [Seek(T) or SeekForPrev(T)]. Below is an example
  84. // flow for Next(), but same principle applies to Prev():
  85. //
  86. //
  87. // T0: Before the operation T1: Execute Next()
  88. // | |
  89. // | -------------
  90. // | | * capture the key (T)
  91. // DBIter(SV#A) | |
  92. // --------------\ /------\ /---------
  93. // SV #A | ... -> [ X ] -> [ T ] -> ... |
  94. // -----------------------------------
  95. // / |
  96. // / |
  97. // / T2: Refresh iterator
  98. // /
  99. // DBIter(SV#A') /
  100. // ----------------------------------
  101. // SV #A' | ... -> [ T ] -> ... |
  102. // ----------------/ \---------------
  103. // |
  104. // ---- T3: Seek(T)
  105. //
  106. bool valid = false;
  107. std::string key;
  108. if (!is_seek && db_iter_->Valid()) {
  109. // The key() Slice is valid until the iterator state changes.
  110. // Given that refresh is heavy-weight operation it itself,
  111. // we should copy the target key upfront to avoid reading bad value.
  112. valid = true;
  113. key = db_iter_->key().ToString();
  114. }
  115. // It's perfectly fine to unref the corresponding superversion
  116. // as we rely on pinning behavior of snapshot for consistency.
  117. DoRefresh(read_options_.snapshot, cur_sv_number);
  118. if (!is_seek && valid) { // Reconcile new iterator after Next() / Prev()
  119. if (direction == DBIter::kForward) {
  120. db_iter_->Seek(key);
  121. } else {
  122. assert(direction == DBIter::kReverse);
  123. db_iter_->SeekForPrev(key);
  124. }
  125. }
  126. }
  127. }
  128. }
  129. Status ArenaWrappedDBIter::Refresh() { return Refresh(nullptr); }
  130. void ArenaWrappedDBIter::DoRefresh(const Snapshot* snapshot,
  131. [[maybe_unused]] uint64_t sv_number) {
  132. Env* env = db_iter_->env();
  133. // NOTE:
  134. //
  135. // Errors like file deletion (as a part of SV cleanup in ~DBIter) will be
  136. // present in the error log, but won't be reflected in the iterator status.
  137. // This is by design as we expect compaction to clean up those obsolete files
  138. // eventually.
  139. db_iter_->~DBIter();
  140. arena_.~Arena();
  141. new (&arena_) Arena();
  142. auto cfd = cfh_->cfd();
  143. auto db_impl = cfh_->db();
  144. SuperVersion* sv = cfd->GetReferencedSuperVersion(db_impl);
  145. assert(sv->version_number >= sv_number);
  146. SequenceNumber read_seq = GetSeqNum(db_impl, snapshot);
  147. if (read_callback_) {
  148. read_callback_->Refresh(read_seq);
  149. }
  150. Init(env, read_options_, cfd->ioptions(), sv->mutable_cf_options, sv->current,
  151. read_seq, sv->version_number, read_callback_, cfh_, expose_blob_index_,
  152. allow_refresh_, allow_mark_memtable_for_flush_ ? sv->mem : nullptr);
  153. InternalIterator* internal_iter = db_impl->NewInternalIterator(
  154. read_options_, cfd, sv, &arena_, read_seq,
  155. /* allow_unprepared_value */ true, /* db_iter */ this);
  156. SetIterUnderDBIter(internal_iter);
  157. }
  158. Status ArenaWrappedDBIter::Refresh(const Snapshot* snapshot) {
  159. if (cfh_ == nullptr || !allow_refresh_) {
  160. return Status::NotSupported("Creating renew iterator is not allowed.");
  161. }
  162. assert(db_iter_ != nullptr);
  163. auto cfd = cfh_->cfd();
  164. auto db_impl = cfh_->db();
  165. // TODO(yiwu): For last_seq_same_as_publish_seq_==false, this is not the
  166. // correct behavior. Will be corrected automatically when we take a snapshot
  167. // here for the case of WritePreparedTxnDB.
  168. uint64_t cur_sv_number = cfd->GetSuperVersionNumber();
  169. // If we recreate a new internal iterator below (NewInternalIterator()),
  170. // we will pass in read_options_. We need to make sure it
  171. // has the right snapshot.
  172. read_options_.snapshot = snapshot;
  173. TEST_SYNC_POINT("ArenaWrappedDBIter::Refresh:1");
  174. TEST_SYNC_POINT("ArenaWrappedDBIter::Refresh:2");
  175. while (true) {
  176. if (sv_number_ != cur_sv_number) {
  177. DoRefresh(snapshot, cur_sv_number);
  178. break;
  179. } else {
  180. SequenceNumber read_seq = GetSeqNum(db_impl, snapshot);
  181. // Refresh range-tombstones in MemTable
  182. if (!read_options_.ignore_range_deletions) {
  183. SuperVersion* sv = cfd->GetThreadLocalSuperVersion(db_impl);
  184. TEST_SYNC_POINT_CALLBACK("ArenaWrappedDBIter::Refresh:SV", nullptr);
  185. auto t = sv->mem->NewRangeTombstoneIterator(
  186. read_options_, read_seq, false /* immutable_memtable */);
  187. if (!t || t->empty()) {
  188. // If memtable_range_tombstone_iter_ points to a non-empty tombstone
  189. // iterator, then it means sv->mem is not the memtable that
  190. // memtable_range_tombstone_iter_ points to, so SV must have changed
  191. // after the sv_number_ != cur_sv_number check above. We will fall
  192. // back to re-init the InternalIterator, and the tombstone iterator
  193. // will be freed during db_iter destruction there.
  194. if (memtable_range_tombstone_iter_) {
  195. assert(!*memtable_range_tombstone_iter_ ||
  196. sv_number_ != cfd->GetSuperVersionNumber());
  197. }
  198. delete t;
  199. } else { // current mutable memtable has range tombstones
  200. if (!memtable_range_tombstone_iter_) {
  201. delete t;
  202. db_impl->ReturnAndCleanupSuperVersion(cfd, sv);
  203. // The memtable under DBIter did not have range tombstone before
  204. // refresh.
  205. DoRefresh(snapshot, cur_sv_number);
  206. break;
  207. } else {
  208. *memtable_range_tombstone_iter_ =
  209. std::make_unique<TruncatedRangeDelIterator>(
  210. std::unique_ptr<FragmentedRangeTombstoneIterator>(t),
  211. &cfd->internal_comparator(), nullptr, nullptr);
  212. }
  213. }
  214. db_impl->ReturnAndCleanupSuperVersion(cfd, sv);
  215. }
  216. // Check again if the latest super version number is changed
  217. uint64_t latest_sv_number = cfd->GetSuperVersionNumber();
  218. if (latest_sv_number != cur_sv_number) {
  219. // If the super version number is changed after refreshing,
  220. // fallback to Re-Init the InternalIterator
  221. cur_sv_number = latest_sv_number;
  222. continue;
  223. }
  224. db_iter_->set_sequence(read_seq);
  225. db_iter_->set_valid(false);
  226. break;
  227. }
  228. }
  229. return Status::OK();
  230. }
  231. ArenaWrappedDBIter* NewArenaWrappedDbIterator(
  232. Env* env, const ReadOptions& read_options, ColumnFamilyHandleImpl* cfh,
  233. SuperVersion* sv, const SequenceNumber& sequence,
  234. ReadCallback* read_callback, DBImpl* db_impl, bool expose_blob_index,
  235. bool allow_refresh, bool allow_mark_memtable_for_flush) {
  236. ArenaWrappedDBIter* db_iter = new ArenaWrappedDBIter();
  237. db_iter->Init(env, read_options, cfh->cfd()->ioptions(),
  238. sv->mutable_cf_options, sv->current, sequence,
  239. sv->version_number, read_callback, cfh, expose_blob_index,
  240. allow_refresh,
  241. allow_mark_memtable_for_flush ? sv->mem : nullptr);
  242. if (cfh != nullptr && allow_refresh) {
  243. db_iter->StoreRefreshInfo(cfh, read_callback, expose_blob_index);
  244. }
  245. InternalIterator* internal_iter = db_impl->NewInternalIterator(
  246. db_iter->GetReadOptions(), cfh->cfd(), sv, db_iter->GetArena(), sequence,
  247. /*allow_unprepared_value=*/true, db_iter);
  248. db_iter->SetIterUnderDBIter(internal_iter);
  249. return db_iter;
  250. }
  251. } // namespace ROCKSDB_NAMESPACE