version_set_sync_and_async.h 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. // Copyright (c) Meta Platforms, Inc. and affiliates.
  2. //
  3. // This source code is licensed under both the GPLv2 (found in the
  4. // COPYING file in the root directory) and Apache 2.0 License
  5. // (found in the LICENSE.Apache file in the root directory).
  6. #include "util/coro_utils.h"
  7. #if defined(WITHOUT_COROUTINES) || \
  8. (defined(USE_COROUTINES) && defined(WITH_COROUTINES))
  9. namespace ROCKSDB_NAMESPACE {
  10. // Lookup a batch of keys in a single SST file
  11. DEFINE_SYNC_AND_ASYNC(Status, Version::MultiGetFromSST)
  12. (const ReadOptions& read_options, MultiGetRange file_range, int hit_file_level,
  13. bool skip_filters, bool skip_range_deletions, FdWithKeyRange* f,
  14. std::unordered_map<uint64_t, BlobReadContexts>& blob_ctxs,
  15. TableCache::TypedHandle* table_handle, uint64_t& num_filter_read,
  16. uint64_t& num_index_read, uint64_t& num_sst_read) {
  17. bool timer_enabled = GetPerfLevel() >= PerfLevel::kEnableTimeExceptForMutex &&
  18. get_perf_context()->per_level_perf_context_enabled;
  19. Status s;
  20. StopWatchNano timer(clock_, timer_enabled /* auto_start */);
  21. s = CO_AWAIT(table_cache_->MultiGet)(
  22. read_options, *internal_comparator(), *f->file_metadata, &file_range,
  23. mutable_cf_options_,
  24. cfd_->internal_stats()->GetFileReadHist(hit_file_level), skip_filters,
  25. skip_range_deletions, hit_file_level, table_handle);
  26. // TODO: examine the behavior for corrupted key
  27. if (timer_enabled) {
  28. PERF_COUNTER_BY_LEVEL_ADD(get_from_table_nanos, timer.ElapsedNanos(),
  29. hit_file_level);
  30. }
  31. if (!s.ok()) {
  32. // TODO: Set status for individual keys appropriately
  33. for (auto iter = file_range.begin(); iter != file_range.end(); ++iter) {
  34. *iter->s = s;
  35. file_range.MarkKeyDone(iter);
  36. }
  37. CO_RETURN s;
  38. }
  39. uint64_t batch_size = 0;
  40. for (auto iter = file_range.begin(); s.ok() && iter != file_range.end();
  41. ++iter) {
  42. GetContext& get_context = *iter->get_context;
  43. Status* status = iter->s;
  44. // The Status in the KeyContext takes precedence over GetContext state
  45. // Status may be an error if there were any IO errors in the table
  46. // reader. We never expect Status to be NotFound(), as that is
  47. // determined by get_context
  48. assert(!status->IsNotFound());
  49. if (!status->ok()) {
  50. file_range.MarkKeyDone(iter);
  51. continue;
  52. }
  53. if (get_context.sample()) {
  54. sample_file_read_inc(f->file_metadata);
  55. }
  56. batch_size++;
  57. num_index_read += get_context.get_context_stats_.num_index_read;
  58. num_filter_read += get_context.get_context_stats_.num_filter_read;
  59. num_sst_read += get_context.get_context_stats_.num_sst_read;
  60. // Reset these stats since they're specific to a level
  61. get_context.get_context_stats_.num_index_read = 0;
  62. get_context.get_context_stats_.num_filter_read = 0;
  63. get_context.get_context_stats_.num_sst_read = 0;
  64. // report the counters before returning
  65. if (get_context.State() != GetContext::kNotFound &&
  66. get_context.State() != GetContext::kMerge &&
  67. db_statistics_ != nullptr) {
  68. get_context.ReportCounters();
  69. } else {
  70. if (iter->max_covering_tombstone_seq > 0) {
  71. // The remaining files we look at will only contain covered keys, so
  72. // we stop here for this key
  73. file_range.SkipKey(iter);
  74. }
  75. }
  76. switch (get_context.State()) {
  77. case GetContext::kNotFound:
  78. // Keep searching in other files
  79. break;
  80. case GetContext::kMerge:
  81. // TODO: update per-level perfcontext user_key_return_count for kMerge
  82. break;
  83. case GetContext::kFound:
  84. if (hit_file_level == 0) {
  85. RecordTick(db_statistics_, GET_HIT_L0);
  86. } else if (hit_file_level == 1) {
  87. RecordTick(db_statistics_, GET_HIT_L1);
  88. } else if (hit_file_level >= 2) {
  89. RecordTick(db_statistics_, GET_HIT_L2_AND_UP);
  90. }
  91. PERF_COUNTER_BY_LEVEL_ADD(user_key_return_count, 1, hit_file_level);
  92. file_range.MarkKeyDone(iter);
  93. if (iter->is_blob_index) {
  94. BlobIndex blob_index;
  95. Status tmp_s;
  96. if (iter->value) {
  97. TEST_SYNC_POINT_CALLBACK("Version::MultiGet::TamperWithBlobIndex",
  98. &(*iter));
  99. tmp_s = blob_index.DecodeFrom(*(iter->value));
  100. } else {
  101. assert(iter->columns);
  102. tmp_s = blob_index.DecodeFrom(
  103. WideColumnsHelper::GetDefaultColumn(iter->columns->columns()));
  104. }
  105. if (tmp_s.ok()) {
  106. const uint64_t blob_file_num = blob_index.file_number();
  107. blob_ctxs[blob_file_num].emplace_back(blob_index, &*iter);
  108. } else {
  109. *(iter->s) = tmp_s;
  110. }
  111. } else {
  112. if (iter->value) {
  113. file_range.AddValueSize(iter->value->size());
  114. } else {
  115. assert(iter->columns);
  116. file_range.AddValueSize(iter->columns->serialized_size());
  117. }
  118. if (file_range.GetValueSize() > read_options.value_size_soft_limit) {
  119. s = Status::Aborted();
  120. break;
  121. }
  122. }
  123. continue;
  124. case GetContext::kDeleted:
  125. // Use empty error message for speed
  126. *status = Status::NotFound();
  127. file_range.MarkKeyDone(iter);
  128. continue;
  129. case GetContext::kCorrupt:
  130. *status =
  131. Status::Corruption("corrupted key for ", iter->lkey->user_key());
  132. file_range.MarkKeyDone(iter);
  133. continue;
  134. case GetContext::kUnexpectedBlobIndex:
  135. ROCKS_LOG_ERROR(info_log_, "Encounter unexpected blob index.");
  136. *status = Status::NotSupported(
  137. "Encounter unexpected blob index. Please open DB with "
  138. "ROCKSDB_NAMESPACE::blob_db::BlobDB instead.");
  139. file_range.MarkKeyDone(iter);
  140. continue;
  141. case GetContext::kMergeOperatorFailed:
  142. *status = Status::Corruption(Status::SubCode::kMergeOperatorFailed);
  143. file_range.MarkKeyDone(iter);
  144. continue;
  145. }
  146. }
  147. RecordInHistogram(db_statistics_, SST_BATCH_SIZE, batch_size);
  148. CO_RETURN s;
  149. }
  150. } // namespace ROCKSDB_NAMESPACE
  151. #endif