random_access_file_reader.cc 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #include "file/random_access_file_reader.h"
  10. #include <algorithm>
  11. #include <mutex>
  12. #include "file/file_util.h"
  13. #include "monitoring/histogram.h"
  14. #include "monitoring/iostats_context_imp.h"
  15. #include "port/port.h"
  16. #include "table/format.h"
  17. #include "test_util/sync_point.h"
  18. #include "util/random.h"
  19. #include "util/rate_limiter_impl.h"
  20. namespace ROCKSDB_NAMESPACE {
  21. inline Histograms GetFileReadHistograms(Statistics* stats,
  22. Env::IOActivity io_activity) {
  23. switch (io_activity) {
  24. case Env::IOActivity::kFlush:
  25. return Histograms::FILE_READ_FLUSH_MICROS;
  26. case Env::IOActivity::kCompaction:
  27. return Histograms::FILE_READ_COMPACTION_MICROS;
  28. case Env::IOActivity::kDBOpen:
  29. return Histograms::FILE_READ_DB_OPEN_MICROS;
  30. default:
  31. break;
  32. }
  33. if (stats && stats->get_stats_level() > StatsLevel::kExceptDetailedTimers) {
  34. switch (io_activity) {
  35. case Env::IOActivity::kGet:
  36. return Histograms::FILE_READ_GET_MICROS;
  37. case Env::IOActivity::kMultiGet:
  38. return Histograms::FILE_READ_MULTIGET_MICROS;
  39. case Env::IOActivity::kDBIterator:
  40. return Histograms::FILE_READ_DB_ITERATOR_MICROS;
  41. case Env::IOActivity::kVerifyDBChecksum:
  42. return Histograms::FILE_READ_VERIFY_DB_CHECKSUM_MICROS;
  43. case Env::IOActivity::kVerifyFileChecksums:
  44. return Histograms::FILE_READ_VERIFY_FILE_CHECKSUMS_MICROS;
  45. default:
  46. break;
  47. }
  48. }
  49. return Histograms::HISTOGRAM_ENUM_MAX;
  50. }
  51. inline void RecordIOStats(Statistics* stats, Temperature file_temperature,
  52. bool is_last_level, size_t size) {
  53. IOSTATS_ADD(bytes_read, size);
  54. // record for last/non-last level
  55. if (is_last_level) {
  56. RecordTick(stats, LAST_LEVEL_READ_BYTES, size);
  57. RecordTick(stats, LAST_LEVEL_READ_COUNT, 1);
  58. } else {
  59. RecordTick(stats, NON_LAST_LEVEL_READ_BYTES, size);
  60. RecordTick(stats, NON_LAST_LEVEL_READ_COUNT, 1);
  61. }
  62. // record for temperature file
  63. if (file_temperature != Temperature::kUnknown) {
  64. switch (file_temperature) {
  65. case Temperature::kHot:
  66. IOSTATS_ADD(file_io_stats_by_temperature.hot_file_bytes_read, size);
  67. IOSTATS_ADD(file_io_stats_by_temperature.hot_file_read_count, 1);
  68. RecordTick(stats, HOT_FILE_READ_BYTES, size);
  69. RecordTick(stats, HOT_FILE_READ_COUNT, 1);
  70. break;
  71. case Temperature::kWarm:
  72. IOSTATS_ADD(file_io_stats_by_temperature.warm_file_bytes_read, size);
  73. IOSTATS_ADD(file_io_stats_by_temperature.warm_file_read_count, 1);
  74. RecordTick(stats, WARM_FILE_READ_BYTES, size);
  75. RecordTick(stats, WARM_FILE_READ_COUNT, 1);
  76. break;
  77. case Temperature::kCool:
  78. IOSTATS_ADD(file_io_stats_by_temperature.cool_file_bytes_read, size);
  79. IOSTATS_ADD(file_io_stats_by_temperature.cool_file_read_count, 1);
  80. RecordTick(stats, COOL_FILE_READ_BYTES, size);
  81. RecordTick(stats, COOL_FILE_READ_COUNT, 1);
  82. break;
  83. case Temperature::kCold:
  84. IOSTATS_ADD(file_io_stats_by_temperature.cold_file_bytes_read, size);
  85. IOSTATS_ADD(file_io_stats_by_temperature.cold_file_read_count, 1);
  86. RecordTick(stats, COLD_FILE_READ_BYTES, size);
  87. RecordTick(stats, COLD_FILE_READ_COUNT, 1);
  88. break;
  89. case Temperature::kIce:
  90. IOSTATS_ADD(file_io_stats_by_temperature.ice_file_bytes_read, size);
  91. IOSTATS_ADD(file_io_stats_by_temperature.ice_file_read_count, 1);
  92. RecordTick(stats, ICE_FILE_READ_BYTES, size);
  93. RecordTick(stats, ICE_FILE_READ_COUNT, 1);
  94. break;
  95. default:
  96. break;
  97. }
  98. }
  99. }
  100. IOStatus RandomAccessFileReader::Create(
  101. const std::shared_ptr<FileSystem>& fs, const std::string& fname,
  102. const FileOptions& file_opts,
  103. std::unique_ptr<RandomAccessFileReader>* reader, IODebugContext* dbg) {
  104. std::unique_ptr<FSRandomAccessFile> file;
  105. IOStatus io_s = fs->NewRandomAccessFile(fname, file_opts, &file, dbg);
  106. if (io_s.ok()) {
  107. reader->reset(new RandomAccessFileReader(std::move(file), fname));
  108. }
  109. return io_s;
  110. }
  111. IOStatus RandomAccessFileReader::Read(const IOOptions& opts, uint64_t offset,
  112. size_t n, Slice* result, char* scratch,
  113. AlignedBuf* aligned_buf,
  114. IODebugContext* dbg) const {
  115. (void)aligned_buf;
  116. const Env::IOPriority rate_limiter_priority = opts.rate_limiter_priority;
  117. TEST_SYNC_POINT_CALLBACK("RandomAccessFileReader::Read", nullptr);
  118. TEST_SYNC_POINT_CALLBACK("RandomAccessFileReader::Read:IODebugContext",
  119. const_cast<void*>(static_cast<void*>(dbg)));
  120. // To be paranoid: modify scratch a little bit, so in case underlying
  121. // FileSystem doesn't fill the buffer but return success and `scratch` returns
  122. // contains a previous block, returned value will not pass checksum.
  123. if (n > 0 && scratch != nullptr) {
  124. // This byte might not change anything for direct I/O case, but it's OK.
  125. scratch[0]++;
  126. }
  127. IOStatus io_s;
  128. uint64_t elapsed = 0;
  129. size_t alignment = file_->GetRequiredBufferAlignment();
  130. bool is_aligned = false;
  131. if (scratch != nullptr) {
  132. // Check if offset, length and buffer are aligned.
  133. is_aligned = (offset & (alignment - 1)) == 0 &&
  134. (n & (alignment - 1)) == 0 &&
  135. (uintptr_t(scratch) & (alignment - 1)) == 0;
  136. }
  137. {
  138. StopWatch sw(clock_, stats_, hist_type_,
  139. GetFileReadHistograms(stats_, opts.io_activity),
  140. (stats_ != nullptr) ? &elapsed : nullptr, true /*overwrite*/,
  141. true /*delay_enabled*/);
  142. auto prev_perf_level = GetPerfLevel();
  143. IOSTATS_TIMER_GUARD(read_nanos);
  144. if (use_direct_io() && is_aligned == false) {
  145. size_t aligned_offset =
  146. TruncateToPageBoundary(alignment, static_cast<size_t>(offset));
  147. size_t offset_advance = static_cast<size_t>(offset) - aligned_offset;
  148. size_t read_size =
  149. Roundup(static_cast<size_t>(offset + n), alignment) - aligned_offset;
  150. AlignedBuffer buf;
  151. buf.Alignment(alignment);
  152. buf.AllocateNewBuffer(read_size);
  153. while (buf.CurrentSize() < read_size) {
  154. size_t allowed;
  155. if (rate_limiter_priority != Env::IO_TOTAL &&
  156. rate_limiter_ != nullptr) {
  157. allowed = rate_limiter_->RequestToken(
  158. buf.Capacity() - buf.CurrentSize(), buf.Alignment(),
  159. rate_limiter_priority, stats_, RateLimiter::OpType::kRead);
  160. } else {
  161. assert(buf.CurrentSize() == 0);
  162. allowed = read_size;
  163. }
  164. Slice tmp;
  165. FileOperationInfo::StartTimePoint start_ts;
  166. uint64_t orig_offset = 0;
  167. if (ShouldNotifyListeners()) {
  168. start_ts = FileOperationInfo::StartNow();
  169. orig_offset = aligned_offset + buf.CurrentSize();
  170. }
  171. {
  172. IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, clock_);
  173. // Only user reads are expected to specify a timeout. And user reads
  174. // are not subjected to rate_limiter and should go through only
  175. // one iteration of this loop, so we don't need to check and adjust
  176. // the opts.timeout before calling file_->Read
  177. assert(!opts.timeout.count() || allowed == read_size);
  178. io_s = file_->Read(aligned_offset + buf.CurrentSize(), allowed, opts,
  179. &tmp, buf.Destination(), dbg);
  180. }
  181. if (ShouldNotifyListeners()) {
  182. auto finish_ts = FileOperationInfo::FinishNow();
  183. NotifyOnFileReadFinish(orig_offset, tmp.size(), start_ts, finish_ts,
  184. io_s);
  185. if (!io_s.ok()) {
  186. NotifyOnIOError(io_s, FileOperationType::kRead, file_name(),
  187. tmp.size(), orig_offset);
  188. }
  189. }
  190. buf.Size(buf.CurrentSize() + tmp.size());
  191. if (!io_s.ok() || tmp.size() < allowed) {
  192. break;
  193. }
  194. }
  195. size_t res_len = 0;
  196. if (io_s.ok() && offset_advance < buf.CurrentSize()) {
  197. res_len = std::min(buf.CurrentSize() - offset_advance, n);
  198. if (aligned_buf == nullptr) {
  199. buf.Read(scratch, offset_advance, res_len);
  200. } else {
  201. scratch = buf.BufferStart() + offset_advance;
  202. *aligned_buf = buf.Release();
  203. }
  204. }
  205. *result = Slice(scratch, res_len);
  206. } else {
  207. size_t pos = 0;
  208. const char* res_scratch = nullptr;
  209. while (pos < n) {
  210. size_t allowed;
  211. if (rate_limiter_priority != Env::IO_TOTAL &&
  212. rate_limiter_ != nullptr) {
  213. if (rate_limiter_->IsRateLimited(RateLimiter::OpType::kRead)) {
  214. sw.DelayStart();
  215. }
  216. allowed = rate_limiter_->RequestToken(
  217. n - pos, (use_direct_io() ? alignment : 0), rate_limiter_priority,
  218. stats_, RateLimiter::OpType::kRead);
  219. if (rate_limiter_->IsRateLimited(RateLimiter::OpType::kRead)) {
  220. sw.DelayStop();
  221. }
  222. } else {
  223. allowed = n;
  224. }
  225. Slice tmp_result;
  226. FileOperationInfo::StartTimePoint start_ts;
  227. if (ShouldNotifyListeners()) {
  228. start_ts = FileOperationInfo::StartNow();
  229. }
  230. {
  231. IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, clock_);
  232. // Only user reads are expected to specify a timeout. And user reads
  233. // are not subjected to rate_limiter and should go through only
  234. // one iteration of this loop, so we don't need to check and adjust
  235. // the opts.timeout before calling file_->Read
  236. assert(!opts.timeout.count() || allowed == n);
  237. io_s = file_->Read(offset + pos, allowed, opts, &tmp_result,
  238. scratch + pos, dbg);
  239. }
  240. if (ShouldNotifyListeners()) {
  241. auto finish_ts = FileOperationInfo::FinishNow();
  242. NotifyOnFileReadFinish(offset + pos, tmp_result.size(), start_ts,
  243. finish_ts, io_s);
  244. if (!io_s.ok()) {
  245. NotifyOnIOError(io_s, FileOperationType::kRead, file_name(),
  246. tmp_result.size(), offset + pos);
  247. }
  248. }
  249. if (res_scratch == nullptr) {
  250. // we can't simply use `scratch` because reads of mmap'd files return
  251. // data in a different buffer.
  252. res_scratch = tmp_result.data();
  253. } else {
  254. // make sure chunks are inserted contiguously into `res_scratch`.
  255. assert(tmp_result.data() == res_scratch + pos);
  256. }
  257. pos += tmp_result.size();
  258. if (!io_s.ok() || tmp_result.size() < allowed) {
  259. break;
  260. }
  261. }
  262. *result = Slice(res_scratch, io_s.ok() ? pos : 0);
  263. }
  264. RecordIOStats(stats_, file_temperature_, is_last_level_, result->size());
  265. SetPerfLevel(prev_perf_level);
  266. }
  267. if (stats_ != nullptr && file_read_hist_ != nullptr) {
  268. file_read_hist_->Add(elapsed);
  269. }
  270. #ifndef NDEBUG
  271. auto pair = std::make_pair(&file_name_, &io_s);
  272. if (offset == 0) {
  273. TEST_SYNC_POINT_CALLBACK("RandomAccessFileReader::Read::BeforeReturn",
  274. &pair);
  275. }
  276. TEST_SYNC_POINT_CALLBACK("RandomAccessFileReader::Read::AnyOffset", &pair);
  277. #endif
  278. return io_s;
  279. }
  280. size_t End(const FSReadRequest& r) {
  281. return static_cast<size_t>(r.offset) + r.len;
  282. }
  283. FSReadRequest Align(const FSReadRequest& r, size_t alignment) {
  284. FSReadRequest req;
  285. req.offset = static_cast<uint64_t>(
  286. TruncateToPageBoundary(alignment, static_cast<size_t>(r.offset)));
  287. req.len = Roundup(End(r), alignment) - req.offset;
  288. req.scratch = nullptr;
  289. return req;
  290. }
  291. bool TryMerge(FSReadRequest* dest, const FSReadRequest& src) {
  292. size_t dest_offset = static_cast<size_t>(dest->offset);
  293. size_t src_offset = static_cast<size_t>(src.offset);
  294. size_t dest_end = End(*dest);
  295. size_t src_end = End(src);
  296. if (std::max(dest_offset, src_offset) > std::min(dest_end, src_end)) {
  297. return false;
  298. }
  299. dest->offset = static_cast<uint64_t>(std::min(dest_offset, src_offset));
  300. dest->len = std::max(dest_end, src_end) - dest->offset;
  301. return true;
  302. }
  303. IOStatus RandomAccessFileReader::MultiRead(const IOOptions& opts,
  304. FSReadRequest* read_reqs,
  305. size_t num_reqs,
  306. AlignedBuf* aligned_buf,
  307. IODebugContext* dbg) const {
  308. (void)aligned_buf; // suppress warning of unused variable in LITE mode
  309. assert(num_reqs > 0);
  310. #ifndef NDEBUG
  311. for (size_t i = 0; i < num_reqs - 1; ++i) {
  312. assert(read_reqs[i].offset <= read_reqs[i + 1].offset);
  313. }
  314. #endif // !NDEBUG
  315. const Env::IOPriority rate_limiter_priority = opts.rate_limiter_priority;
  316. // To be paranoid modify scratch a little bit, so in case underlying
  317. // FileSystem doesn't fill the buffer but return success and `scratch` returns
  318. // contains a previous block, returned value will not pass checksum.
  319. // This byte might not change anything for direct I/O case, but it's OK.
  320. for (size_t i = 0; i < num_reqs; i++) {
  321. FSReadRequest& r = read_reqs[i];
  322. if (r.len > 0 && r.scratch != nullptr) {
  323. r.scratch[0]++;
  324. }
  325. }
  326. IOStatus io_s;
  327. uint64_t elapsed = 0;
  328. {
  329. StopWatch sw(clock_, stats_, hist_type_,
  330. GetFileReadHistograms(stats_, opts.io_activity),
  331. (stats_ != nullptr) ? &elapsed : nullptr, true /*overwrite*/,
  332. true /*delay_enabled*/);
  333. auto prev_perf_level = GetPerfLevel();
  334. IOSTATS_TIMER_GUARD(read_nanos);
  335. FSReadRequest* fs_reqs = read_reqs;
  336. size_t num_fs_reqs = num_reqs;
  337. std::vector<FSReadRequest> aligned_reqs;
  338. if (use_direct_io()) {
  339. // num_reqs is the max possible size,
  340. // this can reduce std::vecector's internal resize operations.
  341. aligned_reqs.reserve(num_reqs);
  342. // Align and merge the read requests.
  343. size_t alignment = file_->GetRequiredBufferAlignment();
  344. for (size_t i = 0; i < num_reqs; i++) {
  345. FSReadRequest r = Align(read_reqs[i], alignment);
  346. if (i == 0) {
  347. // head
  348. aligned_reqs.push_back(std::move(r));
  349. } else if (!TryMerge(&aligned_reqs.back(), r)) {
  350. // head + n
  351. aligned_reqs.push_back(std::move(r));
  352. } else {
  353. // unused
  354. r.status.PermitUncheckedError();
  355. }
  356. }
  357. TEST_SYNC_POINT_CALLBACK("RandomAccessFileReader::MultiRead:AlignedReqs",
  358. &aligned_reqs);
  359. // Allocate aligned buffer and let scratch buffers point to it.
  360. size_t total_len = 0;
  361. for (const auto& r : aligned_reqs) {
  362. total_len += r.len;
  363. }
  364. AlignedBuffer buf;
  365. buf.Alignment(alignment);
  366. buf.AllocateNewBuffer(total_len);
  367. char* scratch = buf.BufferStart();
  368. for (auto& r : aligned_reqs) {
  369. r.scratch = scratch;
  370. scratch += r.len;
  371. }
  372. *aligned_buf = buf.Release();
  373. fs_reqs = aligned_reqs.data();
  374. num_fs_reqs = aligned_reqs.size();
  375. }
  376. FileOperationInfo::StartTimePoint start_ts;
  377. if (ShouldNotifyListeners()) {
  378. start_ts = FileOperationInfo::StartNow();
  379. }
  380. {
  381. IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, clock_);
  382. if (rate_limiter_priority != Env::IO_TOTAL && rate_limiter_ != nullptr) {
  383. // TODO: ideally we should call `RateLimiter::RequestToken()` for
  384. // allowed bytes to multi-read and then consume those bytes by
  385. // satisfying as many requests in `MultiRead()` as possible, instead of
  386. // what we do here, which can cause burst when the
  387. // `total_multi_read_size` is big.
  388. size_t total_multi_read_size = 0;
  389. assert(fs_reqs != nullptr);
  390. for (size_t i = 0; i < num_fs_reqs; ++i) {
  391. FSReadRequest& req = fs_reqs[i];
  392. total_multi_read_size += req.len;
  393. }
  394. size_t remaining_bytes = total_multi_read_size;
  395. size_t request_bytes = 0;
  396. while (remaining_bytes > 0) {
  397. request_bytes = std::min(
  398. static_cast<size_t>(rate_limiter_->GetSingleBurstBytes()),
  399. remaining_bytes);
  400. rate_limiter_->Request(request_bytes, rate_limiter_priority,
  401. nullptr /* stats */,
  402. RateLimiter::OpType::kRead);
  403. remaining_bytes -= request_bytes;
  404. }
  405. }
  406. TEST_SYNC_POINT_CALLBACK(
  407. "RandomAccessFileReader::MultiRead:IODebugContext",
  408. const_cast<void*>(static_cast<void*>(dbg)));
  409. io_s = file_->MultiRead(fs_reqs, num_fs_reqs, opts, dbg);
  410. RecordInHistogram(stats_, MULTIGET_IO_BATCH_SIZE, num_fs_reqs);
  411. }
  412. if (use_direct_io()) {
  413. // Populate results in the unaligned read requests.
  414. size_t aligned_i = 0;
  415. for (size_t i = 0; i < num_reqs; i++) {
  416. auto& r = read_reqs[i];
  417. if (static_cast<size_t>(r.offset) > End(aligned_reqs[aligned_i])) {
  418. aligned_i++;
  419. }
  420. const auto& fs_r = fs_reqs[aligned_i];
  421. r.status = fs_r.status;
  422. if (r.status.ok()) {
  423. uint64_t offset = r.offset - fs_r.offset;
  424. if (fs_r.result.size() <= offset) {
  425. // No byte in the read range is returned.
  426. r.result = Slice();
  427. } else {
  428. size_t len = std::min(
  429. r.len, static_cast<size_t>(fs_r.result.size() - offset));
  430. r.result = Slice(fs_r.scratch + offset, len);
  431. }
  432. } else {
  433. r.result = Slice();
  434. }
  435. }
  436. }
  437. for (size_t i = 0; i < num_reqs; ++i) {
  438. if (ShouldNotifyListeners()) {
  439. auto finish_ts = FileOperationInfo::FinishNow();
  440. NotifyOnFileReadFinish(read_reqs[i].offset, read_reqs[i].result.size(),
  441. start_ts, finish_ts, read_reqs[i].status);
  442. }
  443. if (!read_reqs[i].status.ok()) {
  444. NotifyOnIOError(read_reqs[i].status, FileOperationType::kRead,
  445. file_name(), read_reqs[i].result.size(),
  446. read_reqs[i].offset);
  447. }
  448. RecordIOStats(stats_, file_temperature_, is_last_level_,
  449. read_reqs[i].result.size());
  450. }
  451. SetPerfLevel(prev_perf_level);
  452. }
  453. if (stats_ != nullptr && file_read_hist_ != nullptr) {
  454. file_read_hist_->Add(elapsed);
  455. }
  456. return io_s;
  457. }
  458. IOStatus RandomAccessFileReader::PrepareIOOptions(const ReadOptions& ro,
  459. IOOptions& opts,
  460. IODebugContext* dbg) const {
  461. if (clock_ != nullptr) {
  462. return PrepareIOFromReadOptions(ro, clock_, opts, dbg);
  463. } else {
  464. return PrepareIOFromReadOptions(ro, SystemClock::Default().get(), opts,
  465. dbg);
  466. }
  467. }
  468. // Notes for when direct_io is enabled:
  469. // Unless req.offset, req.len, req.scratch are all already aligned,
  470. // RandomAccessFileReader will creats aligned requests and aligned buffer for
  471. // the request. User should only provide either req.scratch or aligned_buf. If
  472. // only req.scratch is provided, result will be copied from allocated aligned
  473. // buffer to req.scratch. If only alignd_buf is provided, it will be set to
  474. // the ailgned buf allocated by RandomAccessFileReader and saves a copy.
  475. IOStatus RandomAccessFileReader::ReadAsync(
  476. FSReadRequest& req, const IOOptions& opts,
  477. std::function<void(FSReadRequest&, void*)> cb, void* cb_arg,
  478. void** io_handle, IOHandleDeleter* del_fn, AlignedBuf* aligned_buf,
  479. IODebugContext* dbg) {
  480. IOStatus s;
  481. // Create a callback and populate info.
  482. auto read_async_callback =
  483. std::bind(&RandomAccessFileReader::ReadAsyncCallback, this,
  484. std::placeholders::_1, std::placeholders::_2);
  485. ReadAsyncInfo* read_async_info = new ReadAsyncInfo(
  486. cb, cb_arg, (clock_ != nullptr ? clock_->NowMicros() : 0));
  487. if (ShouldNotifyListeners()) {
  488. read_async_info->fs_start_ts_ = FileOperationInfo::StartNow();
  489. }
  490. size_t alignment = file_->GetRequiredBufferAlignment();
  491. bool is_aligned = (req.offset & (alignment - 1)) == 0 &&
  492. (req.len & (alignment - 1)) == 0 &&
  493. (uintptr_t(req.scratch) & (alignment - 1)) == 0;
  494. read_async_info->is_aligned_ = is_aligned;
  495. uint64_t elapsed = 0;
  496. if (use_direct_io() && is_aligned == false) {
  497. FSReadRequest aligned_req = Align(req, alignment);
  498. aligned_req.status.PermitUncheckedError();
  499. // Allocate aligned buffer.
  500. read_async_info->buf_.Alignment(alignment);
  501. read_async_info->buf_.AllocateNewBuffer(aligned_req.len);
  502. // Set rem fields in aligned FSReadRequest.
  503. aligned_req.scratch = read_async_info->buf_.BufferStart();
  504. // Set user provided fields to populate back in callback.
  505. read_async_info->user_scratch_ = req.scratch;
  506. read_async_info->user_aligned_buf_ = aligned_buf;
  507. read_async_info->user_len_ = req.len;
  508. read_async_info->user_offset_ = req.offset;
  509. read_async_info->user_result_ = req.result;
  510. assert(read_async_info->buf_.CurrentSize() == 0);
  511. StopWatch sw(clock_, stats_, hist_type_,
  512. GetFileReadHistograms(stats_, opts.io_activity),
  513. (stats_ != nullptr) ? &elapsed : nullptr, true /*overwrite*/,
  514. true /*delay_enabled*/);
  515. s = file_->ReadAsync(aligned_req, opts, read_async_callback,
  516. read_async_info, io_handle, del_fn, dbg);
  517. } else {
  518. StopWatch sw(clock_, stats_, hist_type_,
  519. GetFileReadHistograms(stats_, opts.io_activity),
  520. (stats_ != nullptr) ? &elapsed : nullptr, true /*overwrite*/,
  521. true /*delay_enabled*/);
  522. s = file_->ReadAsync(req, opts, read_async_callback, read_async_info,
  523. io_handle, del_fn, dbg);
  524. }
  525. RecordTick(stats_, READ_ASYNC_MICROS, elapsed);
  526. // Suppress false positive clang analyzer warnings.
  527. // Memory is not released if file_->ReadAsync returns !s.ok(), because
  528. // ReadAsyncCallback is never called in that case. If ReadAsyncCallback is
  529. // called then ReadAsync should always return IOStatus::OK().
  530. #ifndef __clang_analyzer__
  531. if (!s.ok()) {
  532. delete read_async_info;
  533. }
  534. #endif // __clang_analyzer__
  535. return s;
  536. }
  537. void RandomAccessFileReader::ReadAsyncCallback(FSReadRequest& req,
  538. void* cb_arg) {
  539. ReadAsyncInfo* read_async_info = static_cast<ReadAsyncInfo*>(cb_arg);
  540. assert(read_async_info);
  541. assert(read_async_info->cb_);
  542. if (use_direct_io() && read_async_info->is_aligned_ == false) {
  543. // Create FSReadRequest with user provided fields.
  544. FSReadRequest user_req;
  545. user_req.scratch = read_async_info->user_scratch_;
  546. user_req.offset = read_async_info->user_offset_;
  547. user_req.len = read_async_info->user_len_;
  548. // Update results in user_req.
  549. user_req.result = req.result;
  550. user_req.status = req.status;
  551. read_async_info->buf_.Size(read_async_info->buf_.CurrentSize() +
  552. req.result.size());
  553. size_t offset_advance_len = static_cast<size_t>(
  554. /*offset_passed_by_user=*/read_async_info->user_offset_ -
  555. /*aligned_offset=*/req.offset);
  556. size_t res_len = 0;
  557. if (req.status.ok() &&
  558. offset_advance_len < read_async_info->buf_.CurrentSize()) {
  559. res_len =
  560. std::min(read_async_info->buf_.CurrentSize() - offset_advance_len,
  561. read_async_info->user_len_);
  562. if (read_async_info->user_aligned_buf_ == nullptr) {
  563. // Copy the data into user's scratch.
  564. // Clang analyzer assumes that it will take use_direct_io() == false in
  565. // ReadAsync and use_direct_io() == true in Callback which cannot be true.
  566. #ifndef __clang_analyzer__
  567. read_async_info->buf_.Read(user_req.scratch, offset_advance_len,
  568. res_len);
  569. #endif // __clang_analyzer__
  570. } else {
  571. // Set aligned_buf provided by user without additional copy.
  572. user_req.scratch =
  573. read_async_info->buf_.BufferStart() + offset_advance_len;
  574. *read_async_info->user_aligned_buf_ = read_async_info->buf_.Release();
  575. }
  576. user_req.result = Slice(user_req.scratch, res_len);
  577. } else {
  578. // Either req.status is not ok or data was not read.
  579. user_req.result = Slice();
  580. }
  581. read_async_info->cb_(user_req, read_async_info->cb_arg_);
  582. } else {
  583. read_async_info->cb_(req, read_async_info->cb_arg_);
  584. }
  585. // Update stats and notify listeners.
  586. if (stats_ != nullptr && file_read_hist_ != nullptr) {
  587. // elapsed doesn't take into account delay and overwrite as StopWatch does
  588. // in Read.
  589. uint64_t elapsed = clock_->NowMicros() - read_async_info->start_time_;
  590. file_read_hist_->Add(elapsed);
  591. }
  592. if (req.status.ok()) {
  593. RecordInHistogram(stats_, ASYNC_READ_BYTES, req.result.size());
  594. } else if (!req.status.IsAborted()) {
  595. RecordTick(stats_, ASYNC_READ_ERROR_COUNT, 1);
  596. }
  597. if (ShouldNotifyListeners()) {
  598. auto finish_ts = FileOperationInfo::FinishNow();
  599. NotifyOnFileReadFinish(req.offset, req.result.size(),
  600. read_async_info->fs_start_ts_, finish_ts,
  601. req.status);
  602. }
  603. if (!req.status.ok()) {
  604. NotifyOnIOError(req.status, FileOperationType::kRead, file_name(),
  605. req.result.size(), req.offset);
  606. }
  607. RecordIOStats(stats_, file_temperature_, is_last_level_, req.result.size());
  608. delete read_async_info;
  609. }
  610. } // namespace ROCKSDB_NAMESPACE