merge_helper.h 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. #pragma once
  7. #include <deque>
  8. #include <string>
  9. #include <vector>
  10. #include "db/dbformat.h"
  11. #include "db/merge_context.h"
  12. #include "db/range_del_aggregator.h"
  13. #include "db/snapshot_checker.h"
  14. #include "rocksdb/compaction_filter.h"
  15. #include "rocksdb/env.h"
  16. #include "rocksdb/slice.h"
  17. #include "util/stop_watch.h"
  18. namespace ROCKSDB_NAMESPACE {
  19. class Comparator;
  20. class Iterator;
  21. class Logger;
  22. class MergeOperator;
  23. class Statistics;
  24. class MergeHelper {
  25. public:
  26. MergeHelper(Env* env, const Comparator* user_comparator,
  27. const MergeOperator* user_merge_operator,
  28. const CompactionFilter* compaction_filter, Logger* logger,
  29. bool assert_valid_internal_key, SequenceNumber latest_snapshot,
  30. const SnapshotChecker* snapshot_checker = nullptr, int level = 0,
  31. Statistics* stats = nullptr,
  32. const std::atomic<bool>* shutting_down = nullptr);
  33. // Wrapper around MergeOperator::FullMergeV2() that records perf statistics.
  34. // Result of merge will be written to result if status returned is OK.
  35. // If operands is empty, the value will simply be copied to result.
  36. // Set `update_num_ops_stats` to true if it is from a user read, so that
  37. // the latency is sensitive.
  38. // Returns one of the following statuses:
  39. // - OK: Entries were successfully merged.
  40. // - Corruption: Merge operator reported unsuccessful merge.
  41. static Status TimedFullMerge(const MergeOperator* merge_operator,
  42. const Slice& key, const Slice* value,
  43. const std::vector<Slice>& operands,
  44. std::string* result, Logger* logger,
  45. Statistics* statistics, Env* env,
  46. Slice* result_operand = nullptr,
  47. bool update_num_ops_stats = false);
  48. // Merge entries until we hit
  49. // - a corrupted key
  50. // - a Put/Delete,
  51. // - a different user key,
  52. // - a specific sequence number (snapshot boundary),
  53. // - REMOVE_AND_SKIP_UNTIL returned from compaction filter,
  54. // or - the end of iteration
  55. // iter: (IN) points to the first merge type entry
  56. // (OUT) points to the first entry not included in the merge process
  57. // range_del_agg: (IN) filters merge operands covered by range tombstones.
  58. // stop_before: (IN) a sequence number that merge should not cross.
  59. // 0 means no restriction
  60. // at_bottom: (IN) true if the iterator covers the bottem level, which means
  61. // we could reach the start of the history of this user key.
  62. //
  63. // Returns one of the following statuses:
  64. // - OK: Entries were successfully merged.
  65. // - MergeInProgress: Put/Delete not encountered, and didn't reach the start
  66. // of key's history. Output consists of merge operands only.
  67. // - Corruption: Merge operator reported unsuccessful merge or a corrupted
  68. // key has been encountered and not expected (applies only when compiling
  69. // with asserts removed).
  70. // - ShutdownInProgress: interrupted by shutdown (*shutting_down == true).
  71. //
  72. // REQUIRED: The first key in the input is not corrupted.
  73. Status MergeUntil(InternalIterator* iter,
  74. CompactionRangeDelAggregator* range_del_agg = nullptr,
  75. const SequenceNumber stop_before = 0,
  76. const bool at_bottom = false);
  77. // Filters a merge operand using the compaction filter specified
  78. // in the constructor. Returns the decision that the filter made.
  79. // Uses compaction_filter_value_ and compaction_filter_skip_until_ for the
  80. // optional outputs of compaction filter.
  81. CompactionFilter::Decision FilterMerge(const Slice& user_key,
  82. const Slice& value_slice);
  83. // Query the merge result
  84. // These are valid until the next MergeUntil call
  85. // If the merging was successful:
  86. // - keys() contains a single element with the latest sequence number of
  87. // the merges. The type will be Put or Merge. See IMPORTANT 1 note, below.
  88. // - values() contains a single element with the result of merging all the
  89. // operands together
  90. //
  91. // IMPORTANT 1: the key type could change after the MergeUntil call.
  92. // Put/Delete + Merge + ... + Merge => Put
  93. // Merge + ... + Merge => Merge
  94. //
  95. // If the merge operator is not associative, and if a Put/Delete is not found
  96. // then the merging will be unsuccessful. In this case:
  97. // - keys() contains the list of internal keys seen in order of iteration.
  98. // - values() contains the list of values (merges) seen in the same order.
  99. // values() is parallel to keys() so that the first entry in
  100. // keys() is the key associated with the first entry in values()
  101. // and so on. These lists will be the same length.
  102. // All of these pairs will be merges over the same user key.
  103. // See IMPORTANT 2 note below.
  104. //
  105. // IMPORTANT 2: The entries were traversed in order from BACK to FRONT.
  106. // So keys().back() was the first key seen by iterator.
  107. // TODO: Re-style this comment to be like the first one
  108. const std::deque<std::string>& keys() const { return keys_; }
  109. const std::vector<Slice>& values() const {
  110. return merge_context_.GetOperands();
  111. }
  112. uint64_t TotalFilterTime() const { return total_filter_time_; }
  113. bool HasOperator() const { return user_merge_operator_ != nullptr; }
  114. // If compaction filter returned REMOVE_AND_SKIP_UNTIL, this method will
  115. // return true and fill *until with the key to which we should skip.
  116. // If true, keys() and values() are empty.
  117. bool FilteredUntil(Slice* skip_until) const {
  118. if (!has_compaction_filter_skip_until_) {
  119. return false;
  120. }
  121. assert(compaction_filter_ != nullptr);
  122. assert(skip_until != nullptr);
  123. assert(compaction_filter_skip_until_.Valid());
  124. *skip_until = compaction_filter_skip_until_.Encode();
  125. return true;
  126. }
  127. private:
  128. Env* env_;
  129. const Comparator* user_comparator_;
  130. const MergeOperator* user_merge_operator_;
  131. const CompactionFilter* compaction_filter_;
  132. const std::atomic<bool>* shutting_down_;
  133. Logger* logger_;
  134. bool assert_valid_internal_key_; // enforce no internal key corruption?
  135. bool allow_single_operand_;
  136. SequenceNumber latest_snapshot_;
  137. const SnapshotChecker* const snapshot_checker_;
  138. int level_;
  139. // the scratch area that holds the result of MergeUntil
  140. // valid up to the next MergeUntil call
  141. // Keeps track of the sequence of keys seen
  142. std::deque<std::string> keys_;
  143. // Parallel with keys_; stores the operands
  144. mutable MergeContext merge_context_;
  145. StopWatchNano filter_timer_;
  146. uint64_t total_filter_time_;
  147. Statistics* stats_;
  148. bool has_compaction_filter_skip_until_ = false;
  149. std::string compaction_filter_value_;
  150. InternalKey compaction_filter_skip_until_;
  151. bool IsShuttingDown() {
  152. // This is a best-effort facility, so memory_order_relaxed is sufficient.
  153. return shutting_down_ && shutting_down_->load(std::memory_order_relaxed);
  154. }
  155. };
  156. // MergeOutputIterator can be used to iterate over the result of a merge.
  157. class MergeOutputIterator {
  158. public:
  159. // The MergeOutputIterator is bound to a MergeHelper instance.
  160. explicit MergeOutputIterator(const MergeHelper* merge_helper);
  161. // Seeks to the first record in the output.
  162. void SeekToFirst();
  163. // Advances to the next record in the output.
  164. void Next();
  165. Slice key() { return Slice(*it_keys_); }
  166. Slice value() { return Slice(*it_values_); }
  167. bool Valid() { return it_keys_ != merge_helper_->keys().rend(); }
  168. private:
  169. const MergeHelper* merge_helper_;
  170. std::deque<std::string>::const_reverse_iterator it_keys_;
  171. std::vector<Slice>::const_reverse_iterator it_values_;
  172. };
  173. } // namespace ROCKSDB_NAMESPACE