| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168 |
- // Copyright (c) Meta Platforms, Inc. and affiliates.
- //
- // This source code is licensed under both the GPLv2 (found in the
- // COPYING file in the root directory) and Apache 2.0 License
- // (found in the LICENSE.Apache file in the root directory).
- #include "util/coro_utils.h"
- #if defined(WITHOUT_COROUTINES) || \
- (defined(USE_COROUTINES) && defined(WITH_COROUTINES))
- namespace ROCKSDB_NAMESPACE {
- // Lookup a batch of keys in a single SST file
- DEFINE_SYNC_AND_ASYNC(Status, Version::MultiGetFromSST)
- (const ReadOptions& read_options, MultiGetRange file_range, int hit_file_level,
- bool skip_filters, bool skip_range_deletions, FdWithKeyRange* f,
- std::unordered_map<uint64_t, BlobReadContexts>& blob_ctxs,
- TableCache::TypedHandle* table_handle, uint64_t& num_filter_read,
- uint64_t& num_index_read, uint64_t& num_sst_read) {
- bool timer_enabled = GetPerfLevel() >= PerfLevel::kEnableTimeExceptForMutex &&
- get_perf_context()->per_level_perf_context_enabled;
- Status s;
- StopWatchNano timer(clock_, timer_enabled /* auto_start */);
- s = CO_AWAIT(table_cache_->MultiGet)(
- read_options, *internal_comparator(), *f->file_metadata, &file_range,
- mutable_cf_options_,
- cfd_->internal_stats()->GetFileReadHist(hit_file_level), skip_filters,
- skip_range_deletions, hit_file_level, table_handle);
- // TODO: examine the behavior for corrupted key
- if (timer_enabled) {
- PERF_COUNTER_BY_LEVEL_ADD(get_from_table_nanos, timer.ElapsedNanos(),
- hit_file_level);
- }
- if (!s.ok()) {
- // TODO: Set status for individual keys appropriately
- for (auto iter = file_range.begin(); iter != file_range.end(); ++iter) {
- *iter->s = s;
- file_range.MarkKeyDone(iter);
- }
- CO_RETURN s;
- }
- uint64_t batch_size = 0;
- for (auto iter = file_range.begin(); s.ok() && iter != file_range.end();
- ++iter) {
- GetContext& get_context = *iter->get_context;
- Status* status = iter->s;
- // The Status in the KeyContext takes precedence over GetContext state
- // Status may be an error if there were any IO errors in the table
- // reader. We never expect Status to be NotFound(), as that is
- // determined by get_context
- assert(!status->IsNotFound());
- if (!status->ok()) {
- file_range.MarkKeyDone(iter);
- continue;
- }
- if (get_context.sample()) {
- sample_file_read_inc(f->file_metadata);
- }
- batch_size++;
- num_index_read += get_context.get_context_stats_.num_index_read;
- num_filter_read += get_context.get_context_stats_.num_filter_read;
- num_sst_read += get_context.get_context_stats_.num_sst_read;
- // Reset these stats since they're specific to a level
- get_context.get_context_stats_.num_index_read = 0;
- get_context.get_context_stats_.num_filter_read = 0;
- get_context.get_context_stats_.num_sst_read = 0;
- // report the counters before returning
- if (get_context.State() != GetContext::kNotFound &&
- get_context.State() != GetContext::kMerge &&
- db_statistics_ != nullptr) {
- get_context.ReportCounters();
- } else {
- if (iter->max_covering_tombstone_seq > 0) {
- // The remaining files we look at will only contain covered keys, so
- // we stop here for this key
- file_range.SkipKey(iter);
- }
- }
- switch (get_context.State()) {
- case GetContext::kNotFound:
- // Keep searching in other files
- break;
- case GetContext::kMerge:
- // TODO: update per-level perfcontext user_key_return_count for kMerge
- break;
- case GetContext::kFound:
- if (hit_file_level == 0) {
- RecordTick(db_statistics_, GET_HIT_L0);
- } else if (hit_file_level == 1) {
- RecordTick(db_statistics_, GET_HIT_L1);
- } else if (hit_file_level >= 2) {
- RecordTick(db_statistics_, GET_HIT_L2_AND_UP);
- }
- PERF_COUNTER_BY_LEVEL_ADD(user_key_return_count, 1, hit_file_level);
- file_range.MarkKeyDone(iter);
- if (iter->is_blob_index) {
- BlobIndex blob_index;
- Status tmp_s;
- if (iter->value) {
- TEST_SYNC_POINT_CALLBACK("Version::MultiGet::TamperWithBlobIndex",
- &(*iter));
- tmp_s = blob_index.DecodeFrom(*(iter->value));
- } else {
- assert(iter->columns);
- tmp_s = blob_index.DecodeFrom(
- WideColumnsHelper::GetDefaultColumn(iter->columns->columns()));
- }
- if (tmp_s.ok()) {
- const uint64_t blob_file_num = blob_index.file_number();
- blob_ctxs[blob_file_num].emplace_back(blob_index, &*iter);
- } else {
- *(iter->s) = tmp_s;
- }
- } else {
- if (iter->value) {
- file_range.AddValueSize(iter->value->size());
- } else {
- assert(iter->columns);
- file_range.AddValueSize(iter->columns->serialized_size());
- }
- if (file_range.GetValueSize() > read_options.value_size_soft_limit) {
- s = Status::Aborted();
- break;
- }
- }
- continue;
- case GetContext::kDeleted:
- // Use empty error message for speed
- *status = Status::NotFound();
- file_range.MarkKeyDone(iter);
- continue;
- case GetContext::kCorrupt:
- *status =
- Status::Corruption("corrupted key for ", iter->lkey->user_key());
- file_range.MarkKeyDone(iter);
- continue;
- case GetContext::kUnexpectedBlobIndex:
- ROCKS_LOG_ERROR(info_log_, "Encounter unexpected blob index.");
- *status = Status::NotSupported(
- "Encounter unexpected blob index. Please open DB with "
- "ROCKSDB_NAMESPACE::blob_db::BlobDB instead.");
- file_range.MarkKeyDone(iter);
- continue;
- case GetContext::kMergeOperatorFailed:
- *status = Status::Corruption(Status::SubCode::kMergeOperatorFailed);
- file_range.MarkKeyDone(iter);
- continue;
- }
- }
- RecordInHistogram(db_statistics_, SST_BATCH_SIZE, batch_size);
- CO_RETURN s;
- }
- } // namespace ROCKSDB_NAMESPACE
- #endif
|