| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700 |
- // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
- // This source code is licensed under both the GPLv2 (found in the
- // COPYING file in the root directory) and Apache 2.0 License
- // (found in the LICENSE.Apache file in the root directory).
- #include "db/merge_helper.h"
- #include <string>
- #include "db/blob/blob_fetcher.h"
- #include "db/blob/blob_index.h"
- #include "db/blob/prefetch_buffer_collection.h"
- #include "db/compaction/compaction_iteration_stats.h"
- #include "db/dbformat.h"
- #include "db/wide/wide_columns_helper.h"
- #include "logging/logging.h"
- #include "monitoring/perf_context_imp.h"
- #include "monitoring/statistics_impl.h"
- #include "port/likely.h"
- #include "rocksdb/comparator.h"
- #include "rocksdb/db.h"
- #include "rocksdb/merge_operator.h"
- #include "rocksdb/system_clock.h"
- #include "table/format.h"
- #include "table/internal_iterator.h"
- #include "util/overload.h"
- namespace ROCKSDB_NAMESPACE {
- MergeHelper::MergeHelper(Env* env, const Comparator* user_comparator,
- const MergeOperator* user_merge_operator,
- const CompactionFilter* compaction_filter,
- Logger* logger, bool assert_valid_internal_key,
- SequenceNumber latest_snapshot,
- const SnapshotChecker* snapshot_checker, int level,
- Statistics* stats,
- const std::atomic<bool>* shutting_down)
- : env_(env),
- clock_(env->GetSystemClock().get()),
- user_comparator_(user_comparator),
- user_merge_operator_(user_merge_operator),
- compaction_filter_(compaction_filter),
- shutting_down_(shutting_down),
- logger_(logger),
- assert_valid_internal_key_(assert_valid_internal_key),
- allow_single_operand_(false),
- latest_snapshot_(latest_snapshot),
- snapshot_checker_(snapshot_checker),
- level_(level),
- keys_(),
- filter_timer_(clock_),
- total_filter_time_(0U),
- stats_(stats) {
- assert(user_comparator_ != nullptr);
- if (user_merge_operator_) {
- allow_single_operand_ = user_merge_operator_->AllowSingleOperand();
- }
- }
- template <typename Visitor>
- Status MergeHelper::TimedFullMergeCommonImpl(
- const MergeOperator* merge_operator, const Slice& key,
- MergeOperator::MergeOperationInputV3::ExistingValue&& existing_value,
- const std::vector<Slice>& operands, Logger* logger, Statistics* statistics,
- SystemClock* clock, bool update_num_ops_stats,
- MergeOperator::OpFailureScope* op_failure_scope, Visitor&& visitor) {
- assert(merge_operator);
- assert(!operands.empty());
- if (update_num_ops_stats) {
- RecordInHistogram(statistics, READ_NUM_MERGE_OPERANDS,
- static_cast<uint64_t>(operands.size()));
- }
- const MergeOperator::MergeOperationInputV3 merge_in(
- key, std::move(existing_value), operands, logger);
- MergeOperator::MergeOperationOutputV3 merge_out;
- bool success = false;
- {
- StopWatchNano timer(clock, statistics != nullptr);
- PERF_TIMER_GUARD(merge_operator_time_nanos);
- success = merge_operator->FullMergeV3(merge_in, &merge_out);
- RecordTick(statistics, MERGE_OPERATION_TOTAL_TIME,
- statistics ? timer.ElapsedNanos() : 0);
- }
- if (!success) {
- RecordTick(statistics, NUMBER_MERGE_FAILURES);
- if (op_failure_scope) {
- *op_failure_scope = merge_out.op_failure_scope;
- // Apply default per merge_operator.h
- if (*op_failure_scope == MergeOperator::OpFailureScope::kDefault) {
- *op_failure_scope = MergeOperator::OpFailureScope::kTryMerge;
- }
- }
- return Status::Corruption(Status::SubCode::kMergeOperatorFailed);
- }
- return std::visit(std::forward<Visitor>(visitor),
- std::move(merge_out.new_value));
- }
- Status MergeHelper::TimedFullMergeImpl(
- const MergeOperator* merge_operator, const Slice& key,
- MergeOperator::MergeOperationInputV3::ExistingValue&& existing_value,
- const std::vector<Slice>& operands, Logger* logger, Statistics* statistics,
- SystemClock* clock, bool update_num_ops_stats,
- MergeOperator::OpFailureScope* op_failure_scope, std::string* result,
- Slice* result_operand, ValueType* result_type) {
- assert(result);
- assert(result_type);
- auto visitor = overload{
- [&](std::string&& new_value) -> Status {
- *result_type = kTypeValue;
- if (result_operand) {
- *result_operand = Slice(nullptr, 0);
- }
- *result = std::move(new_value);
- return Status::OK();
- },
- [&](MergeOperator::MergeOperationOutputV3::NewColumns&& new_columns)
- -> Status {
- *result_type = kTypeWideColumnEntity;
- if (result_operand) {
- *result_operand = Slice(nullptr, 0);
- }
- result->clear();
- WideColumns sorted_columns;
- sorted_columns.reserve(new_columns.size());
- for (const auto& column : new_columns) {
- sorted_columns.emplace_back(column.first, column.second);
- }
- WideColumnsHelper::SortColumns(sorted_columns);
- return WideColumnSerialization::Serialize(sorted_columns, *result);
- },
- [&](Slice&& operand) -> Status {
- *result_type = kTypeValue;
- if (result_operand) {
- *result_operand = operand;
- result->clear();
- } else {
- result->assign(operand.data(), operand.size());
- }
- return Status::OK();
- }};
- return TimedFullMergeCommonImpl(merge_operator, key,
- std::move(existing_value), operands, logger,
- statistics, clock, update_num_ops_stats,
- op_failure_scope, std::move(visitor));
- }
- Status MergeHelper::TimedFullMergeImpl(
- const MergeOperator* merge_operator, const Slice& key,
- MergeOperator::MergeOperationInputV3::ExistingValue&& existing_value,
- const std::vector<Slice>& operands, Logger* logger, Statistics* statistics,
- SystemClock* clock, bool update_num_ops_stats,
- MergeOperator::OpFailureScope* op_failure_scope, std::string* result_value,
- PinnableWideColumns* result_entity) {
- assert(result_value || result_entity);
- assert(!result_value || !result_entity);
- auto visitor = overload{
- [&](std::string&& new_value) -> Status {
- if (result_value) {
- *result_value = std::move(new_value);
- return Status::OK();
- }
- assert(result_entity);
- result_entity->SetPlainValue(std::move(new_value));
- return Status::OK();
- },
- [&](MergeOperator::MergeOperationOutputV3::NewColumns&& new_columns)
- -> Status {
- if (result_value) {
- if (!new_columns.empty() &&
- new_columns.front().first == kDefaultWideColumnName) {
- *result_value = std::move(new_columns.front().second);
- } else {
- result_value->clear();
- }
- return Status::OK();
- }
- assert(result_entity);
- WideColumns sorted_columns;
- sorted_columns.reserve(new_columns.size());
- for (const auto& column : new_columns) {
- sorted_columns.emplace_back(column.first, column.second);
- }
- WideColumnsHelper::SortColumns(sorted_columns);
- std::string result;
- const Status s =
- WideColumnSerialization::Serialize(sorted_columns, result);
- if (!s.ok()) {
- result_entity->Reset();
- return s;
- }
- return result_entity->SetWideColumnValue(std::move(result));
- },
- [&](Slice&& operand) -> Status {
- if (result_value) {
- result_value->assign(operand.data(), operand.size());
- return Status::OK();
- }
- assert(result_entity);
- result_entity->SetPlainValue(operand);
- return Status::OK();
- }};
- return TimedFullMergeCommonImpl(merge_operator, key,
- std::move(existing_value), operands, logger,
- statistics, clock, update_num_ops_stats,
- op_failure_scope, std::move(visitor));
- }
- // PRE: iter points to the first merge type entry
- // POST: iter points to the first entry beyond the merge process (or the end)
- // keys_, operands_ are updated to reflect the merge result.
- // keys_ stores the list of keys encountered while merging.
- // operands_ stores the list of merge operands encountered while merging.
- // keys_[i] corresponds to operands_[i] for each i.
- //
- // TODO: Avoid the snapshot stripe map lookup in CompactionRangeDelAggregator
- // and just pass the StripeRep corresponding to the stripe being merged.
- Status MergeHelper::MergeUntil(InternalIterator* iter,
- CompactionRangeDelAggregator* range_del_agg,
- const SequenceNumber stop_before,
- const bool at_bottom,
- const bool allow_data_in_errors,
- const BlobFetcher* blob_fetcher,
- const std::string* const full_history_ts_low,
- PrefetchBufferCollection* prefetch_buffers,
- CompactionIterationStats* c_iter_stats) {
- // Get a copy of the internal key, before it's invalidated by iter->Next()
- // Also maintain the list of merge operands seen.
- assert(HasOperator());
- keys_.clear();
- merge_context_.Clear();
- has_compaction_filter_skip_until_ = false;
- assert(user_merge_operator_);
- assert(user_comparator_);
- const size_t ts_sz = user_comparator_->timestamp_size();
- if (full_history_ts_low) {
- assert(ts_sz > 0);
- assert(ts_sz == full_history_ts_low->size());
- }
- bool first_key = true;
- // We need to parse the internal key again as the parsed key is
- // backed by the internal key!
- // Assume no internal key corruption as it has been successfully parsed
- // by the caller.
- // original_key_is_iter variable is just caching the information:
- // original_key_is_iter == (iter->key().ToString() == original_key)
- bool original_key_is_iter = true;
- std::string original_key = iter->key().ToString();
- // Important:
- // orig_ikey is backed by original_key if keys_.empty()
- // orig_ikey is backed by keys_.back() if !keys_.empty()
- ParsedInternalKey orig_ikey;
- Status s = ParseInternalKey(original_key, &orig_ikey, allow_data_in_errors);
- assert(s.ok());
- if (!s.ok()) {
- return s;
- }
- assert(kTypeMerge == orig_ikey.type);
- bool hit_the_next_user_key = false;
- int cmp_with_full_history_ts_low = 0;
- for (; iter->Valid(); iter->Next(), original_key_is_iter = false) {
- if (IsShuttingDown()) {
- s = Status::ShutdownInProgress();
- return s;
- }
- // Skip range tombstones emitted by the compaction iterator.
- if (iter->IsDeleteRangeSentinelKey()) {
- continue;
- }
- ParsedInternalKey ikey;
- assert(keys_.size() == merge_context_.GetNumOperands());
- Status pik_status =
- ParseInternalKey(iter->key(), &ikey, allow_data_in_errors);
- Slice ts;
- if (pik_status.ok()) {
- ts = ExtractTimestampFromUserKey(ikey.user_key, ts_sz);
- if (full_history_ts_low) {
- cmp_with_full_history_ts_low =
- user_comparator_->CompareTimestamp(ts, *full_history_ts_low);
- }
- }
- if (!pik_status.ok()) {
- // stop at corrupted key
- if (assert_valid_internal_key_) {
- return pik_status;
- }
- break;
- } else if (first_key) {
- // If user-defined timestamp is enabled, we expect both user key and
- // timestamps are equal, as a sanity check.
- assert(user_comparator_->Equal(ikey.user_key, orig_ikey.user_key));
- first_key = false;
- } else if (!user_comparator_->EqualWithoutTimestamp(ikey.user_key,
- orig_ikey.user_key) ||
- (ts_sz > 0 &&
- !user_comparator_->Equal(ikey.user_key, orig_ikey.user_key) &&
- cmp_with_full_history_ts_low >= 0)) {
- // 1) hit a different user key, or
- // 2) user-defined timestamp is enabled, and hit a version of user key NOT
- // eligible for GC, then stop right here.
- hit_the_next_user_key = true;
- break;
- } else if (stop_before > 0 && ikey.sequence <= stop_before &&
- LIKELY(snapshot_checker_ == nullptr ||
- snapshot_checker_->CheckInSnapshot(ikey.sequence,
- stop_before) !=
- SnapshotCheckerResult::kNotInSnapshot)) {
- // hit an entry that's possibly visible by the previous snapshot, can't
- // touch that
- break;
- }
- // At this point we are guaranteed that we need to process this key.
- assert(IsValueType(ikey.type));
- if (ikey.type != kTypeMerge) {
- // hit a put/delete/single delete
- // => merge the put value or a nullptr with operands_
- // => store result in operands_.back() (and update keys_.back())
- // => change the entry type for keys_.back()
- // We are done! Success!
- // If there are no operands, just return the Status::OK(). That will cause
- // the compaction iterator to write out the key we're currently at, which
- // is the put/delete we just encountered.
- if (keys_.empty()) {
- return s;
- }
- // TODO: if we're in compaction and it's a put, it would be nice to run
- // compaction filter on it.
- std::string merge_result;
- ValueType merge_result_type;
- MergeOperator::OpFailureScope op_failure_scope;
- if (range_del_agg &&
- range_del_agg->ShouldDelete(
- ikey, RangeDelPositioningMode::kForwardTraversal)) {
- s = TimedFullMerge(user_merge_operator_, ikey.user_key, kNoBaseValue,
- merge_context_.GetOperands(), logger_, stats_,
- clock_, /* update_num_ops_stats */ false,
- &op_failure_scope, &merge_result,
- /* result_operand */ nullptr, &merge_result_type);
- } else if (ikey.type == kTypeValue) {
- s = TimedFullMerge(user_merge_operator_, ikey.user_key, kPlainBaseValue,
- iter->value(), merge_context_.GetOperands(), logger_,
- stats_, clock_, /* update_num_ops_stats */ false,
- &op_failure_scope, &merge_result,
- /* result_operand */ nullptr, &merge_result_type);
- } else if (ikey.type == kTypeValuePreferredSeqno) {
- // When a TimedPut is merged with some merge operands, its original
- // write time info is obsolete and removed, and the merge result is a
- // kTypeValue.
- Slice unpacked_value = ParsePackedValueForValue(iter->value());
- s = TimedFullMerge(user_merge_operator_, ikey.user_key, kPlainBaseValue,
- unpacked_value, merge_context_.GetOperands(),
- logger_, stats_, clock_,
- /* update_num_ops_stats */ false, &op_failure_scope,
- &merge_result,
- /* result_operand */ nullptr, &merge_result_type);
- } else if (ikey.type == kTypeBlobIndex) {
- BlobIndex blob_index;
- s = blob_index.DecodeFrom(iter->value());
- if (!s.ok()) {
- return s;
- }
- FilePrefetchBuffer* prefetch_buffer =
- prefetch_buffers ? prefetch_buffers->GetOrCreatePrefetchBuffer(
- blob_index.file_number())
- : nullptr;
- uint64_t bytes_read = 0;
- assert(blob_fetcher);
- PinnableSlice blob_value;
- s = blob_fetcher->FetchBlob(ikey.user_key, blob_index, prefetch_buffer,
- &blob_value, &bytes_read);
- if (!s.ok()) {
- return s;
- }
- if (c_iter_stats) {
- ++c_iter_stats->num_blobs_read;
- c_iter_stats->total_blob_bytes_read += bytes_read;
- }
- s = TimedFullMerge(user_merge_operator_, ikey.user_key, kPlainBaseValue,
- blob_value, merge_context_.GetOperands(), logger_,
- stats_, clock_, /* update_num_ops_stats */ false,
- &op_failure_scope, &merge_result,
- /* result_operand */ nullptr, &merge_result_type);
- } else if (ikey.type == kTypeWideColumnEntity) {
- s = TimedFullMerge(user_merge_operator_, ikey.user_key, kWideBaseValue,
- iter->value(), merge_context_.GetOperands(), logger_,
- stats_, clock_, /* update_num_ops_stats */ false,
- &op_failure_scope, &merge_result,
- /* result_operand */ nullptr, &merge_result_type);
- } else {
- s = TimedFullMerge(user_merge_operator_, ikey.user_key, kNoBaseValue,
- merge_context_.GetOperands(), logger_, stats_,
- clock_, /* update_num_ops_stats */ false,
- &op_failure_scope, &merge_result,
- /* result_operand */ nullptr, &merge_result_type);
- }
- // We store the result in keys_.back() and operands_.back()
- // if nothing went wrong (i.e.: no operand corruption on disk)
- if (s.ok()) {
- // The original key encountered
- original_key = std::move(keys_.back());
- assert(merge_result_type == kTypeValue ||
- merge_result_type == kTypeWideColumnEntity);
- orig_ikey.type = merge_result_type;
- UpdateInternalKey(&original_key, orig_ikey.sequence, orig_ikey.type);
- keys_.clear();
- merge_context_.Clear();
- keys_.emplace_front(std::move(original_key));
- merge_context_.PushOperand(merge_result);
- // move iter to the next entry
- iter->Next();
- } else if (op_failure_scope ==
- MergeOperator::OpFailureScope::kMustMerge) {
- // Change to `Status::MergeInProgress()` to denote output consists of
- // merge operands only. Leave `iter` at the non-merge entry so it will
- // be output after.
- s = Status::MergeInProgress();
- }
- return s;
- } else {
- // hit a merge
- // => if there is a compaction filter, apply it.
- // => check for range tombstones covering the operand
- // => merge the operand into the front of the operands_ list
- // if not filtered
- // => then continue because we haven't yet seen a Put/Delete.
- //
- // Keep queuing keys and operands until we either meet a put / delete
- // request or later did a partial merge.
- Slice value_slice = iter->value();
- // add an operand to the list if:
- // 1) it's included in one of the snapshots. in that case we *must* write
- // it out, no matter what compaction filter says
- // 2) it's not filtered by a compaction filter
- CompactionFilter::Decision filter =
- ikey.sequence <= latest_snapshot_
- ? CompactionFilter::Decision::kKeep
- : FilterMerge(orig_ikey.user_key, value_slice);
- // FIXME: should also check for kRemove here
- if (filter != CompactionFilter::Decision::kRemoveAndSkipUntil &&
- range_del_agg != nullptr &&
- range_del_agg->ShouldDelete(
- iter->key(), RangeDelPositioningMode::kForwardTraversal)) {
- filter = CompactionFilter::Decision::kRemove;
- }
- if (filter == CompactionFilter::Decision::kKeep ||
- filter == CompactionFilter::Decision::kChangeValue) {
- if (original_key_is_iter) {
- // this is just an optimization that saves us one memcpy
- keys_.emplace_front(original_key);
- } else {
- keys_.emplace_front(iter->key().ToString());
- }
- if (keys_.size() == 1) {
- // we need to re-anchor the orig_ikey because it was anchored by
- // original_key before
- pik_status =
- ParseInternalKey(keys_.back(), &orig_ikey, allow_data_in_errors);
- pik_status.PermitUncheckedError();
- assert(pik_status.ok());
- }
- if (filter == CompactionFilter::Decision::kKeep) {
- merge_context_.PushOperand(
- value_slice, iter->IsValuePinned() /* operand_pinned */);
- } else {
- assert(filter == CompactionFilter::Decision::kChangeValue);
- // Compaction filter asked us to change the operand from value_slice
- // to compaction_filter_value_.
- merge_context_.PushOperand(compaction_filter_value_, false);
- }
- } else if (filter == CompactionFilter::Decision::kRemoveAndSkipUntil) {
- // Compaction filter asked us to remove this key altogether
- // (not just this operand), along with some keys following it.
- keys_.clear();
- merge_context_.Clear();
- has_compaction_filter_skip_until_ = true;
- return s;
- }
- }
- }
- if (cmp_with_full_history_ts_low >= 0) {
- size_t num_merge_operands = merge_context_.GetNumOperands();
- if (ts_sz && num_merge_operands > 1) {
- // We do not merge merge operands with different timestamps if they are
- // not eligible for GC.
- ROCKS_LOG_ERROR(logger_, "ts_sz=%d, %d merge oprands",
- static_cast<int>(ts_sz),
- static_cast<int>(num_merge_operands));
- assert(false);
- }
- }
- if (merge_context_.GetNumOperands() == 0) {
- // we filtered out all the merge operands
- return s;
- }
- // We are sure we have seen this key's entire history if:
- // at_bottom == true (this does not necessarily mean it is the bottommost
- // layer, but rather that we are confident the key does not appear on any of
- // the lower layers, at_bottom == false doesn't mean it does appear, just
- // that we can't be sure, see Compaction::IsBottommostLevel for details)
- // AND
- // we have either encountered another key or end of key history on this
- // layer.
- // Note that if user-defined timestamp is enabled, we need some extra caution
- // here: if full_history_ts_low is nullptr, or it's not null but the key's
- // timestamp is greater than or equal to full_history_ts_low, it means this
- // key cannot be dropped. We may not have seen the beginning of the key.
- //
- // When these conditions are true we are able to merge all the keys
- // using full merge.
- //
- // For these cases we are not sure about, we simply miss the opportunity
- // to combine the keys. Since VersionSet::SetupOtherInputs() always makes
- // sure that all merge-operands on the same level get compacted together,
- // this will simply lead to these merge operands moving to the next level.
- bool surely_seen_the_beginning =
- (hit_the_next_user_key || !iter->Valid()) && at_bottom &&
- (ts_sz == 0 || cmp_with_full_history_ts_low < 0);
- if (surely_seen_the_beginning) {
- // do a final merge with nullptr as the existing value and say
- // bye to the merge type (it's now converted to a Put)
- assert(kTypeMerge == orig_ikey.type);
- assert(merge_context_.GetNumOperands() >= 1);
- assert(merge_context_.GetNumOperands() == keys_.size());
- std::string merge_result;
- ValueType merge_result_type;
- MergeOperator::OpFailureScope op_failure_scope;
- s = TimedFullMerge(user_merge_operator_, orig_ikey.user_key, kNoBaseValue,
- merge_context_.GetOperands(), logger_, stats_, clock_,
- /* update_num_ops_stats */ false, &op_failure_scope,
- &merge_result,
- /* result_operand */ nullptr, &merge_result_type);
- if (s.ok()) {
- // The original key encountered
- // We are certain that keys_ is not empty here (see assertions couple of
- // lines before).
- original_key = std::move(keys_.back());
- assert(merge_result_type == kTypeValue ||
- merge_result_type == kTypeWideColumnEntity);
- orig_ikey.type = merge_result_type;
- UpdateInternalKey(&original_key, orig_ikey.sequence, orig_ikey.type);
- keys_.clear();
- merge_context_.Clear();
- keys_.emplace_front(std::move(original_key));
- merge_context_.PushOperand(merge_result);
- } else if (op_failure_scope == MergeOperator::OpFailureScope::kMustMerge) {
- // Change to `Status::MergeInProgress()` to denote output consists of
- // merge operands only.
- s = Status::MergeInProgress();
- }
- } else {
- // We haven't seen the beginning of the key nor a Put/Delete.
- // Attempt to use the user's associative merge function to
- // merge the stacked merge operands into a single operand.
- s = Status::MergeInProgress();
- if (merge_context_.GetNumOperands() >= 2 ||
- (allow_single_operand_ && merge_context_.GetNumOperands() == 1)) {
- bool merge_success = false;
- std::string merge_result;
- {
- StopWatchNano timer(clock_, stats_ != nullptr);
- PERF_TIMER_GUARD(merge_operator_time_nanos);
- merge_success = user_merge_operator_->PartialMergeMulti(
- orig_ikey.user_key,
- std::deque<Slice>(merge_context_.GetOperands().begin(),
- merge_context_.GetOperands().end()),
- &merge_result, logger_);
- RecordTick(stats_, MERGE_OPERATION_TOTAL_TIME,
- stats_ ? timer.ElapsedNanosSafe() : 0);
- }
- if (merge_success) {
- // Merging of operands (associative merge) was successful.
- // Replace operands with the merge result
- merge_context_.Clear();
- merge_context_.PushOperand(merge_result);
- keys_.erase(keys_.begin(), keys_.end() - 1);
- }
- }
- }
- return s;
- }
- MergeOutputIterator::MergeOutputIterator(const MergeHelper* merge_helper)
- : merge_helper_(merge_helper) {
- it_keys_ = merge_helper_->keys().rend();
- it_values_ = merge_helper_->values().rend();
- }
- void MergeOutputIterator::SeekToFirst() {
- const auto& keys = merge_helper_->keys();
- const auto& values = merge_helper_->values();
- assert(keys.size() == values.size());
- it_keys_ = keys.rbegin();
- it_values_ = values.rbegin();
- }
- void MergeOutputIterator::Next() {
- ++it_keys_;
- ++it_values_;
- }
- CompactionFilter::Decision MergeHelper::FilterMerge(const Slice& user_key,
- const Slice& value_slice) {
- if (compaction_filter_ == nullptr) {
- return CompactionFilter::Decision::kKeep;
- }
- if (stats_ != nullptr && ShouldReportDetailedTime(env_, stats_)) {
- filter_timer_.Start();
- }
- compaction_filter_value_.clear();
- compaction_filter_skip_until_.Clear();
- auto ret = compaction_filter_->FilterV3(
- level_, user_key, CompactionFilter::ValueType::kMergeOperand,
- &value_slice, /* existing_columns */ nullptr, &compaction_filter_value_,
- /* new_columns */ nullptr, compaction_filter_skip_until_.rep());
- if (ret == CompactionFilter::Decision::kRemoveAndSkipUntil) {
- if (user_comparator_->Compare(*compaction_filter_skip_until_.rep(),
- user_key) <= 0) {
- // Invalid skip_until returned from compaction filter.
- // Keep the key as per FilterV2/FilterV3 documentation.
- ret = CompactionFilter::Decision::kKeep;
- } else {
- compaction_filter_skip_until_.ConvertFromUserKey(kMaxSequenceNumber,
- kValueTypeForSeek);
- }
- }
- if (stats_ != nullptr && ShouldReportDetailedTime(env_, stats_)) {
- total_filter_time_ += filter_timer_.ElapsedNanosSafe();
- }
- return ret;
- }
- } // namespace ROCKSDB_NAMESPACE
|