| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666 |
- // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
- // This source code is licensed under both the GPLv2 (found in the
- // COPYING file in the root directory) and Apache 2.0 License
- // (found in the LICENSE.Apache file in the root directory).
- //
- // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
- // Use of this source code is governed by a BSD-style license that can be
- // found in the LICENSE file. See the AUTHORS file for names of contributors.
- #include "file/random_access_file_reader.h"
- #include <algorithm>
- #include <mutex>
- #include "file/file_util.h"
- #include "monitoring/histogram.h"
- #include "monitoring/iostats_context_imp.h"
- #include "port/port.h"
- #include "table/format.h"
- #include "test_util/sync_point.h"
- #include "util/random.h"
- #include "util/rate_limiter_impl.h"
- namespace ROCKSDB_NAMESPACE {
- inline Histograms GetFileReadHistograms(Statistics* stats,
- Env::IOActivity io_activity) {
- switch (io_activity) {
- case Env::IOActivity::kFlush:
- return Histograms::FILE_READ_FLUSH_MICROS;
- case Env::IOActivity::kCompaction:
- return Histograms::FILE_READ_COMPACTION_MICROS;
- case Env::IOActivity::kDBOpen:
- return Histograms::FILE_READ_DB_OPEN_MICROS;
- default:
- break;
- }
- if (stats && stats->get_stats_level() > StatsLevel::kExceptDetailedTimers) {
- switch (io_activity) {
- case Env::IOActivity::kGet:
- return Histograms::FILE_READ_GET_MICROS;
- case Env::IOActivity::kMultiGet:
- return Histograms::FILE_READ_MULTIGET_MICROS;
- case Env::IOActivity::kDBIterator:
- return Histograms::FILE_READ_DB_ITERATOR_MICROS;
- case Env::IOActivity::kVerifyDBChecksum:
- return Histograms::FILE_READ_VERIFY_DB_CHECKSUM_MICROS;
- case Env::IOActivity::kVerifyFileChecksums:
- return Histograms::FILE_READ_VERIFY_FILE_CHECKSUMS_MICROS;
- default:
- break;
- }
- }
- return Histograms::HISTOGRAM_ENUM_MAX;
- }
- inline void RecordIOStats(Statistics* stats, Temperature file_temperature,
- bool is_last_level, size_t size) {
- IOSTATS_ADD(bytes_read, size);
- // record for last/non-last level
- if (is_last_level) {
- RecordTick(stats, LAST_LEVEL_READ_BYTES, size);
- RecordTick(stats, LAST_LEVEL_READ_COUNT, 1);
- } else {
- RecordTick(stats, NON_LAST_LEVEL_READ_BYTES, size);
- RecordTick(stats, NON_LAST_LEVEL_READ_COUNT, 1);
- }
- // record for temperature file
- if (file_temperature != Temperature::kUnknown) {
- switch (file_temperature) {
- case Temperature::kHot:
- IOSTATS_ADD(file_io_stats_by_temperature.hot_file_bytes_read, size);
- IOSTATS_ADD(file_io_stats_by_temperature.hot_file_read_count, 1);
- RecordTick(stats, HOT_FILE_READ_BYTES, size);
- RecordTick(stats, HOT_FILE_READ_COUNT, 1);
- break;
- case Temperature::kWarm:
- IOSTATS_ADD(file_io_stats_by_temperature.warm_file_bytes_read, size);
- IOSTATS_ADD(file_io_stats_by_temperature.warm_file_read_count, 1);
- RecordTick(stats, WARM_FILE_READ_BYTES, size);
- RecordTick(stats, WARM_FILE_READ_COUNT, 1);
- break;
- case Temperature::kCool:
- IOSTATS_ADD(file_io_stats_by_temperature.cool_file_bytes_read, size);
- IOSTATS_ADD(file_io_stats_by_temperature.cool_file_read_count, 1);
- RecordTick(stats, COOL_FILE_READ_BYTES, size);
- RecordTick(stats, COOL_FILE_READ_COUNT, 1);
- break;
- case Temperature::kCold:
- IOSTATS_ADD(file_io_stats_by_temperature.cold_file_bytes_read, size);
- IOSTATS_ADD(file_io_stats_by_temperature.cold_file_read_count, 1);
- RecordTick(stats, COLD_FILE_READ_BYTES, size);
- RecordTick(stats, COLD_FILE_READ_COUNT, 1);
- break;
- case Temperature::kIce:
- IOSTATS_ADD(file_io_stats_by_temperature.ice_file_bytes_read, size);
- IOSTATS_ADD(file_io_stats_by_temperature.ice_file_read_count, 1);
- RecordTick(stats, ICE_FILE_READ_BYTES, size);
- RecordTick(stats, ICE_FILE_READ_COUNT, 1);
- break;
- default:
- break;
- }
- }
- }
- IOStatus RandomAccessFileReader::Create(
- const std::shared_ptr<FileSystem>& fs, const std::string& fname,
- const FileOptions& file_opts,
- std::unique_ptr<RandomAccessFileReader>* reader, IODebugContext* dbg) {
- std::unique_ptr<FSRandomAccessFile> file;
- IOStatus io_s = fs->NewRandomAccessFile(fname, file_opts, &file, dbg);
- if (io_s.ok()) {
- reader->reset(new RandomAccessFileReader(std::move(file), fname));
- }
- return io_s;
- }
- IOStatus RandomAccessFileReader::Read(const IOOptions& opts, uint64_t offset,
- size_t n, Slice* result, char* scratch,
- AlignedBuf* aligned_buf,
- IODebugContext* dbg) const {
- (void)aligned_buf;
- const Env::IOPriority rate_limiter_priority = opts.rate_limiter_priority;
- TEST_SYNC_POINT_CALLBACK("RandomAccessFileReader::Read", nullptr);
- TEST_SYNC_POINT_CALLBACK("RandomAccessFileReader::Read:IODebugContext",
- const_cast<void*>(static_cast<void*>(dbg)));
- // To be paranoid: modify scratch a little bit, so in case underlying
- // FileSystem doesn't fill the buffer but return success and `scratch` returns
- // contains a previous block, returned value will not pass checksum.
- if (n > 0 && scratch != nullptr) {
- // This byte might not change anything for direct I/O case, but it's OK.
- scratch[0]++;
- }
- IOStatus io_s;
- uint64_t elapsed = 0;
- size_t alignment = file_->GetRequiredBufferAlignment();
- bool is_aligned = false;
- if (scratch != nullptr) {
- // Check if offset, length and buffer are aligned.
- is_aligned = (offset & (alignment - 1)) == 0 &&
- (n & (alignment - 1)) == 0 &&
- (uintptr_t(scratch) & (alignment - 1)) == 0;
- }
- {
- StopWatch sw(clock_, stats_, hist_type_,
- GetFileReadHistograms(stats_, opts.io_activity),
- (stats_ != nullptr) ? &elapsed : nullptr, true /*overwrite*/,
- true /*delay_enabled*/);
- auto prev_perf_level = GetPerfLevel();
- IOSTATS_TIMER_GUARD(read_nanos);
- if (use_direct_io() && is_aligned == false) {
- size_t aligned_offset =
- TruncateToPageBoundary(alignment, static_cast<size_t>(offset));
- size_t offset_advance = static_cast<size_t>(offset) - aligned_offset;
- size_t read_size =
- Roundup(static_cast<size_t>(offset + n), alignment) - aligned_offset;
- AlignedBuffer buf;
- buf.Alignment(alignment);
- buf.AllocateNewBuffer(read_size);
- while (buf.CurrentSize() < read_size) {
- size_t allowed;
- if (rate_limiter_priority != Env::IO_TOTAL &&
- rate_limiter_ != nullptr) {
- allowed = rate_limiter_->RequestToken(
- buf.Capacity() - buf.CurrentSize(), buf.Alignment(),
- rate_limiter_priority, stats_, RateLimiter::OpType::kRead);
- } else {
- assert(buf.CurrentSize() == 0);
- allowed = read_size;
- }
- Slice tmp;
- FileOperationInfo::StartTimePoint start_ts;
- uint64_t orig_offset = 0;
- if (ShouldNotifyListeners()) {
- start_ts = FileOperationInfo::StartNow();
- orig_offset = aligned_offset + buf.CurrentSize();
- }
- {
- IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, clock_);
- // Only user reads are expected to specify a timeout. And user reads
- // are not subjected to rate_limiter and should go through only
- // one iteration of this loop, so we don't need to check and adjust
- // the opts.timeout before calling file_->Read
- assert(!opts.timeout.count() || allowed == read_size);
- io_s = file_->Read(aligned_offset + buf.CurrentSize(), allowed, opts,
- &tmp, buf.Destination(), dbg);
- }
- if (ShouldNotifyListeners()) {
- auto finish_ts = FileOperationInfo::FinishNow();
- NotifyOnFileReadFinish(orig_offset, tmp.size(), start_ts, finish_ts,
- io_s);
- if (!io_s.ok()) {
- NotifyOnIOError(io_s, FileOperationType::kRead, file_name(),
- tmp.size(), orig_offset);
- }
- }
- buf.Size(buf.CurrentSize() + tmp.size());
- if (!io_s.ok() || tmp.size() < allowed) {
- break;
- }
- }
- size_t res_len = 0;
- if (io_s.ok() && offset_advance < buf.CurrentSize()) {
- res_len = std::min(buf.CurrentSize() - offset_advance, n);
- if (aligned_buf == nullptr) {
- buf.Read(scratch, offset_advance, res_len);
- } else {
- scratch = buf.BufferStart() + offset_advance;
- *aligned_buf = buf.Release();
- }
- }
- *result = Slice(scratch, res_len);
- } else {
- size_t pos = 0;
- const char* res_scratch = nullptr;
- while (pos < n) {
- size_t allowed;
- if (rate_limiter_priority != Env::IO_TOTAL &&
- rate_limiter_ != nullptr) {
- if (rate_limiter_->IsRateLimited(RateLimiter::OpType::kRead)) {
- sw.DelayStart();
- }
- allowed = rate_limiter_->RequestToken(
- n - pos, (use_direct_io() ? alignment : 0), rate_limiter_priority,
- stats_, RateLimiter::OpType::kRead);
- if (rate_limiter_->IsRateLimited(RateLimiter::OpType::kRead)) {
- sw.DelayStop();
- }
- } else {
- allowed = n;
- }
- Slice tmp_result;
- FileOperationInfo::StartTimePoint start_ts;
- if (ShouldNotifyListeners()) {
- start_ts = FileOperationInfo::StartNow();
- }
- {
- IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, clock_);
- // Only user reads are expected to specify a timeout. And user reads
- // are not subjected to rate_limiter and should go through only
- // one iteration of this loop, so we don't need to check and adjust
- // the opts.timeout before calling file_->Read
- assert(!opts.timeout.count() || allowed == n);
- io_s = file_->Read(offset + pos, allowed, opts, &tmp_result,
- scratch + pos, dbg);
- }
- if (ShouldNotifyListeners()) {
- auto finish_ts = FileOperationInfo::FinishNow();
- NotifyOnFileReadFinish(offset + pos, tmp_result.size(), start_ts,
- finish_ts, io_s);
- if (!io_s.ok()) {
- NotifyOnIOError(io_s, FileOperationType::kRead, file_name(),
- tmp_result.size(), offset + pos);
- }
- }
- if (res_scratch == nullptr) {
- // we can't simply use `scratch` because reads of mmap'd files return
- // data in a different buffer.
- res_scratch = tmp_result.data();
- } else {
- // make sure chunks are inserted contiguously into `res_scratch`.
- assert(tmp_result.data() == res_scratch + pos);
- }
- pos += tmp_result.size();
- if (!io_s.ok() || tmp_result.size() < allowed) {
- break;
- }
- }
- *result = Slice(res_scratch, io_s.ok() ? pos : 0);
- }
- RecordIOStats(stats_, file_temperature_, is_last_level_, result->size());
- SetPerfLevel(prev_perf_level);
- }
- if (stats_ != nullptr && file_read_hist_ != nullptr) {
- file_read_hist_->Add(elapsed);
- }
- #ifndef NDEBUG
- auto pair = std::make_pair(&file_name_, &io_s);
- if (offset == 0) {
- TEST_SYNC_POINT_CALLBACK("RandomAccessFileReader::Read::BeforeReturn",
- &pair);
- }
- TEST_SYNC_POINT_CALLBACK("RandomAccessFileReader::Read::AnyOffset", &pair);
- #endif
- return io_s;
- }
- size_t End(const FSReadRequest& r) {
- return static_cast<size_t>(r.offset) + r.len;
- }
- FSReadRequest Align(const FSReadRequest& r, size_t alignment) {
- FSReadRequest req;
- req.offset = static_cast<uint64_t>(
- TruncateToPageBoundary(alignment, static_cast<size_t>(r.offset)));
- req.len = Roundup(End(r), alignment) - req.offset;
- req.scratch = nullptr;
- return req;
- }
- bool TryMerge(FSReadRequest* dest, const FSReadRequest& src) {
- size_t dest_offset = static_cast<size_t>(dest->offset);
- size_t src_offset = static_cast<size_t>(src.offset);
- size_t dest_end = End(*dest);
- size_t src_end = End(src);
- if (std::max(dest_offset, src_offset) > std::min(dest_end, src_end)) {
- return false;
- }
- dest->offset = static_cast<uint64_t>(std::min(dest_offset, src_offset));
- dest->len = std::max(dest_end, src_end) - dest->offset;
- return true;
- }
- IOStatus RandomAccessFileReader::MultiRead(const IOOptions& opts,
- FSReadRequest* read_reqs,
- size_t num_reqs,
- AlignedBuf* aligned_buf,
- IODebugContext* dbg) const {
- (void)aligned_buf; // suppress warning of unused variable in LITE mode
- assert(num_reqs > 0);
- #ifndef NDEBUG
- for (size_t i = 0; i < num_reqs - 1; ++i) {
- assert(read_reqs[i].offset <= read_reqs[i + 1].offset);
- }
- #endif // !NDEBUG
- const Env::IOPriority rate_limiter_priority = opts.rate_limiter_priority;
- // To be paranoid modify scratch a little bit, so in case underlying
- // FileSystem doesn't fill the buffer but return success and `scratch` returns
- // contains a previous block, returned value will not pass checksum.
- // This byte might not change anything for direct I/O case, but it's OK.
- for (size_t i = 0; i < num_reqs; i++) {
- FSReadRequest& r = read_reqs[i];
- if (r.len > 0 && r.scratch != nullptr) {
- r.scratch[0]++;
- }
- }
- IOStatus io_s;
- uint64_t elapsed = 0;
- {
- StopWatch sw(clock_, stats_, hist_type_,
- GetFileReadHistograms(stats_, opts.io_activity),
- (stats_ != nullptr) ? &elapsed : nullptr, true /*overwrite*/,
- true /*delay_enabled*/);
- auto prev_perf_level = GetPerfLevel();
- IOSTATS_TIMER_GUARD(read_nanos);
- FSReadRequest* fs_reqs = read_reqs;
- size_t num_fs_reqs = num_reqs;
- std::vector<FSReadRequest> aligned_reqs;
- if (use_direct_io()) {
- // num_reqs is the max possible size,
- // this can reduce std::vecector's internal resize operations.
- aligned_reqs.reserve(num_reqs);
- // Align and merge the read requests.
- size_t alignment = file_->GetRequiredBufferAlignment();
- for (size_t i = 0; i < num_reqs; i++) {
- FSReadRequest r = Align(read_reqs[i], alignment);
- if (i == 0) {
- // head
- aligned_reqs.push_back(std::move(r));
- } else if (!TryMerge(&aligned_reqs.back(), r)) {
- // head + n
- aligned_reqs.push_back(std::move(r));
- } else {
- // unused
- r.status.PermitUncheckedError();
- }
- }
- TEST_SYNC_POINT_CALLBACK("RandomAccessFileReader::MultiRead:AlignedReqs",
- &aligned_reqs);
- // Allocate aligned buffer and let scratch buffers point to it.
- size_t total_len = 0;
- for (const auto& r : aligned_reqs) {
- total_len += r.len;
- }
- AlignedBuffer buf;
- buf.Alignment(alignment);
- buf.AllocateNewBuffer(total_len);
- char* scratch = buf.BufferStart();
- for (auto& r : aligned_reqs) {
- r.scratch = scratch;
- scratch += r.len;
- }
- *aligned_buf = buf.Release();
- fs_reqs = aligned_reqs.data();
- num_fs_reqs = aligned_reqs.size();
- }
- FileOperationInfo::StartTimePoint start_ts;
- if (ShouldNotifyListeners()) {
- start_ts = FileOperationInfo::StartNow();
- }
- {
- IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, clock_);
- if (rate_limiter_priority != Env::IO_TOTAL && rate_limiter_ != nullptr) {
- // TODO: ideally we should call `RateLimiter::RequestToken()` for
- // allowed bytes to multi-read and then consume those bytes by
- // satisfying as many requests in `MultiRead()` as possible, instead of
- // what we do here, which can cause burst when the
- // `total_multi_read_size` is big.
- size_t total_multi_read_size = 0;
- assert(fs_reqs != nullptr);
- for (size_t i = 0; i < num_fs_reqs; ++i) {
- FSReadRequest& req = fs_reqs[i];
- total_multi_read_size += req.len;
- }
- size_t remaining_bytes = total_multi_read_size;
- size_t request_bytes = 0;
- while (remaining_bytes > 0) {
- request_bytes = std::min(
- static_cast<size_t>(rate_limiter_->GetSingleBurstBytes()),
- remaining_bytes);
- rate_limiter_->Request(request_bytes, rate_limiter_priority,
- nullptr /* stats */,
- RateLimiter::OpType::kRead);
- remaining_bytes -= request_bytes;
- }
- }
- TEST_SYNC_POINT_CALLBACK(
- "RandomAccessFileReader::MultiRead:IODebugContext",
- const_cast<void*>(static_cast<void*>(dbg)));
- io_s = file_->MultiRead(fs_reqs, num_fs_reqs, opts, dbg);
- RecordInHistogram(stats_, MULTIGET_IO_BATCH_SIZE, num_fs_reqs);
- }
- if (use_direct_io()) {
- // Populate results in the unaligned read requests.
- size_t aligned_i = 0;
- for (size_t i = 0; i < num_reqs; i++) {
- auto& r = read_reqs[i];
- if (static_cast<size_t>(r.offset) > End(aligned_reqs[aligned_i])) {
- aligned_i++;
- }
- const auto& fs_r = fs_reqs[aligned_i];
- r.status = fs_r.status;
- if (r.status.ok()) {
- uint64_t offset = r.offset - fs_r.offset;
- if (fs_r.result.size() <= offset) {
- // No byte in the read range is returned.
- r.result = Slice();
- } else {
- size_t len = std::min(
- r.len, static_cast<size_t>(fs_r.result.size() - offset));
- r.result = Slice(fs_r.scratch + offset, len);
- }
- } else {
- r.result = Slice();
- }
- }
- }
- for (size_t i = 0; i < num_reqs; ++i) {
- if (ShouldNotifyListeners()) {
- auto finish_ts = FileOperationInfo::FinishNow();
- NotifyOnFileReadFinish(read_reqs[i].offset, read_reqs[i].result.size(),
- start_ts, finish_ts, read_reqs[i].status);
- }
- if (!read_reqs[i].status.ok()) {
- NotifyOnIOError(read_reqs[i].status, FileOperationType::kRead,
- file_name(), read_reqs[i].result.size(),
- read_reqs[i].offset);
- }
- RecordIOStats(stats_, file_temperature_, is_last_level_,
- read_reqs[i].result.size());
- }
- SetPerfLevel(prev_perf_level);
- }
- if (stats_ != nullptr && file_read_hist_ != nullptr) {
- file_read_hist_->Add(elapsed);
- }
- return io_s;
- }
- IOStatus RandomAccessFileReader::PrepareIOOptions(const ReadOptions& ro,
- IOOptions& opts,
- IODebugContext* dbg) const {
- if (clock_ != nullptr) {
- return PrepareIOFromReadOptions(ro, clock_, opts, dbg);
- } else {
- return PrepareIOFromReadOptions(ro, SystemClock::Default().get(), opts,
- dbg);
- }
- }
- // Notes for when direct_io is enabled:
- // Unless req.offset, req.len, req.scratch are all already aligned,
- // RandomAccessFileReader will creats aligned requests and aligned buffer for
- // the request. User should only provide either req.scratch or aligned_buf. If
- // only req.scratch is provided, result will be copied from allocated aligned
- // buffer to req.scratch. If only alignd_buf is provided, it will be set to
- // the ailgned buf allocated by RandomAccessFileReader and saves a copy.
- IOStatus RandomAccessFileReader::ReadAsync(
- FSReadRequest& req, const IOOptions& opts,
- std::function<void(FSReadRequest&, void*)> cb, void* cb_arg,
- void** io_handle, IOHandleDeleter* del_fn, AlignedBuf* aligned_buf,
- IODebugContext* dbg) {
- IOStatus s;
- // Create a callback and populate info.
- auto read_async_callback =
- std::bind(&RandomAccessFileReader::ReadAsyncCallback, this,
- std::placeholders::_1, std::placeholders::_2);
- ReadAsyncInfo* read_async_info = new ReadAsyncInfo(
- cb, cb_arg, (clock_ != nullptr ? clock_->NowMicros() : 0));
- if (ShouldNotifyListeners()) {
- read_async_info->fs_start_ts_ = FileOperationInfo::StartNow();
- }
- size_t alignment = file_->GetRequiredBufferAlignment();
- bool is_aligned = (req.offset & (alignment - 1)) == 0 &&
- (req.len & (alignment - 1)) == 0 &&
- (uintptr_t(req.scratch) & (alignment - 1)) == 0;
- read_async_info->is_aligned_ = is_aligned;
- uint64_t elapsed = 0;
- if (use_direct_io() && is_aligned == false) {
- FSReadRequest aligned_req = Align(req, alignment);
- aligned_req.status.PermitUncheckedError();
- // Allocate aligned buffer.
- read_async_info->buf_.Alignment(alignment);
- read_async_info->buf_.AllocateNewBuffer(aligned_req.len);
- // Set rem fields in aligned FSReadRequest.
- aligned_req.scratch = read_async_info->buf_.BufferStart();
- // Set user provided fields to populate back in callback.
- read_async_info->user_scratch_ = req.scratch;
- read_async_info->user_aligned_buf_ = aligned_buf;
- read_async_info->user_len_ = req.len;
- read_async_info->user_offset_ = req.offset;
- read_async_info->user_result_ = req.result;
- assert(read_async_info->buf_.CurrentSize() == 0);
- StopWatch sw(clock_, stats_, hist_type_,
- GetFileReadHistograms(stats_, opts.io_activity),
- (stats_ != nullptr) ? &elapsed : nullptr, true /*overwrite*/,
- true /*delay_enabled*/);
- s = file_->ReadAsync(aligned_req, opts, read_async_callback,
- read_async_info, io_handle, del_fn, dbg);
- } else {
- StopWatch sw(clock_, stats_, hist_type_,
- GetFileReadHistograms(stats_, opts.io_activity),
- (stats_ != nullptr) ? &elapsed : nullptr, true /*overwrite*/,
- true /*delay_enabled*/);
- s = file_->ReadAsync(req, opts, read_async_callback, read_async_info,
- io_handle, del_fn, dbg);
- }
- RecordTick(stats_, READ_ASYNC_MICROS, elapsed);
- // Suppress false positive clang analyzer warnings.
- // Memory is not released if file_->ReadAsync returns !s.ok(), because
- // ReadAsyncCallback is never called in that case. If ReadAsyncCallback is
- // called then ReadAsync should always return IOStatus::OK().
- #ifndef __clang_analyzer__
- if (!s.ok()) {
- delete read_async_info;
- }
- #endif // __clang_analyzer__
- return s;
- }
- void RandomAccessFileReader::ReadAsyncCallback(FSReadRequest& req,
- void* cb_arg) {
- ReadAsyncInfo* read_async_info = static_cast<ReadAsyncInfo*>(cb_arg);
- assert(read_async_info);
- assert(read_async_info->cb_);
- if (use_direct_io() && read_async_info->is_aligned_ == false) {
- // Create FSReadRequest with user provided fields.
- FSReadRequest user_req;
- user_req.scratch = read_async_info->user_scratch_;
- user_req.offset = read_async_info->user_offset_;
- user_req.len = read_async_info->user_len_;
- // Update results in user_req.
- user_req.result = req.result;
- user_req.status = req.status;
- read_async_info->buf_.Size(read_async_info->buf_.CurrentSize() +
- req.result.size());
- size_t offset_advance_len = static_cast<size_t>(
- /*offset_passed_by_user=*/read_async_info->user_offset_ -
- /*aligned_offset=*/req.offset);
- size_t res_len = 0;
- if (req.status.ok() &&
- offset_advance_len < read_async_info->buf_.CurrentSize()) {
- res_len =
- std::min(read_async_info->buf_.CurrentSize() - offset_advance_len,
- read_async_info->user_len_);
- if (read_async_info->user_aligned_buf_ == nullptr) {
- // Copy the data into user's scratch.
- // Clang analyzer assumes that it will take use_direct_io() == false in
- // ReadAsync and use_direct_io() == true in Callback which cannot be true.
- #ifndef __clang_analyzer__
- read_async_info->buf_.Read(user_req.scratch, offset_advance_len,
- res_len);
- #endif // __clang_analyzer__
- } else {
- // Set aligned_buf provided by user without additional copy.
- user_req.scratch =
- read_async_info->buf_.BufferStart() + offset_advance_len;
- *read_async_info->user_aligned_buf_ = read_async_info->buf_.Release();
- }
- user_req.result = Slice(user_req.scratch, res_len);
- } else {
- // Either req.status is not ok or data was not read.
- user_req.result = Slice();
- }
- read_async_info->cb_(user_req, read_async_info->cb_arg_);
- } else {
- read_async_info->cb_(req, read_async_info->cb_arg_);
- }
- // Update stats and notify listeners.
- if (stats_ != nullptr && file_read_hist_ != nullptr) {
- // elapsed doesn't take into account delay and overwrite as StopWatch does
- // in Read.
- uint64_t elapsed = clock_->NowMicros() - read_async_info->start_time_;
- file_read_hist_->Add(elapsed);
- }
- if (req.status.ok()) {
- RecordInHistogram(stats_, ASYNC_READ_BYTES, req.result.size());
- } else if (!req.status.IsAborted()) {
- RecordTick(stats_, ASYNC_READ_ERROR_COUNT, 1);
- }
- if (ShouldNotifyListeners()) {
- auto finish_ts = FileOperationInfo::FinishNow();
- NotifyOnFileReadFinish(req.offset, req.result.size(),
- read_async_info->fs_start_ts_, finish_ts,
- req.status);
- }
- if (!req.status.ok()) {
- NotifyOnIOError(req.status, FileOperationType::kRead, file_name(),
- req.result.size(), req.offset);
- }
- RecordIOStats(stats_, file_temperature_, is_last_level_, req.result.size());
- delete read_async_info;
- }
- } // namespace ROCKSDB_NAMESPACE
|