random_access_file_reader.cc 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #include "file/random_access_file_reader.h"
  10. #include <algorithm>
  11. #include <mutex>
  12. #include "monitoring/histogram.h"
  13. #include "monitoring/iostats_context_imp.h"
  14. #include "port/port.h"
  15. #include "test_util/sync_point.h"
  16. #include "util/random.h"
  17. #include "util/rate_limiter.h"
  18. namespace ROCKSDB_NAMESPACE {
  19. Status RandomAccessFileReader::Read(uint64_t offset, size_t n, Slice* result,
  20. char* scratch, bool for_compaction) const {
  21. Status s;
  22. uint64_t elapsed = 0;
  23. {
  24. StopWatch sw(env_, stats_, hist_type_,
  25. (stats_ != nullptr) ? &elapsed : nullptr, true /*overwrite*/,
  26. true /*delay_enabled*/);
  27. auto prev_perf_level = GetPerfLevel();
  28. IOSTATS_TIMER_GUARD(read_nanos);
  29. if (use_direct_io()) {
  30. #ifndef ROCKSDB_LITE
  31. size_t alignment = file_->GetRequiredBufferAlignment();
  32. size_t aligned_offset =
  33. TruncateToPageBoundary(alignment, static_cast<size_t>(offset));
  34. size_t offset_advance = static_cast<size_t>(offset) - aligned_offset;
  35. size_t read_size =
  36. Roundup(static_cast<size_t>(offset + n), alignment) - aligned_offset;
  37. AlignedBuffer buf;
  38. buf.Alignment(alignment);
  39. buf.AllocateNewBuffer(read_size);
  40. while (buf.CurrentSize() < read_size) {
  41. size_t allowed;
  42. if (for_compaction && rate_limiter_ != nullptr) {
  43. allowed = rate_limiter_->RequestToken(
  44. buf.Capacity() - buf.CurrentSize(), buf.Alignment(),
  45. Env::IOPriority::IO_LOW, stats_, RateLimiter::OpType::kRead);
  46. } else {
  47. assert(buf.CurrentSize() == 0);
  48. allowed = read_size;
  49. }
  50. Slice tmp;
  51. FileOperationInfo::TimePoint start_ts;
  52. uint64_t orig_offset = 0;
  53. if (ShouldNotifyListeners()) {
  54. start_ts = std::chrono::system_clock::now();
  55. orig_offset = aligned_offset + buf.CurrentSize();
  56. }
  57. {
  58. IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, env_);
  59. s = file_->Read(aligned_offset + buf.CurrentSize(), allowed,
  60. IOOptions(), &tmp, buf.Destination(), nullptr);
  61. }
  62. if (ShouldNotifyListeners()) {
  63. auto finish_ts = std::chrono::system_clock::now();
  64. NotifyOnFileReadFinish(orig_offset, tmp.size(), start_ts, finish_ts,
  65. s);
  66. }
  67. buf.Size(buf.CurrentSize() + tmp.size());
  68. if (!s.ok() || tmp.size() < allowed) {
  69. break;
  70. }
  71. }
  72. size_t res_len = 0;
  73. if (s.ok() && offset_advance < buf.CurrentSize()) {
  74. res_len = buf.Read(scratch, offset_advance,
  75. std::min(buf.CurrentSize() - offset_advance, n));
  76. }
  77. *result = Slice(scratch, res_len);
  78. #endif // !ROCKSDB_LITE
  79. } else {
  80. size_t pos = 0;
  81. const char* res_scratch = nullptr;
  82. while (pos < n) {
  83. size_t allowed;
  84. if (for_compaction && rate_limiter_ != nullptr) {
  85. if (rate_limiter_->IsRateLimited(RateLimiter::OpType::kRead)) {
  86. sw.DelayStart();
  87. }
  88. allowed = rate_limiter_->RequestToken(n - pos, 0 /* alignment */,
  89. Env::IOPriority::IO_LOW, stats_,
  90. RateLimiter::OpType::kRead);
  91. if (rate_limiter_->IsRateLimited(RateLimiter::OpType::kRead)) {
  92. sw.DelayStop();
  93. }
  94. } else {
  95. allowed = n;
  96. }
  97. Slice tmp_result;
  98. #ifndef ROCKSDB_LITE
  99. FileOperationInfo::TimePoint start_ts;
  100. if (ShouldNotifyListeners()) {
  101. start_ts = std::chrono::system_clock::now();
  102. }
  103. #endif
  104. {
  105. IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, env_);
  106. s = file_->Read(offset + pos, allowed, IOOptions(), &tmp_result,
  107. scratch + pos, nullptr);
  108. }
  109. #ifndef ROCKSDB_LITE
  110. if (ShouldNotifyListeners()) {
  111. auto finish_ts = std::chrono::system_clock::now();
  112. NotifyOnFileReadFinish(offset + pos, tmp_result.size(), start_ts,
  113. finish_ts, s);
  114. }
  115. #endif
  116. if (res_scratch == nullptr) {
  117. // we can't simply use `scratch` because reads of mmap'd files return
  118. // data in a different buffer.
  119. res_scratch = tmp_result.data();
  120. } else {
  121. // make sure chunks are inserted contiguously into `res_scratch`.
  122. assert(tmp_result.data() == res_scratch + pos);
  123. }
  124. pos += tmp_result.size();
  125. if (!s.ok() || tmp_result.size() < allowed) {
  126. break;
  127. }
  128. }
  129. *result = Slice(res_scratch, s.ok() ? pos : 0);
  130. }
  131. IOSTATS_ADD_IF_POSITIVE(bytes_read, result->size());
  132. SetPerfLevel(prev_perf_level);
  133. }
  134. if (stats_ != nullptr && file_read_hist_ != nullptr) {
  135. file_read_hist_->Add(elapsed);
  136. }
  137. return s;
  138. }
  139. Status RandomAccessFileReader::MultiRead(FSReadRequest* read_reqs,
  140. size_t num_reqs) const {
  141. Status s;
  142. uint64_t elapsed = 0;
  143. assert(!use_direct_io());
  144. {
  145. StopWatch sw(env_, stats_, hist_type_,
  146. (stats_ != nullptr) ? &elapsed : nullptr, true /*overwrite*/,
  147. true /*delay_enabled*/);
  148. auto prev_perf_level = GetPerfLevel();
  149. IOSTATS_TIMER_GUARD(read_nanos);
  150. #ifndef ROCKSDB_LITE
  151. FileOperationInfo::TimePoint start_ts;
  152. if (ShouldNotifyListeners()) {
  153. start_ts = std::chrono::system_clock::now();
  154. }
  155. #endif // ROCKSDB_LITE
  156. {
  157. IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, env_);
  158. s = file_->MultiRead(read_reqs, num_reqs, IOOptions(), nullptr);
  159. }
  160. for (size_t i = 0; i < num_reqs; ++i) {
  161. #ifndef ROCKSDB_LITE
  162. if (ShouldNotifyListeners()) {
  163. auto finish_ts = std::chrono::system_clock::now();
  164. NotifyOnFileReadFinish(read_reqs[i].offset, read_reqs[i].result.size(),
  165. start_ts, finish_ts, read_reqs[i].status);
  166. }
  167. #endif // ROCKSDB_LITE
  168. IOSTATS_ADD_IF_POSITIVE(bytes_read, read_reqs[i].result.size());
  169. }
  170. SetPerfLevel(prev_perf_level);
  171. }
  172. if (stats_ != nullptr && file_read_hist_ != nullptr) {
  173. file_read_hist_->Add(elapsed);
  174. }
  175. return s;
  176. }
  177. } // namespace ROCKSDB_NAMESPACE