compaction_iterator.cc 59 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #include "db/compaction/compaction_iterator.h"
  6. #include <iterator>
  7. #include <limits>
  8. #include "db/blob/blob_fetcher.h"
  9. #include "db/blob/blob_file_builder.h"
  10. #include "db/blob/blob_index.h"
  11. #include "db/blob/prefetch_buffer_collection.h"
  12. #include "db/snapshot_checker.h"
  13. #include "db/wide/wide_column_serialization.h"
  14. #include "db/wide/wide_columns_helper.h"
  15. #include "logging/logging.h"
  16. #include "port/likely.h"
  17. #include "rocksdb/listener.h"
  18. #include "table/internal_iterator.h"
  19. #include "test_util/sync_point.h"
  20. namespace ROCKSDB_NAMESPACE {
  21. CompactionIterator::CompactionIterator(
  22. InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
  23. SequenceNumber last_sequence, std::vector<SequenceNumber>* snapshots,
  24. SequenceNumber earliest_snapshot,
  25. SequenceNumber earliest_write_conflict_snapshot,
  26. SequenceNumber job_snapshot, const SnapshotChecker* snapshot_checker,
  27. Env* env, bool report_detailed_time,
  28. CompactionRangeDelAggregator* range_del_agg,
  29. BlobFileBuilder* blob_file_builder, bool allow_data_in_errors,
  30. bool enforce_single_del_contracts,
  31. const std::atomic<bool>& manual_compaction_canceled,
  32. bool must_count_input_entries, const Compaction* compaction,
  33. const CompactionFilter* compaction_filter,
  34. const std::atomic<bool>* shutting_down,
  35. const std::shared_ptr<Logger> info_log,
  36. const std::string* full_history_ts_low,
  37. std::optional<SequenceNumber> preserve_seqno_min)
  38. : CompactionIterator(
  39. input, cmp, merge_helper, last_sequence, snapshots, earliest_snapshot,
  40. earliest_write_conflict_snapshot, job_snapshot, snapshot_checker, env,
  41. report_detailed_time, range_del_agg, blob_file_builder,
  42. allow_data_in_errors, enforce_single_del_contracts,
  43. manual_compaction_canceled,
  44. compaction ? std::make_unique<RealCompaction>(compaction) : nullptr,
  45. must_count_input_entries, compaction_filter, shutting_down, info_log,
  46. full_history_ts_low, preserve_seqno_min) {}
  47. CompactionIterator::CompactionIterator(
  48. InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
  49. SequenceNumber /*last_sequence*/, std::vector<SequenceNumber>* snapshots,
  50. SequenceNumber earliest_snapshot,
  51. SequenceNumber earliest_write_conflict_snapshot,
  52. SequenceNumber job_snapshot, const SnapshotChecker* snapshot_checker,
  53. Env* env, bool report_detailed_time,
  54. CompactionRangeDelAggregator* range_del_agg,
  55. BlobFileBuilder* blob_file_builder, bool allow_data_in_errors,
  56. bool enforce_single_del_contracts,
  57. const std::atomic<bool>& manual_compaction_canceled,
  58. std::unique_ptr<CompactionProxy> compaction, bool must_count_input_entries,
  59. const CompactionFilter* compaction_filter,
  60. const std::atomic<bool>* shutting_down,
  61. const std::shared_ptr<Logger> info_log,
  62. const std::string* full_history_ts_low,
  63. std::optional<SequenceNumber> preserve_seqno_min)
  64. : input_(input, cmp, must_count_input_entries),
  65. cmp_(cmp),
  66. merge_helper_(merge_helper),
  67. snapshots_(snapshots),
  68. earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot),
  69. job_snapshot_(job_snapshot),
  70. snapshot_checker_(snapshot_checker),
  71. env_(env),
  72. clock_(env_->GetSystemClock().get()),
  73. report_detailed_time_(report_detailed_time),
  74. range_del_agg_(range_del_agg),
  75. blob_file_builder_(blob_file_builder),
  76. compaction_(std::move(compaction)),
  77. compaction_filter_(compaction_filter),
  78. shutting_down_(shutting_down),
  79. manual_compaction_canceled_(manual_compaction_canceled),
  80. bottommost_level_(compaction_ && compaction_->bottommost_level() &&
  81. !compaction_->allow_ingest_behind()),
  82. // snapshots_ cannot be nullptr, but we will assert later in the body of
  83. // the constructor.
  84. visible_at_tip_(snapshots_ ? snapshots_->empty() : false),
  85. earliest_snapshot_(earliest_snapshot),
  86. info_log_(info_log),
  87. allow_data_in_errors_(allow_data_in_errors),
  88. enforce_single_del_contracts_(enforce_single_del_contracts),
  89. timestamp_size_(cmp_ ? cmp_->timestamp_size() : 0),
  90. full_history_ts_low_(full_history_ts_low),
  91. current_user_key_sequence_(0),
  92. current_user_key_snapshot_(0),
  93. merge_out_iter_(merge_helper_),
  94. blob_garbage_collection_cutoff_file_number_(
  95. ComputeBlobGarbageCollectionCutoffFileNumber(compaction_.get())),
  96. blob_fetcher_(CreateBlobFetcherIfNeeded(compaction_.get())),
  97. prefetch_buffers_(
  98. CreatePrefetchBufferCollectionIfNeeded(compaction_.get())),
  99. current_key_committed_(false),
  100. cmp_with_history_ts_low_(0),
  101. level_(compaction_ == nullptr ? 0 : compaction_->level()),
  102. preserve_seqno_after_(preserve_seqno_min.value_or(earliest_snapshot)) {
  103. assert(snapshots_ != nullptr);
  104. assert(preserve_seqno_after_ <= earliest_snapshot_);
  105. if (compaction_ != nullptr) {
  106. level_ptrs_ = std::vector<size_t>(compaction_->number_levels(), 0);
  107. }
  108. #ifndef NDEBUG
  109. // findEarliestVisibleSnapshot assumes this ordering.
  110. for (size_t i = 1; i < snapshots_->size(); ++i) {
  111. assert(snapshots_->at(i - 1) < snapshots_->at(i));
  112. }
  113. assert(timestamp_size_ == 0 || !full_history_ts_low_ ||
  114. timestamp_size_ == full_history_ts_low_->size());
  115. #endif
  116. input_.SetPinnedItersMgr(&pinned_iters_mgr_);
  117. // The default `merge_until_status_` does not need to be checked since it is
  118. // overwritten as soon as `MergeUntil()` is called
  119. merge_until_status_.PermitUncheckedError();
  120. TEST_SYNC_POINT_CALLBACK("CompactionIterator:AfterInit", compaction_.get());
  121. }
  122. CompactionIterator::~CompactionIterator() {
  123. // input_ Iterator lifetime is longer than pinned_iters_mgr_ lifetime
  124. input_.SetPinnedItersMgr(nullptr);
  125. }
  126. void CompactionIterator::ResetRecordCounts() {
  127. iter_stats_.num_record_drop_user = 0;
  128. iter_stats_.num_record_drop_hidden = 0;
  129. iter_stats_.num_record_drop_obsolete = 0;
  130. iter_stats_.num_record_drop_range_del = 0;
  131. iter_stats_.num_range_del_drop_obsolete = 0;
  132. iter_stats_.num_optimized_del_drop_obsolete = 0;
  133. }
  134. void CompactionIterator::SeekToFirst() {
  135. NextFromInput();
  136. PrepareOutput();
  137. }
  138. void CompactionIterator::Next() {
  139. // If there is a merge output, return it before continuing to process the
  140. // input.
  141. if (merge_out_iter_.Valid()) {
  142. merge_out_iter_.Next();
  143. // Check if we returned all records of the merge output.
  144. if (merge_out_iter_.Valid()) {
  145. key_ = merge_out_iter_.key();
  146. value_ = merge_out_iter_.value();
  147. Status s = ParseInternalKey(key_, &ikey_, allow_data_in_errors_);
  148. // MergeUntil stops when it encounters a corrupt key and does not
  149. // include them in the result, so we expect the keys here to be valid.
  150. if (!s.ok()) {
  151. // FIXME: should fail compaction after this fatal logging.
  152. ROCKS_LOG_FATAL(
  153. info_log_, "Invalid ikey %s in compaction. %s",
  154. allow_data_in_errors_ ? key_.ToString(true).c_str() : "hidden",
  155. s.getState());
  156. assert(false);
  157. }
  158. // Keep current_key_ in sync.
  159. if (0 == timestamp_size_) {
  160. current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
  161. } else {
  162. Slice ts = ikey_.GetTimestamp(timestamp_size_);
  163. current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type, &ts);
  164. }
  165. key_ = current_key_.GetInternalKey();
  166. ikey_.user_key = current_key_.GetUserKey();
  167. validity_info_.SetValid(ValidContext::kMerge1);
  168. } else {
  169. if (merge_until_status_.IsMergeInProgress()) {
  170. // `Status::MergeInProgress()` tells us that the previous `MergeUntil()`
  171. // produced only merge operands. Those merge operands were accessed and
  172. // written out using `merge_out_iter_`. Since `merge_out_iter_` is
  173. // exhausted at this point, all merge operands have been written out.
  174. //
  175. // Still, there may be a base value (PUT, DELETE, SINGLEDEL, etc.) that
  176. // needs to be written out. Normally, `CompactionIterator` would skip it
  177. // on the basis that it has already output something in the same
  178. // snapshot stripe. To prevent this, we reset `has_current_user_key_` to
  179. // trick the future iteration from finding out the snapshot stripe is
  180. // unchanged.
  181. has_current_user_key_ = false;
  182. }
  183. // We consumed all pinned merge operands, release pinned iterators
  184. pinned_iters_mgr_.ReleasePinnedData();
  185. // MergeHelper moves the iterator to the first record after the merged
  186. // records, so even though we reached the end of the merge output, we do
  187. // not want to advance the iterator.
  188. NextFromInput();
  189. }
  190. } else {
  191. // Only advance the input iterator if there is no merge output and the
  192. // iterator is not already at the next record.
  193. if (!at_next_) {
  194. AdvanceInputIter();
  195. }
  196. NextFromInput();
  197. }
  198. if (Valid()) {
  199. // Record that we've outputted a record for the current key.
  200. has_outputted_key_ = true;
  201. }
  202. PrepareOutput();
  203. }
  204. bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip,
  205. Slice* skip_until) {
  206. if (!compaction_filter_) {
  207. return true;
  208. }
  209. if (ikey_.type != kTypeValue && ikey_.type != kTypeBlobIndex &&
  210. ikey_.type != kTypeWideColumnEntity) {
  211. return true;
  212. }
  213. CompactionFilter::Decision decision =
  214. CompactionFilter::Decision::kUndetermined;
  215. CompactionFilter::ValueType value_type =
  216. ikey_.type == kTypeValue ? CompactionFilter::ValueType::kValue
  217. : ikey_.type == kTypeBlobIndex
  218. ? CompactionFilter::ValueType::kBlobIndex
  219. : CompactionFilter::ValueType::kWideColumnEntity;
  220. // Hack: pass internal key to BlobIndexCompactionFilter since it needs
  221. // to get sequence number.
  222. assert(compaction_filter_);
  223. const Slice& filter_key =
  224. (ikey_.type != kTypeBlobIndex ||
  225. !compaction_filter_->IsStackedBlobDbInternalCompactionFilter())
  226. ? ikey_.user_key
  227. : key_;
  228. compaction_filter_value_.clear();
  229. compaction_filter_skip_until_.Clear();
  230. std::vector<std::pair<std::string, std::string>> new_columns;
  231. {
  232. StopWatchNano timer(clock_, report_detailed_time_);
  233. if (ikey_.type == kTypeBlobIndex) {
  234. decision = compaction_filter_->FilterBlobByKey(
  235. level_, filter_key, &compaction_filter_value_,
  236. compaction_filter_skip_until_.rep());
  237. if (decision == CompactionFilter::Decision::kUndetermined &&
  238. !compaction_filter_->IsStackedBlobDbInternalCompactionFilter()) {
  239. if (!compaction_) {
  240. status_ =
  241. Status::Corruption("Unexpected blob index outside of compaction");
  242. validity_info_.Invalidate();
  243. return false;
  244. }
  245. TEST_SYNC_POINT_CALLBACK(
  246. "CompactionIterator::InvokeFilterIfNeeded::TamperWithBlobIndex",
  247. &value_);
  248. // For integrated BlobDB impl, CompactionIterator reads blob value.
  249. // For Stacked BlobDB impl, the corresponding CompactionFilter's
  250. // FilterV2 method should read the blob value.
  251. BlobIndex blob_index;
  252. Status s = blob_index.DecodeFrom(value_);
  253. if (!s.ok()) {
  254. status_ = s;
  255. validity_info_.Invalidate();
  256. return false;
  257. }
  258. FilePrefetchBuffer* prefetch_buffer =
  259. prefetch_buffers_ ? prefetch_buffers_->GetOrCreatePrefetchBuffer(
  260. blob_index.file_number())
  261. : nullptr;
  262. uint64_t bytes_read = 0;
  263. assert(blob_fetcher_);
  264. s = blob_fetcher_->FetchBlob(ikey_.user_key, blob_index,
  265. prefetch_buffer, &blob_value_,
  266. &bytes_read);
  267. if (!s.ok()) {
  268. status_ = s;
  269. validity_info_.Invalidate();
  270. return false;
  271. }
  272. ++iter_stats_.num_blobs_read;
  273. iter_stats_.total_blob_bytes_read += bytes_read;
  274. value_type = CompactionFilter::ValueType::kValue;
  275. }
  276. }
  277. if (decision == CompactionFilter::Decision::kUndetermined) {
  278. const Slice* existing_val = nullptr;
  279. const WideColumns* existing_col = nullptr;
  280. WideColumns existing_columns;
  281. if (ikey_.type != kTypeWideColumnEntity) {
  282. if (!blob_value_.empty()) {
  283. existing_val = &blob_value_;
  284. } else {
  285. existing_val = &value_;
  286. }
  287. } else {
  288. Slice value_copy = value_;
  289. const Status s =
  290. WideColumnSerialization::Deserialize(value_copy, existing_columns);
  291. if (!s.ok()) {
  292. status_ = s;
  293. validity_info_.Invalidate();
  294. return false;
  295. }
  296. existing_col = &existing_columns;
  297. }
  298. decision = compaction_filter_->FilterV3(
  299. level_, filter_key, value_type, existing_val, existing_col,
  300. &compaction_filter_value_, &new_columns,
  301. compaction_filter_skip_until_.rep());
  302. }
  303. iter_stats_.total_filter_time +=
  304. env_ != nullptr && report_detailed_time_ ? timer.ElapsedNanos() : 0;
  305. }
  306. if (decision == CompactionFilter::Decision::kUndetermined) {
  307. // Should not reach here, since FilterV2/FilterV3 should never return
  308. // kUndetermined.
  309. status_ = Status::NotSupported(
  310. "FilterV2/FilterV3 should never return kUndetermined");
  311. validity_info_.Invalidate();
  312. return false;
  313. }
  314. if (decision == CompactionFilter::Decision::kRemoveAndSkipUntil &&
  315. cmp_->Compare(*compaction_filter_skip_until_.rep(), ikey_.user_key) <=
  316. 0) {
  317. // Can't skip to a key smaller than the current one.
  318. // Keep the key as per FilterV2/FilterV3 documentation.
  319. decision = CompactionFilter::Decision::kKeep;
  320. }
  321. if (decision == CompactionFilter::Decision::kRemove) {
  322. // convert the current key to a delete; key_ is pointing into
  323. // current_key_ at this point, so updating current_key_ updates key()
  324. ikey_.type = kTypeDeletion;
  325. current_key_.UpdateInternalKey(ikey_.sequence, kTypeDeletion);
  326. // no value associated with delete
  327. value_.clear();
  328. iter_stats_.num_record_drop_user++;
  329. } else if (decision == CompactionFilter::Decision::kPurge) {
  330. // convert the current key to a single delete; key_ is pointing into
  331. // current_key_ at this point, so updating current_key_ updates key()
  332. ikey_.type = kTypeSingleDeletion;
  333. current_key_.UpdateInternalKey(ikey_.sequence, kTypeSingleDeletion);
  334. // no value associated with single delete
  335. value_.clear();
  336. iter_stats_.num_record_drop_user++;
  337. } else if (decision == CompactionFilter::Decision::kChangeValue) {
  338. if (ikey_.type != kTypeValue) {
  339. ikey_.type = kTypeValue;
  340. current_key_.UpdateInternalKey(ikey_.sequence, kTypeValue);
  341. }
  342. value_ = compaction_filter_value_;
  343. } else if (decision == CompactionFilter::Decision::kRemoveAndSkipUntil) {
  344. *need_skip = true;
  345. compaction_filter_skip_until_.ConvertFromUserKey(kMaxSequenceNumber,
  346. kValueTypeForSeek);
  347. *skip_until = compaction_filter_skip_until_.Encode();
  348. } else if (decision == CompactionFilter::Decision::kChangeBlobIndex) {
  349. // Only the StackableDB-based BlobDB impl's compaction filter should return
  350. // kChangeBlobIndex. Decision about rewriting blob and changing blob index
  351. // in the integrated BlobDB impl is made in subsequent call to
  352. // PrepareOutput() and its callees.
  353. if (!compaction_filter_->IsStackedBlobDbInternalCompactionFilter()) {
  354. status_ = Status::NotSupported(
  355. "Only stacked BlobDB's internal compaction filter can return "
  356. "kChangeBlobIndex.");
  357. validity_info_.Invalidate();
  358. return false;
  359. }
  360. if (ikey_.type != kTypeBlobIndex) {
  361. ikey_.type = kTypeBlobIndex;
  362. current_key_.UpdateInternalKey(ikey_.sequence, kTypeBlobIndex);
  363. }
  364. value_ = compaction_filter_value_;
  365. } else if (decision == CompactionFilter::Decision::kIOError) {
  366. if (!compaction_filter_->IsStackedBlobDbInternalCompactionFilter()) {
  367. status_ = Status::NotSupported(
  368. "CompactionFilter for integrated BlobDB should not return kIOError");
  369. validity_info_.Invalidate();
  370. return false;
  371. }
  372. status_ = Status::IOError("Failed to access blob during compaction filter");
  373. validity_info_.Invalidate();
  374. return false;
  375. } else if (decision == CompactionFilter::Decision::kChangeWideColumnEntity) {
  376. WideColumns sorted_columns;
  377. sorted_columns.reserve(new_columns.size());
  378. for (const auto& column : new_columns) {
  379. sorted_columns.emplace_back(column.first, column.second);
  380. }
  381. WideColumnsHelper::SortColumns(sorted_columns);
  382. {
  383. const Status s = WideColumnSerialization::Serialize(
  384. sorted_columns, compaction_filter_value_);
  385. if (!s.ok()) {
  386. status_ = s;
  387. validity_info_.Invalidate();
  388. return false;
  389. }
  390. }
  391. if (ikey_.type != kTypeWideColumnEntity) {
  392. ikey_.type = kTypeWideColumnEntity;
  393. current_key_.UpdateInternalKey(ikey_.sequence, kTypeWideColumnEntity);
  394. }
  395. value_ = compaction_filter_value_;
  396. }
  397. return true;
  398. }
  399. void CompactionIterator::NextFromInput() {
  400. at_next_ = false;
  401. validity_info_.Invalidate();
  402. while (!Valid() && input_.Valid() && !IsPausingManualCompaction() &&
  403. !IsShuttingDown()) {
  404. key_ = input_.key();
  405. value_ = input_.value();
  406. blob_value_.Reset();
  407. iter_stats_.num_input_records++;
  408. is_range_del_ = input_.IsDeleteRangeSentinelKey();
  409. Status pik_status = ParseInternalKey(key_, &ikey_, allow_data_in_errors_);
  410. if (!pik_status.ok()) {
  411. iter_stats_.num_input_corrupt_records++;
  412. // Always fail compaction when encountering corrupted internal keys
  413. status_ = pik_status;
  414. return;
  415. }
  416. TEST_SYNC_POINT_CALLBACK("CompactionIterator:ProcessKV", &ikey_);
  417. if (is_range_del_) {
  418. validity_info_.SetValid(kRangeDeletion);
  419. break;
  420. }
  421. // Update input statistics
  422. if (ikey_.type == kTypeDeletion || ikey_.type == kTypeSingleDeletion ||
  423. ikey_.type == kTypeDeletionWithTimestamp) {
  424. iter_stats_.num_input_deletion_records++;
  425. } else if (ikey_.type == kTypeValuePreferredSeqno) {
  426. iter_stats_.num_input_timed_put_records++;
  427. }
  428. iter_stats_.total_input_raw_key_bytes += key_.size();
  429. iter_stats_.total_input_raw_value_bytes += value_.size();
  430. // If need_skip is true, we should seek the input iterator
  431. // to internal key skip_until and continue from there.
  432. bool need_skip = false;
  433. // Points either into compaction_filter_skip_until_ or into
  434. // merge_helper_->compaction_filter_skip_until_.
  435. Slice skip_until;
  436. bool user_key_equal_without_ts = false;
  437. int cmp_ts = 0;
  438. if (has_current_user_key_) {
  439. user_key_equal_without_ts =
  440. cmp_->EqualWithoutTimestamp(ikey_.user_key, current_user_key_);
  441. // if timestamp_size_ > 0, then curr_ts_ has been initialized by a
  442. // previous key.
  443. cmp_ts = timestamp_size_ ? cmp_->CompareTimestamp(
  444. ExtractTimestampFromUserKey(
  445. ikey_.user_key, timestamp_size_),
  446. curr_ts_)
  447. : 0;
  448. }
  449. // Check whether the user key changed. After this if statement current_key_
  450. // is a copy of the current input key (maybe converted to a delete by the
  451. // compaction filter). ikey_.user_key is pointing to the copy.
  452. if (!has_current_user_key_ || !user_key_equal_without_ts || cmp_ts != 0) {
  453. // First occurrence of this user key
  454. // Copy key for output
  455. key_ = current_key_.SetInternalKey(key_, &ikey_);
  456. int prev_cmp_with_ts_low =
  457. !full_history_ts_low_ ? 0
  458. : curr_ts_.empty()
  459. ? 0
  460. : cmp_->CompareTimestamp(curr_ts_, *full_history_ts_low_);
  461. // If timestamp_size_ > 0, then copy from ikey_ to curr_ts_ for the use
  462. // in next iteration to compare with the timestamp of next key.
  463. UpdateTimestampAndCompareWithFullHistoryLow();
  464. // If
  465. // (1) !has_current_user_key_, OR
  466. // (2) timestamp is disabled, OR
  467. // (3) all history will be preserved, OR
  468. // (4) user key (excluding timestamp) is different from previous key, OR
  469. // (5) timestamp is NO older than *full_history_ts_low_, OR
  470. // (6) timestamp is the largest one older than full_history_ts_low_,
  471. // then current_user_key_ must be treated as a different user key.
  472. // This means, if a user key (excluding ts) is the same as the previous
  473. // user key, and its ts is older than *full_history_ts_low_, then we
  474. // consider this key for GC, e.g. it may be dropped if certain conditions
  475. // match.
  476. if (!has_current_user_key_ || !timestamp_size_ || !full_history_ts_low_ ||
  477. !user_key_equal_without_ts || cmp_with_history_ts_low_ >= 0 ||
  478. prev_cmp_with_ts_low >= 0) {
  479. // Initialize for future comparison for rule (A) and etc.
  480. current_user_key_sequence_ = kMaxSequenceNumber;
  481. current_user_key_snapshot_ = 0;
  482. has_current_user_key_ = true;
  483. }
  484. current_user_key_ = ikey_.user_key;
  485. has_outputted_key_ = false;
  486. last_key_seq_zeroed_ = false;
  487. current_key_committed_ = KeyCommitted(ikey_.sequence);
  488. // Apply the compaction filter to the first committed version of the user
  489. // key.
  490. if (current_key_committed_ &&
  491. !InvokeFilterIfNeeded(&need_skip, &skip_until)) {
  492. break;
  493. }
  494. } else {
  495. // Update the current key to reflect the new sequence number/type without
  496. // copying the user key.
  497. // TODO(rven): Compaction filter does not process keys in this path
  498. // Need to have the compaction filter process multiple versions
  499. // if we have versions on both sides of a snapshot
  500. current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
  501. key_ = current_key_.GetInternalKey();
  502. ikey_.user_key = current_key_.GetUserKey();
  503. // Note that newer version of a key is ordered before older versions. If a
  504. // newer version of a key is committed, so as the older version. No need
  505. // to query snapshot_checker_ in that case.
  506. if (UNLIKELY(!current_key_committed_)) {
  507. assert(snapshot_checker_ != nullptr);
  508. current_key_committed_ = KeyCommitted(ikey_.sequence);
  509. // Apply the compaction filter to the first committed version of the
  510. // user key.
  511. if (current_key_committed_ &&
  512. !InvokeFilterIfNeeded(&need_skip, &skip_until)) {
  513. break;
  514. }
  515. }
  516. }
  517. if (UNLIKELY(!current_key_committed_)) {
  518. assert(snapshot_checker_ != nullptr);
  519. validity_info_.SetValid(ValidContext::kCurrentKeyUncommitted);
  520. break;
  521. }
  522. // If there are no snapshots, then this kv affect visibility at tip.
  523. // Otherwise, search though all existing snapshots to find the earliest
  524. // snapshot that is affected by this kv.
  525. SequenceNumber last_sequence = current_user_key_sequence_;
  526. current_user_key_sequence_ = ikey_.sequence;
  527. SequenceNumber last_snapshot = current_user_key_snapshot_;
  528. SequenceNumber prev_snapshot = 0; // 0 means no previous snapshot
  529. current_user_key_snapshot_ =
  530. visible_at_tip_
  531. ? earliest_snapshot_
  532. : findEarliestVisibleSnapshot(ikey_.sequence, &prev_snapshot);
  533. if (need_skip) {
  534. // This case is handled below.
  535. } else if (clear_and_output_next_key_) {
  536. // In the previous iteration we encountered a single delete that we could
  537. // not compact out. We will keep this Put, but can drop it's data.
  538. // (See Optimization 3, below.)
  539. if (ikey_.type != kTypeValue && ikey_.type != kTypeBlobIndex &&
  540. ikey_.type != kTypeWideColumnEntity &&
  541. ikey_.type != kTypeValuePreferredSeqno) {
  542. ROCKS_LOG_FATAL(info_log_, "Unexpected key %s for compaction output",
  543. ikey_.DebugString(allow_data_in_errors_, true).c_str());
  544. assert(false);
  545. }
  546. if (current_user_key_snapshot_ < last_snapshot) {
  547. ROCKS_LOG_FATAL(info_log_,
  548. "key %s, current_user_key_snapshot_ (%" PRIu64
  549. ") < last_snapshot (%" PRIu64 ")",
  550. ikey_.DebugString(allow_data_in_errors_, true).c_str(),
  551. current_user_key_snapshot_, last_snapshot);
  552. assert(false);
  553. }
  554. if (ikey_.type == kTypeBlobIndex || ikey_.type == kTypeWideColumnEntity ||
  555. ikey_.type == kTypeValuePreferredSeqno) {
  556. ikey_.type = kTypeValue;
  557. current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
  558. }
  559. value_.clear();
  560. validity_info_.SetValid(ValidContext::kKeepSDAndClearPut);
  561. clear_and_output_next_key_ = false;
  562. } else if (ikey_.type == kTypeSingleDeletion) {
  563. // We can compact out a SingleDelete if:
  564. // 1) We encounter the corresponding PUT -OR- we know that this key
  565. // doesn't appear past this output level and we are not in
  566. // ingest_behind mode.
  567. // =AND=
  568. // 2) We've already returned a record in this snapshot -OR-
  569. // there are no earlier earliest_write_conflict_snapshot.
  570. //
  571. // A note about 2) above:
  572. // we try to determine whether there is any earlier write conflict
  573. // checking snapshot by calling DefinitelyInSnapshot() with seq and
  574. // earliest_write_conflict_snapshot as arguments. For write-prepared
  575. // and write-unprepared transactions, if earliest_write_conflict_snapshot
  576. // is evicted from WritePreparedTxnDB::commit_cache, then
  577. // DefinitelyInSnapshot(seq, earliest_write_conflict_snapshot) returns
  578. // false, even if the seq is actually visible within
  579. // earliest_write_conflict_snapshot. Consequently, CompactionIterator
  580. // may try to zero out its sequence number, thus hitting assertion error
  581. // in debug mode or cause incorrect DBIter return result.
  582. // We observe that earliest_write_conflict_snapshot >= earliest_snapshot,
  583. // and the seq zeroing logic depends on
  584. // DefinitelyInSnapshot(seq, earliest_snapshot). Therefore, if we cannot
  585. // determine whether seq is **definitely** in
  586. // earliest_write_conflict_snapshot, then we can additionally check if
  587. // seq is definitely in earliest_snapshot. If the latter holds, then the
  588. // former holds too.
  589. //
  590. // Rule 1 is needed for SingleDelete correctness. Rule 2 is needed to
  591. // allow Transactions to do write-conflict checking (if we compacted away
  592. // all keys, then we wouldn't know that a write happened in this
  593. // snapshot). If there is no earlier snapshot, then we know that there
  594. // are no active transactions that need to know about any writes.
  595. //
  596. // Optimization 3:
  597. // If we encounter a SingleDelete followed by a PUT and Rule 2 is NOT
  598. // true, then we must output a SingleDelete. In this case, we will decide
  599. // to also output the PUT. While we are compacting less by outputting the
  600. // PUT now, hopefully this will lead to better compaction in the future
  601. // when Rule 2 is later true (Ie, We are hoping we can later compact out
  602. // both the SingleDelete and the Put, while we couldn't if we only
  603. // outputted the SingleDelete now).
  604. // In this case, we can save space by removing the PUT's value as it will
  605. // never be read.
  606. //
  607. // Deletes and Merges are not supported on the same key that has a
  608. // SingleDelete as it is not possible to correctly do any partial
  609. // compaction of such a combination of operations. The result of mixing
  610. // those operations for a given key is documented as being undefined. So
  611. // we can choose how to handle such a combinations of operations. We will
  612. // try to compact out as much as we can in these cases.
  613. // We will report counts on these anomalous cases.
  614. //
  615. // Note: If timestamp is enabled, then record will be eligible for
  616. // deletion, only if, along with above conditions (Rule 1 and Rule 2)
  617. // full_history_ts_low_ is specified and timestamp for that key is less
  618. // than *full_history_ts_low_. If it's not eligible for deletion, then we
  619. // will output the SingleDelete. For Optimization 3 also, if
  620. // full_history_ts_low_ is specified and timestamp for the key is less
  621. // than *full_history_ts_low_ then only optimization will be applied.
  622. // The easiest way to process a SingleDelete during iteration is to peek
  623. // ahead at the next key.
  624. const bool is_timestamp_eligible_for_gc =
  625. (timestamp_size_ == 0 ||
  626. (full_history_ts_low_ && cmp_with_history_ts_low_ < 0));
  627. ParsedInternalKey next_ikey;
  628. AdvanceInputIter();
  629. while (input_.Valid() && input_.IsDeleteRangeSentinelKey() &&
  630. ParseInternalKey(input_.key(), &next_ikey, allow_data_in_errors_)
  631. .ok() &&
  632. cmp_->EqualWithoutTimestamp(ikey_.user_key, next_ikey.user_key)) {
  633. // skip range tombstone start keys with the same user key
  634. // since they are not "real" point keys.
  635. AdvanceInputIter();
  636. }
  637. // Check whether the next key exists, is not corrupt, and is the same key
  638. // as the single delete.
  639. if (input_.Valid() &&
  640. ParseInternalKey(input_.key(), &next_ikey, allow_data_in_errors_)
  641. .ok() &&
  642. cmp_->EqualWithoutTimestamp(ikey_.user_key, next_ikey.user_key)) {
  643. assert(!input_.IsDeleteRangeSentinelKey());
  644. #ifndef NDEBUG
  645. const Compaction* c =
  646. compaction_ ? compaction_->real_compaction() : nullptr;
  647. #endif
  648. TEST_SYNC_POINT_CALLBACK(
  649. "CompactionIterator::NextFromInput:SingleDelete:1",
  650. const_cast<Compaction*>(c));
  651. if (last_key_seq_zeroed_) {
  652. // Drop SD and the next key since they are both in the last
  653. // snapshot (since last key has seqno zeroed).
  654. ++iter_stats_.num_record_drop_hidden;
  655. ++iter_stats_.num_record_drop_obsolete;
  656. assert(bottommost_level_);
  657. AdvanceInputIter();
  658. } else if (prev_snapshot == 0 ||
  659. DefinitelyNotInSnapshot(next_ikey.sequence, prev_snapshot)) {
  660. // Check whether the next key belongs to the same snapshot as the
  661. // SingleDelete.
  662. TEST_SYNC_POINT_CALLBACK(
  663. "CompactionIterator::NextFromInput:SingleDelete:2", nullptr);
  664. if (next_ikey.type == kTypeSingleDeletion) {
  665. // We encountered two SingleDeletes for same key in a row. This
  666. // could be due to unexpected user input. If write-(un)prepared
  667. // transaction is used, this could also be due to releasing an old
  668. // snapshot between a Put and its matching SingleDelete.
  669. // Skip the first SingleDelete and let the next iteration decide
  670. // how to handle the second SingleDelete.
  671. // First SingleDelete has been skipped since we already called
  672. // input_.Next().
  673. ++iter_stats_.num_record_drop_obsolete;
  674. ++iter_stats_.num_single_del_mismatch;
  675. } else if (next_ikey.type == kTypeDeletion) {
  676. std::ostringstream oss;
  677. oss << "Found SD and type: " << static_cast<int>(next_ikey.type)
  678. << " on the same key, violating the contract "
  679. "of SingleDelete. Check your application to make sure the "
  680. "application does not mix SingleDelete and Delete for "
  681. "the same key. If you are using "
  682. "write-prepared/write-unprepared transactions, and use "
  683. "SingleDelete to delete certain keys, then make sure "
  684. "TransactionDBOptions::rollback_deletion_type_callback is "
  685. "configured properly. Mixing SD and DEL can lead to "
  686. "undefined behaviors";
  687. ++iter_stats_.num_record_drop_obsolete;
  688. ++iter_stats_.num_single_del_mismatch;
  689. if (enforce_single_del_contracts_) {
  690. ROCKS_LOG_ERROR(info_log_, "%s", oss.str().c_str());
  691. validity_info_.Invalidate();
  692. status_ = Status::Corruption(oss.str());
  693. return;
  694. }
  695. ROCKS_LOG_WARN(info_log_, "%s", oss.str().c_str());
  696. } else if (!is_timestamp_eligible_for_gc) {
  697. // We cannot drop the SingleDelete as timestamp is enabled, and
  698. // timestamp of this key is greater than or equal to
  699. // *full_history_ts_low_. We will output the SingleDelete.
  700. validity_info_.SetValid(ValidContext::kKeepTsHistory);
  701. } else if (has_outputted_key_ ||
  702. DefinitelyInSnapshot(ikey_.sequence,
  703. earliest_write_conflict_snapshot_) ||
  704. (earliest_snapshot_ < earliest_write_conflict_snapshot_ &&
  705. DefinitelyInSnapshot(ikey_.sequence,
  706. earliest_snapshot_))) {
  707. // Found a matching value, we can drop the single delete and the
  708. // value. It is safe to drop both records since we've already
  709. // outputted a key in this snapshot, or there is no earlier
  710. // snapshot (Rule 2 above).
  711. // Note: it doesn't matter whether the second key is a Put or if it
  712. // is an unexpected Merge or Delete. We will compact it out
  713. // either way. We will maintain counts of how many mismatches
  714. // happened
  715. if (next_ikey.type != kTypeValue &&
  716. next_ikey.type != kTypeBlobIndex &&
  717. next_ikey.type != kTypeWideColumnEntity &&
  718. next_ikey.type != kTypeValuePreferredSeqno) {
  719. ++iter_stats_.num_single_del_mismatch;
  720. }
  721. ++iter_stats_.num_record_drop_hidden;
  722. ++iter_stats_.num_record_drop_obsolete;
  723. // Already called input_.Next() once. Call it a second time to
  724. // skip past the second key.
  725. AdvanceInputIter();
  726. } else {
  727. // Found a matching value, but we cannot drop both keys since
  728. // there is an earlier snapshot and we need to leave behind a record
  729. // to know that a write happened in this snapshot (Rule 2 above).
  730. // Clear the value and output the SingleDelete. (The value will be
  731. // outputted on the next iteration.)
  732. // Setting valid_ to true will output the current SingleDelete
  733. validity_info_.SetValid(ValidContext::kKeepSDForConflictCheck);
  734. // Set up the Put to be outputted in the next iteration.
  735. // (Optimization 3).
  736. clear_and_output_next_key_ = true;
  737. TEST_SYNC_POINT_CALLBACK(
  738. "CompactionIterator::NextFromInput:KeepSDForWW",
  739. /*arg=*/nullptr);
  740. }
  741. } else {
  742. // We hit the next snapshot without hitting a put, so the iterator
  743. // returns the single delete.
  744. validity_info_.SetValid(ValidContext::kKeepSDForSnapshot);
  745. TEST_SYNC_POINT_CALLBACK(
  746. "CompactionIterator::NextFromInput:SingleDelete:3",
  747. const_cast<Compaction*>(c));
  748. }
  749. } else {
  750. // We are at the end of the input, could not parse the next key, or hit
  751. // a different key. The iterator returns the single delete if the key
  752. // possibly exists beyond the current output level. We set
  753. // has_current_user_key to false so that if the iterator is at the next
  754. // key, we do not compare it again against the previous key at the next
  755. // iteration. If the next key is corrupt, we return before the
  756. // comparison, so the value of has_current_user_key does not matter.
  757. has_current_user_key_ = false;
  758. if (compaction_ != nullptr && !compaction_->allow_ingest_behind() &&
  759. DefinitelyInSnapshot(ikey_.sequence, earliest_snapshot_) &&
  760. compaction_->KeyNotExistsBeyondOutputLevel(ikey_.user_key,
  761. &level_ptrs_) &&
  762. is_timestamp_eligible_for_gc) {
  763. // Key doesn't exist outside of this range.
  764. // Can compact out this SingleDelete.
  765. ++iter_stats_.num_record_drop_obsolete;
  766. ++iter_stats_.num_single_del_fallthru;
  767. if (!bottommost_level_) {
  768. ++iter_stats_.num_optimized_del_drop_obsolete;
  769. }
  770. } else if (last_key_seq_zeroed_) {
  771. // Sequence number zeroing requires bottommost_level_, which is
  772. // false with ingest_behind.
  773. assert(!compaction_->allow_ingest_behind());
  774. // Skip.
  775. ++iter_stats_.num_record_drop_hidden;
  776. ++iter_stats_.num_record_drop_obsolete;
  777. assert(bottommost_level_);
  778. } else {
  779. // Output SingleDelete
  780. validity_info_.SetValid(ValidContext::kKeepSD);
  781. }
  782. }
  783. if (Valid()) {
  784. at_next_ = true;
  785. }
  786. } else if (last_sequence != kMaxSequenceNumber &&
  787. (last_snapshot == current_user_key_snapshot_ ||
  788. last_snapshot < current_user_key_snapshot_)) {
  789. // rule (A):
  790. // If the earliest snapshot is which this key is visible in
  791. // is the same as the visibility of a previous instance of the
  792. // same key, then this kv is not visible in any snapshot.
  793. // Hidden by an newer entry for same user key
  794. //
  795. // Note: Dropping this key will not affect TransactionDB write-conflict
  796. // checking since there has already been a record returned for this key
  797. // in this snapshot.
  798. // When ingest_behind is enabled, it's ok that we drop an overwritten
  799. // Delete here. The overwritting key still covers whatever that will be
  800. // ingested. Note that we will not drop SingleDelete here as SingleDelte
  801. // is handled entirely in its own if clause. This is important, see
  802. // example: from new to old: SingleDelete_1, PUT_1, SingleDelete_2, PUT_2,
  803. // where all operations are on the same key and PUT_2 is ingested with
  804. // ingest_behind=true. If SingleDelete_2 is dropped due to being compacted
  805. // together with PUT_1, and then PUT_1 is compacted away together with
  806. // SingleDelete_1, PUT_2 can incorrectly becomes visible.
  807. if (last_sequence < current_user_key_sequence_) {
  808. ROCKS_LOG_FATAL(info_log_,
  809. "key %s, last_sequence (%" PRIu64
  810. ") < current_user_key_sequence_ (%" PRIu64 ")",
  811. ikey_.DebugString(allow_data_in_errors_, true).c_str(),
  812. last_sequence, current_user_key_sequence_);
  813. assert(false);
  814. }
  815. ++iter_stats_.num_record_drop_hidden;
  816. AdvanceInputIter();
  817. } else if (compaction_ != nullptr &&
  818. (ikey_.type == kTypeDeletion ||
  819. (ikey_.type == kTypeDeletionWithTimestamp &&
  820. cmp_with_history_ts_low_ < 0)) &&
  821. !compaction_->allow_ingest_behind() &&
  822. DefinitelyInSnapshot(ikey_.sequence, earliest_snapshot_) &&
  823. compaction_->KeyNotExistsBeyondOutputLevel(ikey_.user_key,
  824. &level_ptrs_)) {
  825. // TODO(noetzli): This is the only place where we use compaction_
  826. // (besides the constructor). We should probably get rid of this
  827. // dependency and find a way to do similar filtering during flushes.
  828. //
  829. // For this user key:
  830. // (1) there is no data in higher levels
  831. // (2) data in lower levels will have larger sequence numbers
  832. // (3) data in layers that are being compacted here and have
  833. // smaller sequence numbers will be dropped in the next
  834. // few iterations of this loop (by rule (A) above).
  835. // Therefore this deletion marker is obsolete and can be dropped.
  836. //
  837. // Note: Dropping this Delete will not affect TransactionDB
  838. // write-conflict checking since it is earlier than any snapshot.
  839. //
  840. // It seems that we can also drop deletion later than earliest snapshot
  841. // given that:
  842. // (1) The deletion is earlier than earliest_write_conflict_snapshot, and
  843. // (2) No value exist earlier than the deletion.
  844. //
  845. // Note also that a deletion marker of type kTypeDeletionWithTimestamp
  846. // will be treated as a different user key unless the timestamp is older
  847. // than *full_history_ts_low_.
  848. ++iter_stats_.num_record_drop_obsolete;
  849. if (!bottommost_level_) {
  850. ++iter_stats_.num_optimized_del_drop_obsolete;
  851. }
  852. AdvanceInputIter();
  853. } else if ((ikey_.type == kTypeDeletion ||
  854. (ikey_.type == kTypeDeletionWithTimestamp &&
  855. cmp_with_history_ts_low_ < 0)) &&
  856. bottommost_level_) {
  857. assert(compaction_);
  858. assert(!compaction_->allow_ingest_behind()); // bottommost_level_ is true
  859. // Handle the case where we have a delete key at the bottom most level
  860. // We can skip outputting the key iff there are no subsequent puts for
  861. // this key
  862. assert(compaction_->KeyNotExistsBeyondOutputLevel(ikey_.user_key,
  863. &level_ptrs_));
  864. ParsedInternalKey next_ikey;
  865. AdvanceInputIter();
  866. #ifndef NDEBUG
  867. const Compaction* c =
  868. compaction_ ? compaction_->real_compaction() : nullptr;
  869. #endif
  870. TEST_SYNC_POINT_CALLBACK(
  871. "CompactionIterator::NextFromInput:BottommostDelete:1",
  872. const_cast<Compaction*>(c));
  873. // Skip over all versions of this key that happen to occur in the same
  874. // snapshot range as the delete.
  875. //
  876. // Note that a deletion marker of type kTypeDeletionWithTimestamp will be
  877. // considered to have a different user key unless the timestamp is older
  878. // than *full_history_ts_low_.
  879. //
  880. // Range tombstone start keys are skipped as they are not "real" keys.
  881. while (!IsPausingManualCompaction() && !IsShuttingDown() &&
  882. input_.Valid() &&
  883. (ParseInternalKey(input_.key(), &next_ikey, allow_data_in_errors_)
  884. .ok()) &&
  885. cmp_->EqualWithoutTimestamp(ikey_.user_key, next_ikey.user_key) &&
  886. (prev_snapshot == 0 || input_.IsDeleteRangeSentinelKey() ||
  887. DefinitelyNotInSnapshot(next_ikey.sequence, prev_snapshot))) {
  888. AdvanceInputIter();
  889. }
  890. // If you find you still need to output a row with this key, we need to
  891. // output the delete too
  892. if (input_.Valid() &&
  893. (ParseInternalKey(input_.key(), &next_ikey, allow_data_in_errors_)
  894. .ok()) &&
  895. cmp_->EqualWithoutTimestamp(ikey_.user_key, next_ikey.user_key)) {
  896. validity_info_.SetValid(ValidContext::kKeepDel);
  897. at_next_ = true;
  898. }
  899. } else if (ikey_.type == kTypeValuePreferredSeqno &&
  900. DefinitelyInSnapshot(ikey_.sequence, earliest_snapshot_) &&
  901. (bottommost_level_ ||
  902. (compaction_ != nullptr &&
  903. compaction_->KeyNotExistsBeyondOutputLevel(ikey_.user_key,
  904. &level_ptrs_)))) {
  905. // FIXME: it's possible that we are setting sequence number to 0 as
  906. // preferred sequence number here. If cf_ingest_behind is enabled, this
  907. // may fail ingestions since they expect all keys above the last level
  908. // to have non-zero sequence number. We should probably not allow seqno
  909. // zeroing here.
  910. //
  911. // This section that attempts to swap preferred sequence number will not
  912. // be invoked if this is a CompactionIterator created for flush, since
  913. // `compaction_` will be nullptr and it's not bottommost either.
  914. //
  915. // The entries with the same user key and smaller sequence numbers are
  916. // all in this earliest snapshot range to be iterated. Since those entries
  917. // will be hidden by this entry [rule A], it's safe to swap in the
  918. // preferred seqno now.
  919. //
  920. // It's otherwise not safe to swap in the preferred seqno since it's
  921. // possible for entries in earlier snapshots to have sequence number that
  922. // is smaller than this entry's sequence number but bigger than this
  923. // entry's preferred sequence number. Swapping in the preferred sequence
  924. // number will break the internal key ordering invariant for this key.
  925. //
  926. // A special case involving range deletion is handled separately below.
  927. auto [unpacked_value, preferred_seqno] =
  928. ParsePackedValueWithSeqno(value_);
  929. assert(preferred_seqno < ikey_.sequence || ikey_.sequence == 0);
  930. if (range_del_agg_->ShouldDelete(
  931. key_, RangeDelPositioningMode::kForwardTraversal)) {
  932. ++iter_stats_.num_record_drop_hidden;
  933. ++iter_stats_.num_record_drop_range_del;
  934. AdvanceInputIter();
  935. } else {
  936. InternalKey ikey_after_swap(ikey_.user_key,
  937. std::min(preferred_seqno, ikey_.sequence),
  938. kTypeValue);
  939. Slice ikey_after_swap_slice(*ikey_after_swap.rep());
  940. if (range_del_agg_->ShouldDelete(
  941. ikey_after_swap_slice,
  942. RangeDelPositioningMode::kForwardTraversal)) {
  943. // A range tombstone that doesn't cover this kTypeValuePreferredSeqno
  944. // entry will end up covering the entry, so it's not safe to swap
  945. // preferred sequence number. In this case, we output the entry as is.
  946. validity_info_.SetValid(ValidContext::kNewUserKey);
  947. } else {
  948. if (ikey_.sequence != 0) {
  949. iter_stats_.num_timed_put_swap_preferred_seqno++;
  950. ikey_.sequence = preferred_seqno;
  951. }
  952. ikey_.type = kTypeValue;
  953. current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
  954. key_ = current_key_.GetInternalKey();
  955. ikey_.user_key = current_key_.GetUserKey();
  956. value_ = unpacked_value;
  957. validity_info_.SetValid(ValidContext::kSwapPreferredSeqno);
  958. }
  959. }
  960. } else if (ikey_.type == kTypeMerge) {
  961. if (!merge_helper_->HasOperator()) {
  962. status_ = Status::InvalidArgument(
  963. "merge_operator is not properly initialized.");
  964. return;
  965. }
  966. pinned_iters_mgr_.StartPinning();
  967. // We know the merge type entry is not hidden, otherwise we would
  968. // have hit (A)
  969. // We encapsulate the merge related state machine in a different
  970. // object to minimize change to the existing flow.
  971. merge_until_status_ = merge_helper_->MergeUntil(
  972. &input_, range_del_agg_, prev_snapshot, bottommost_level_,
  973. allow_data_in_errors_, blob_fetcher_.get(), full_history_ts_low_,
  974. prefetch_buffers_.get(), &iter_stats_);
  975. merge_out_iter_.SeekToFirst();
  976. if (!merge_until_status_.ok() &&
  977. !merge_until_status_.IsMergeInProgress()) {
  978. status_ = merge_until_status_;
  979. return;
  980. } else if (merge_out_iter_.Valid()) {
  981. // NOTE: key, value, and ikey_ refer to old entries.
  982. // These will be correctly set below.
  983. key_ = merge_out_iter_.key();
  984. value_ = merge_out_iter_.value();
  985. pik_status = ParseInternalKey(key_, &ikey_, allow_data_in_errors_);
  986. // MergeUntil stops when it encounters a corrupt key and does not
  987. // include them in the result, so we expect the keys here to valid.
  988. if (!pik_status.ok()) {
  989. ROCKS_LOG_FATAL(
  990. info_log_, "Invalid key %s in compaction. %s",
  991. allow_data_in_errors_ ? key_.ToString(true).c_str() : "hidden",
  992. pik_status.getState());
  993. assert(false);
  994. }
  995. // Keep current_key_ in sync.
  996. current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
  997. key_ = current_key_.GetInternalKey();
  998. ikey_.user_key = current_key_.GetUserKey();
  999. validity_info_.SetValid(ValidContext::kMerge2);
  1000. } else {
  1001. // all merge operands were filtered out. reset the user key, since the
  1002. // batch consumed by the merge operator should not shadow any keys
  1003. // coming after the merges
  1004. has_current_user_key_ = false;
  1005. pinned_iters_mgr_.ReleasePinnedData();
  1006. if (merge_helper_->FilteredUntil(&skip_until)) {
  1007. need_skip = true;
  1008. }
  1009. }
  1010. } else {
  1011. // 1. new user key -OR-
  1012. // 2. different snapshot stripe
  1013. // If user-defined timestamp is enabled, we consider keys for GC if they
  1014. // are below history_ts_low_. CompactionRangeDelAggregator::ShouldDelete()
  1015. // only considers range deletions that are at or below history_ts_low_ and
  1016. // trim_ts_. We drop keys here that are below history_ts_low_ and are
  1017. // covered by a range tombstone that is at or below history_ts_low_ and
  1018. // trim_ts.
  1019. bool should_delete = false;
  1020. if (!timestamp_size_ || cmp_with_history_ts_low_ < 0) {
  1021. should_delete = range_del_agg_->ShouldDelete(
  1022. key_, RangeDelPositioningMode::kForwardTraversal);
  1023. }
  1024. if (should_delete) {
  1025. ++iter_stats_.num_record_drop_hidden;
  1026. ++iter_stats_.num_record_drop_range_del;
  1027. AdvanceInputIter();
  1028. } else {
  1029. validity_info_.SetValid(ValidContext::kNewUserKey);
  1030. }
  1031. }
  1032. if (need_skip) {
  1033. SkipUntil(skip_until);
  1034. }
  1035. }
  1036. if (status_.ok()) {
  1037. if (!Valid() && IsShuttingDown()) {
  1038. status_ = Status::ShutdownInProgress();
  1039. } else if (IsPausingManualCompaction()) {
  1040. status_ = Status::Incomplete(Status::SubCode::kManualCompactionPaused);
  1041. } else if (!input_.Valid() && input_.status().IsCorruption()) {
  1042. // Propagate corruption status from memtable iterator
  1043. status_ = input_.status();
  1044. }
  1045. }
  1046. }
  1047. bool CompactionIterator::ExtractLargeValueIfNeededImpl() {
  1048. if (!blob_file_builder_) {
  1049. return false;
  1050. }
  1051. blob_index_.clear();
  1052. const Status s = blob_file_builder_->Add(user_key(), value_, &blob_index_);
  1053. if (!s.ok()) {
  1054. status_ = s;
  1055. validity_info_.Invalidate();
  1056. return false;
  1057. }
  1058. if (blob_index_.empty()) {
  1059. return false;
  1060. }
  1061. value_ = blob_index_;
  1062. return true;
  1063. }
  1064. void CompactionIterator::ExtractLargeValueIfNeeded() {
  1065. assert(ikey_.type == kTypeValue);
  1066. if (!ExtractLargeValueIfNeededImpl()) {
  1067. return;
  1068. }
  1069. ikey_.type = kTypeBlobIndex;
  1070. current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
  1071. }
  1072. void CompactionIterator::GarbageCollectBlobIfNeeded() {
  1073. assert(ikey_.type == kTypeBlobIndex);
  1074. if (!compaction_) {
  1075. return;
  1076. }
  1077. // GC for integrated BlobDB
  1078. if (compaction_->enable_blob_garbage_collection()) {
  1079. TEST_SYNC_POINT_CALLBACK(
  1080. "CompactionIterator::GarbageCollectBlobIfNeeded::TamperWithBlobIndex",
  1081. &value_);
  1082. BlobIndex blob_index;
  1083. {
  1084. const Status s = blob_index.DecodeFrom(value_);
  1085. if (!s.ok()) {
  1086. status_ = s;
  1087. validity_info_.Invalidate();
  1088. return;
  1089. }
  1090. }
  1091. if (blob_index.file_number() >=
  1092. blob_garbage_collection_cutoff_file_number_) {
  1093. return;
  1094. }
  1095. FilePrefetchBuffer* prefetch_buffer =
  1096. prefetch_buffers_ ? prefetch_buffers_->GetOrCreatePrefetchBuffer(
  1097. blob_index.file_number())
  1098. : nullptr;
  1099. uint64_t bytes_read = 0;
  1100. {
  1101. assert(blob_fetcher_);
  1102. const Status s = blob_fetcher_->FetchBlob(
  1103. user_key(), blob_index, prefetch_buffer, &blob_value_, &bytes_read);
  1104. if (!s.ok()) {
  1105. status_ = s;
  1106. validity_info_.Invalidate();
  1107. return;
  1108. }
  1109. }
  1110. ++iter_stats_.num_blobs_read;
  1111. iter_stats_.total_blob_bytes_read += bytes_read;
  1112. ++iter_stats_.num_blobs_relocated;
  1113. iter_stats_.total_blob_bytes_relocated += blob_index.size();
  1114. value_ = blob_value_;
  1115. if (ExtractLargeValueIfNeededImpl()) {
  1116. return;
  1117. }
  1118. ikey_.type = kTypeValue;
  1119. current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
  1120. return;
  1121. }
  1122. // GC for stacked BlobDB
  1123. if (compaction_filter_ &&
  1124. compaction_filter_->IsStackedBlobDbInternalCompactionFilter()) {
  1125. const auto blob_decision = compaction_filter_->PrepareBlobOutput(
  1126. user_key(), value_, &compaction_filter_value_);
  1127. if (blob_decision == CompactionFilter::BlobDecision::kCorruption) {
  1128. status_ =
  1129. Status::Corruption("Corrupted blob reference encountered during GC");
  1130. validity_info_.Invalidate();
  1131. return;
  1132. }
  1133. if (blob_decision == CompactionFilter::BlobDecision::kIOError) {
  1134. status_ = Status::IOError("Could not relocate blob during GC");
  1135. validity_info_.Invalidate();
  1136. return;
  1137. }
  1138. if (blob_decision == CompactionFilter::BlobDecision::kChangeValue) {
  1139. value_ = compaction_filter_value_;
  1140. return;
  1141. }
  1142. }
  1143. }
  1144. void CompactionIterator::PrepareOutput() {
  1145. if (Valid()) {
  1146. if (LIKELY(!is_range_del_)) {
  1147. if (ikey_.type == kTypeValue) {
  1148. ExtractLargeValueIfNeeded();
  1149. } else if (ikey_.type == kTypeBlobIndex) {
  1150. GarbageCollectBlobIfNeeded();
  1151. }
  1152. }
  1153. // Zeroing out the sequence number leads to better compression.
  1154. // If this is the bottommost level (no files in lower levels)
  1155. // and the earliest snapshot is larger than this seqno
  1156. // and the userkey differs from the last userkey in compaction
  1157. // then we can squash the seqno to zero.
  1158. //
  1159. // This is safe for TransactionDB write-conflict checking since transactions
  1160. // only care about sequence number larger than any active snapshots.
  1161. //
  1162. // Can we do the same for levels above bottom level as long as
  1163. // KeyNotExistsBeyondOutputLevel() return true?
  1164. if (Valid() && bottommost_level_ &&
  1165. DefinitelyInSnapshot(ikey_.sequence, earliest_snapshot_) &&
  1166. ikey_.type != kTypeMerge && current_key_committed_ &&
  1167. ikey_.sequence <= preserve_seqno_after_ && !is_range_del_) {
  1168. assert(compaction_ != nullptr && !compaction_->allow_ingest_behind());
  1169. if (ikey_.type == kTypeDeletion ||
  1170. (ikey_.type == kTypeSingleDeletion && timestamp_size_ == 0)) {
  1171. ROCKS_LOG_FATAL(
  1172. info_log_,
  1173. "Unexpected key %s for seq-zero optimization. "
  1174. "earliest_snapshot %" PRIu64
  1175. ", earliest_write_conflict_snapshot %" PRIu64
  1176. " job_snapshot %" PRIu64
  1177. ". timestamp_size: %d full_history_ts_low_ %s. validity %x",
  1178. ikey_.DebugString(allow_data_in_errors_, true).c_str(),
  1179. earliest_snapshot_, earliest_write_conflict_snapshot_,
  1180. job_snapshot_, static_cast<int>(timestamp_size_),
  1181. full_history_ts_low_ != nullptr
  1182. ? Slice(*full_history_ts_low_).ToString(true).c_str()
  1183. : "null",
  1184. validity_info_.rep);
  1185. assert(false);
  1186. }
  1187. ikey_.sequence = 0;
  1188. last_key_seq_zeroed_ = true;
  1189. TEST_SYNC_POINT_CALLBACK("CompactionIterator::PrepareOutput:ZeroingSeq",
  1190. &ikey_);
  1191. if (!timestamp_size_) {
  1192. current_key_.UpdateInternalKey(0, ikey_.type);
  1193. } else if (full_history_ts_low_ && cmp_with_history_ts_low_ < 0) {
  1194. // We can also zero out timestamp for better compression.
  1195. // For the same user key (excluding timestamp), the timestamp-based
  1196. // history can be collapsed to save some space if the timestamp is
  1197. // older than *full_history_ts_low_.
  1198. const std::string kTsMin(timestamp_size_, static_cast<char>(0));
  1199. const Slice ts_slice = kTsMin;
  1200. ikey_.SetTimestamp(ts_slice);
  1201. current_key_.UpdateInternalKey(0, ikey_.type, &ts_slice);
  1202. }
  1203. }
  1204. }
  1205. }
  1206. inline SequenceNumber CompactionIterator::findEarliestVisibleSnapshot(
  1207. SequenceNumber in, SequenceNumber* prev_snapshot) {
  1208. assert(snapshots_->size());
  1209. if (snapshots_->size() == 0) {
  1210. ROCKS_LOG_FATAL(info_log_,
  1211. "No snapshot left in findEarliestVisibleSnapshot");
  1212. }
  1213. auto snapshots_iter =
  1214. std::lower_bound(snapshots_->begin(), snapshots_->end(), in);
  1215. assert(prev_snapshot != nullptr);
  1216. if (snapshots_iter == snapshots_->begin()) {
  1217. *prev_snapshot = 0;
  1218. } else {
  1219. *prev_snapshot = *std::prev(snapshots_iter);
  1220. if (*prev_snapshot >= in) {
  1221. ROCKS_LOG_FATAL(info_log_,
  1222. "*prev_snapshot (%" PRIu64 ") >= in (%" PRIu64
  1223. ") in findEarliestVisibleSnapshot",
  1224. *prev_snapshot, in);
  1225. assert(false);
  1226. }
  1227. }
  1228. if (snapshot_checker_ == nullptr) {
  1229. return snapshots_iter != snapshots_->end() ? *snapshots_iter
  1230. : kMaxSequenceNumber;
  1231. }
  1232. bool has_released_snapshot = !released_snapshots_.empty();
  1233. for (; snapshots_iter != snapshots_->end(); ++snapshots_iter) {
  1234. auto cur = *snapshots_iter;
  1235. if (in > cur) {
  1236. ROCKS_LOG_FATAL(info_log_,
  1237. "in (%" PRIu64 ") > cur (%" PRIu64
  1238. ") in findEarliestVisibleSnapshot",
  1239. in, cur);
  1240. assert(false);
  1241. }
  1242. // Skip if cur is in released_snapshots.
  1243. if (has_released_snapshot && released_snapshots_.count(cur) > 0) {
  1244. continue;
  1245. }
  1246. auto res = snapshot_checker_->CheckInSnapshot(in, cur);
  1247. if (res == SnapshotCheckerResult::kInSnapshot) {
  1248. return cur;
  1249. } else if (res == SnapshotCheckerResult::kSnapshotReleased) {
  1250. released_snapshots_.insert(cur);
  1251. }
  1252. *prev_snapshot = cur;
  1253. }
  1254. return kMaxSequenceNumber;
  1255. }
  1256. uint64_t CompactionIterator::ComputeBlobGarbageCollectionCutoffFileNumber(
  1257. const CompactionProxy* compaction) {
  1258. if (!compaction) {
  1259. return 0;
  1260. }
  1261. if (!compaction->enable_blob_garbage_collection()) {
  1262. return 0;
  1263. }
  1264. const Version* const version = compaction->input_version();
  1265. assert(version);
  1266. const VersionStorageInfo* const storage_info = version->storage_info();
  1267. assert(storage_info);
  1268. const auto& blob_files = storage_info->GetBlobFiles();
  1269. const size_t cutoff_index = static_cast<size_t>(
  1270. compaction->blob_garbage_collection_age_cutoff() * blob_files.size());
  1271. if (cutoff_index >= blob_files.size()) {
  1272. return std::numeric_limits<uint64_t>::max();
  1273. }
  1274. const auto& meta = blob_files[cutoff_index];
  1275. assert(meta);
  1276. return meta->GetBlobFileNumber();
  1277. }
  1278. std::unique_ptr<BlobFetcher> CompactionIterator::CreateBlobFetcherIfNeeded(
  1279. const CompactionProxy* compaction) {
  1280. if (!compaction) {
  1281. return nullptr;
  1282. }
  1283. const Version* const version = compaction->input_version();
  1284. if (!version) {
  1285. return nullptr;
  1286. }
  1287. ReadOptions read_options;
  1288. read_options.io_activity = Env::IOActivity::kCompaction;
  1289. read_options.fill_cache = false;
  1290. return std::unique_ptr<BlobFetcher>(new BlobFetcher(version, read_options));
  1291. }
  1292. std::unique_ptr<PrefetchBufferCollection>
  1293. CompactionIterator::CreatePrefetchBufferCollectionIfNeeded(
  1294. const CompactionProxy* compaction) {
  1295. if (!compaction) {
  1296. return nullptr;
  1297. }
  1298. if (!compaction->input_version()) {
  1299. return nullptr;
  1300. }
  1301. if (compaction->allow_mmap_reads()) {
  1302. return nullptr;
  1303. }
  1304. const uint64_t readahead_size = compaction->blob_compaction_readahead_size();
  1305. if (!readahead_size) {
  1306. return nullptr;
  1307. }
  1308. return std::unique_ptr<PrefetchBufferCollection>(
  1309. new PrefetchBufferCollection(readahead_size));
  1310. }
  1311. } // namespace ROCKSDB_NAMESPACE