merge_helper.h 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. #pragma once
  7. #include <deque>
  8. #include <string>
  9. #include <vector>
  10. #include "db/merge_context.h"
  11. #include "db/range_del_aggregator.h"
  12. #include "db/snapshot_checker.h"
  13. #include "db/wide/wide_column_serialization.h"
  14. #include "rocksdb/compaction_filter.h"
  15. #include "rocksdb/env.h"
  16. #include "rocksdb/merge_operator.h"
  17. #include "rocksdb/slice.h"
  18. #include "rocksdb/wide_columns.h"
  19. #include "util/stop_watch.h"
  20. namespace ROCKSDB_NAMESPACE {
  21. class Comparator;
  22. class Iterator;
  23. class Logger;
  24. class MergeOperator;
  25. class Statistics;
  26. class SystemClock;
  27. class BlobFetcher;
  28. class PrefetchBufferCollection;
  29. struct CompactionIterationStats;
  30. class MergeHelper {
  31. public:
  32. MergeHelper(Env* env, const Comparator* user_comparator,
  33. const MergeOperator* user_merge_operator,
  34. const CompactionFilter* compaction_filter, Logger* logger,
  35. bool assert_valid_internal_key, SequenceNumber latest_snapshot,
  36. const SnapshotChecker* snapshot_checker = nullptr, int level = 0,
  37. Statistics* stats = nullptr,
  38. const std::atomic<bool>* shutting_down = nullptr);
  39. // Wrappers around MergeOperator::FullMergeV3() that record perf statistics.
  40. // Set `update_num_ops_stats` to true if it is from a user read so that
  41. // the corresponding statistics are updated.
  42. // Returns one of the following statuses:
  43. // - OK: Entries were successfully merged.
  44. // - Corruption: Merge operator reported unsuccessful merge. The scope of the
  45. // damage will be stored in `*op_failure_scope` when `op_failure_scope` is
  46. // not nullptr
  47. // Empty tag types to disambiguate overloads
  48. struct NoBaseValueTag {};
  49. static constexpr NoBaseValueTag kNoBaseValue{};
  50. struct PlainBaseValueTag {};
  51. static constexpr PlainBaseValueTag kPlainBaseValue{};
  52. struct WideBaseValueTag {};
  53. static constexpr WideBaseValueTag kWideBaseValue{};
  54. template <typename... ResultTs>
  55. static Status TimedFullMerge(const MergeOperator* merge_operator,
  56. const Slice& key, NoBaseValueTag,
  57. const std::vector<Slice>& operands,
  58. Logger* logger, Statistics* statistics,
  59. SystemClock* clock, bool update_num_ops_stats,
  60. MergeOperator::OpFailureScope* op_failure_scope,
  61. ResultTs... results) {
  62. MergeOperator::MergeOperationInputV3::ExistingValue existing_value;
  63. return TimedFullMergeImpl(
  64. merge_operator, key, std::move(existing_value), operands, logger,
  65. statistics, clock, update_num_ops_stats, op_failure_scope, results...);
  66. }
  67. template <typename... ResultTs>
  68. static Status TimedFullMerge(
  69. const MergeOperator* merge_operator, const Slice& key, PlainBaseValueTag,
  70. const Slice& value, const std::vector<Slice>& operands, Logger* logger,
  71. Statistics* statistics, SystemClock* clock, bool update_num_ops_stats,
  72. MergeOperator::OpFailureScope* op_failure_scope, ResultTs... results) {
  73. MergeOperator::MergeOperationInputV3::ExistingValue existing_value(value);
  74. return TimedFullMergeImpl(
  75. merge_operator, key, std::move(existing_value), operands, logger,
  76. statistics, clock, update_num_ops_stats, op_failure_scope, results...);
  77. }
  78. template <typename... ResultTs>
  79. static Status TimedFullMerge(
  80. const MergeOperator* merge_operator, const Slice& key, WideBaseValueTag,
  81. const Slice& entity, const std::vector<Slice>& operands, Logger* logger,
  82. Statistics* statistics, SystemClock* clock, bool update_num_ops_stats,
  83. MergeOperator::OpFailureScope* op_failure_scope, ResultTs... results) {
  84. MergeOperator::MergeOperationInputV3::ExistingValue existing_value;
  85. Slice entity_copy(entity);
  86. WideColumns existing_columns;
  87. const Status s =
  88. WideColumnSerialization::Deserialize(entity_copy, existing_columns);
  89. if (!s.ok()) {
  90. return s;
  91. }
  92. existing_value = std::move(existing_columns);
  93. return TimedFullMergeImpl(
  94. merge_operator, key, std::move(existing_value), operands, logger,
  95. statistics, clock, update_num_ops_stats, op_failure_scope, results...);
  96. }
  97. template <typename... ResultTs>
  98. static Status TimedFullMerge(const MergeOperator* merge_operator,
  99. const Slice& key, WideBaseValueTag,
  100. const WideColumns& columns,
  101. const std::vector<Slice>& operands,
  102. Logger* logger, Statistics* statistics,
  103. SystemClock* clock, bool update_num_ops_stats,
  104. MergeOperator::OpFailureScope* op_failure_scope,
  105. ResultTs... results) {
  106. MergeOperator::MergeOperationInputV3::ExistingValue existing_value(columns);
  107. return TimedFullMergeImpl(
  108. merge_operator, key, std::move(existing_value), operands, logger,
  109. statistics, clock, update_num_ops_stats, op_failure_scope, results...);
  110. }
  111. // During compaction, merge entries until we hit
  112. // - a corrupted key
  113. // - a Put/Delete,
  114. // - a different user key,
  115. // - a specific sequence number (snapshot boundary),
  116. // - REMOVE_AND_SKIP_UNTIL returned from compaction filter,
  117. // or - the end of iteration
  118. //
  119. // The result(s) of the merge can be accessed in `MergeHelper::keys()` and
  120. // `MergeHelper::values()`, which are invalidated the next time `MergeUntil()`
  121. // is called. `MergeOutputIterator` is specially designed to iterate the
  122. // results of a `MergeHelper`'s most recent `MergeUntil()`.
  123. //
  124. // iter: (IN) points to the first merge type entry
  125. // (OUT) points to the first entry not included in the merge process
  126. // range_del_agg: (IN) filters merge operands covered by range tombstones.
  127. // stop_before: (IN) a sequence number that merge should not cross.
  128. // 0 means no restriction
  129. // at_bottom: (IN) true if the iterator covers the bottem level, which means
  130. // we could reach the start of the history of this user key.
  131. // allow_data_in_errors: (IN) if true, data details will be displayed in
  132. // error/log messages.
  133. // blob_fetcher: (IN) blob fetcher object for the compaction's input version.
  134. // prefetch_buffers: (IN/OUT) a collection of blob file prefetch buffers
  135. // used for compaction readahead.
  136. // c_iter_stats: (OUT) compaction iteration statistics.
  137. //
  138. // Returns one of the following statuses:
  139. // - OK: Entries were successfully merged.
  140. // - MergeInProgress: Output consists of merge operands only.
  141. // - Corruption: Merge operator reported unsuccessful merge or a corrupted
  142. // key has been encountered and not expected (applies only when compiling
  143. // with asserts removed).
  144. // - ShutdownInProgress: interrupted by shutdown (*shutting_down == true).
  145. //
  146. // REQUIRED: The first key in the input is not corrupted.
  147. Status MergeUntil(InternalIterator* iter,
  148. CompactionRangeDelAggregator* range_del_agg,
  149. const SequenceNumber stop_before, const bool at_bottom,
  150. const bool allow_data_in_errors,
  151. const BlobFetcher* blob_fetcher,
  152. const std::string* const full_history_ts_low,
  153. PrefetchBufferCollection* prefetch_buffers,
  154. CompactionIterationStats* c_iter_stats);
  155. // Filters a merge operand using the compaction filter specified
  156. // in the constructor. Returns the decision that the filter made.
  157. // Uses compaction_filter_value_ and compaction_filter_skip_until_ for the
  158. // optional outputs of compaction filter.
  159. // user_key includes timestamp if user-defined timestamp is enabled.
  160. CompactionFilter::Decision FilterMerge(const Slice& user_key,
  161. const Slice& value_slice);
  162. // Query the merge result
  163. // These are valid until the next MergeUntil call
  164. // If the merging was successful:
  165. // - keys() contains a single element with the latest sequence number of
  166. // the merges. The type will be Put or Merge. See IMPORTANT 1 note, below.
  167. // - values() contains a single element with the result of merging all the
  168. // operands together
  169. //
  170. // IMPORTANT 1: the key type could change after the MergeUntil call.
  171. // Put/Delete + Merge + ... + Merge => Put
  172. // Merge + ... + Merge => Merge
  173. //
  174. // If the merge operator is not associative, and if a Put/Delete is not found
  175. // then the merging will be unsuccessful. In this case:
  176. // - keys() contains the list of internal keys seen in order of iteration.
  177. // - values() contains the list of values (merges) seen in the same order.
  178. // values() is parallel to keys() so that the first entry in
  179. // keys() is the key associated with the first entry in values()
  180. // and so on. These lists will be the same length.
  181. // All of these pairs will be merges over the same user key.
  182. // See IMPORTANT 2 note below.
  183. //
  184. // IMPORTANT 2: The entries were traversed in order from BACK to FRONT.
  185. // So keys().back() was the first key seen by iterator.
  186. // TODO: Re-style this comment to be like the first one
  187. const std::deque<std::string>& keys() const { return keys_; }
  188. const std::vector<Slice>& values() const {
  189. return merge_context_.GetOperands();
  190. }
  191. uint64_t TotalFilterTime() const { return total_filter_time_; }
  192. bool HasOperator() const { return user_merge_operator_ != nullptr; }
  193. // If compaction filter returned REMOVE_AND_SKIP_UNTIL, this method will
  194. // return true and fill *until with the key to which we should skip.
  195. // If true, keys() and values() are empty.
  196. bool FilteredUntil(Slice* skip_until) const {
  197. if (!has_compaction_filter_skip_until_) {
  198. return false;
  199. }
  200. assert(compaction_filter_ != nullptr);
  201. assert(skip_until != nullptr);
  202. assert(compaction_filter_skip_until_.Valid());
  203. *skip_until = compaction_filter_skip_until_.Encode();
  204. return true;
  205. }
  206. private:
  207. Env* env_;
  208. SystemClock* clock_;
  209. const Comparator* user_comparator_;
  210. const MergeOperator* user_merge_operator_;
  211. const CompactionFilter* compaction_filter_;
  212. const std::atomic<bool>* shutting_down_;
  213. Logger* logger_;
  214. bool assert_valid_internal_key_; // enforce no internal key corruption?
  215. bool allow_single_operand_;
  216. SequenceNumber latest_snapshot_;
  217. const SnapshotChecker* const snapshot_checker_;
  218. int level_;
  219. // the scratch area that holds the result of MergeUntil
  220. // valid up to the next MergeUntil call
  221. // Keeps track of the sequence of keys seen
  222. std::deque<std::string> keys_;
  223. // Parallel with keys_; stores the operands
  224. mutable MergeContext merge_context_;
  225. StopWatchNano<> filter_timer_;
  226. uint64_t total_filter_time_;
  227. Statistics* stats_;
  228. bool has_compaction_filter_skip_until_ = false;
  229. std::string compaction_filter_value_;
  230. InternalKey compaction_filter_skip_until_;
  231. bool IsShuttingDown() {
  232. // This is a best-effort facility, so memory_order_relaxed is sufficient.
  233. return shutting_down_ && shutting_down_->load(std::memory_order_relaxed);
  234. }
  235. template <typename Visitor>
  236. static Status TimedFullMergeCommonImpl(
  237. const MergeOperator* merge_operator, const Slice& key,
  238. MergeOperator::MergeOperationInputV3::ExistingValue&& existing_value,
  239. const std::vector<Slice>& operands, Logger* logger,
  240. Statistics* statistics, SystemClock* clock, bool update_num_ops_stats,
  241. MergeOperator::OpFailureScope* op_failure_scope, Visitor&& visitor);
  242. // Variant that exposes the merge result directly (in serialized form for wide
  243. // columns) as well as its value type. Used by iterator and compaction.
  244. static Status TimedFullMergeImpl(
  245. const MergeOperator* merge_operator, const Slice& key,
  246. MergeOperator::MergeOperationInputV3::ExistingValue&& existing_value,
  247. const std::vector<Slice>& operands, Logger* logger,
  248. Statistics* statistics, SystemClock* clock, bool update_num_ops_stats,
  249. MergeOperator::OpFailureScope* op_failure_scope, std::string* result,
  250. Slice* result_operand, ValueType* result_type);
  251. // Variant that exposes the merge result translated into the form requested by
  252. // the client. (For example, if the result is a wide-column structure but the
  253. // client requested the results in plain-value form, the value of the default
  254. // column is returned.) Used by point lookups.
  255. static Status TimedFullMergeImpl(
  256. const MergeOperator* merge_operator, const Slice& key,
  257. MergeOperator::MergeOperationInputV3::ExistingValue&& existing_value,
  258. const std::vector<Slice>& operands, Logger* logger,
  259. Statistics* statistics, SystemClock* clock, bool update_num_ops_stats,
  260. MergeOperator::OpFailureScope* op_failure_scope,
  261. std::string* result_value, PinnableWideColumns* result_entity);
  262. };
  263. // MergeOutputIterator can be used to iterate over the result of a merge.
  264. class MergeOutputIterator {
  265. public:
  266. // The MergeOutputIterator is bound to a MergeHelper instance.
  267. explicit MergeOutputIterator(const MergeHelper* merge_helper);
  268. // Seeks to the first record in the output.
  269. void SeekToFirst();
  270. // Advances to the next record in the output.
  271. void Next();
  272. Slice key() { return Slice(*it_keys_); }
  273. Slice value() { return Slice(*it_values_); }
  274. bool Valid() const { return it_keys_ != merge_helper_->keys().rend(); }
  275. private:
  276. const MergeHelper* merge_helper_;
  277. std::deque<std::string>::const_reverse_iterator it_keys_;
  278. std::vector<Slice>::const_reverse_iterator it_values_;
  279. };
  280. } // namespace ROCKSDB_NAMESPACE