| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189 |
- // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
- // This source code is licensed under both the GPLv2 (found in the
- // COPYING file in the root directory) and Apache 2.0 License
- // (found in the LICENSE.Apache file in the root directory).
- //
- // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
- // Use of this source code is governed by a BSD-style license that can be
- // found in the LICENSE file. See the AUTHORS file for names of contributors.
- #include "file/random_access_file_reader.h"
- #include <algorithm>
- #include <mutex>
- #include "monitoring/histogram.h"
- #include "monitoring/iostats_context_imp.h"
- #include "port/port.h"
- #include "test_util/sync_point.h"
- #include "util/random.h"
- #include "util/rate_limiter.h"
- namespace ROCKSDB_NAMESPACE {
- Status RandomAccessFileReader::Read(uint64_t offset, size_t n, Slice* result,
- char* scratch, bool for_compaction) const {
- Status s;
- uint64_t elapsed = 0;
- {
- StopWatch sw(env_, stats_, hist_type_,
- (stats_ != nullptr) ? &elapsed : nullptr, true /*overwrite*/,
- true /*delay_enabled*/);
- auto prev_perf_level = GetPerfLevel();
- IOSTATS_TIMER_GUARD(read_nanos);
- if (use_direct_io()) {
- #ifndef ROCKSDB_LITE
- size_t alignment = file_->GetRequiredBufferAlignment();
- size_t aligned_offset =
- TruncateToPageBoundary(alignment, static_cast<size_t>(offset));
- size_t offset_advance = static_cast<size_t>(offset) - aligned_offset;
- size_t read_size =
- Roundup(static_cast<size_t>(offset + n), alignment) - aligned_offset;
- AlignedBuffer buf;
- buf.Alignment(alignment);
- buf.AllocateNewBuffer(read_size);
- while (buf.CurrentSize() < read_size) {
- size_t allowed;
- if (for_compaction && rate_limiter_ != nullptr) {
- allowed = rate_limiter_->RequestToken(
- buf.Capacity() - buf.CurrentSize(), buf.Alignment(),
- Env::IOPriority::IO_LOW, stats_, RateLimiter::OpType::kRead);
- } else {
- assert(buf.CurrentSize() == 0);
- allowed = read_size;
- }
- Slice tmp;
- FileOperationInfo::TimePoint start_ts;
- uint64_t orig_offset = 0;
- if (ShouldNotifyListeners()) {
- start_ts = std::chrono::system_clock::now();
- orig_offset = aligned_offset + buf.CurrentSize();
- }
- {
- IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, env_);
- s = file_->Read(aligned_offset + buf.CurrentSize(), allowed,
- IOOptions(), &tmp, buf.Destination(), nullptr);
- }
- if (ShouldNotifyListeners()) {
- auto finish_ts = std::chrono::system_clock::now();
- NotifyOnFileReadFinish(orig_offset, tmp.size(), start_ts, finish_ts,
- s);
- }
- buf.Size(buf.CurrentSize() + tmp.size());
- if (!s.ok() || tmp.size() < allowed) {
- break;
- }
- }
- size_t res_len = 0;
- if (s.ok() && offset_advance < buf.CurrentSize()) {
- res_len = buf.Read(scratch, offset_advance,
- std::min(buf.CurrentSize() - offset_advance, n));
- }
- *result = Slice(scratch, res_len);
- #endif // !ROCKSDB_LITE
- } else {
- size_t pos = 0;
- const char* res_scratch = nullptr;
- while (pos < n) {
- size_t allowed;
- if (for_compaction && rate_limiter_ != nullptr) {
- if (rate_limiter_->IsRateLimited(RateLimiter::OpType::kRead)) {
- sw.DelayStart();
- }
- allowed = rate_limiter_->RequestToken(n - pos, 0 /* alignment */,
- Env::IOPriority::IO_LOW, stats_,
- RateLimiter::OpType::kRead);
- if (rate_limiter_->IsRateLimited(RateLimiter::OpType::kRead)) {
- sw.DelayStop();
- }
- } else {
- allowed = n;
- }
- Slice tmp_result;
- #ifndef ROCKSDB_LITE
- FileOperationInfo::TimePoint start_ts;
- if (ShouldNotifyListeners()) {
- start_ts = std::chrono::system_clock::now();
- }
- #endif
- {
- IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, env_);
- s = file_->Read(offset + pos, allowed, IOOptions(), &tmp_result,
- scratch + pos, nullptr);
- }
- #ifndef ROCKSDB_LITE
- if (ShouldNotifyListeners()) {
- auto finish_ts = std::chrono::system_clock::now();
- NotifyOnFileReadFinish(offset + pos, tmp_result.size(), start_ts,
- finish_ts, s);
- }
- #endif
- if (res_scratch == nullptr) {
- // we can't simply use `scratch` because reads of mmap'd files return
- // data in a different buffer.
- res_scratch = tmp_result.data();
- } else {
- // make sure chunks are inserted contiguously into `res_scratch`.
- assert(tmp_result.data() == res_scratch + pos);
- }
- pos += tmp_result.size();
- if (!s.ok() || tmp_result.size() < allowed) {
- break;
- }
- }
- *result = Slice(res_scratch, s.ok() ? pos : 0);
- }
- IOSTATS_ADD_IF_POSITIVE(bytes_read, result->size());
- SetPerfLevel(prev_perf_level);
- }
- if (stats_ != nullptr && file_read_hist_ != nullptr) {
- file_read_hist_->Add(elapsed);
- }
- return s;
- }
- Status RandomAccessFileReader::MultiRead(FSReadRequest* read_reqs,
- size_t num_reqs) const {
- Status s;
- uint64_t elapsed = 0;
- assert(!use_direct_io());
- {
- StopWatch sw(env_, stats_, hist_type_,
- (stats_ != nullptr) ? &elapsed : nullptr, true /*overwrite*/,
- true /*delay_enabled*/);
- auto prev_perf_level = GetPerfLevel();
- IOSTATS_TIMER_GUARD(read_nanos);
- #ifndef ROCKSDB_LITE
- FileOperationInfo::TimePoint start_ts;
- if (ShouldNotifyListeners()) {
- start_ts = std::chrono::system_clock::now();
- }
- #endif // ROCKSDB_LITE
- {
- IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, env_);
- s = file_->MultiRead(read_reqs, num_reqs, IOOptions(), nullptr);
- }
- for (size_t i = 0; i < num_reqs; ++i) {
- #ifndef ROCKSDB_LITE
- if (ShouldNotifyListeners()) {
- auto finish_ts = std::chrono::system_clock::now();
- NotifyOnFileReadFinish(read_reqs[i].offset, read_reqs[i].result.size(),
- start_ts, finish_ts, read_reqs[i].status);
- }
- #endif // ROCKSDB_LITE
- IOSTATS_ADD_IF_POSITIVE(bytes_read, read_reqs[i].result.size());
- }
- SetPerfLevel(prev_perf_level);
- }
- if (stats_ != nullptr && file_read_hist_ != nullptr) {
- file_read_hist_->Add(elapsed);
- }
- return s;
- }
- } // namespace ROCKSDB_NAMESPACE
|