merge_helper.cc 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #include "db/merge_helper.h"
  6. #include <string>
  7. #include "db/blob/blob_fetcher.h"
  8. #include "db/blob/blob_index.h"
  9. #include "db/blob/prefetch_buffer_collection.h"
  10. #include "db/compaction/compaction_iteration_stats.h"
  11. #include "db/dbformat.h"
  12. #include "db/wide/wide_columns_helper.h"
  13. #include "logging/logging.h"
  14. #include "monitoring/perf_context_imp.h"
  15. #include "monitoring/statistics_impl.h"
  16. #include "port/likely.h"
  17. #include "rocksdb/comparator.h"
  18. #include "rocksdb/db.h"
  19. #include "rocksdb/merge_operator.h"
  20. #include "rocksdb/system_clock.h"
  21. #include "table/format.h"
  22. #include "table/internal_iterator.h"
  23. #include "util/overload.h"
  24. namespace ROCKSDB_NAMESPACE {
  25. MergeHelper::MergeHelper(Env* env, const Comparator* user_comparator,
  26. const MergeOperator* user_merge_operator,
  27. const CompactionFilter* compaction_filter,
  28. Logger* logger, bool assert_valid_internal_key,
  29. SequenceNumber latest_snapshot,
  30. const SnapshotChecker* snapshot_checker, int level,
  31. Statistics* stats,
  32. const std::atomic<bool>* shutting_down)
  33. : env_(env),
  34. clock_(env->GetSystemClock().get()),
  35. user_comparator_(user_comparator),
  36. user_merge_operator_(user_merge_operator),
  37. compaction_filter_(compaction_filter),
  38. shutting_down_(shutting_down),
  39. logger_(logger),
  40. assert_valid_internal_key_(assert_valid_internal_key),
  41. allow_single_operand_(false),
  42. latest_snapshot_(latest_snapshot),
  43. snapshot_checker_(snapshot_checker),
  44. level_(level),
  45. keys_(),
  46. filter_timer_(clock_),
  47. total_filter_time_(0U),
  48. stats_(stats) {
  49. assert(user_comparator_ != nullptr);
  50. if (user_merge_operator_) {
  51. allow_single_operand_ = user_merge_operator_->AllowSingleOperand();
  52. }
  53. }
  54. template <typename Visitor>
  55. Status MergeHelper::TimedFullMergeCommonImpl(
  56. const MergeOperator* merge_operator, const Slice& key,
  57. MergeOperator::MergeOperationInputV3::ExistingValue&& existing_value,
  58. const std::vector<Slice>& operands, Logger* logger, Statistics* statistics,
  59. SystemClock* clock, bool update_num_ops_stats,
  60. MergeOperator::OpFailureScope* op_failure_scope, Visitor&& visitor) {
  61. assert(merge_operator);
  62. assert(!operands.empty());
  63. if (update_num_ops_stats) {
  64. RecordInHistogram(statistics, READ_NUM_MERGE_OPERANDS,
  65. static_cast<uint64_t>(operands.size()));
  66. }
  67. const MergeOperator::MergeOperationInputV3 merge_in(
  68. key, std::move(existing_value), operands, logger);
  69. MergeOperator::MergeOperationOutputV3 merge_out;
  70. bool success = false;
  71. {
  72. StopWatchNano timer(clock, statistics != nullptr);
  73. PERF_TIMER_GUARD(merge_operator_time_nanos);
  74. success = merge_operator->FullMergeV3(merge_in, &merge_out);
  75. RecordTick(statistics, MERGE_OPERATION_TOTAL_TIME,
  76. statistics ? timer.ElapsedNanos() : 0);
  77. }
  78. if (!success) {
  79. RecordTick(statistics, NUMBER_MERGE_FAILURES);
  80. if (op_failure_scope) {
  81. *op_failure_scope = merge_out.op_failure_scope;
  82. // Apply default per merge_operator.h
  83. if (*op_failure_scope == MergeOperator::OpFailureScope::kDefault) {
  84. *op_failure_scope = MergeOperator::OpFailureScope::kTryMerge;
  85. }
  86. }
  87. return Status::Corruption(Status::SubCode::kMergeOperatorFailed);
  88. }
  89. return std::visit(std::forward<Visitor>(visitor),
  90. std::move(merge_out.new_value));
  91. }
  92. Status MergeHelper::TimedFullMergeImpl(
  93. const MergeOperator* merge_operator, const Slice& key,
  94. MergeOperator::MergeOperationInputV3::ExistingValue&& existing_value,
  95. const std::vector<Slice>& operands, Logger* logger, Statistics* statistics,
  96. SystemClock* clock, bool update_num_ops_stats,
  97. MergeOperator::OpFailureScope* op_failure_scope, std::string* result,
  98. Slice* result_operand, ValueType* result_type) {
  99. assert(result);
  100. assert(result_type);
  101. auto visitor = overload{
  102. [&](std::string&& new_value) -> Status {
  103. *result_type = kTypeValue;
  104. if (result_operand) {
  105. *result_operand = Slice(nullptr, 0);
  106. }
  107. *result = std::move(new_value);
  108. return Status::OK();
  109. },
  110. [&](MergeOperator::MergeOperationOutputV3::NewColumns&& new_columns)
  111. -> Status {
  112. *result_type = kTypeWideColumnEntity;
  113. if (result_operand) {
  114. *result_operand = Slice(nullptr, 0);
  115. }
  116. result->clear();
  117. WideColumns sorted_columns;
  118. sorted_columns.reserve(new_columns.size());
  119. for (const auto& column : new_columns) {
  120. sorted_columns.emplace_back(column.first, column.second);
  121. }
  122. WideColumnsHelper::SortColumns(sorted_columns);
  123. return WideColumnSerialization::Serialize(sorted_columns, *result);
  124. },
  125. [&](Slice&& operand) -> Status {
  126. *result_type = kTypeValue;
  127. if (result_operand) {
  128. *result_operand = operand;
  129. result->clear();
  130. } else {
  131. result->assign(operand.data(), operand.size());
  132. }
  133. return Status::OK();
  134. }};
  135. return TimedFullMergeCommonImpl(merge_operator, key,
  136. std::move(existing_value), operands, logger,
  137. statistics, clock, update_num_ops_stats,
  138. op_failure_scope, std::move(visitor));
  139. }
  140. Status MergeHelper::TimedFullMergeImpl(
  141. const MergeOperator* merge_operator, const Slice& key,
  142. MergeOperator::MergeOperationInputV3::ExistingValue&& existing_value,
  143. const std::vector<Slice>& operands, Logger* logger, Statistics* statistics,
  144. SystemClock* clock, bool update_num_ops_stats,
  145. MergeOperator::OpFailureScope* op_failure_scope, std::string* result_value,
  146. PinnableWideColumns* result_entity) {
  147. assert(result_value || result_entity);
  148. assert(!result_value || !result_entity);
  149. auto visitor = overload{
  150. [&](std::string&& new_value) -> Status {
  151. if (result_value) {
  152. *result_value = std::move(new_value);
  153. return Status::OK();
  154. }
  155. assert(result_entity);
  156. result_entity->SetPlainValue(std::move(new_value));
  157. return Status::OK();
  158. },
  159. [&](MergeOperator::MergeOperationOutputV3::NewColumns&& new_columns)
  160. -> Status {
  161. if (result_value) {
  162. if (!new_columns.empty() &&
  163. new_columns.front().first == kDefaultWideColumnName) {
  164. *result_value = std::move(new_columns.front().second);
  165. } else {
  166. result_value->clear();
  167. }
  168. return Status::OK();
  169. }
  170. assert(result_entity);
  171. WideColumns sorted_columns;
  172. sorted_columns.reserve(new_columns.size());
  173. for (const auto& column : new_columns) {
  174. sorted_columns.emplace_back(column.first, column.second);
  175. }
  176. WideColumnsHelper::SortColumns(sorted_columns);
  177. std::string result;
  178. const Status s =
  179. WideColumnSerialization::Serialize(sorted_columns, result);
  180. if (!s.ok()) {
  181. result_entity->Reset();
  182. return s;
  183. }
  184. return result_entity->SetWideColumnValue(std::move(result));
  185. },
  186. [&](Slice&& operand) -> Status {
  187. if (result_value) {
  188. result_value->assign(operand.data(), operand.size());
  189. return Status::OK();
  190. }
  191. assert(result_entity);
  192. result_entity->SetPlainValue(operand);
  193. return Status::OK();
  194. }};
  195. return TimedFullMergeCommonImpl(merge_operator, key,
  196. std::move(existing_value), operands, logger,
  197. statistics, clock, update_num_ops_stats,
  198. op_failure_scope, std::move(visitor));
  199. }
  200. // PRE: iter points to the first merge type entry
  201. // POST: iter points to the first entry beyond the merge process (or the end)
  202. // keys_, operands_ are updated to reflect the merge result.
  203. // keys_ stores the list of keys encountered while merging.
  204. // operands_ stores the list of merge operands encountered while merging.
  205. // keys_[i] corresponds to operands_[i] for each i.
  206. //
  207. // TODO: Avoid the snapshot stripe map lookup in CompactionRangeDelAggregator
  208. // and just pass the StripeRep corresponding to the stripe being merged.
  209. Status MergeHelper::MergeUntil(InternalIterator* iter,
  210. CompactionRangeDelAggregator* range_del_agg,
  211. const SequenceNumber stop_before,
  212. const bool at_bottom,
  213. const bool allow_data_in_errors,
  214. const BlobFetcher* blob_fetcher,
  215. const std::string* const full_history_ts_low,
  216. PrefetchBufferCollection* prefetch_buffers,
  217. CompactionIterationStats* c_iter_stats) {
  218. // Get a copy of the internal key, before it's invalidated by iter->Next()
  219. // Also maintain the list of merge operands seen.
  220. assert(HasOperator());
  221. keys_.clear();
  222. merge_context_.Clear();
  223. has_compaction_filter_skip_until_ = false;
  224. assert(user_merge_operator_);
  225. assert(user_comparator_);
  226. const size_t ts_sz = user_comparator_->timestamp_size();
  227. if (full_history_ts_low) {
  228. assert(ts_sz > 0);
  229. assert(ts_sz == full_history_ts_low->size());
  230. }
  231. bool first_key = true;
  232. // We need to parse the internal key again as the parsed key is
  233. // backed by the internal key!
  234. // Assume no internal key corruption as it has been successfully parsed
  235. // by the caller.
  236. // original_key_is_iter variable is just caching the information:
  237. // original_key_is_iter == (iter->key().ToString() == original_key)
  238. bool original_key_is_iter = true;
  239. std::string original_key = iter->key().ToString();
  240. // Important:
  241. // orig_ikey is backed by original_key if keys_.empty()
  242. // orig_ikey is backed by keys_.back() if !keys_.empty()
  243. ParsedInternalKey orig_ikey;
  244. Status s = ParseInternalKey(original_key, &orig_ikey, allow_data_in_errors);
  245. assert(s.ok());
  246. if (!s.ok()) {
  247. return s;
  248. }
  249. assert(kTypeMerge == orig_ikey.type);
  250. bool hit_the_next_user_key = false;
  251. int cmp_with_full_history_ts_low = 0;
  252. for (; iter->Valid(); iter->Next(), original_key_is_iter = false) {
  253. if (IsShuttingDown()) {
  254. s = Status::ShutdownInProgress();
  255. return s;
  256. }
  257. // Skip range tombstones emitted by the compaction iterator.
  258. if (iter->IsDeleteRangeSentinelKey()) {
  259. continue;
  260. }
  261. ParsedInternalKey ikey;
  262. assert(keys_.size() == merge_context_.GetNumOperands());
  263. Status pik_status =
  264. ParseInternalKey(iter->key(), &ikey, allow_data_in_errors);
  265. Slice ts;
  266. if (pik_status.ok()) {
  267. ts = ExtractTimestampFromUserKey(ikey.user_key, ts_sz);
  268. if (full_history_ts_low) {
  269. cmp_with_full_history_ts_low =
  270. user_comparator_->CompareTimestamp(ts, *full_history_ts_low);
  271. }
  272. }
  273. if (!pik_status.ok()) {
  274. // stop at corrupted key
  275. if (assert_valid_internal_key_) {
  276. return pik_status;
  277. }
  278. break;
  279. } else if (first_key) {
  280. // If user-defined timestamp is enabled, we expect both user key and
  281. // timestamps are equal, as a sanity check.
  282. assert(user_comparator_->Equal(ikey.user_key, orig_ikey.user_key));
  283. first_key = false;
  284. } else if (!user_comparator_->EqualWithoutTimestamp(ikey.user_key,
  285. orig_ikey.user_key) ||
  286. (ts_sz > 0 &&
  287. !user_comparator_->Equal(ikey.user_key, orig_ikey.user_key) &&
  288. cmp_with_full_history_ts_low >= 0)) {
  289. // 1) hit a different user key, or
  290. // 2) user-defined timestamp is enabled, and hit a version of user key NOT
  291. // eligible for GC, then stop right here.
  292. hit_the_next_user_key = true;
  293. break;
  294. } else if (stop_before > 0 && ikey.sequence <= stop_before &&
  295. LIKELY(snapshot_checker_ == nullptr ||
  296. snapshot_checker_->CheckInSnapshot(ikey.sequence,
  297. stop_before) !=
  298. SnapshotCheckerResult::kNotInSnapshot)) {
  299. // hit an entry that's possibly visible by the previous snapshot, can't
  300. // touch that
  301. break;
  302. }
  303. // At this point we are guaranteed that we need to process this key.
  304. assert(IsValueType(ikey.type));
  305. if (ikey.type != kTypeMerge) {
  306. // hit a put/delete/single delete
  307. // => merge the put value or a nullptr with operands_
  308. // => store result in operands_.back() (and update keys_.back())
  309. // => change the entry type for keys_.back()
  310. // We are done! Success!
  311. // If there are no operands, just return the Status::OK(). That will cause
  312. // the compaction iterator to write out the key we're currently at, which
  313. // is the put/delete we just encountered.
  314. if (keys_.empty()) {
  315. return s;
  316. }
  317. // TODO: if we're in compaction and it's a put, it would be nice to run
  318. // compaction filter on it.
  319. std::string merge_result;
  320. ValueType merge_result_type;
  321. MergeOperator::OpFailureScope op_failure_scope;
  322. if (range_del_agg &&
  323. range_del_agg->ShouldDelete(
  324. ikey, RangeDelPositioningMode::kForwardTraversal)) {
  325. s = TimedFullMerge(user_merge_operator_, ikey.user_key, kNoBaseValue,
  326. merge_context_.GetOperands(), logger_, stats_,
  327. clock_, /* update_num_ops_stats */ false,
  328. &op_failure_scope, &merge_result,
  329. /* result_operand */ nullptr, &merge_result_type);
  330. } else if (ikey.type == kTypeValue) {
  331. s = TimedFullMerge(user_merge_operator_, ikey.user_key, kPlainBaseValue,
  332. iter->value(), merge_context_.GetOperands(), logger_,
  333. stats_, clock_, /* update_num_ops_stats */ false,
  334. &op_failure_scope, &merge_result,
  335. /* result_operand */ nullptr, &merge_result_type);
  336. } else if (ikey.type == kTypeValuePreferredSeqno) {
  337. // When a TimedPut is merged with some merge operands, its original
  338. // write time info is obsolete and removed, and the merge result is a
  339. // kTypeValue.
  340. Slice unpacked_value = ParsePackedValueForValue(iter->value());
  341. s = TimedFullMerge(user_merge_operator_, ikey.user_key, kPlainBaseValue,
  342. unpacked_value, merge_context_.GetOperands(),
  343. logger_, stats_, clock_,
  344. /* update_num_ops_stats */ false, &op_failure_scope,
  345. &merge_result,
  346. /* result_operand */ nullptr, &merge_result_type);
  347. } else if (ikey.type == kTypeBlobIndex) {
  348. BlobIndex blob_index;
  349. s = blob_index.DecodeFrom(iter->value());
  350. if (!s.ok()) {
  351. return s;
  352. }
  353. FilePrefetchBuffer* prefetch_buffer =
  354. prefetch_buffers ? prefetch_buffers->GetOrCreatePrefetchBuffer(
  355. blob_index.file_number())
  356. : nullptr;
  357. uint64_t bytes_read = 0;
  358. assert(blob_fetcher);
  359. PinnableSlice blob_value;
  360. s = blob_fetcher->FetchBlob(ikey.user_key, blob_index, prefetch_buffer,
  361. &blob_value, &bytes_read);
  362. if (!s.ok()) {
  363. return s;
  364. }
  365. if (c_iter_stats) {
  366. ++c_iter_stats->num_blobs_read;
  367. c_iter_stats->total_blob_bytes_read += bytes_read;
  368. }
  369. s = TimedFullMerge(user_merge_operator_, ikey.user_key, kPlainBaseValue,
  370. blob_value, merge_context_.GetOperands(), logger_,
  371. stats_, clock_, /* update_num_ops_stats */ false,
  372. &op_failure_scope, &merge_result,
  373. /* result_operand */ nullptr, &merge_result_type);
  374. } else if (ikey.type == kTypeWideColumnEntity) {
  375. s = TimedFullMerge(user_merge_operator_, ikey.user_key, kWideBaseValue,
  376. iter->value(), merge_context_.GetOperands(), logger_,
  377. stats_, clock_, /* update_num_ops_stats */ false,
  378. &op_failure_scope, &merge_result,
  379. /* result_operand */ nullptr, &merge_result_type);
  380. } else {
  381. s = TimedFullMerge(user_merge_operator_, ikey.user_key, kNoBaseValue,
  382. merge_context_.GetOperands(), logger_, stats_,
  383. clock_, /* update_num_ops_stats */ false,
  384. &op_failure_scope, &merge_result,
  385. /* result_operand */ nullptr, &merge_result_type);
  386. }
  387. // We store the result in keys_.back() and operands_.back()
  388. // if nothing went wrong (i.e.: no operand corruption on disk)
  389. if (s.ok()) {
  390. // The original key encountered
  391. original_key = std::move(keys_.back());
  392. assert(merge_result_type == kTypeValue ||
  393. merge_result_type == kTypeWideColumnEntity);
  394. orig_ikey.type = merge_result_type;
  395. UpdateInternalKey(&original_key, orig_ikey.sequence, orig_ikey.type);
  396. keys_.clear();
  397. merge_context_.Clear();
  398. keys_.emplace_front(std::move(original_key));
  399. merge_context_.PushOperand(merge_result);
  400. // move iter to the next entry
  401. iter->Next();
  402. } else if (op_failure_scope ==
  403. MergeOperator::OpFailureScope::kMustMerge) {
  404. // Change to `Status::MergeInProgress()` to denote output consists of
  405. // merge operands only. Leave `iter` at the non-merge entry so it will
  406. // be output after.
  407. s = Status::MergeInProgress();
  408. }
  409. return s;
  410. } else {
  411. // hit a merge
  412. // => if there is a compaction filter, apply it.
  413. // => check for range tombstones covering the operand
  414. // => merge the operand into the front of the operands_ list
  415. // if not filtered
  416. // => then continue because we haven't yet seen a Put/Delete.
  417. //
  418. // Keep queuing keys and operands until we either meet a put / delete
  419. // request or later did a partial merge.
  420. Slice value_slice = iter->value();
  421. // add an operand to the list if:
  422. // 1) it's included in one of the snapshots. in that case we *must* write
  423. // it out, no matter what compaction filter says
  424. // 2) it's not filtered by a compaction filter
  425. CompactionFilter::Decision filter =
  426. ikey.sequence <= latest_snapshot_
  427. ? CompactionFilter::Decision::kKeep
  428. : FilterMerge(orig_ikey.user_key, value_slice);
  429. // FIXME: should also check for kRemove here
  430. if (filter != CompactionFilter::Decision::kRemoveAndSkipUntil &&
  431. range_del_agg != nullptr &&
  432. range_del_agg->ShouldDelete(
  433. iter->key(), RangeDelPositioningMode::kForwardTraversal)) {
  434. filter = CompactionFilter::Decision::kRemove;
  435. }
  436. if (filter == CompactionFilter::Decision::kKeep ||
  437. filter == CompactionFilter::Decision::kChangeValue) {
  438. if (original_key_is_iter) {
  439. // this is just an optimization that saves us one memcpy
  440. keys_.emplace_front(original_key);
  441. } else {
  442. keys_.emplace_front(iter->key().ToString());
  443. }
  444. if (keys_.size() == 1) {
  445. // we need to re-anchor the orig_ikey because it was anchored by
  446. // original_key before
  447. pik_status =
  448. ParseInternalKey(keys_.back(), &orig_ikey, allow_data_in_errors);
  449. pik_status.PermitUncheckedError();
  450. assert(pik_status.ok());
  451. }
  452. if (filter == CompactionFilter::Decision::kKeep) {
  453. merge_context_.PushOperand(
  454. value_slice, iter->IsValuePinned() /* operand_pinned */);
  455. } else {
  456. assert(filter == CompactionFilter::Decision::kChangeValue);
  457. // Compaction filter asked us to change the operand from value_slice
  458. // to compaction_filter_value_.
  459. merge_context_.PushOperand(compaction_filter_value_, false);
  460. }
  461. } else if (filter == CompactionFilter::Decision::kRemoveAndSkipUntil) {
  462. // Compaction filter asked us to remove this key altogether
  463. // (not just this operand), along with some keys following it.
  464. keys_.clear();
  465. merge_context_.Clear();
  466. has_compaction_filter_skip_until_ = true;
  467. return s;
  468. }
  469. }
  470. }
  471. if (cmp_with_full_history_ts_low >= 0) {
  472. size_t num_merge_operands = merge_context_.GetNumOperands();
  473. if (ts_sz && num_merge_operands > 1) {
  474. // We do not merge merge operands with different timestamps if they are
  475. // not eligible for GC.
  476. ROCKS_LOG_ERROR(logger_, "ts_sz=%d, %d merge oprands",
  477. static_cast<int>(ts_sz),
  478. static_cast<int>(num_merge_operands));
  479. assert(false);
  480. }
  481. }
  482. if (merge_context_.GetNumOperands() == 0) {
  483. // we filtered out all the merge operands
  484. return s;
  485. }
  486. // We are sure we have seen this key's entire history if:
  487. // at_bottom == true (this does not necessarily mean it is the bottommost
  488. // layer, but rather that we are confident the key does not appear on any of
  489. // the lower layers, at_bottom == false doesn't mean it does appear, just
  490. // that we can't be sure, see Compaction::IsBottommostLevel for details)
  491. // AND
  492. // we have either encountered another key or end of key history on this
  493. // layer.
  494. // Note that if user-defined timestamp is enabled, we need some extra caution
  495. // here: if full_history_ts_low is nullptr, or it's not null but the key's
  496. // timestamp is greater than or equal to full_history_ts_low, it means this
  497. // key cannot be dropped. We may not have seen the beginning of the key.
  498. //
  499. // When these conditions are true we are able to merge all the keys
  500. // using full merge.
  501. //
  502. // For these cases we are not sure about, we simply miss the opportunity
  503. // to combine the keys. Since VersionSet::SetupOtherInputs() always makes
  504. // sure that all merge-operands on the same level get compacted together,
  505. // this will simply lead to these merge operands moving to the next level.
  506. bool surely_seen_the_beginning =
  507. (hit_the_next_user_key || !iter->Valid()) && at_bottom &&
  508. (ts_sz == 0 || cmp_with_full_history_ts_low < 0);
  509. if (surely_seen_the_beginning) {
  510. // do a final merge with nullptr as the existing value and say
  511. // bye to the merge type (it's now converted to a Put)
  512. assert(kTypeMerge == orig_ikey.type);
  513. assert(merge_context_.GetNumOperands() >= 1);
  514. assert(merge_context_.GetNumOperands() == keys_.size());
  515. std::string merge_result;
  516. ValueType merge_result_type;
  517. MergeOperator::OpFailureScope op_failure_scope;
  518. s = TimedFullMerge(user_merge_operator_, orig_ikey.user_key, kNoBaseValue,
  519. merge_context_.GetOperands(), logger_, stats_, clock_,
  520. /* update_num_ops_stats */ false, &op_failure_scope,
  521. &merge_result,
  522. /* result_operand */ nullptr, &merge_result_type);
  523. if (s.ok()) {
  524. // The original key encountered
  525. // We are certain that keys_ is not empty here (see assertions couple of
  526. // lines before).
  527. original_key = std::move(keys_.back());
  528. assert(merge_result_type == kTypeValue ||
  529. merge_result_type == kTypeWideColumnEntity);
  530. orig_ikey.type = merge_result_type;
  531. UpdateInternalKey(&original_key, orig_ikey.sequence, orig_ikey.type);
  532. keys_.clear();
  533. merge_context_.Clear();
  534. keys_.emplace_front(std::move(original_key));
  535. merge_context_.PushOperand(merge_result);
  536. } else if (op_failure_scope == MergeOperator::OpFailureScope::kMustMerge) {
  537. // Change to `Status::MergeInProgress()` to denote output consists of
  538. // merge operands only.
  539. s = Status::MergeInProgress();
  540. }
  541. } else {
  542. // We haven't seen the beginning of the key nor a Put/Delete.
  543. // Attempt to use the user's associative merge function to
  544. // merge the stacked merge operands into a single operand.
  545. s = Status::MergeInProgress();
  546. if (merge_context_.GetNumOperands() >= 2 ||
  547. (allow_single_operand_ && merge_context_.GetNumOperands() == 1)) {
  548. bool merge_success = false;
  549. std::string merge_result;
  550. {
  551. StopWatchNano timer(clock_, stats_ != nullptr);
  552. PERF_TIMER_GUARD(merge_operator_time_nanos);
  553. merge_success = user_merge_operator_->PartialMergeMulti(
  554. orig_ikey.user_key,
  555. std::deque<Slice>(merge_context_.GetOperands().begin(),
  556. merge_context_.GetOperands().end()),
  557. &merge_result, logger_);
  558. RecordTick(stats_, MERGE_OPERATION_TOTAL_TIME,
  559. stats_ ? timer.ElapsedNanosSafe() : 0);
  560. }
  561. if (merge_success) {
  562. // Merging of operands (associative merge) was successful.
  563. // Replace operands with the merge result
  564. merge_context_.Clear();
  565. merge_context_.PushOperand(merge_result);
  566. keys_.erase(keys_.begin(), keys_.end() - 1);
  567. }
  568. }
  569. }
  570. return s;
  571. }
  572. MergeOutputIterator::MergeOutputIterator(const MergeHelper* merge_helper)
  573. : merge_helper_(merge_helper) {
  574. it_keys_ = merge_helper_->keys().rend();
  575. it_values_ = merge_helper_->values().rend();
  576. }
  577. void MergeOutputIterator::SeekToFirst() {
  578. const auto& keys = merge_helper_->keys();
  579. const auto& values = merge_helper_->values();
  580. assert(keys.size() == values.size());
  581. it_keys_ = keys.rbegin();
  582. it_values_ = values.rbegin();
  583. }
  584. void MergeOutputIterator::Next() {
  585. ++it_keys_;
  586. ++it_values_;
  587. }
  588. CompactionFilter::Decision MergeHelper::FilterMerge(const Slice& user_key,
  589. const Slice& value_slice) {
  590. if (compaction_filter_ == nullptr) {
  591. return CompactionFilter::Decision::kKeep;
  592. }
  593. if (stats_ != nullptr && ShouldReportDetailedTime(env_, stats_)) {
  594. filter_timer_.Start();
  595. }
  596. compaction_filter_value_.clear();
  597. compaction_filter_skip_until_.Clear();
  598. auto ret = compaction_filter_->FilterV3(
  599. level_, user_key, CompactionFilter::ValueType::kMergeOperand,
  600. &value_slice, /* existing_columns */ nullptr, &compaction_filter_value_,
  601. /* new_columns */ nullptr, compaction_filter_skip_until_.rep());
  602. if (ret == CompactionFilter::Decision::kRemoveAndSkipUntil) {
  603. if (user_comparator_->Compare(*compaction_filter_skip_until_.rep(),
  604. user_key) <= 0) {
  605. // Invalid skip_until returned from compaction filter.
  606. // Keep the key as per FilterV2/FilterV3 documentation.
  607. ret = CompactionFilter::Decision::kKeep;
  608. } else {
  609. compaction_filter_skip_until_.ConvertFromUserKey(kMaxSequenceNumber,
  610. kValueTypeForSeek);
  611. }
  612. }
  613. if (stats_ != nullptr && ShouldReportDetailedTime(env_, stats_)) {
  614. total_filter_time_ += filter_timer_.ElapsedNanosSafe();
  615. }
  616. return ret;
  617. }
  618. } // namespace ROCKSDB_NAMESPACE