multi_cf_iterator_impl.h 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354
  1. // Copyright (c) Meta Platforms, Inc. and affiliates.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #pragma once
  6. #include <functional>
  7. #include <variant>
  8. #include "rocksdb/comparator.h"
  9. #include "rocksdb/iterator.h"
  10. #include "rocksdb/options.h"
  11. #include "util/heap.h"
  12. namespace ROCKSDB_NAMESPACE {
  13. struct MultiCfIteratorInfo {
  14. ColumnFamilyHandle* cfh;
  15. Iterator* iterator;
  16. int order;
  17. };
  18. template <typename ResetFunc, typename PopulateFunc>
  19. class MultiCfIteratorImpl {
  20. public:
  21. MultiCfIteratorImpl(
  22. const ReadOptions& read_options, const Comparator* comparator,
  23. std::vector<std::pair<ColumnFamilyHandle*, std::unique_ptr<Iterator>>>&&
  24. cfh_iter_pairs,
  25. ResetFunc reset_func, PopulateFunc populate_func)
  26. : allow_unprepared_value_(read_options.allow_unprepared_value),
  27. comparator_(comparator),
  28. cfh_iter_pairs_(std::move(cfh_iter_pairs)),
  29. reset_func_(std::move(reset_func)),
  30. populate_func_(std::move(populate_func)),
  31. heap_(MultiCfMinHeap(
  32. MultiCfHeapItemComparator<std::greater<int>>(comparator_))) {}
  33. ~MultiCfIteratorImpl() { status_.PermitUncheckedError(); }
  34. // No copy allowed
  35. MultiCfIteratorImpl(const MultiCfIteratorImpl&) = delete;
  36. MultiCfIteratorImpl& operator=(const MultiCfIteratorImpl&) = delete;
  37. Slice key() const {
  38. assert(Valid());
  39. return current()->key();
  40. }
  41. bool Valid() const {
  42. if (std::holds_alternative<MultiCfMaxHeap>(heap_)) {
  43. auto& max_heap = std::get<MultiCfMaxHeap>(heap_);
  44. return !max_heap.empty() && status_.ok();
  45. }
  46. auto& min_heap = std::get<MultiCfMinHeap>(heap_);
  47. return !min_heap.empty() && status_.ok();
  48. }
  49. Status status() const { return status_; }
  50. void SeekToFirst() {
  51. auto& min_heap = GetHeap<MultiCfMinHeap>([this]() { InitMinHeap(); });
  52. SeekCommon(min_heap, [](Iterator* iter) { iter->SeekToFirst(); });
  53. }
  54. void Seek(const Slice& target) {
  55. auto& min_heap = GetHeap<MultiCfMinHeap>([this]() { InitMinHeap(); });
  56. SeekCommon(min_heap, [&target](Iterator* iter) { iter->Seek(target); });
  57. }
  58. void SeekToLast() {
  59. auto& max_heap = GetHeap<MultiCfMaxHeap>([this]() { InitMaxHeap(); });
  60. SeekCommon(max_heap, [](Iterator* iter) { iter->SeekToLast(); });
  61. }
  62. void SeekForPrev(const Slice& target) {
  63. auto& max_heap = GetHeap<MultiCfMaxHeap>([this]() { InitMaxHeap(); });
  64. SeekCommon(max_heap,
  65. [&target](Iterator* iter) { iter->SeekForPrev(target); });
  66. }
  67. void Next() {
  68. assert(Valid());
  69. auto& min_heap = GetHeap<MultiCfMinHeap>([this]() {
  70. std::string target(key().data(), key().size());
  71. InitMinHeap();
  72. Seek(target);
  73. });
  74. AdvanceIterator(min_heap, [](Iterator* iter) { iter->Next(); });
  75. }
  76. void Prev() {
  77. assert(Valid());
  78. auto& max_heap = GetHeap<MultiCfMaxHeap>([this]() {
  79. std::string target(key().data(), key().size());
  80. InitMaxHeap();
  81. SeekForPrev(target);
  82. });
  83. AdvanceIterator(max_heap, [](Iterator* iter) { iter->Prev(); });
  84. }
  85. bool PrepareValue() {
  86. assert(Valid());
  87. if (!allow_unprepared_value_) {
  88. return true;
  89. }
  90. if (std::holds_alternative<MultiCfMaxHeap>(heap_)) {
  91. return PopulateIterator(std::get<MultiCfMaxHeap>(heap_));
  92. }
  93. return PopulateIterator(std::get<MultiCfMinHeap>(heap_));
  94. }
  95. private:
  96. Status status_;
  97. bool allow_unprepared_value_;
  98. const Comparator* comparator_;
  99. std::vector<std::pair<ColumnFamilyHandle*, std::unique_ptr<Iterator>>>
  100. cfh_iter_pairs_;
  101. ResetFunc reset_func_;
  102. PopulateFunc populate_func_;
  103. template <typename CompareOp>
  104. class MultiCfHeapItemComparator {
  105. public:
  106. explicit MultiCfHeapItemComparator(const Comparator* comparator)
  107. : comparator_(comparator) {}
  108. bool operator()(const MultiCfIteratorInfo& a,
  109. const MultiCfIteratorInfo& b) const {
  110. assert(a.iterator);
  111. assert(b.iterator);
  112. assert(a.iterator->Valid());
  113. assert(b.iterator->Valid());
  114. int c = comparator_->Compare(a.iterator->key(), b.iterator->key());
  115. assert(c != 0 || a.order != b.order);
  116. return c == 0 ? a.order - b.order > 0 : CompareOp()(c, 0);
  117. }
  118. private:
  119. const Comparator* comparator_;
  120. };
  121. using MultiCfMinHeap =
  122. BinaryHeap<MultiCfIteratorInfo,
  123. MultiCfHeapItemComparator<std::greater<int>>>;
  124. using MultiCfMaxHeap = BinaryHeap<MultiCfIteratorInfo,
  125. MultiCfHeapItemComparator<std::less<int>>>;
  126. using MultiCfIterHeap = std::variant<MultiCfMinHeap, MultiCfMaxHeap>;
  127. MultiCfIterHeap heap_;
  128. Iterator* current() const {
  129. if (std::holds_alternative<MultiCfMaxHeap>(heap_)) {
  130. auto& max_heap = std::get<MultiCfMaxHeap>(heap_);
  131. return max_heap.top().iterator;
  132. }
  133. auto& min_heap = std::get<MultiCfMinHeap>(heap_);
  134. return min_heap.top().iterator;
  135. }
  136. void considerStatus(Status s) {
  137. if (!s.ok() && status_.ok()) {
  138. status_ = std::move(s);
  139. }
  140. }
  141. template <typename HeapType, typename InitFunc>
  142. HeapType& GetHeap(InitFunc initFunc) {
  143. if (!std::holds_alternative<HeapType>(heap_)) {
  144. initFunc();
  145. }
  146. return std::get<HeapType>(heap_);
  147. }
  148. void InitMinHeap() {
  149. heap_.template emplace<MultiCfMinHeap>(
  150. MultiCfHeapItemComparator<std::greater<int>>(comparator_));
  151. }
  152. void InitMaxHeap() {
  153. heap_.template emplace<MultiCfMaxHeap>(
  154. MultiCfHeapItemComparator<std::less<int>>(comparator_));
  155. }
  156. template <typename BinaryHeap, typename ChildSeekFuncType>
  157. void SeekCommon(BinaryHeap& heap, ChildSeekFuncType child_seek_func) {
  158. reset_func_();
  159. heap.clear();
  160. int i = 0;
  161. for (auto& [cfh, iter] : cfh_iter_pairs_) {
  162. child_seek_func(iter.get());
  163. if (iter->Valid()) {
  164. assert(iter->status().ok());
  165. heap.push(MultiCfIteratorInfo{cfh, iter.get(), i});
  166. } else {
  167. considerStatus(iter->status());
  168. if (!status_.ok()) {
  169. // Non-OK status from the iterator. Bail out early
  170. heap.clear();
  171. return;
  172. }
  173. }
  174. ++i;
  175. }
  176. if (!allow_unprepared_value_ && !heap.empty()) {
  177. [[maybe_unused]] const bool result = PopulateIterator(heap);
  178. assert(result || (!Valid() && !status_.ok()));
  179. }
  180. }
  181. template <typename BinaryHeap, typename AdvanceFuncType>
  182. void AdvanceIterator(BinaryHeap& heap, AdvanceFuncType advance_func) {
  183. reset_func_();
  184. // It is possible for one or more child iters are at invalid keys due to
  185. // manual prefix iteration. For such cases, we consider the result of the
  186. // multi-cf-iter is also undefined.
  187. // https://github.com/facebook/rocksdb/wiki/Prefix-Seek#manual-prefix-iterating
  188. // for details about manual prefix iteration
  189. if (heap.empty()) {
  190. return;
  191. }
  192. // 1. Keep the top iterator (by popping it from the heap)
  193. // 2. Make sure all others have iterated past the top iterator key slice
  194. // 3. Advance the top iterator, and add it back to the heap if valid
  195. auto top = heap.top();
  196. assert(top.iterator);
  197. assert(top.iterator->Valid());
  198. assert(top.iterator->status().ok());
  199. heap.pop();
  200. while (!heap.empty()) {
  201. auto current = heap.top();
  202. assert(current.iterator);
  203. assert(current.iterator->Valid());
  204. assert(current.iterator->status().ok());
  205. if (comparator_->Compare(current.iterator->key(), top.iterator->key()) !=
  206. 0) {
  207. break;
  208. }
  209. advance_func(current.iterator);
  210. if (current.iterator->Valid()) {
  211. assert(current.iterator->status().ok());
  212. heap.replace_top(current);
  213. } else {
  214. considerStatus(current.iterator->status());
  215. if (!status_.ok()) {
  216. heap.clear();
  217. return;
  218. } else {
  219. heap.pop();
  220. }
  221. }
  222. }
  223. advance_func(top.iterator);
  224. if (top.iterator->Valid()) {
  225. assert(top.iterator->status().ok());
  226. heap.push(top);
  227. } else {
  228. considerStatus(top.iterator->status());
  229. if (!status_.ok()) {
  230. heap.clear();
  231. return;
  232. }
  233. }
  234. if (!allow_unprepared_value_ && !heap.empty()) {
  235. [[maybe_unused]] const bool result = PopulateIterator(heap);
  236. assert(result || (!Valid() && !status_.ok()));
  237. }
  238. }
  239. template <typename BinaryHeap>
  240. bool PopulateIterator(BinaryHeap& heap) {
  241. // 1. Keep the top iterator (by popping it from the heap) and add it to list
  242. // to populate
  243. // 2. For all non-top iterators having the same key as top iter popped
  244. // from the previous step, add them to the same list and pop it
  245. // temporarily from the heap
  246. // 3. Once no other iters have the same key as the top iter from step 1,
  247. // populate the value/columns and attribute_groups from the list
  248. // collected in step 1 and 2 and add all the iters back to the heap
  249. assert(!heap.empty());
  250. auto prepare_value = [this, &heap](Iterator* iterator) {
  251. assert(iterator);
  252. assert(iterator->Valid());
  253. assert(iterator->status().ok());
  254. if (!iterator->PrepareValue()) {
  255. assert(!iterator->Valid());
  256. assert(!iterator->status().ok());
  257. considerStatus(iterator->status());
  258. heap.clear();
  259. assert(!Valid());
  260. assert(!status_.ok());
  261. return false;
  262. }
  263. return true;
  264. };
  265. auto top = heap.top();
  266. assert(top.iterator);
  267. assert(top.iterator->Valid());
  268. assert(top.iterator->status().ok());
  269. if (!prepare_value(top.iterator)) {
  270. return false;
  271. }
  272. autovector<MultiCfIteratorInfo> to_populate;
  273. to_populate.push_back(top);
  274. heap.pop();
  275. while (!heap.empty()) {
  276. auto current = heap.top();
  277. assert(current.iterator);
  278. assert(current.iterator->Valid());
  279. assert(current.iterator->status().ok());
  280. if (comparator_->Compare(current.iterator->key(), top.iterator->key()) !=
  281. 0) {
  282. break;
  283. }
  284. if (!prepare_value(current.iterator)) {
  285. return false;
  286. }
  287. to_populate.push_back(current);
  288. heap.pop();
  289. }
  290. // Add the items back to the heap
  291. for (auto& item : to_populate) {
  292. heap.push(item);
  293. }
  294. populate_func_(to_populate);
  295. return true;
  296. }
  297. };
  298. } // namespace ROCKSDB_NAMESPACE