| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457 |
- // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
- // This source code is licensed under both the GPLv2 (found in the
- // COPYING file in the root directory) and Apache 2.0 License
- // (found in the LICENSE.Apache file in the root directory).
- #include "db/compaction/compaction_iterator.h"
- #include <iterator>
- #include <limits>
- #include "db/blob/blob_fetcher.h"
- #include "db/blob/blob_file_builder.h"
- #include "db/blob/blob_index.h"
- #include "db/blob/prefetch_buffer_collection.h"
- #include "db/snapshot_checker.h"
- #include "db/wide/wide_column_serialization.h"
- #include "db/wide/wide_columns_helper.h"
- #include "logging/logging.h"
- #include "port/likely.h"
- #include "rocksdb/listener.h"
- #include "table/internal_iterator.h"
- #include "test_util/sync_point.h"
- namespace ROCKSDB_NAMESPACE {
- CompactionIterator::CompactionIterator(
- InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
- SequenceNumber last_sequence, std::vector<SequenceNumber>* snapshots,
- SequenceNumber earliest_snapshot,
- SequenceNumber earliest_write_conflict_snapshot,
- SequenceNumber job_snapshot, const SnapshotChecker* snapshot_checker,
- Env* env, bool report_detailed_time,
- CompactionRangeDelAggregator* range_del_agg,
- BlobFileBuilder* blob_file_builder, bool allow_data_in_errors,
- bool enforce_single_del_contracts,
- const std::atomic<bool>& manual_compaction_canceled,
- bool must_count_input_entries, const Compaction* compaction,
- const CompactionFilter* compaction_filter,
- const std::atomic<bool>* shutting_down,
- const std::shared_ptr<Logger> info_log,
- const std::string* full_history_ts_low,
- std::optional<SequenceNumber> preserve_seqno_min)
- : CompactionIterator(
- input, cmp, merge_helper, last_sequence, snapshots, earliest_snapshot,
- earliest_write_conflict_snapshot, job_snapshot, snapshot_checker, env,
- report_detailed_time, range_del_agg, blob_file_builder,
- allow_data_in_errors, enforce_single_del_contracts,
- manual_compaction_canceled,
- compaction ? std::make_unique<RealCompaction>(compaction) : nullptr,
- must_count_input_entries, compaction_filter, shutting_down, info_log,
- full_history_ts_low, preserve_seqno_min) {}
- CompactionIterator::CompactionIterator(
- InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
- SequenceNumber /*last_sequence*/, std::vector<SequenceNumber>* snapshots,
- SequenceNumber earliest_snapshot,
- SequenceNumber earliest_write_conflict_snapshot,
- SequenceNumber job_snapshot, const SnapshotChecker* snapshot_checker,
- Env* env, bool report_detailed_time,
- CompactionRangeDelAggregator* range_del_agg,
- BlobFileBuilder* blob_file_builder, bool allow_data_in_errors,
- bool enforce_single_del_contracts,
- const std::atomic<bool>& manual_compaction_canceled,
- std::unique_ptr<CompactionProxy> compaction, bool must_count_input_entries,
- const CompactionFilter* compaction_filter,
- const std::atomic<bool>* shutting_down,
- const std::shared_ptr<Logger> info_log,
- const std::string* full_history_ts_low,
- std::optional<SequenceNumber> preserve_seqno_min)
- : input_(input, cmp, must_count_input_entries),
- cmp_(cmp),
- merge_helper_(merge_helper),
- snapshots_(snapshots),
- earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot),
- job_snapshot_(job_snapshot),
- snapshot_checker_(snapshot_checker),
- env_(env),
- clock_(env_->GetSystemClock().get()),
- report_detailed_time_(report_detailed_time),
- range_del_agg_(range_del_agg),
- blob_file_builder_(blob_file_builder),
- compaction_(std::move(compaction)),
- compaction_filter_(compaction_filter),
- shutting_down_(shutting_down),
- manual_compaction_canceled_(manual_compaction_canceled),
- bottommost_level_(compaction_ && compaction_->bottommost_level() &&
- !compaction_->allow_ingest_behind()),
- // snapshots_ cannot be nullptr, but we will assert later in the body of
- // the constructor.
- visible_at_tip_(snapshots_ ? snapshots_->empty() : false),
- earliest_snapshot_(earliest_snapshot),
- info_log_(info_log),
- allow_data_in_errors_(allow_data_in_errors),
- enforce_single_del_contracts_(enforce_single_del_contracts),
- timestamp_size_(cmp_ ? cmp_->timestamp_size() : 0),
- full_history_ts_low_(full_history_ts_low),
- current_user_key_sequence_(0),
- current_user_key_snapshot_(0),
- merge_out_iter_(merge_helper_),
- blob_garbage_collection_cutoff_file_number_(
- ComputeBlobGarbageCollectionCutoffFileNumber(compaction_.get())),
- blob_fetcher_(CreateBlobFetcherIfNeeded(compaction_.get())),
- prefetch_buffers_(
- CreatePrefetchBufferCollectionIfNeeded(compaction_.get())),
- current_key_committed_(false),
- cmp_with_history_ts_low_(0),
- level_(compaction_ == nullptr ? 0 : compaction_->level()),
- preserve_seqno_after_(preserve_seqno_min.value_or(earliest_snapshot)) {
- assert(snapshots_ != nullptr);
- assert(preserve_seqno_after_ <= earliest_snapshot_);
- if (compaction_ != nullptr) {
- level_ptrs_ = std::vector<size_t>(compaction_->number_levels(), 0);
- }
- #ifndef NDEBUG
- // findEarliestVisibleSnapshot assumes this ordering.
- for (size_t i = 1; i < snapshots_->size(); ++i) {
- assert(snapshots_->at(i - 1) < snapshots_->at(i));
- }
- assert(timestamp_size_ == 0 || !full_history_ts_low_ ||
- timestamp_size_ == full_history_ts_low_->size());
- #endif
- input_.SetPinnedItersMgr(&pinned_iters_mgr_);
- // The default `merge_until_status_` does not need to be checked since it is
- // overwritten as soon as `MergeUntil()` is called
- merge_until_status_.PermitUncheckedError();
- TEST_SYNC_POINT_CALLBACK("CompactionIterator:AfterInit", compaction_.get());
- }
- CompactionIterator::~CompactionIterator() {
- // input_ Iterator lifetime is longer than pinned_iters_mgr_ lifetime
- input_.SetPinnedItersMgr(nullptr);
- }
- void CompactionIterator::ResetRecordCounts() {
- iter_stats_.num_record_drop_user = 0;
- iter_stats_.num_record_drop_hidden = 0;
- iter_stats_.num_record_drop_obsolete = 0;
- iter_stats_.num_record_drop_range_del = 0;
- iter_stats_.num_range_del_drop_obsolete = 0;
- iter_stats_.num_optimized_del_drop_obsolete = 0;
- }
- void CompactionIterator::SeekToFirst() {
- NextFromInput();
- PrepareOutput();
- }
- void CompactionIterator::Next() {
- // If there is a merge output, return it before continuing to process the
- // input.
- if (merge_out_iter_.Valid()) {
- merge_out_iter_.Next();
- // Check if we returned all records of the merge output.
- if (merge_out_iter_.Valid()) {
- key_ = merge_out_iter_.key();
- value_ = merge_out_iter_.value();
- Status s = ParseInternalKey(key_, &ikey_, allow_data_in_errors_);
- // MergeUntil stops when it encounters a corrupt key and does not
- // include them in the result, so we expect the keys here to be valid.
- if (!s.ok()) {
- // FIXME: should fail compaction after this fatal logging.
- ROCKS_LOG_FATAL(
- info_log_, "Invalid ikey %s in compaction. %s",
- allow_data_in_errors_ ? key_.ToString(true).c_str() : "hidden",
- s.getState());
- assert(false);
- }
- // Keep current_key_ in sync.
- if (0 == timestamp_size_) {
- current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
- } else {
- Slice ts = ikey_.GetTimestamp(timestamp_size_);
- current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type, &ts);
- }
- key_ = current_key_.GetInternalKey();
- ikey_.user_key = current_key_.GetUserKey();
- validity_info_.SetValid(ValidContext::kMerge1);
- } else {
- if (merge_until_status_.IsMergeInProgress()) {
- // `Status::MergeInProgress()` tells us that the previous `MergeUntil()`
- // produced only merge operands. Those merge operands were accessed and
- // written out using `merge_out_iter_`. Since `merge_out_iter_` is
- // exhausted at this point, all merge operands have been written out.
- //
- // Still, there may be a base value (PUT, DELETE, SINGLEDEL, etc.) that
- // needs to be written out. Normally, `CompactionIterator` would skip it
- // on the basis that it has already output something in the same
- // snapshot stripe. To prevent this, we reset `has_current_user_key_` to
- // trick the future iteration from finding out the snapshot stripe is
- // unchanged.
- has_current_user_key_ = false;
- }
- // We consumed all pinned merge operands, release pinned iterators
- pinned_iters_mgr_.ReleasePinnedData();
- // MergeHelper moves the iterator to the first record after the merged
- // records, so even though we reached the end of the merge output, we do
- // not want to advance the iterator.
- NextFromInput();
- }
- } else {
- // Only advance the input iterator if there is no merge output and the
- // iterator is not already at the next record.
- if (!at_next_) {
- AdvanceInputIter();
- }
- NextFromInput();
- }
- if (Valid()) {
- // Record that we've outputted a record for the current key.
- has_outputted_key_ = true;
- }
- PrepareOutput();
- }
- bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip,
- Slice* skip_until) {
- if (!compaction_filter_) {
- return true;
- }
- if (ikey_.type != kTypeValue && ikey_.type != kTypeBlobIndex &&
- ikey_.type != kTypeWideColumnEntity) {
- return true;
- }
- CompactionFilter::Decision decision =
- CompactionFilter::Decision::kUndetermined;
- CompactionFilter::ValueType value_type =
- ikey_.type == kTypeValue ? CompactionFilter::ValueType::kValue
- : ikey_.type == kTypeBlobIndex
- ? CompactionFilter::ValueType::kBlobIndex
- : CompactionFilter::ValueType::kWideColumnEntity;
- // Hack: pass internal key to BlobIndexCompactionFilter since it needs
- // to get sequence number.
- assert(compaction_filter_);
- const Slice& filter_key =
- (ikey_.type != kTypeBlobIndex ||
- !compaction_filter_->IsStackedBlobDbInternalCompactionFilter())
- ? ikey_.user_key
- : key_;
- compaction_filter_value_.clear();
- compaction_filter_skip_until_.Clear();
- std::vector<std::pair<std::string, std::string>> new_columns;
- {
- StopWatchNano timer(clock_, report_detailed_time_);
- if (ikey_.type == kTypeBlobIndex) {
- decision = compaction_filter_->FilterBlobByKey(
- level_, filter_key, &compaction_filter_value_,
- compaction_filter_skip_until_.rep());
- if (decision == CompactionFilter::Decision::kUndetermined &&
- !compaction_filter_->IsStackedBlobDbInternalCompactionFilter()) {
- if (!compaction_) {
- status_ =
- Status::Corruption("Unexpected blob index outside of compaction");
- validity_info_.Invalidate();
- return false;
- }
- TEST_SYNC_POINT_CALLBACK(
- "CompactionIterator::InvokeFilterIfNeeded::TamperWithBlobIndex",
- &value_);
- // For integrated BlobDB impl, CompactionIterator reads blob value.
- // For Stacked BlobDB impl, the corresponding CompactionFilter's
- // FilterV2 method should read the blob value.
- BlobIndex blob_index;
- Status s = blob_index.DecodeFrom(value_);
- if (!s.ok()) {
- status_ = s;
- validity_info_.Invalidate();
- return false;
- }
- FilePrefetchBuffer* prefetch_buffer =
- prefetch_buffers_ ? prefetch_buffers_->GetOrCreatePrefetchBuffer(
- blob_index.file_number())
- : nullptr;
- uint64_t bytes_read = 0;
- assert(blob_fetcher_);
- s = blob_fetcher_->FetchBlob(ikey_.user_key, blob_index,
- prefetch_buffer, &blob_value_,
- &bytes_read);
- if (!s.ok()) {
- status_ = s;
- validity_info_.Invalidate();
- return false;
- }
- ++iter_stats_.num_blobs_read;
- iter_stats_.total_blob_bytes_read += bytes_read;
- value_type = CompactionFilter::ValueType::kValue;
- }
- }
- if (decision == CompactionFilter::Decision::kUndetermined) {
- const Slice* existing_val = nullptr;
- const WideColumns* existing_col = nullptr;
- WideColumns existing_columns;
- if (ikey_.type != kTypeWideColumnEntity) {
- if (!blob_value_.empty()) {
- existing_val = &blob_value_;
- } else {
- existing_val = &value_;
- }
- } else {
- Slice value_copy = value_;
- const Status s =
- WideColumnSerialization::Deserialize(value_copy, existing_columns);
- if (!s.ok()) {
- status_ = s;
- validity_info_.Invalidate();
- return false;
- }
- existing_col = &existing_columns;
- }
- decision = compaction_filter_->FilterV3(
- level_, filter_key, value_type, existing_val, existing_col,
- &compaction_filter_value_, &new_columns,
- compaction_filter_skip_until_.rep());
- }
- iter_stats_.total_filter_time +=
- env_ != nullptr && report_detailed_time_ ? timer.ElapsedNanos() : 0;
- }
- if (decision == CompactionFilter::Decision::kUndetermined) {
- // Should not reach here, since FilterV2/FilterV3 should never return
- // kUndetermined.
- status_ = Status::NotSupported(
- "FilterV2/FilterV3 should never return kUndetermined");
- validity_info_.Invalidate();
- return false;
- }
- if (decision == CompactionFilter::Decision::kRemoveAndSkipUntil &&
- cmp_->Compare(*compaction_filter_skip_until_.rep(), ikey_.user_key) <=
- 0) {
- // Can't skip to a key smaller than the current one.
- // Keep the key as per FilterV2/FilterV3 documentation.
- decision = CompactionFilter::Decision::kKeep;
- }
- if (decision == CompactionFilter::Decision::kRemove) {
- // convert the current key to a delete; key_ is pointing into
- // current_key_ at this point, so updating current_key_ updates key()
- ikey_.type = kTypeDeletion;
- current_key_.UpdateInternalKey(ikey_.sequence, kTypeDeletion);
- // no value associated with delete
- value_.clear();
- iter_stats_.num_record_drop_user++;
- } else if (decision == CompactionFilter::Decision::kPurge) {
- // convert the current key to a single delete; key_ is pointing into
- // current_key_ at this point, so updating current_key_ updates key()
- ikey_.type = kTypeSingleDeletion;
- current_key_.UpdateInternalKey(ikey_.sequence, kTypeSingleDeletion);
- // no value associated with single delete
- value_.clear();
- iter_stats_.num_record_drop_user++;
- } else if (decision == CompactionFilter::Decision::kChangeValue) {
- if (ikey_.type != kTypeValue) {
- ikey_.type = kTypeValue;
- current_key_.UpdateInternalKey(ikey_.sequence, kTypeValue);
- }
- value_ = compaction_filter_value_;
- } else if (decision == CompactionFilter::Decision::kRemoveAndSkipUntil) {
- *need_skip = true;
- compaction_filter_skip_until_.ConvertFromUserKey(kMaxSequenceNumber,
- kValueTypeForSeek);
- *skip_until = compaction_filter_skip_until_.Encode();
- } else if (decision == CompactionFilter::Decision::kChangeBlobIndex) {
- // Only the StackableDB-based BlobDB impl's compaction filter should return
- // kChangeBlobIndex. Decision about rewriting blob and changing blob index
- // in the integrated BlobDB impl is made in subsequent call to
- // PrepareOutput() and its callees.
- if (!compaction_filter_->IsStackedBlobDbInternalCompactionFilter()) {
- status_ = Status::NotSupported(
- "Only stacked BlobDB's internal compaction filter can return "
- "kChangeBlobIndex.");
- validity_info_.Invalidate();
- return false;
- }
- if (ikey_.type != kTypeBlobIndex) {
- ikey_.type = kTypeBlobIndex;
- current_key_.UpdateInternalKey(ikey_.sequence, kTypeBlobIndex);
- }
- value_ = compaction_filter_value_;
- } else if (decision == CompactionFilter::Decision::kIOError) {
- if (!compaction_filter_->IsStackedBlobDbInternalCompactionFilter()) {
- status_ = Status::NotSupported(
- "CompactionFilter for integrated BlobDB should not return kIOError");
- validity_info_.Invalidate();
- return false;
- }
- status_ = Status::IOError("Failed to access blob during compaction filter");
- validity_info_.Invalidate();
- return false;
- } else if (decision == CompactionFilter::Decision::kChangeWideColumnEntity) {
- WideColumns sorted_columns;
- sorted_columns.reserve(new_columns.size());
- for (const auto& column : new_columns) {
- sorted_columns.emplace_back(column.first, column.second);
- }
- WideColumnsHelper::SortColumns(sorted_columns);
- {
- const Status s = WideColumnSerialization::Serialize(
- sorted_columns, compaction_filter_value_);
- if (!s.ok()) {
- status_ = s;
- validity_info_.Invalidate();
- return false;
- }
- }
- if (ikey_.type != kTypeWideColumnEntity) {
- ikey_.type = kTypeWideColumnEntity;
- current_key_.UpdateInternalKey(ikey_.sequence, kTypeWideColumnEntity);
- }
- value_ = compaction_filter_value_;
- }
- return true;
- }
- void CompactionIterator::NextFromInput() {
- at_next_ = false;
- validity_info_.Invalidate();
- while (!Valid() && input_.Valid() && !IsPausingManualCompaction() &&
- !IsShuttingDown()) {
- key_ = input_.key();
- value_ = input_.value();
- blob_value_.Reset();
- iter_stats_.num_input_records++;
- is_range_del_ = input_.IsDeleteRangeSentinelKey();
- Status pik_status = ParseInternalKey(key_, &ikey_, allow_data_in_errors_);
- if (!pik_status.ok()) {
- iter_stats_.num_input_corrupt_records++;
- // Always fail compaction when encountering corrupted internal keys
- status_ = pik_status;
- return;
- }
- TEST_SYNC_POINT_CALLBACK("CompactionIterator:ProcessKV", &ikey_);
- if (is_range_del_) {
- validity_info_.SetValid(kRangeDeletion);
- break;
- }
- // Update input statistics
- if (ikey_.type == kTypeDeletion || ikey_.type == kTypeSingleDeletion ||
- ikey_.type == kTypeDeletionWithTimestamp) {
- iter_stats_.num_input_deletion_records++;
- } else if (ikey_.type == kTypeValuePreferredSeqno) {
- iter_stats_.num_input_timed_put_records++;
- }
- iter_stats_.total_input_raw_key_bytes += key_.size();
- iter_stats_.total_input_raw_value_bytes += value_.size();
- // If need_skip is true, we should seek the input iterator
- // to internal key skip_until and continue from there.
- bool need_skip = false;
- // Points either into compaction_filter_skip_until_ or into
- // merge_helper_->compaction_filter_skip_until_.
- Slice skip_until;
- bool user_key_equal_without_ts = false;
- int cmp_ts = 0;
- if (has_current_user_key_) {
- user_key_equal_without_ts =
- cmp_->EqualWithoutTimestamp(ikey_.user_key, current_user_key_);
- // if timestamp_size_ > 0, then curr_ts_ has been initialized by a
- // previous key.
- cmp_ts = timestamp_size_ ? cmp_->CompareTimestamp(
- ExtractTimestampFromUserKey(
- ikey_.user_key, timestamp_size_),
- curr_ts_)
- : 0;
- }
- // Check whether the user key changed. After this if statement current_key_
- // is a copy of the current input key (maybe converted to a delete by the
- // compaction filter). ikey_.user_key is pointing to the copy.
- if (!has_current_user_key_ || !user_key_equal_without_ts || cmp_ts != 0) {
- // First occurrence of this user key
- // Copy key for output
- key_ = current_key_.SetInternalKey(key_, &ikey_);
- int prev_cmp_with_ts_low =
- !full_history_ts_low_ ? 0
- : curr_ts_.empty()
- ? 0
- : cmp_->CompareTimestamp(curr_ts_, *full_history_ts_low_);
- // If timestamp_size_ > 0, then copy from ikey_ to curr_ts_ for the use
- // in next iteration to compare with the timestamp of next key.
- UpdateTimestampAndCompareWithFullHistoryLow();
- // If
- // (1) !has_current_user_key_, OR
- // (2) timestamp is disabled, OR
- // (3) all history will be preserved, OR
- // (4) user key (excluding timestamp) is different from previous key, OR
- // (5) timestamp is NO older than *full_history_ts_low_, OR
- // (6) timestamp is the largest one older than full_history_ts_low_,
- // then current_user_key_ must be treated as a different user key.
- // This means, if a user key (excluding ts) is the same as the previous
- // user key, and its ts is older than *full_history_ts_low_, then we
- // consider this key for GC, e.g. it may be dropped if certain conditions
- // match.
- if (!has_current_user_key_ || !timestamp_size_ || !full_history_ts_low_ ||
- !user_key_equal_without_ts || cmp_with_history_ts_low_ >= 0 ||
- prev_cmp_with_ts_low >= 0) {
- // Initialize for future comparison for rule (A) and etc.
- current_user_key_sequence_ = kMaxSequenceNumber;
- current_user_key_snapshot_ = 0;
- has_current_user_key_ = true;
- }
- current_user_key_ = ikey_.user_key;
- has_outputted_key_ = false;
- last_key_seq_zeroed_ = false;
- current_key_committed_ = KeyCommitted(ikey_.sequence);
- // Apply the compaction filter to the first committed version of the user
- // key.
- if (current_key_committed_ &&
- !InvokeFilterIfNeeded(&need_skip, &skip_until)) {
- break;
- }
- } else {
- // Update the current key to reflect the new sequence number/type without
- // copying the user key.
- // TODO(rven): Compaction filter does not process keys in this path
- // Need to have the compaction filter process multiple versions
- // if we have versions on both sides of a snapshot
- current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
- key_ = current_key_.GetInternalKey();
- ikey_.user_key = current_key_.GetUserKey();
- // Note that newer version of a key is ordered before older versions. If a
- // newer version of a key is committed, so as the older version. No need
- // to query snapshot_checker_ in that case.
- if (UNLIKELY(!current_key_committed_)) {
- assert(snapshot_checker_ != nullptr);
- current_key_committed_ = KeyCommitted(ikey_.sequence);
- // Apply the compaction filter to the first committed version of the
- // user key.
- if (current_key_committed_ &&
- !InvokeFilterIfNeeded(&need_skip, &skip_until)) {
- break;
- }
- }
- }
- if (UNLIKELY(!current_key_committed_)) {
- assert(snapshot_checker_ != nullptr);
- validity_info_.SetValid(ValidContext::kCurrentKeyUncommitted);
- break;
- }
- // If there are no snapshots, then this kv affect visibility at tip.
- // Otherwise, search though all existing snapshots to find the earliest
- // snapshot that is affected by this kv.
- SequenceNumber last_sequence = current_user_key_sequence_;
- current_user_key_sequence_ = ikey_.sequence;
- SequenceNumber last_snapshot = current_user_key_snapshot_;
- SequenceNumber prev_snapshot = 0; // 0 means no previous snapshot
- current_user_key_snapshot_ =
- visible_at_tip_
- ? earliest_snapshot_
- : findEarliestVisibleSnapshot(ikey_.sequence, &prev_snapshot);
- if (need_skip) {
- // This case is handled below.
- } else if (clear_and_output_next_key_) {
- // In the previous iteration we encountered a single delete that we could
- // not compact out. We will keep this Put, but can drop it's data.
- // (See Optimization 3, below.)
- if (ikey_.type != kTypeValue && ikey_.type != kTypeBlobIndex &&
- ikey_.type != kTypeWideColumnEntity &&
- ikey_.type != kTypeValuePreferredSeqno) {
- ROCKS_LOG_FATAL(info_log_, "Unexpected key %s for compaction output",
- ikey_.DebugString(allow_data_in_errors_, true).c_str());
- assert(false);
- }
- if (current_user_key_snapshot_ < last_snapshot) {
- ROCKS_LOG_FATAL(info_log_,
- "key %s, current_user_key_snapshot_ (%" PRIu64
- ") < last_snapshot (%" PRIu64 ")",
- ikey_.DebugString(allow_data_in_errors_, true).c_str(),
- current_user_key_snapshot_, last_snapshot);
- assert(false);
- }
- if (ikey_.type == kTypeBlobIndex || ikey_.type == kTypeWideColumnEntity ||
- ikey_.type == kTypeValuePreferredSeqno) {
- ikey_.type = kTypeValue;
- current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
- }
- value_.clear();
- validity_info_.SetValid(ValidContext::kKeepSDAndClearPut);
- clear_and_output_next_key_ = false;
- } else if (ikey_.type == kTypeSingleDeletion) {
- // We can compact out a SingleDelete if:
- // 1) We encounter the corresponding PUT -OR- we know that this key
- // doesn't appear past this output level and we are not in
- // ingest_behind mode.
- // =AND=
- // 2) We've already returned a record in this snapshot -OR-
- // there are no earlier earliest_write_conflict_snapshot.
- //
- // A note about 2) above:
- // we try to determine whether there is any earlier write conflict
- // checking snapshot by calling DefinitelyInSnapshot() with seq and
- // earliest_write_conflict_snapshot as arguments. For write-prepared
- // and write-unprepared transactions, if earliest_write_conflict_snapshot
- // is evicted from WritePreparedTxnDB::commit_cache, then
- // DefinitelyInSnapshot(seq, earliest_write_conflict_snapshot) returns
- // false, even if the seq is actually visible within
- // earliest_write_conflict_snapshot. Consequently, CompactionIterator
- // may try to zero out its sequence number, thus hitting assertion error
- // in debug mode or cause incorrect DBIter return result.
- // We observe that earliest_write_conflict_snapshot >= earliest_snapshot,
- // and the seq zeroing logic depends on
- // DefinitelyInSnapshot(seq, earliest_snapshot). Therefore, if we cannot
- // determine whether seq is **definitely** in
- // earliest_write_conflict_snapshot, then we can additionally check if
- // seq is definitely in earliest_snapshot. If the latter holds, then the
- // former holds too.
- //
- // Rule 1 is needed for SingleDelete correctness. Rule 2 is needed to
- // allow Transactions to do write-conflict checking (if we compacted away
- // all keys, then we wouldn't know that a write happened in this
- // snapshot). If there is no earlier snapshot, then we know that there
- // are no active transactions that need to know about any writes.
- //
- // Optimization 3:
- // If we encounter a SingleDelete followed by a PUT and Rule 2 is NOT
- // true, then we must output a SingleDelete. In this case, we will decide
- // to also output the PUT. While we are compacting less by outputting the
- // PUT now, hopefully this will lead to better compaction in the future
- // when Rule 2 is later true (Ie, We are hoping we can later compact out
- // both the SingleDelete and the Put, while we couldn't if we only
- // outputted the SingleDelete now).
- // In this case, we can save space by removing the PUT's value as it will
- // never be read.
- //
- // Deletes and Merges are not supported on the same key that has a
- // SingleDelete as it is not possible to correctly do any partial
- // compaction of such a combination of operations. The result of mixing
- // those operations for a given key is documented as being undefined. So
- // we can choose how to handle such a combinations of operations. We will
- // try to compact out as much as we can in these cases.
- // We will report counts on these anomalous cases.
- //
- // Note: If timestamp is enabled, then record will be eligible for
- // deletion, only if, along with above conditions (Rule 1 and Rule 2)
- // full_history_ts_low_ is specified and timestamp for that key is less
- // than *full_history_ts_low_. If it's not eligible for deletion, then we
- // will output the SingleDelete. For Optimization 3 also, if
- // full_history_ts_low_ is specified and timestamp for the key is less
- // than *full_history_ts_low_ then only optimization will be applied.
- // The easiest way to process a SingleDelete during iteration is to peek
- // ahead at the next key.
- const bool is_timestamp_eligible_for_gc =
- (timestamp_size_ == 0 ||
- (full_history_ts_low_ && cmp_with_history_ts_low_ < 0));
- ParsedInternalKey next_ikey;
- AdvanceInputIter();
- while (input_.Valid() && input_.IsDeleteRangeSentinelKey() &&
- ParseInternalKey(input_.key(), &next_ikey, allow_data_in_errors_)
- .ok() &&
- cmp_->EqualWithoutTimestamp(ikey_.user_key, next_ikey.user_key)) {
- // skip range tombstone start keys with the same user key
- // since they are not "real" point keys.
- AdvanceInputIter();
- }
- // Check whether the next key exists, is not corrupt, and is the same key
- // as the single delete.
- if (input_.Valid() &&
- ParseInternalKey(input_.key(), &next_ikey, allow_data_in_errors_)
- .ok() &&
- cmp_->EqualWithoutTimestamp(ikey_.user_key, next_ikey.user_key)) {
- assert(!input_.IsDeleteRangeSentinelKey());
- #ifndef NDEBUG
- const Compaction* c =
- compaction_ ? compaction_->real_compaction() : nullptr;
- #endif
- TEST_SYNC_POINT_CALLBACK(
- "CompactionIterator::NextFromInput:SingleDelete:1",
- const_cast<Compaction*>(c));
- if (last_key_seq_zeroed_) {
- // Drop SD and the next key since they are both in the last
- // snapshot (since last key has seqno zeroed).
- ++iter_stats_.num_record_drop_hidden;
- ++iter_stats_.num_record_drop_obsolete;
- assert(bottommost_level_);
- AdvanceInputIter();
- } else if (prev_snapshot == 0 ||
- DefinitelyNotInSnapshot(next_ikey.sequence, prev_snapshot)) {
- // Check whether the next key belongs to the same snapshot as the
- // SingleDelete.
- TEST_SYNC_POINT_CALLBACK(
- "CompactionIterator::NextFromInput:SingleDelete:2", nullptr);
- if (next_ikey.type == kTypeSingleDeletion) {
- // We encountered two SingleDeletes for same key in a row. This
- // could be due to unexpected user input. If write-(un)prepared
- // transaction is used, this could also be due to releasing an old
- // snapshot between a Put and its matching SingleDelete.
- // Skip the first SingleDelete and let the next iteration decide
- // how to handle the second SingleDelete.
- // First SingleDelete has been skipped since we already called
- // input_.Next().
- ++iter_stats_.num_record_drop_obsolete;
- ++iter_stats_.num_single_del_mismatch;
- } else if (next_ikey.type == kTypeDeletion) {
- std::ostringstream oss;
- oss << "Found SD and type: " << static_cast<int>(next_ikey.type)
- << " on the same key, violating the contract "
- "of SingleDelete. Check your application to make sure the "
- "application does not mix SingleDelete and Delete for "
- "the same key. If you are using "
- "write-prepared/write-unprepared transactions, and use "
- "SingleDelete to delete certain keys, then make sure "
- "TransactionDBOptions::rollback_deletion_type_callback is "
- "configured properly. Mixing SD and DEL can lead to "
- "undefined behaviors";
- ++iter_stats_.num_record_drop_obsolete;
- ++iter_stats_.num_single_del_mismatch;
- if (enforce_single_del_contracts_) {
- ROCKS_LOG_ERROR(info_log_, "%s", oss.str().c_str());
- validity_info_.Invalidate();
- status_ = Status::Corruption(oss.str());
- return;
- }
- ROCKS_LOG_WARN(info_log_, "%s", oss.str().c_str());
- } else if (!is_timestamp_eligible_for_gc) {
- // We cannot drop the SingleDelete as timestamp is enabled, and
- // timestamp of this key is greater than or equal to
- // *full_history_ts_low_. We will output the SingleDelete.
- validity_info_.SetValid(ValidContext::kKeepTsHistory);
- } else if (has_outputted_key_ ||
- DefinitelyInSnapshot(ikey_.sequence,
- earliest_write_conflict_snapshot_) ||
- (earliest_snapshot_ < earliest_write_conflict_snapshot_ &&
- DefinitelyInSnapshot(ikey_.sequence,
- earliest_snapshot_))) {
- // Found a matching value, we can drop the single delete and the
- // value. It is safe to drop both records since we've already
- // outputted a key in this snapshot, or there is no earlier
- // snapshot (Rule 2 above).
- // Note: it doesn't matter whether the second key is a Put or if it
- // is an unexpected Merge or Delete. We will compact it out
- // either way. We will maintain counts of how many mismatches
- // happened
- if (next_ikey.type != kTypeValue &&
- next_ikey.type != kTypeBlobIndex &&
- next_ikey.type != kTypeWideColumnEntity &&
- next_ikey.type != kTypeValuePreferredSeqno) {
- ++iter_stats_.num_single_del_mismatch;
- }
- ++iter_stats_.num_record_drop_hidden;
- ++iter_stats_.num_record_drop_obsolete;
- // Already called input_.Next() once. Call it a second time to
- // skip past the second key.
- AdvanceInputIter();
- } else {
- // Found a matching value, but we cannot drop both keys since
- // there is an earlier snapshot and we need to leave behind a record
- // to know that a write happened in this snapshot (Rule 2 above).
- // Clear the value and output the SingleDelete. (The value will be
- // outputted on the next iteration.)
- // Setting valid_ to true will output the current SingleDelete
- validity_info_.SetValid(ValidContext::kKeepSDForConflictCheck);
- // Set up the Put to be outputted in the next iteration.
- // (Optimization 3).
- clear_and_output_next_key_ = true;
- TEST_SYNC_POINT_CALLBACK(
- "CompactionIterator::NextFromInput:KeepSDForWW",
- /*arg=*/nullptr);
- }
- } else {
- // We hit the next snapshot without hitting a put, so the iterator
- // returns the single delete.
- validity_info_.SetValid(ValidContext::kKeepSDForSnapshot);
- TEST_SYNC_POINT_CALLBACK(
- "CompactionIterator::NextFromInput:SingleDelete:3",
- const_cast<Compaction*>(c));
- }
- } else {
- // We are at the end of the input, could not parse the next key, or hit
- // a different key. The iterator returns the single delete if the key
- // possibly exists beyond the current output level. We set
- // has_current_user_key to false so that if the iterator is at the next
- // key, we do not compare it again against the previous key at the next
- // iteration. If the next key is corrupt, we return before the
- // comparison, so the value of has_current_user_key does not matter.
- has_current_user_key_ = false;
- if (compaction_ != nullptr && !compaction_->allow_ingest_behind() &&
- DefinitelyInSnapshot(ikey_.sequence, earliest_snapshot_) &&
- compaction_->KeyNotExistsBeyondOutputLevel(ikey_.user_key,
- &level_ptrs_) &&
- is_timestamp_eligible_for_gc) {
- // Key doesn't exist outside of this range.
- // Can compact out this SingleDelete.
- ++iter_stats_.num_record_drop_obsolete;
- ++iter_stats_.num_single_del_fallthru;
- if (!bottommost_level_) {
- ++iter_stats_.num_optimized_del_drop_obsolete;
- }
- } else if (last_key_seq_zeroed_) {
- // Sequence number zeroing requires bottommost_level_, which is
- // false with ingest_behind.
- assert(!compaction_->allow_ingest_behind());
- // Skip.
- ++iter_stats_.num_record_drop_hidden;
- ++iter_stats_.num_record_drop_obsolete;
- assert(bottommost_level_);
- } else {
- // Output SingleDelete
- validity_info_.SetValid(ValidContext::kKeepSD);
- }
- }
- if (Valid()) {
- at_next_ = true;
- }
- } else if (last_sequence != kMaxSequenceNumber &&
- (last_snapshot == current_user_key_snapshot_ ||
- last_snapshot < current_user_key_snapshot_)) {
- // rule (A):
- // If the earliest snapshot is which this key is visible in
- // is the same as the visibility of a previous instance of the
- // same key, then this kv is not visible in any snapshot.
- // Hidden by an newer entry for same user key
- //
- // Note: Dropping this key will not affect TransactionDB write-conflict
- // checking since there has already been a record returned for this key
- // in this snapshot.
- // When ingest_behind is enabled, it's ok that we drop an overwritten
- // Delete here. The overwritting key still covers whatever that will be
- // ingested. Note that we will not drop SingleDelete here as SingleDelte
- // is handled entirely in its own if clause. This is important, see
- // example: from new to old: SingleDelete_1, PUT_1, SingleDelete_2, PUT_2,
- // where all operations are on the same key and PUT_2 is ingested with
- // ingest_behind=true. If SingleDelete_2 is dropped due to being compacted
- // together with PUT_1, and then PUT_1 is compacted away together with
- // SingleDelete_1, PUT_2 can incorrectly becomes visible.
- if (last_sequence < current_user_key_sequence_) {
- ROCKS_LOG_FATAL(info_log_,
- "key %s, last_sequence (%" PRIu64
- ") < current_user_key_sequence_ (%" PRIu64 ")",
- ikey_.DebugString(allow_data_in_errors_, true).c_str(),
- last_sequence, current_user_key_sequence_);
- assert(false);
- }
- ++iter_stats_.num_record_drop_hidden;
- AdvanceInputIter();
- } else if (compaction_ != nullptr &&
- (ikey_.type == kTypeDeletion ||
- (ikey_.type == kTypeDeletionWithTimestamp &&
- cmp_with_history_ts_low_ < 0)) &&
- !compaction_->allow_ingest_behind() &&
- DefinitelyInSnapshot(ikey_.sequence, earliest_snapshot_) &&
- compaction_->KeyNotExistsBeyondOutputLevel(ikey_.user_key,
- &level_ptrs_)) {
- // TODO(noetzli): This is the only place where we use compaction_
- // (besides the constructor). We should probably get rid of this
- // dependency and find a way to do similar filtering during flushes.
- //
- // For this user key:
- // (1) there is no data in higher levels
- // (2) data in lower levels will have larger sequence numbers
- // (3) data in layers that are being compacted here and have
- // smaller sequence numbers will be dropped in the next
- // few iterations of this loop (by rule (A) above).
- // Therefore this deletion marker is obsolete and can be dropped.
- //
- // Note: Dropping this Delete will not affect TransactionDB
- // write-conflict checking since it is earlier than any snapshot.
- //
- // It seems that we can also drop deletion later than earliest snapshot
- // given that:
- // (1) The deletion is earlier than earliest_write_conflict_snapshot, and
- // (2) No value exist earlier than the deletion.
- //
- // Note also that a deletion marker of type kTypeDeletionWithTimestamp
- // will be treated as a different user key unless the timestamp is older
- // than *full_history_ts_low_.
- ++iter_stats_.num_record_drop_obsolete;
- if (!bottommost_level_) {
- ++iter_stats_.num_optimized_del_drop_obsolete;
- }
- AdvanceInputIter();
- } else if ((ikey_.type == kTypeDeletion ||
- (ikey_.type == kTypeDeletionWithTimestamp &&
- cmp_with_history_ts_low_ < 0)) &&
- bottommost_level_) {
- assert(compaction_);
- assert(!compaction_->allow_ingest_behind()); // bottommost_level_ is true
- // Handle the case where we have a delete key at the bottom most level
- // We can skip outputting the key iff there are no subsequent puts for
- // this key
- assert(compaction_->KeyNotExistsBeyondOutputLevel(ikey_.user_key,
- &level_ptrs_));
- ParsedInternalKey next_ikey;
- AdvanceInputIter();
- #ifndef NDEBUG
- const Compaction* c =
- compaction_ ? compaction_->real_compaction() : nullptr;
- #endif
- TEST_SYNC_POINT_CALLBACK(
- "CompactionIterator::NextFromInput:BottommostDelete:1",
- const_cast<Compaction*>(c));
- // Skip over all versions of this key that happen to occur in the same
- // snapshot range as the delete.
- //
- // Note that a deletion marker of type kTypeDeletionWithTimestamp will be
- // considered to have a different user key unless the timestamp is older
- // than *full_history_ts_low_.
- //
- // Range tombstone start keys are skipped as they are not "real" keys.
- while (!IsPausingManualCompaction() && !IsShuttingDown() &&
- input_.Valid() &&
- (ParseInternalKey(input_.key(), &next_ikey, allow_data_in_errors_)
- .ok()) &&
- cmp_->EqualWithoutTimestamp(ikey_.user_key, next_ikey.user_key) &&
- (prev_snapshot == 0 || input_.IsDeleteRangeSentinelKey() ||
- DefinitelyNotInSnapshot(next_ikey.sequence, prev_snapshot))) {
- AdvanceInputIter();
- }
- // If you find you still need to output a row with this key, we need to
- // output the delete too
- if (input_.Valid() &&
- (ParseInternalKey(input_.key(), &next_ikey, allow_data_in_errors_)
- .ok()) &&
- cmp_->EqualWithoutTimestamp(ikey_.user_key, next_ikey.user_key)) {
- validity_info_.SetValid(ValidContext::kKeepDel);
- at_next_ = true;
- }
- } else if (ikey_.type == kTypeValuePreferredSeqno &&
- DefinitelyInSnapshot(ikey_.sequence, earliest_snapshot_) &&
- (bottommost_level_ ||
- (compaction_ != nullptr &&
- compaction_->KeyNotExistsBeyondOutputLevel(ikey_.user_key,
- &level_ptrs_)))) {
- // FIXME: it's possible that we are setting sequence number to 0 as
- // preferred sequence number here. If cf_ingest_behind is enabled, this
- // may fail ingestions since they expect all keys above the last level
- // to have non-zero sequence number. We should probably not allow seqno
- // zeroing here.
- //
- // This section that attempts to swap preferred sequence number will not
- // be invoked if this is a CompactionIterator created for flush, since
- // `compaction_` will be nullptr and it's not bottommost either.
- //
- // The entries with the same user key and smaller sequence numbers are
- // all in this earliest snapshot range to be iterated. Since those entries
- // will be hidden by this entry [rule A], it's safe to swap in the
- // preferred seqno now.
- //
- // It's otherwise not safe to swap in the preferred seqno since it's
- // possible for entries in earlier snapshots to have sequence number that
- // is smaller than this entry's sequence number but bigger than this
- // entry's preferred sequence number. Swapping in the preferred sequence
- // number will break the internal key ordering invariant for this key.
- //
- // A special case involving range deletion is handled separately below.
- auto [unpacked_value, preferred_seqno] =
- ParsePackedValueWithSeqno(value_);
- assert(preferred_seqno < ikey_.sequence || ikey_.sequence == 0);
- if (range_del_agg_->ShouldDelete(
- key_, RangeDelPositioningMode::kForwardTraversal)) {
- ++iter_stats_.num_record_drop_hidden;
- ++iter_stats_.num_record_drop_range_del;
- AdvanceInputIter();
- } else {
- InternalKey ikey_after_swap(ikey_.user_key,
- std::min(preferred_seqno, ikey_.sequence),
- kTypeValue);
- Slice ikey_after_swap_slice(*ikey_after_swap.rep());
- if (range_del_agg_->ShouldDelete(
- ikey_after_swap_slice,
- RangeDelPositioningMode::kForwardTraversal)) {
- // A range tombstone that doesn't cover this kTypeValuePreferredSeqno
- // entry will end up covering the entry, so it's not safe to swap
- // preferred sequence number. In this case, we output the entry as is.
- validity_info_.SetValid(ValidContext::kNewUserKey);
- } else {
- if (ikey_.sequence != 0) {
- iter_stats_.num_timed_put_swap_preferred_seqno++;
- ikey_.sequence = preferred_seqno;
- }
- ikey_.type = kTypeValue;
- current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
- key_ = current_key_.GetInternalKey();
- ikey_.user_key = current_key_.GetUserKey();
- value_ = unpacked_value;
- validity_info_.SetValid(ValidContext::kSwapPreferredSeqno);
- }
- }
- } else if (ikey_.type == kTypeMerge) {
- if (!merge_helper_->HasOperator()) {
- status_ = Status::InvalidArgument(
- "merge_operator is not properly initialized.");
- return;
- }
- pinned_iters_mgr_.StartPinning();
- // We know the merge type entry is not hidden, otherwise we would
- // have hit (A)
- // We encapsulate the merge related state machine in a different
- // object to minimize change to the existing flow.
- merge_until_status_ = merge_helper_->MergeUntil(
- &input_, range_del_agg_, prev_snapshot, bottommost_level_,
- allow_data_in_errors_, blob_fetcher_.get(), full_history_ts_low_,
- prefetch_buffers_.get(), &iter_stats_);
- merge_out_iter_.SeekToFirst();
- if (!merge_until_status_.ok() &&
- !merge_until_status_.IsMergeInProgress()) {
- status_ = merge_until_status_;
- return;
- } else if (merge_out_iter_.Valid()) {
- // NOTE: key, value, and ikey_ refer to old entries.
- // These will be correctly set below.
- key_ = merge_out_iter_.key();
- value_ = merge_out_iter_.value();
- pik_status = ParseInternalKey(key_, &ikey_, allow_data_in_errors_);
- // MergeUntil stops when it encounters a corrupt key and does not
- // include them in the result, so we expect the keys here to valid.
- if (!pik_status.ok()) {
- ROCKS_LOG_FATAL(
- info_log_, "Invalid key %s in compaction. %s",
- allow_data_in_errors_ ? key_.ToString(true).c_str() : "hidden",
- pik_status.getState());
- assert(false);
- }
- // Keep current_key_ in sync.
- current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
- key_ = current_key_.GetInternalKey();
- ikey_.user_key = current_key_.GetUserKey();
- validity_info_.SetValid(ValidContext::kMerge2);
- } else {
- // all merge operands were filtered out. reset the user key, since the
- // batch consumed by the merge operator should not shadow any keys
- // coming after the merges
- has_current_user_key_ = false;
- pinned_iters_mgr_.ReleasePinnedData();
- if (merge_helper_->FilteredUntil(&skip_until)) {
- need_skip = true;
- }
- }
- } else {
- // 1. new user key -OR-
- // 2. different snapshot stripe
- // If user-defined timestamp is enabled, we consider keys for GC if they
- // are below history_ts_low_. CompactionRangeDelAggregator::ShouldDelete()
- // only considers range deletions that are at or below history_ts_low_ and
- // trim_ts_. We drop keys here that are below history_ts_low_ and are
- // covered by a range tombstone that is at or below history_ts_low_ and
- // trim_ts.
- bool should_delete = false;
- if (!timestamp_size_ || cmp_with_history_ts_low_ < 0) {
- should_delete = range_del_agg_->ShouldDelete(
- key_, RangeDelPositioningMode::kForwardTraversal);
- }
- if (should_delete) {
- ++iter_stats_.num_record_drop_hidden;
- ++iter_stats_.num_record_drop_range_del;
- AdvanceInputIter();
- } else {
- validity_info_.SetValid(ValidContext::kNewUserKey);
- }
- }
- if (need_skip) {
- SkipUntil(skip_until);
- }
- }
- if (status_.ok()) {
- if (!Valid() && IsShuttingDown()) {
- status_ = Status::ShutdownInProgress();
- } else if (IsPausingManualCompaction()) {
- status_ = Status::Incomplete(Status::SubCode::kManualCompactionPaused);
- } else if (!input_.Valid() && input_.status().IsCorruption()) {
- // Propagate corruption status from memtable iterator
- status_ = input_.status();
- }
- }
- }
- bool CompactionIterator::ExtractLargeValueIfNeededImpl() {
- if (!blob_file_builder_) {
- return false;
- }
- blob_index_.clear();
- const Status s = blob_file_builder_->Add(user_key(), value_, &blob_index_);
- if (!s.ok()) {
- status_ = s;
- validity_info_.Invalidate();
- return false;
- }
- if (blob_index_.empty()) {
- return false;
- }
- value_ = blob_index_;
- return true;
- }
- void CompactionIterator::ExtractLargeValueIfNeeded() {
- assert(ikey_.type == kTypeValue);
- if (!ExtractLargeValueIfNeededImpl()) {
- return;
- }
- ikey_.type = kTypeBlobIndex;
- current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
- }
- void CompactionIterator::GarbageCollectBlobIfNeeded() {
- assert(ikey_.type == kTypeBlobIndex);
- if (!compaction_) {
- return;
- }
- // GC for integrated BlobDB
- if (compaction_->enable_blob_garbage_collection()) {
- TEST_SYNC_POINT_CALLBACK(
- "CompactionIterator::GarbageCollectBlobIfNeeded::TamperWithBlobIndex",
- &value_);
- BlobIndex blob_index;
- {
- const Status s = blob_index.DecodeFrom(value_);
- if (!s.ok()) {
- status_ = s;
- validity_info_.Invalidate();
- return;
- }
- }
- if (blob_index.file_number() >=
- blob_garbage_collection_cutoff_file_number_) {
- return;
- }
- FilePrefetchBuffer* prefetch_buffer =
- prefetch_buffers_ ? prefetch_buffers_->GetOrCreatePrefetchBuffer(
- blob_index.file_number())
- : nullptr;
- uint64_t bytes_read = 0;
- {
- assert(blob_fetcher_);
- const Status s = blob_fetcher_->FetchBlob(
- user_key(), blob_index, prefetch_buffer, &blob_value_, &bytes_read);
- if (!s.ok()) {
- status_ = s;
- validity_info_.Invalidate();
- return;
- }
- }
- ++iter_stats_.num_blobs_read;
- iter_stats_.total_blob_bytes_read += bytes_read;
- ++iter_stats_.num_blobs_relocated;
- iter_stats_.total_blob_bytes_relocated += blob_index.size();
- value_ = blob_value_;
- if (ExtractLargeValueIfNeededImpl()) {
- return;
- }
- ikey_.type = kTypeValue;
- current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
- return;
- }
- // GC for stacked BlobDB
- if (compaction_filter_ &&
- compaction_filter_->IsStackedBlobDbInternalCompactionFilter()) {
- const auto blob_decision = compaction_filter_->PrepareBlobOutput(
- user_key(), value_, &compaction_filter_value_);
- if (blob_decision == CompactionFilter::BlobDecision::kCorruption) {
- status_ =
- Status::Corruption("Corrupted blob reference encountered during GC");
- validity_info_.Invalidate();
- return;
- }
- if (blob_decision == CompactionFilter::BlobDecision::kIOError) {
- status_ = Status::IOError("Could not relocate blob during GC");
- validity_info_.Invalidate();
- return;
- }
- if (blob_decision == CompactionFilter::BlobDecision::kChangeValue) {
- value_ = compaction_filter_value_;
- return;
- }
- }
- }
- void CompactionIterator::PrepareOutput() {
- if (Valid()) {
- if (LIKELY(!is_range_del_)) {
- if (ikey_.type == kTypeValue) {
- ExtractLargeValueIfNeeded();
- } else if (ikey_.type == kTypeBlobIndex) {
- GarbageCollectBlobIfNeeded();
- }
- }
- // Zeroing out the sequence number leads to better compression.
- // If this is the bottommost level (no files in lower levels)
- // and the earliest snapshot is larger than this seqno
- // and the userkey differs from the last userkey in compaction
- // then we can squash the seqno to zero.
- //
- // This is safe for TransactionDB write-conflict checking since transactions
- // only care about sequence number larger than any active snapshots.
- //
- // Can we do the same for levels above bottom level as long as
- // KeyNotExistsBeyondOutputLevel() return true?
- if (Valid() && bottommost_level_ &&
- DefinitelyInSnapshot(ikey_.sequence, earliest_snapshot_) &&
- ikey_.type != kTypeMerge && current_key_committed_ &&
- ikey_.sequence <= preserve_seqno_after_ && !is_range_del_) {
- assert(compaction_ != nullptr && !compaction_->allow_ingest_behind());
- if (ikey_.type == kTypeDeletion ||
- (ikey_.type == kTypeSingleDeletion && timestamp_size_ == 0)) {
- ROCKS_LOG_FATAL(
- info_log_,
- "Unexpected key %s for seq-zero optimization. "
- "earliest_snapshot %" PRIu64
- ", earliest_write_conflict_snapshot %" PRIu64
- " job_snapshot %" PRIu64
- ". timestamp_size: %d full_history_ts_low_ %s. validity %x",
- ikey_.DebugString(allow_data_in_errors_, true).c_str(),
- earliest_snapshot_, earliest_write_conflict_snapshot_,
- job_snapshot_, static_cast<int>(timestamp_size_),
- full_history_ts_low_ != nullptr
- ? Slice(*full_history_ts_low_).ToString(true).c_str()
- : "null",
- validity_info_.rep);
- assert(false);
- }
- ikey_.sequence = 0;
- last_key_seq_zeroed_ = true;
- TEST_SYNC_POINT_CALLBACK("CompactionIterator::PrepareOutput:ZeroingSeq",
- &ikey_);
- if (!timestamp_size_) {
- current_key_.UpdateInternalKey(0, ikey_.type);
- } else if (full_history_ts_low_ && cmp_with_history_ts_low_ < 0) {
- // We can also zero out timestamp for better compression.
- // For the same user key (excluding timestamp), the timestamp-based
- // history can be collapsed to save some space if the timestamp is
- // older than *full_history_ts_low_.
- const std::string kTsMin(timestamp_size_, static_cast<char>(0));
- const Slice ts_slice = kTsMin;
- ikey_.SetTimestamp(ts_slice);
- current_key_.UpdateInternalKey(0, ikey_.type, &ts_slice);
- }
- }
- }
- }
- inline SequenceNumber CompactionIterator::findEarliestVisibleSnapshot(
- SequenceNumber in, SequenceNumber* prev_snapshot) {
- assert(snapshots_->size());
- if (snapshots_->size() == 0) {
- ROCKS_LOG_FATAL(info_log_,
- "No snapshot left in findEarliestVisibleSnapshot");
- }
- auto snapshots_iter =
- std::lower_bound(snapshots_->begin(), snapshots_->end(), in);
- assert(prev_snapshot != nullptr);
- if (snapshots_iter == snapshots_->begin()) {
- *prev_snapshot = 0;
- } else {
- *prev_snapshot = *std::prev(snapshots_iter);
- if (*prev_snapshot >= in) {
- ROCKS_LOG_FATAL(info_log_,
- "*prev_snapshot (%" PRIu64 ") >= in (%" PRIu64
- ") in findEarliestVisibleSnapshot",
- *prev_snapshot, in);
- assert(false);
- }
- }
- if (snapshot_checker_ == nullptr) {
- return snapshots_iter != snapshots_->end() ? *snapshots_iter
- : kMaxSequenceNumber;
- }
- bool has_released_snapshot = !released_snapshots_.empty();
- for (; snapshots_iter != snapshots_->end(); ++snapshots_iter) {
- auto cur = *snapshots_iter;
- if (in > cur) {
- ROCKS_LOG_FATAL(info_log_,
- "in (%" PRIu64 ") > cur (%" PRIu64
- ") in findEarliestVisibleSnapshot",
- in, cur);
- assert(false);
- }
- // Skip if cur is in released_snapshots.
- if (has_released_snapshot && released_snapshots_.count(cur) > 0) {
- continue;
- }
- auto res = snapshot_checker_->CheckInSnapshot(in, cur);
- if (res == SnapshotCheckerResult::kInSnapshot) {
- return cur;
- } else if (res == SnapshotCheckerResult::kSnapshotReleased) {
- released_snapshots_.insert(cur);
- }
- *prev_snapshot = cur;
- }
- return kMaxSequenceNumber;
- }
- uint64_t CompactionIterator::ComputeBlobGarbageCollectionCutoffFileNumber(
- const CompactionProxy* compaction) {
- if (!compaction) {
- return 0;
- }
- if (!compaction->enable_blob_garbage_collection()) {
- return 0;
- }
- const Version* const version = compaction->input_version();
- assert(version);
- const VersionStorageInfo* const storage_info = version->storage_info();
- assert(storage_info);
- const auto& blob_files = storage_info->GetBlobFiles();
- const size_t cutoff_index = static_cast<size_t>(
- compaction->blob_garbage_collection_age_cutoff() * blob_files.size());
- if (cutoff_index >= blob_files.size()) {
- return std::numeric_limits<uint64_t>::max();
- }
- const auto& meta = blob_files[cutoff_index];
- assert(meta);
- return meta->GetBlobFileNumber();
- }
- std::unique_ptr<BlobFetcher> CompactionIterator::CreateBlobFetcherIfNeeded(
- const CompactionProxy* compaction) {
- if (!compaction) {
- return nullptr;
- }
- const Version* const version = compaction->input_version();
- if (!version) {
- return nullptr;
- }
- ReadOptions read_options;
- read_options.io_activity = Env::IOActivity::kCompaction;
- read_options.fill_cache = false;
- return std::unique_ptr<BlobFetcher>(new BlobFetcher(version, read_options));
- }
- std::unique_ptr<PrefetchBufferCollection>
- CompactionIterator::CreatePrefetchBufferCollectionIfNeeded(
- const CompactionProxy* compaction) {
- if (!compaction) {
- return nullptr;
- }
- if (!compaction->input_version()) {
- return nullptr;
- }
- if (compaction->allow_mmap_reads()) {
- return nullptr;
- }
- const uint64_t readahead_size = compaction->blob_compaction_readahead_size();
- if (!readahead_size) {
- return nullptr;
- }
- return std::unique_ptr<PrefetchBufferCollection>(
- new PrefetchBufferCollection(readahead_size));
- }
- } // namespace ROCKSDB_NAMESPACE
|