merge_helper.cc 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #include "db/merge_helper.h"
  6. #include <string>
  7. #include "db/dbformat.h"
  8. #include "monitoring/perf_context_imp.h"
  9. #include "monitoring/statistics.h"
  10. #include "port/likely.h"
  11. #include "rocksdb/comparator.h"
  12. #include "rocksdb/db.h"
  13. #include "rocksdb/merge_operator.h"
  14. #include "table/format.h"
  15. #include "table/internal_iterator.h"
  16. namespace ROCKSDB_NAMESPACE {
  17. MergeHelper::MergeHelper(Env* env, const Comparator* user_comparator,
  18. const MergeOperator* user_merge_operator,
  19. const CompactionFilter* compaction_filter,
  20. Logger* logger, bool assert_valid_internal_key,
  21. SequenceNumber latest_snapshot,
  22. const SnapshotChecker* snapshot_checker, int level,
  23. Statistics* stats,
  24. const std::atomic<bool>* shutting_down)
  25. : env_(env),
  26. user_comparator_(user_comparator),
  27. user_merge_operator_(user_merge_operator),
  28. compaction_filter_(compaction_filter),
  29. shutting_down_(shutting_down),
  30. logger_(logger),
  31. assert_valid_internal_key_(assert_valid_internal_key),
  32. allow_single_operand_(false),
  33. latest_snapshot_(latest_snapshot),
  34. snapshot_checker_(snapshot_checker),
  35. level_(level),
  36. keys_(),
  37. filter_timer_(env_),
  38. total_filter_time_(0U),
  39. stats_(stats) {
  40. assert(user_comparator_ != nullptr);
  41. if (user_merge_operator_) {
  42. allow_single_operand_ = user_merge_operator_->AllowSingleOperand();
  43. }
  44. }
  45. Status MergeHelper::TimedFullMerge(const MergeOperator* merge_operator,
  46. const Slice& key, const Slice* value,
  47. const std::vector<Slice>& operands,
  48. std::string* result, Logger* logger,
  49. Statistics* statistics, Env* env,
  50. Slice* result_operand,
  51. bool update_num_ops_stats) {
  52. assert(merge_operator != nullptr);
  53. if (operands.size() == 0) {
  54. assert(value != nullptr && result != nullptr);
  55. result->assign(value->data(), value->size());
  56. return Status::OK();
  57. }
  58. if (update_num_ops_stats) {
  59. RecordInHistogram(statistics, READ_NUM_MERGE_OPERANDS,
  60. static_cast<uint64_t>(operands.size()));
  61. }
  62. bool success;
  63. Slice tmp_result_operand(nullptr, 0);
  64. const MergeOperator::MergeOperationInput merge_in(key, value, operands,
  65. logger);
  66. MergeOperator::MergeOperationOutput merge_out(*result, tmp_result_operand);
  67. {
  68. // Setup to time the merge
  69. StopWatchNano timer(env, statistics != nullptr);
  70. PERF_TIMER_GUARD(merge_operator_time_nanos);
  71. // Do the merge
  72. success = merge_operator->FullMergeV2(merge_in, &merge_out);
  73. if (tmp_result_operand.data()) {
  74. // FullMergeV2 result is an existing operand
  75. if (result_operand != nullptr) {
  76. *result_operand = tmp_result_operand;
  77. } else {
  78. result->assign(tmp_result_operand.data(), tmp_result_operand.size());
  79. }
  80. } else if (result_operand) {
  81. *result_operand = Slice(nullptr, 0);
  82. }
  83. RecordTick(statistics, MERGE_OPERATION_TOTAL_TIME,
  84. statistics ? timer.ElapsedNanos() : 0);
  85. }
  86. if (!success) {
  87. RecordTick(statistics, NUMBER_MERGE_FAILURES);
  88. return Status::Corruption("Error: Could not perform merge.");
  89. }
  90. return Status::OK();
  91. }
  92. // PRE: iter points to the first merge type entry
  93. // POST: iter points to the first entry beyond the merge process (or the end)
  94. // keys_, operands_ are updated to reflect the merge result.
  95. // keys_ stores the list of keys encountered while merging.
  96. // operands_ stores the list of merge operands encountered while merging.
  97. // keys_[i] corresponds to operands_[i] for each i.
  98. //
  99. // TODO: Avoid the snapshot stripe map lookup in CompactionRangeDelAggregator
  100. // and just pass the StripeRep corresponding to the stripe being merged.
  101. Status MergeHelper::MergeUntil(InternalIterator* iter,
  102. CompactionRangeDelAggregator* range_del_agg,
  103. const SequenceNumber stop_before,
  104. const bool at_bottom) {
  105. // Get a copy of the internal key, before it's invalidated by iter->Next()
  106. // Also maintain the list of merge operands seen.
  107. assert(HasOperator());
  108. keys_.clear();
  109. merge_context_.Clear();
  110. has_compaction_filter_skip_until_ = false;
  111. assert(user_merge_operator_);
  112. bool first_key = true;
  113. // We need to parse the internal key again as the parsed key is
  114. // backed by the internal key!
  115. // Assume no internal key corruption as it has been successfully parsed
  116. // by the caller.
  117. // original_key_is_iter variable is just caching the information:
  118. // original_key_is_iter == (iter->key().ToString() == original_key)
  119. bool original_key_is_iter = true;
  120. std::string original_key = iter->key().ToString();
  121. // Important:
  122. // orig_ikey is backed by original_key if keys_.empty()
  123. // orig_ikey is backed by keys_.back() if !keys_.empty()
  124. ParsedInternalKey orig_ikey;
  125. bool succ = ParseInternalKey(original_key, &orig_ikey);
  126. assert(succ);
  127. if (!succ) {
  128. return Status::Corruption("Cannot parse key in MergeUntil");
  129. }
  130. Status s;
  131. bool hit_the_next_user_key = false;
  132. for (; iter->Valid(); iter->Next(), original_key_is_iter = false) {
  133. if (IsShuttingDown()) {
  134. return Status::ShutdownInProgress();
  135. }
  136. ParsedInternalKey ikey;
  137. assert(keys_.size() == merge_context_.GetNumOperands());
  138. if (!ParseInternalKey(iter->key(), &ikey)) {
  139. // stop at corrupted key
  140. if (assert_valid_internal_key_) {
  141. assert(!"Corrupted internal key not expected.");
  142. return Status::Corruption("Corrupted internal key not expected.");
  143. }
  144. break;
  145. } else if (first_key) {
  146. assert(user_comparator_->Equal(ikey.user_key, orig_ikey.user_key));
  147. first_key = false;
  148. } else if (!user_comparator_->Equal(ikey.user_key, orig_ikey.user_key)) {
  149. // hit a different user key, stop right here
  150. hit_the_next_user_key = true;
  151. break;
  152. } else if (stop_before > 0 && ikey.sequence <= stop_before &&
  153. LIKELY(snapshot_checker_ == nullptr ||
  154. snapshot_checker_->CheckInSnapshot(ikey.sequence,
  155. stop_before) !=
  156. SnapshotCheckerResult::kNotInSnapshot)) {
  157. // hit an entry that's possibly visible by the previous snapshot, can't
  158. // touch that
  159. break;
  160. }
  161. // At this point we are guaranteed that we need to process this key.
  162. assert(IsValueType(ikey.type));
  163. if (ikey.type != kTypeMerge) {
  164. // hit a put/delete/single delete
  165. // => merge the put value or a nullptr with operands_
  166. // => store result in operands_.back() (and update keys_.back())
  167. // => change the entry type to kTypeValue for keys_.back()
  168. // We are done! Success!
  169. // If there are no operands, just return the Status::OK(). That will cause
  170. // the compaction iterator to write out the key we're currently at, which
  171. // is the put/delete we just encountered.
  172. if (keys_.empty()) {
  173. return Status::OK();
  174. }
  175. // TODO(noetzli) If the merge operator returns false, we are currently
  176. // (almost) silently dropping the put/delete. That's probably not what we
  177. // want. Also if we're in compaction and it's a put, it would be nice to
  178. // run compaction filter on it.
  179. const Slice val = iter->value();
  180. const Slice* val_ptr;
  181. if (kTypeValue == ikey.type &&
  182. (range_del_agg == nullptr ||
  183. !range_del_agg->ShouldDelete(
  184. ikey, RangeDelPositioningMode::kForwardTraversal))) {
  185. val_ptr = &val;
  186. } else {
  187. val_ptr = nullptr;
  188. }
  189. std::string merge_result;
  190. s = TimedFullMerge(user_merge_operator_, ikey.user_key, val_ptr,
  191. merge_context_.GetOperands(), &merge_result, logger_,
  192. stats_, env_);
  193. // We store the result in keys_.back() and operands_.back()
  194. // if nothing went wrong (i.e.: no operand corruption on disk)
  195. if (s.ok()) {
  196. // The original key encountered
  197. original_key = std::move(keys_.back());
  198. orig_ikey.type = kTypeValue;
  199. UpdateInternalKey(&original_key, orig_ikey.sequence, orig_ikey.type);
  200. keys_.clear();
  201. merge_context_.Clear();
  202. keys_.emplace_front(std::move(original_key));
  203. merge_context_.PushOperand(merge_result);
  204. }
  205. // move iter to the next entry
  206. iter->Next();
  207. return s;
  208. } else {
  209. // hit a merge
  210. // => if there is a compaction filter, apply it.
  211. // => check for range tombstones covering the operand
  212. // => merge the operand into the front of the operands_ list
  213. // if not filtered
  214. // => then continue because we haven't yet seen a Put/Delete.
  215. //
  216. // Keep queuing keys and operands until we either meet a put / delete
  217. // request or later did a partial merge.
  218. Slice value_slice = iter->value();
  219. // add an operand to the list if:
  220. // 1) it's included in one of the snapshots. in that case we *must* write
  221. // it out, no matter what compaction filter says
  222. // 2) it's not filtered by a compaction filter
  223. CompactionFilter::Decision filter =
  224. ikey.sequence <= latest_snapshot_
  225. ? CompactionFilter::Decision::kKeep
  226. : FilterMerge(orig_ikey.user_key, value_slice);
  227. if (filter != CompactionFilter::Decision::kRemoveAndSkipUntil &&
  228. range_del_agg != nullptr &&
  229. range_del_agg->ShouldDelete(
  230. iter->key(), RangeDelPositioningMode::kForwardTraversal)) {
  231. filter = CompactionFilter::Decision::kRemove;
  232. }
  233. if (filter == CompactionFilter::Decision::kKeep ||
  234. filter == CompactionFilter::Decision::kChangeValue) {
  235. if (original_key_is_iter) {
  236. // this is just an optimization that saves us one memcpy
  237. keys_.push_front(std::move(original_key));
  238. } else {
  239. keys_.push_front(iter->key().ToString());
  240. }
  241. if (keys_.size() == 1) {
  242. // we need to re-anchor the orig_ikey because it was anchored by
  243. // original_key before
  244. ParseInternalKey(keys_.back(), &orig_ikey);
  245. }
  246. if (filter == CompactionFilter::Decision::kKeep) {
  247. merge_context_.PushOperand(
  248. value_slice, iter->IsValuePinned() /* operand_pinned */);
  249. } else { // kChangeValue
  250. // Compaction filter asked us to change the operand from value_slice
  251. // to compaction_filter_value_.
  252. merge_context_.PushOperand(compaction_filter_value_, false);
  253. }
  254. } else if (filter == CompactionFilter::Decision::kRemoveAndSkipUntil) {
  255. // Compaction filter asked us to remove this key altogether
  256. // (not just this operand), along with some keys following it.
  257. keys_.clear();
  258. merge_context_.Clear();
  259. has_compaction_filter_skip_until_ = true;
  260. return Status::OK();
  261. }
  262. }
  263. }
  264. if (merge_context_.GetNumOperands() == 0) {
  265. // we filtered out all the merge operands
  266. return Status::OK();
  267. }
  268. // We are sure we have seen this key's entire history if:
  269. // at_bottom == true (this does not necessarily mean it is the bottommost
  270. // layer, but rather that we are confident the key does not appear on any of
  271. // the lower layers, at_bottom == false doesn't mean it does appear, just
  272. // that we can't be sure, see Compaction::IsBottommostLevel for details)
  273. // AND
  274. // we have either encountered another key or end of key history on this
  275. // layer.
  276. //
  277. // When these conditions are true we are able to merge all the keys
  278. // using full merge.
  279. //
  280. // For these cases we are not sure about, we simply miss the opportunity
  281. // to combine the keys. Since VersionSet::SetupOtherInputs() always makes
  282. // sure that all merge-operands on the same level get compacted together,
  283. // this will simply lead to these merge operands moving to the next level.
  284. bool surely_seen_the_beginning =
  285. (hit_the_next_user_key || !iter->Valid()) && at_bottom;
  286. if (surely_seen_the_beginning) {
  287. // do a final merge with nullptr as the existing value and say
  288. // bye to the merge type (it's now converted to a Put)
  289. assert(kTypeMerge == orig_ikey.type);
  290. assert(merge_context_.GetNumOperands() >= 1);
  291. assert(merge_context_.GetNumOperands() == keys_.size());
  292. std::string merge_result;
  293. s = TimedFullMerge(user_merge_operator_, orig_ikey.user_key, nullptr,
  294. merge_context_.GetOperands(), &merge_result, logger_,
  295. stats_, env_);
  296. if (s.ok()) {
  297. // The original key encountered
  298. // We are certain that keys_ is not empty here (see assertions couple of
  299. // lines before).
  300. original_key = std::move(keys_.back());
  301. orig_ikey.type = kTypeValue;
  302. UpdateInternalKey(&original_key, orig_ikey.sequence, orig_ikey.type);
  303. keys_.clear();
  304. merge_context_.Clear();
  305. keys_.emplace_front(std::move(original_key));
  306. merge_context_.PushOperand(merge_result);
  307. }
  308. } else {
  309. // We haven't seen the beginning of the key nor a Put/Delete.
  310. // Attempt to use the user's associative merge function to
  311. // merge the stacked merge operands into a single operand.
  312. s = Status::MergeInProgress();
  313. if (merge_context_.GetNumOperands() >= 2 ||
  314. (allow_single_operand_ && merge_context_.GetNumOperands() == 1)) {
  315. bool merge_success = false;
  316. std::string merge_result;
  317. {
  318. StopWatchNano timer(env_, stats_ != nullptr);
  319. PERF_TIMER_GUARD(merge_operator_time_nanos);
  320. merge_success = user_merge_operator_->PartialMergeMulti(
  321. orig_ikey.user_key,
  322. std::deque<Slice>(merge_context_.GetOperands().begin(),
  323. merge_context_.GetOperands().end()),
  324. &merge_result, logger_);
  325. RecordTick(stats_, MERGE_OPERATION_TOTAL_TIME,
  326. stats_ ? timer.ElapsedNanosSafe() : 0);
  327. }
  328. if (merge_success) {
  329. // Merging of operands (associative merge) was successful.
  330. // Replace operands with the merge result
  331. merge_context_.Clear();
  332. merge_context_.PushOperand(merge_result);
  333. keys_.erase(keys_.begin(), keys_.end() - 1);
  334. }
  335. }
  336. }
  337. return s;
  338. }
  339. MergeOutputIterator::MergeOutputIterator(const MergeHelper* merge_helper)
  340. : merge_helper_(merge_helper) {
  341. it_keys_ = merge_helper_->keys().rend();
  342. it_values_ = merge_helper_->values().rend();
  343. }
  344. void MergeOutputIterator::SeekToFirst() {
  345. const auto& keys = merge_helper_->keys();
  346. const auto& values = merge_helper_->values();
  347. assert(keys.size() == values.size());
  348. it_keys_ = keys.rbegin();
  349. it_values_ = values.rbegin();
  350. }
  351. void MergeOutputIterator::Next() {
  352. ++it_keys_;
  353. ++it_values_;
  354. }
  355. CompactionFilter::Decision MergeHelper::FilterMerge(const Slice& user_key,
  356. const Slice& value_slice) {
  357. if (compaction_filter_ == nullptr) {
  358. return CompactionFilter::Decision::kKeep;
  359. }
  360. if (stats_ != nullptr && ShouldReportDetailedTime(env_, stats_)) {
  361. filter_timer_.Start();
  362. }
  363. compaction_filter_value_.clear();
  364. compaction_filter_skip_until_.Clear();
  365. auto ret = compaction_filter_->FilterV2(
  366. level_, user_key, CompactionFilter::ValueType::kMergeOperand, value_slice,
  367. &compaction_filter_value_, compaction_filter_skip_until_.rep());
  368. if (ret == CompactionFilter::Decision::kRemoveAndSkipUntil) {
  369. if (user_comparator_->Compare(*compaction_filter_skip_until_.rep(),
  370. user_key) <= 0) {
  371. // Invalid skip_until returned from compaction filter.
  372. // Keep the key as per FilterV2 documentation.
  373. ret = CompactionFilter::Decision::kKeep;
  374. } else {
  375. compaction_filter_skip_until_.ConvertFromUserKey(kMaxSequenceNumber,
  376. kValueTypeForSeek);
  377. }
  378. }
  379. total_filter_time_ += filter_timer_.ElapsedNanosSafe();
  380. return ret;
  381. }
  382. } // namespace ROCKSDB_NAMESPACE