file_prefetch_buffer.cc 40 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #include "file/file_prefetch_buffer.h"
  10. #include <algorithm>
  11. #include <cassert>
  12. #include "file/random_access_file_reader.h"
  13. #include "monitoring/histogram.h"
  14. #include "monitoring/iostats_context_imp.h"
  15. #include "port/port.h"
  16. #include "test_util/sync_point.h"
  17. #include "util/random.h"
  18. #include "util/rate_limiter_impl.h"
  19. namespace ROCKSDB_NAMESPACE {
  20. void FilePrefetchBuffer::PrepareBufferForRead(
  21. BufferInfo* buf, size_t alignment, uint64_t offset, size_t roundup_len,
  22. bool refit_tail, bool use_fs_buffer, uint64_t& aligned_useful_len) {
  23. uint64_t aligned_useful_offset_in_buf = 0;
  24. bool copy_data_to_new_buffer = false;
  25. // Check if requested bytes are in the existing buffer_.
  26. // If only a few bytes exist -- reuse them & read only what is really needed.
  27. // This is typically the case of incremental reading of data.
  28. // If no bytes exist in buffer -- full pread.
  29. if (buf->DoesBufferContainData() && buf->IsOffsetInBuffer(offset)) {
  30. // Only a few requested bytes are in the buffer. memmove those chunk of
  31. // bytes to the beginning, and memcpy them back into the new buffer if a
  32. // new buffer is created.
  33. aligned_useful_offset_in_buf =
  34. Rounddown(static_cast<size_t>(offset - buf->offset_), alignment);
  35. // aligned_useful_len is passed by reference and used to calculate how much
  36. // data needs to be read, so it is needed regardless of whether
  37. // use_fs_buffer is true
  38. aligned_useful_len = static_cast<uint64_t>(buf->CurrentSize()) -
  39. aligned_useful_offset_in_buf;
  40. assert(aligned_useful_offset_in_buf % alignment == 0);
  41. assert(aligned_useful_len % alignment == 0);
  42. assert(aligned_useful_offset_in_buf + aligned_useful_len <=
  43. buf->offset_ + buf->CurrentSize());
  44. if (aligned_useful_len > 0) {
  45. copy_data_to_new_buffer = true;
  46. } else {
  47. // this reset is not necessary, but just to be safe.
  48. aligned_useful_offset_in_buf = 0;
  49. }
  50. }
  51. // The later buffer allocation / tail refitting does not apply when
  52. // use_fs_buffer is true. If we allocate a new buffer, we end up throwing it
  53. // away later when we reuse the file system allocated buffer. If we refit
  54. // the tail in the main buffer, we don't have a place to put the next chunk of
  55. // data provided by the file system (without performing another copy, which we
  56. // are trying to avoid in the first place)
  57. if (use_fs_buffer) {
  58. return;
  59. }
  60. // Create a new buffer only if current capacity is not sufficient, and memcopy
  61. // bytes from old buffer if needed (i.e., if aligned_useful_len is greater
  62. // than 0).
  63. if (buf->buffer_.Capacity() < roundup_len) {
  64. buf->buffer_.Alignment(alignment);
  65. buf->buffer_.AllocateNewBuffer(
  66. static_cast<size_t>(roundup_len), copy_data_to_new_buffer,
  67. aligned_useful_offset_in_buf, static_cast<size_t>(aligned_useful_len));
  68. } else if (aligned_useful_len > 0 && refit_tail) {
  69. // New buffer not needed. But memmove bytes from tail to the beginning
  70. // since aligned_useful_len is greater than 0.
  71. buf->buffer_.RefitTail(static_cast<size_t>(aligned_useful_offset_in_buf),
  72. static_cast<size_t>(aligned_useful_len));
  73. } else if (aligned_useful_len > 0) {
  74. // For async prefetching, it doesn't call RefitTail with aligned_useful_len
  75. // > 0. Allocate new buffer if needed because aligned buffer calculate
  76. // remaining buffer as capacity - cursize which might not be the case in
  77. // this as it's not refitting.
  78. // TODO: Use refit_tail for async prefetching too.
  79. buf->buffer_.Alignment(alignment);
  80. buf->buffer_.AllocateNewBuffer(
  81. static_cast<size_t>(roundup_len), copy_data_to_new_buffer,
  82. aligned_useful_offset_in_buf, static_cast<size_t>(aligned_useful_len));
  83. }
  84. }
  85. Status FilePrefetchBuffer::Read(BufferInfo* buf, const IOOptions& opts,
  86. RandomAccessFileReader* reader,
  87. uint64_t read_len, uint64_t aligned_useful_len,
  88. uint64_t start_offset, bool use_fs_buffer) {
  89. Slice result;
  90. Status s;
  91. char* to_buf = nullptr;
  92. if (use_fs_buffer) {
  93. s = FSBufferDirectRead(reader, buf, opts, start_offset + aligned_useful_len,
  94. read_len, result);
  95. } else {
  96. to_buf = buf->buffer_.BufferStart() + aligned_useful_len;
  97. s = reader->Read(opts, start_offset + aligned_useful_len, read_len, &result,
  98. to_buf, /*aligned_buf=*/nullptr);
  99. }
  100. #ifndef NDEBUG
  101. if (result.size() < read_len) {
  102. // Fake an IO error to force db_stress fault injection to ignore
  103. // truncated read errors
  104. IGNORE_STATUS_IF_ERROR(Status::IOError());
  105. }
  106. #endif
  107. if (!s.ok()) {
  108. return s;
  109. }
  110. if (!use_fs_buffer && result.data() != to_buf) {
  111. // If the read is coming from some other buffer already in memory (such as
  112. // mmap) then it would be inefficient to create another copy in this
  113. // FilePrefetchBuffer. The caller is expected to exclude this case.
  114. assert(false);
  115. return Status::Corruption("File read didn't populate our buffer");
  116. }
  117. if (usage_ == FilePrefetchBufferUsage::kUserScanPrefetch) {
  118. RecordTick(stats_, PREFETCH_BYTES, read_len);
  119. } else if (usage_ == FilePrefetchBufferUsage::kCompactionPrefetch) {
  120. RecordInHistogram(stats_, COMPACTION_PREFETCH_BYTES, read_len);
  121. }
  122. if (!use_fs_buffer) {
  123. // Update the buffer size.
  124. // We already explicitly set the buffer size when we reuse the FS buffer
  125. buf->buffer_.Size(static_cast<size_t>(aligned_useful_len) + result.size());
  126. }
  127. return s;
  128. }
  129. Status FilePrefetchBuffer::ReadAsync(BufferInfo* buf, const IOOptions& opts,
  130. RandomAccessFileReader* reader,
  131. uint64_t read_len, uint64_t start_offset) {
  132. TEST_SYNC_POINT("FilePrefetchBuffer::ReadAsync");
  133. // callback for async read request.
  134. auto fp = std::bind(&FilePrefetchBuffer::PrefetchAsyncCallback, this,
  135. std::placeholders::_1, std::placeholders::_2);
  136. FSReadRequest req;
  137. Slice result;
  138. req.len = read_len;
  139. req.offset = start_offset;
  140. req.result = result;
  141. req.scratch = buf->buffer_.BufferStart();
  142. buf->async_req_len_ = req.len;
  143. Status s = reader->ReadAsync(req, opts, fp, buf, &(buf->io_handle_),
  144. &(buf->del_fn_), /*aligned_buf =*/nullptr);
  145. req.status.PermitUncheckedError();
  146. if (s.ok()) {
  147. if (usage_ == FilePrefetchBufferUsage::kUserScanPrefetch) {
  148. RecordTick(stats_, PREFETCH_BYTES, read_len);
  149. }
  150. buf->async_read_in_progress_ = true;
  151. }
  152. return s;
  153. }
  154. Status FilePrefetchBuffer::Prefetch(const IOOptions& opts,
  155. RandomAccessFileReader* reader,
  156. uint64_t offset, size_t n) {
  157. if (!enable_ || reader == nullptr) {
  158. return Status::OK();
  159. }
  160. assert(num_buffers_ == 1);
  161. AllocateBufferIfEmpty();
  162. BufferInfo* buf = GetFirstBuffer();
  163. TEST_SYNC_POINT("FilePrefetchBuffer::Prefetch:Start");
  164. if (offset + n <= buf->offset_ + buf->CurrentSize()) {
  165. // All requested bytes are already in the buffer. So no need to Read again.
  166. return Status::OK();
  167. }
  168. size_t alignment = GetRequiredBufferAlignment(reader);
  169. uint64_t rounddown_offset = offset, roundup_end = 0, aligned_useful_len = 0;
  170. size_t read_len = 0;
  171. // TODO: Enable file system buffer reuse optimization. Need to incorporate
  172. // overlap buffer logic here (similar to what is done in PrefetchInternal).
  173. // Currently, if we attempt to use the optimization, it results in an
  174. // unsigned integer overflow because the returned buffer's offset ends up
  175. // higher than the requested offset.
  176. bool use_fs_buffer = false;
  177. ReadAheadSizeTuning(buf, /*read_curr_block=*/true,
  178. /*refit_tail=*/true, use_fs_buffer, rounddown_offset,
  179. alignment, 0, n, rounddown_offset, roundup_end, read_len,
  180. aligned_useful_len);
  181. Status s;
  182. if (read_len > 0) {
  183. s = Read(buf, opts, reader, read_len, aligned_useful_len, rounddown_offset,
  184. use_fs_buffer);
  185. }
  186. if (usage_ == FilePrefetchBufferUsage::kTableOpenPrefetchTail && s.ok()) {
  187. RecordInHistogram(stats_, TABLE_OPEN_PREFETCH_TAIL_READ_BYTES, read_len);
  188. }
  189. assert(buf->offset_ <= offset);
  190. return s;
  191. }
  192. // Copy data from src to overlap_buf_.
  193. void FilePrefetchBuffer::CopyDataToOverlapBuffer(BufferInfo* src,
  194. uint64_t& offset,
  195. size_t& length) {
  196. if (length == 0) {
  197. return;
  198. }
  199. assert(src->IsOffsetInBuffer(offset));
  200. uint64_t copy_offset = (offset - src->offset_);
  201. size_t copy_len = 0;
  202. if (src->IsDataBlockInBuffer(offset, length)) {
  203. // All the bytes are in src.
  204. copy_len = length;
  205. } else {
  206. copy_len = src->CurrentSize() - copy_offset;
  207. }
  208. BufferInfo* dst = overlap_buf_;
  209. assert(copy_len <= dst->buffer_.Capacity() - dst->buffer_.CurrentSize());
  210. dst->buffer_.Append(src->buffer_.BufferStart() + copy_offset, copy_len);
  211. // Update offset and length.
  212. offset += copy_len;
  213. length -= copy_len;
  214. // length > 0 indicates it has consumed all data from the src buffer and it
  215. // still needs to read more other buffer.
  216. if (length > 0) {
  217. FreeFrontBuffer();
  218. }
  219. TEST_SYNC_POINT("FilePrefetchBuffer::CopyDataToOverlapBuffer:Complete");
  220. }
  221. // Clear the buffers if it contains outdated data. Outdated data can be because
  222. // previous sequential reads were read from the cache instead of these buffer.
  223. // In that case outdated IOs should be aborted.
  224. void FilePrefetchBuffer::AbortOutdatedIO(uint64_t offset) {
  225. std::vector<void*> handles;
  226. std::vector<BufferInfo*> tmp_buf;
  227. for (auto& buf : bufs_) {
  228. if (buf->IsBufferOutdatedWithAsyncProgress(offset)) {
  229. handles.emplace_back(buf->io_handle_);
  230. tmp_buf.emplace_back(buf);
  231. }
  232. }
  233. if (!handles.empty()) {
  234. StopWatch sw(clock_, stats_, ASYNC_PREFETCH_ABORT_MICROS);
  235. Status s = fs_->AbortIO(handles);
  236. assert(s.ok());
  237. }
  238. for (auto& buf : tmp_buf) {
  239. if (buf->async_read_in_progress_) {
  240. DestroyAndClearIOHandle(buf);
  241. buf->async_read_in_progress_ = false;
  242. }
  243. buf->ClearBuffer();
  244. }
  245. }
  246. void FilePrefetchBuffer::AbortAllIOs() {
  247. std::vector<void*> handles;
  248. for (auto& buf : bufs_) {
  249. if (buf->async_read_in_progress_ && buf->io_handle_ != nullptr) {
  250. handles.emplace_back(buf->io_handle_);
  251. }
  252. }
  253. if (!handles.empty()) {
  254. StopWatch sw(clock_, stats_, ASYNC_PREFETCH_ABORT_MICROS);
  255. Status s = fs_->AbortIO(handles);
  256. assert(s.ok());
  257. }
  258. for (auto& buf : bufs_) {
  259. if (buf->io_handle_ != nullptr && buf->del_fn_ != nullptr) {
  260. DestroyAndClearIOHandle(buf);
  261. }
  262. buf->async_read_in_progress_ = false;
  263. }
  264. }
  265. // Clear the buffers if it contains outdated data wrt offset. Outdated data can
  266. // be because previous sequential reads were read from the cache instead of
  267. // these buffer or there is IOError while filling the buffers.
  268. //
  269. // offset - the offset requested to be read. This API makes sure that the
  270. // front/first buffer in bufs_ should contain this offset, otherwise, all
  271. // buffers will be freed.
  272. void FilePrefetchBuffer::ClearOutdatedData(uint64_t offset, size_t length) {
  273. while (!IsBufferQueueEmpty()) {
  274. BufferInfo* buf = GetFirstBuffer();
  275. // Offset is greater than this buffer's end offset.
  276. if (buf->IsBufferOutdated(offset)) {
  277. FreeFrontBuffer();
  278. } else {
  279. break;
  280. }
  281. }
  282. if (IsBufferQueueEmpty() || NumBuffersAllocated() == 1) {
  283. return;
  284. }
  285. BufferInfo* buf = GetFirstBuffer();
  286. if (buf->async_read_in_progress_) {
  287. FreeEmptyBuffers();
  288. return;
  289. }
  290. // Below handles the case for Overlapping buffers (NumBuffersAllocated > 1).
  291. bool abort_io = false;
  292. if (buf->DoesBufferContainData() && buf->IsOffsetInBuffer(offset)) {
  293. BufferInfo* next_buf = bufs_[1];
  294. if (/* next buffer doesn't align with first buffer and requested data
  295. overlaps with next buffer */
  296. ((buf->offset_ + buf->CurrentSize() != next_buf->offset_) &&
  297. (offset + length > buf->offset_ + buf->CurrentSize()))) {
  298. abort_io = true;
  299. }
  300. } else {
  301. // buffer with offset doesn't contain data or offset doesn't lie in this
  302. // buffer.
  303. buf->ClearBuffer();
  304. abort_io = true;
  305. }
  306. if (abort_io) {
  307. AbortAllIOs();
  308. // Clear all buffers after first.
  309. for (size_t i = 1; i < bufs_.size(); ++i) {
  310. bufs_[i]->ClearBuffer();
  311. }
  312. }
  313. FreeEmptyBuffers();
  314. assert(IsBufferQueueEmpty() || buf->IsOffsetInBuffer(offset));
  315. }
  316. void FilePrefetchBuffer::PollIfNeeded(uint64_t offset, size_t length) {
  317. BufferInfo* buf = GetFirstBuffer();
  318. if (buf->async_read_in_progress_ && fs_ != nullptr) {
  319. if (buf->io_handle_ != nullptr) {
  320. // Wait for prefetch data to complete.
  321. // No mutex is needed as async_read_in_progress behaves as mutex and is
  322. // updated by main thread only.
  323. std::vector<void*> handles;
  324. handles.emplace_back(buf->io_handle_);
  325. StopWatch sw(clock_, stats_, POLL_WAIT_MICROS);
  326. fs_->Poll(handles, 1).PermitUncheckedError();
  327. }
  328. // Reset and Release io_handle after the Poll API as request has been
  329. // completed.
  330. DestroyAndClearIOHandle(buf);
  331. }
  332. // Always call outdated data after Poll as Buffers might be out of sync w.r.t
  333. // offset and length.
  334. ClearOutdatedData(offset, length);
  335. }
  336. // ReadAheadSizeTuning API calls readaheadsize_cb_
  337. // (BlockBasedTableIterator::BlockCacheLookupForReadAheadSize) to lookup in the
  338. // cache and tune the start and end offsets based on cache hits/misses.
  339. //
  340. // Arguments -
  341. // read_curr_block : True if this call was due to miss in the cache and
  342. // FilePrefetchBuffer wants to read that block
  343. // synchronously.
  344. // False if current call is to prefetch additional data in
  345. // extra buffers through ReadAsync API.
  346. // prev_buf_end_offset : End offset of the previous buffer. It's used in case
  347. // of ReadAsync to make sure it doesn't read anything from
  348. // previous buffer which is already prefetched.
  349. void FilePrefetchBuffer::ReadAheadSizeTuning(
  350. BufferInfo* buf, bool read_curr_block, bool refit_tail, bool use_fs_buffer,
  351. uint64_t prev_buf_end_offset, size_t alignment, size_t length,
  352. size_t readahead_size, uint64_t& start_offset, uint64_t& end_offset,
  353. size_t& read_len, uint64_t& aligned_useful_len) {
  354. uint64_t updated_start_offset = Rounddown(start_offset, alignment);
  355. uint64_t updated_end_offset =
  356. Roundup(start_offset + length + readahead_size, alignment);
  357. uint64_t initial_end_offset = updated_end_offset;
  358. uint64_t initial_start_offset = updated_start_offset;
  359. // Callback to tune the start and end offsets.
  360. if (readaheadsize_cb_ != nullptr && readahead_size > 0) {
  361. readaheadsize_cb_(read_curr_block, updated_start_offset,
  362. updated_end_offset);
  363. }
  364. // read_len will be 0 and there is nothing to read/prefetch.
  365. if (updated_start_offset == updated_end_offset) {
  366. start_offset = end_offset = updated_start_offset;
  367. UpdateReadAheadTrimmedStat((initial_end_offset - initial_start_offset),
  368. (updated_end_offset - updated_start_offset));
  369. return;
  370. }
  371. assert(updated_start_offset < updated_end_offset);
  372. if (!read_curr_block) {
  373. // Handle the case when callback added block handles which are already
  374. // prefetched and nothing new needs to be prefetched. In that case end
  375. // offset updated by callback will be less than prev_buf_end_offset which
  376. // means data has been already prefetched.
  377. if (updated_end_offset <= prev_buf_end_offset) {
  378. start_offset = end_offset = prev_buf_end_offset;
  379. UpdateReadAheadTrimmedStat((initial_end_offset - initial_start_offset),
  380. (end_offset - start_offset));
  381. return;
  382. }
  383. }
  384. // Realign if start and end offsets are not aligned after tuning.
  385. start_offset = Rounddown(updated_start_offset, alignment);
  386. end_offset = Roundup(updated_end_offset, alignment);
  387. if (!read_curr_block && start_offset < prev_buf_end_offset) {
  388. // Previous buffer already contains the data till prev_buf_end_offset
  389. // because of alignment. Update the start offset after that to avoid
  390. // prefetching it again.
  391. start_offset = prev_buf_end_offset;
  392. }
  393. uint64_t roundup_len = end_offset - start_offset;
  394. PrepareBufferForRead(buf, alignment, start_offset, roundup_len, refit_tail,
  395. use_fs_buffer, aligned_useful_len);
  396. assert(roundup_len >= aligned_useful_len);
  397. // Update the buffer offset.
  398. buf->offset_ = start_offset;
  399. // Update the initial end offset of this buffer which will be the starting
  400. // offset of next prefetch.
  401. buf->initial_end_offset_ = initial_end_offset;
  402. read_len = static_cast<size_t>(roundup_len - aligned_useful_len);
  403. UpdateReadAheadTrimmedStat((initial_end_offset - initial_start_offset),
  404. (end_offset - start_offset));
  405. }
  406. // This is for when num_buffers_ = 1.
  407. // If we are reusing the file system allocated buffer, and only some of the
  408. // requested data is in the buffer, we copy the relevant data to overlap_buf_
  409. void FilePrefetchBuffer::HandleOverlappingSyncData(uint64_t offset,
  410. size_t length,
  411. uint64_t& tmp_offset,
  412. size_t& tmp_length,
  413. bool& use_overlap_buffer) {
  414. if (IsBufferQueueEmpty()) {
  415. return;
  416. }
  417. BufferInfo* buf = GetFirstBuffer();
  418. // We should only be calling this when num_buffers_ = 1, so there should
  419. // not be any async reads.
  420. assert(!buf->async_read_in_progress_);
  421. if (!buf->async_read_in_progress_ && buf->DoesBufferContainData() &&
  422. buf->IsOffsetInBuffer(offset) &&
  423. buf->offset_ + buf->CurrentSize() < offset + length) {
  424. // Allocated overlap_buf_ is just enough to hold the result for the user
  425. // Alignment does not matter here
  426. use_overlap_buffer = true;
  427. overlap_buf_->ClearBuffer();
  428. overlap_buf_->buffer_.Alignment(1);
  429. overlap_buf_->buffer_.AllocateNewBuffer(length);
  430. overlap_buf_->offset_ = offset;
  431. CopyDataToOverlapBuffer(buf, tmp_offset, tmp_length);
  432. UpdateStats(/*found_in_buffer=*/false, overlap_buf_->CurrentSize());
  433. }
  434. }
  435. // This is for when num_buffers_ > 1.
  436. // If data is overlapping between two buffers then during this call:
  437. // - data from first buffer is copied into overlapping buffer,
  438. // - first is removed from bufs_ and freed so that it can be used for async
  439. // prefetching of further data.
  440. Status FilePrefetchBuffer::HandleOverlappingAsyncData(
  441. const IOOptions& opts, RandomAccessFileReader* reader, uint64_t offset,
  442. size_t length, size_t readahead_size, bool& copy_to_overlap_buffer,
  443. uint64_t& tmp_offset, size_t& tmp_length) {
  444. // No Overlapping of data between 2 buffers.
  445. if (IsBufferQueueEmpty() || NumBuffersAllocated() == 1) {
  446. return Status::OK();
  447. }
  448. Status s;
  449. size_t alignment = GetRequiredBufferAlignment(reader);
  450. BufferInfo* buf = GetFirstBuffer();
  451. // Check if the first buffer has the required offset and the async read is
  452. // still in progress. This should only happen if a prefetch was initiated
  453. // by Seek, but the next access is at another offset.
  454. if (buf->async_read_in_progress_ &&
  455. buf->IsOffsetInBufferWithAsyncProgress(offset)) {
  456. PollIfNeeded(offset, length);
  457. }
  458. if (IsBufferQueueEmpty() || NumBuffersAllocated() == 1) {
  459. return Status::OK();
  460. }
  461. BufferInfo* next_buf = bufs_[1];
  462. // If data is overlapping over two buffers, copy the data from front and
  463. // call ReadAsync on freed buffer.
  464. if (!buf->async_read_in_progress_ && buf->DoesBufferContainData() &&
  465. buf->IsOffsetInBuffer(offset) &&
  466. (/*Data extends over two buffers and second buffer either has data or in
  467. process of population=*/
  468. (offset + length > next_buf->offset_) &&
  469. (next_buf->async_read_in_progress_ ||
  470. next_buf->DoesBufferContainData()))) {
  471. // Allocate new buffer to overlap_buf_.
  472. overlap_buf_->ClearBuffer();
  473. overlap_buf_->buffer_.Alignment(alignment);
  474. overlap_buf_->buffer_.AllocateNewBuffer(length);
  475. overlap_buf_->offset_ = offset;
  476. copy_to_overlap_buffer = true;
  477. CopyDataToOverlapBuffer(buf, tmp_offset, tmp_length);
  478. UpdateStats(/*found_in_buffer=*/false, overlap_buf_->CurrentSize());
  479. // Call async prefetching on freed buffer since data has been consumed
  480. // only if requested data lies within next buffer.
  481. size_t second_size = next_buf->async_read_in_progress_
  482. ? next_buf->async_req_len_
  483. : next_buf->CurrentSize();
  484. uint64_t start_offset = next_buf->initial_end_offset_;
  485. // If requested bytes - tmp_offset + tmp_length are in next buffer, freed
  486. // buffer can go for further prefetching.
  487. // If requested bytes are not in next buffer, next buffer has to go for sync
  488. // call to get remaining requested bytes. In that case it shouldn't go for
  489. // async prefetching as async prefetching calculates offset based on
  490. // previous buffer end offset and previous buffer has to go for sync
  491. // prefetching.
  492. if (tmp_offset + tmp_length <= next_buf->offset_ + second_size) {
  493. AllocateBuffer();
  494. BufferInfo* new_buf = GetLastBuffer();
  495. size_t read_len = 0;
  496. uint64_t end_offset = start_offset, aligned_useful_len = 0;
  497. ReadAheadSizeTuning(new_buf, /*read_curr_block=*/false,
  498. /*refit_tail=*/false, /*use_fs_buffer=*/false,
  499. next_buf->offset_ + second_size, alignment,
  500. /*length=*/0, readahead_size, start_offset,
  501. end_offset, read_len, aligned_useful_len);
  502. if (read_len > 0) {
  503. s = ReadAsync(new_buf, opts, reader, read_len, start_offset);
  504. if (!s.ok()) {
  505. DestroyAndClearIOHandle(new_buf);
  506. FreeLastBuffer();
  507. return s;
  508. }
  509. }
  510. }
  511. }
  512. return s;
  513. }
  514. // When data is outdated, we clear the first buffer and free it as the
  515. // data has been consumed because of sequential reads.
  516. //
  517. // Scenarios for prefetching asynchronously:
  518. // Case1: If all buffers are in free_bufs_, prefetch n + readahead_size_/2 bytes
  519. // synchronously in first buffer and prefetch readahead_size_/2 async in
  520. // remaining buffers (num_buffers_ -1 ).
  521. // Case2: If first buffer has partial data, prefetch readahead_size_/2 async in
  522. // remaining buffers. In case of partial data, prefetch remaining bytes
  523. // from size n synchronously to fulfill the requested bytes request.
  524. // Case5: (Special case) If data is overlapping in two buffers, copy requested
  525. // data from first, free that buffer to send for async request, wait for
  526. // poll to fill next buffer (if any), and copy remaining data from that
  527. // buffer to overlap buffer.
  528. Status FilePrefetchBuffer::PrefetchInternal(const IOOptions& opts,
  529. RandomAccessFileReader* reader,
  530. uint64_t offset, size_t length,
  531. size_t readahead_size,
  532. bool& copy_to_overlap_buffer) {
  533. if (!enable_) {
  534. return Status::OK();
  535. }
  536. TEST_SYNC_POINT("FilePrefetchBuffer::Prefetch:Start");
  537. size_t alignment = GetRequiredBufferAlignment(reader);
  538. Status s;
  539. uint64_t tmp_offset = offset;
  540. size_t tmp_length = length;
  541. size_t original_length = length;
  542. // Abort outdated IO.
  543. if (!explicit_prefetch_submitted_) {
  544. AbortOutdatedIO(offset);
  545. FreeEmptyBuffers();
  546. }
  547. ClearOutdatedData(offset, length);
  548. // Handle overlapping data over two buffers (async prefetching case).
  549. s = HandleOverlappingAsyncData(opts, reader, offset, length, readahead_size,
  550. copy_to_overlap_buffer, tmp_offset,
  551. tmp_length);
  552. if (!s.ok()) {
  553. return s;
  554. }
  555. // Handle partially available data when reusing the file system buffer
  556. // and num_buffers_ = 1 (sync prefetching case)
  557. bool use_fs_buffer = UseFSBuffer(reader);
  558. if (!copy_to_overlap_buffer && use_fs_buffer) {
  559. HandleOverlappingSyncData(offset, length, tmp_offset, tmp_length,
  560. copy_to_overlap_buffer);
  561. }
  562. AllocateBufferIfEmpty();
  563. BufferInfo* buf = GetFirstBuffer();
  564. // Call Poll only if data is needed for the second buffer.
  565. // - Return if whole data is in first and second buffer is in progress or
  566. // already full.
  567. // - If second buffer is empty, it will go for ReadAsync for second buffer.
  568. if (!buf->async_read_in_progress_ && buf->DoesBufferContainData() &&
  569. buf->IsDataBlockInBuffer(offset, length)) {
  570. // Whole data is in buffer.
  571. if (!IsEligibleForFurtherPrefetching()) {
  572. UpdateStats(/*found_in_buffer=*/true, original_length);
  573. return s;
  574. }
  575. } else {
  576. PollIfNeeded(tmp_offset, tmp_length);
  577. }
  578. AllocateBufferIfEmpty();
  579. buf = GetFirstBuffer();
  580. offset = tmp_offset;
  581. length = tmp_length;
  582. // After polling, if all the requested bytes are in first buffer, it will only
  583. // go for async prefetching.
  584. if (buf->DoesBufferContainData()) {
  585. if (copy_to_overlap_buffer) {
  586. // Data is overlapping i.e. some of the data has been copied to overlap
  587. // buffer and remaining will be updated below.
  588. // Note: why do we not end up performing a duplicate copy when we already
  589. // copy to the overlap buffer in HandleOverlappingAsyncData /
  590. // HandleOverlappingSyncData? The reason is that when we call
  591. // CopyDataToOverlapBuffer, if the buffer is only a "partial hit", then we
  592. // clear it out since it does not have any more useful data once we copy
  593. // to the overlap buffer. Once we reallocate a fresh buffer, that buffer
  594. // will have no data, and it will be the "first" buffer when num_buffers_
  595. // = 1. When num_buffers_ > 1, we call ClearOutdatedData() so we know
  596. // that, if we get to this point in the control flow, the "front" buffer
  597. // has to have the data we need.
  598. size_t initial_buf_size = overlap_buf_->CurrentSize();
  599. CopyDataToOverlapBuffer(buf, offset, length);
  600. UpdateStats(
  601. /*found_in_buffer=*/false,
  602. overlap_buf_->CurrentSize() - initial_buf_size);
  603. // Length == 0: All the requested data has been copied to overlap buffer
  604. // and it has already gone for async prefetching. It can return without
  605. // doing anything further.
  606. // Length > 0: More data needs to be consumed so it will continue async
  607. // and sync prefetching and copy the remaining data to overlap buffer in
  608. // the end.
  609. if (length == 0) {
  610. UpdateStats(/*found_in_buffer=*/true, length);
  611. return s;
  612. }
  613. } else {
  614. if (buf->IsDataBlockInBuffer(offset, length)) {
  615. offset += length;
  616. length = 0;
  617. // Since async request was submitted directly by calling PrefetchAsync
  618. // in last call, we don't need to prefetch further as this call is to
  619. // poll the data submitted in previous call.
  620. if (explicit_prefetch_submitted_) {
  621. return s;
  622. }
  623. if (!IsEligibleForFurtherPrefetching()) {
  624. UpdateStats(/*found_in_buffer=*/true, original_length);
  625. return s;
  626. }
  627. }
  628. }
  629. }
  630. AllocateBufferIfEmpty();
  631. buf = GetFirstBuffer();
  632. assert(!buf->async_read_in_progress_);
  633. // Go for ReadAsync and Read (if needed).
  634. // offset and size alignment for first buffer with synchronous prefetching
  635. uint64_t start_offset1 = offset, end_offset1 = 0, aligned_useful_len1 = 0;
  636. size_t read_len1 = 0;
  637. // For length == 0, skip the synchronous prefetching. read_len1 will be 0.
  638. if (length > 0) {
  639. if (buf->IsOffsetInBuffer(offset)) {
  640. UpdateStats(/*found_in_buffer=*/false,
  641. (buf->offset_ + buf->CurrentSize() - offset));
  642. }
  643. ReadAheadSizeTuning(buf, /*read_curr_block=*/true, /*refit_tail=*/
  644. true, /*use_fs_buffer=*/use_fs_buffer, start_offset1,
  645. alignment, length, readahead_size, start_offset1,
  646. end_offset1, read_len1, aligned_useful_len1);
  647. } else {
  648. UpdateStats(/*found_in_buffer=*/true, original_length);
  649. }
  650. // Prefetch in remaining buffer only if readahead_size > 0.
  651. if (readahead_size > 0) {
  652. s = PrefetchRemBuffers(opts, reader, end_offset1, alignment,
  653. readahead_size);
  654. if (!s.ok()) {
  655. return s;
  656. }
  657. }
  658. if (read_len1 > 0) {
  659. s = Read(buf, opts, reader, read_len1, aligned_useful_len1, start_offset1,
  660. use_fs_buffer);
  661. if (!s.ok()) {
  662. AbortAllIOs();
  663. FreeAllBuffers();
  664. return s;
  665. }
  666. }
  667. // Copy remaining requested bytes to overlap_buf_. No need to
  668. // update stats as data is prefetched during this call.
  669. if (copy_to_overlap_buffer && length > 0) {
  670. CopyDataToOverlapBuffer(buf, offset, length);
  671. }
  672. return s;
  673. }
  674. bool FilePrefetchBuffer::TryReadFromCache(const IOOptions& opts,
  675. RandomAccessFileReader* reader,
  676. uint64_t offset, size_t n,
  677. Slice* result, Status* status,
  678. bool for_compaction) {
  679. bool ret = TryReadFromCacheUntracked(opts, reader, offset, n, result, status,
  680. for_compaction);
  681. if (usage_ == FilePrefetchBufferUsage::kTableOpenPrefetchTail && enable_) {
  682. if (ret) {
  683. RecordTick(stats_, TABLE_OPEN_PREFETCH_TAIL_HIT);
  684. } else {
  685. RecordTick(stats_, TABLE_OPEN_PREFETCH_TAIL_MISS);
  686. }
  687. }
  688. return ret;
  689. }
  690. bool FilePrefetchBuffer::TryReadFromCacheUntracked(
  691. const IOOptions& opts, RandomAccessFileReader* reader, uint64_t offset,
  692. size_t n, Slice* result, Status* status, bool for_compaction) {
  693. // We disallow async IO for compaction reads since they are performed in
  694. // the background anyways and are less latency sensitive compared to
  695. // user-initiated reads
  696. (void)for_compaction;
  697. assert(!for_compaction || num_buffers_ == 1);
  698. if (track_min_offset_ && offset < min_offset_read_) {
  699. min_offset_read_ = static_cast<size_t>(offset);
  700. }
  701. if (!enable_) {
  702. return false;
  703. }
  704. if (explicit_prefetch_submitted_) {
  705. // explicit_prefetch_submitted_ is special case where it expects request
  706. // submitted in PrefetchAsync should match with this request. Otherwise
  707. // buffers will be outdated.
  708. // Random offset called. So abort the IOs.
  709. if (prev_offset_ != offset) {
  710. AbortAllIOs();
  711. FreeAllBuffers();
  712. explicit_prefetch_submitted_ = false;
  713. return false;
  714. }
  715. }
  716. AllocateBufferIfEmpty();
  717. BufferInfo* buf = GetFirstBuffer();
  718. if (!explicit_prefetch_submitted_ && offset < buf->offset_) {
  719. return false;
  720. }
  721. bool prefetched = false;
  722. bool copy_to_overlap_buffer = false;
  723. // If the buffer contains only a few of the requested bytes:
  724. // If readahead is enabled: prefetch the remaining bytes + readahead
  725. // bytes
  726. // and satisfy the request.
  727. // If readahead is not enabled: return false.
  728. TEST_SYNC_POINT_CALLBACK("FilePrefetchBuffer::TryReadFromCache",
  729. &readahead_size_);
  730. if (explicit_prefetch_submitted_ ||
  731. (buf->async_read_in_progress_ ||
  732. offset + n > buf->offset_ + buf->CurrentSize())) {
  733. // In case readahead_size is trimmed (=0), we still want to poll the data
  734. // submitted with explicit_prefetch_submitted_=true.
  735. if (readahead_size_ > 0 || explicit_prefetch_submitted_) {
  736. Status s;
  737. assert(reader != nullptr);
  738. assert(max_readahead_size_ >= readahead_size_);
  739. if (implicit_auto_readahead_) {
  740. if (!IsEligibleForPrefetch(offset, n)) {
  741. // Ignore status as Prefetch is not called.
  742. s.PermitUncheckedError();
  743. return false;
  744. }
  745. }
  746. // Prefetch n + readahead_size_/2 synchronously as remaining
  747. // readahead_size_/2 will be prefetched asynchronously if num_buffers_
  748. // > 1.
  749. s = PrefetchInternal(
  750. opts, reader, offset, n,
  751. (num_buffers_ > 1 ? readahead_size_ / 2 : readahead_size_),
  752. copy_to_overlap_buffer);
  753. explicit_prefetch_submitted_ = false;
  754. if (!s.ok()) {
  755. if (status) {
  756. *status = s;
  757. }
  758. #ifndef NDEBUG
  759. IGNORE_STATUS_IF_ERROR(s);
  760. #endif
  761. return false;
  762. }
  763. prefetched = explicit_prefetch_submitted_ ? false : true;
  764. } else {
  765. return false;
  766. }
  767. } else if (!for_compaction) {
  768. // These stats are meant to track prefetch effectiveness for user reads only
  769. UpdateStats(/*found_in_buffer=*/true, n);
  770. }
  771. UpdateReadPattern(offset, n, /*decrease_readaheadsize=*/false);
  772. buf = GetFirstBuffer();
  773. if (copy_to_overlap_buffer) {
  774. buf = overlap_buf_;
  775. }
  776. assert(buf->IsOffsetInBuffer(offset));
  777. uint64_t offset_in_buffer = offset - buf->offset_;
  778. assert(offset_in_buffer < buf->CurrentSize());
  779. *result = Slice(
  780. buf->buffer_.BufferStart() + offset_in_buffer,
  781. std::min(n, buf->CurrentSize() - static_cast<size_t>(offset_in_buffer)));
  782. if (prefetched) {
  783. readahead_size_ = std::min(max_readahead_size_, readahead_size_ * 2);
  784. }
  785. return true;
  786. }
  787. void FilePrefetchBuffer::PrefetchAsyncCallback(FSReadRequest& req,
  788. void* cb_arg) {
  789. BufferInfo* buf = static_cast<BufferInfo*>(cb_arg);
  790. #ifndef NDEBUG
  791. if (req.result.size() < req.len) {
  792. // Fake an IO error to force db_stress fault injection to ignore
  793. // truncated read errors
  794. IGNORE_STATUS_IF_ERROR(Status::IOError());
  795. }
  796. IGNORE_STATUS_IF_ERROR(req.status);
  797. #endif
  798. if (req.status.ok()) {
  799. if (req.offset + req.result.size() <= buf->offset_ + buf->CurrentSize()) {
  800. // All requested bytes are already in the buffer or no data is read
  801. // because of EOF. So no need to update.
  802. return;
  803. }
  804. if (req.offset < buf->offset_) {
  805. // Next block to be read has changed (Recent read was not a sequential
  806. // read). So ignore this read.
  807. return;
  808. }
  809. size_t current_size = buf->CurrentSize();
  810. buf->buffer_.Size(current_size + req.result.size());
  811. }
  812. }
  813. Status FilePrefetchBuffer::PrefetchAsync(const IOOptions& opts,
  814. RandomAccessFileReader* reader,
  815. uint64_t offset, size_t n,
  816. Slice* result) {
  817. assert(reader != nullptr);
  818. if (!enable_) {
  819. return Status::NotSupported();
  820. }
  821. TEST_SYNC_POINT("FilePrefetchBuffer::PrefetchAsync:Start");
  822. num_file_reads_ = 0;
  823. explicit_prefetch_submitted_ = false;
  824. bool is_eligible_for_prefetching = false;
  825. if (readahead_size_ > 0 &&
  826. (!implicit_auto_readahead_ ||
  827. num_file_reads_ >= num_file_reads_for_auto_readahead_)) {
  828. is_eligible_for_prefetching = true;
  829. }
  830. // Cancel any pending async read to make code simpler as buffers can be out
  831. // of sync.
  832. AbortAllIOs();
  833. // Free empty buffers after aborting IOs.
  834. FreeEmptyBuffers();
  835. ClearOutdatedData(offset, n);
  836. // - Since PrefetchAsync can be called on non sequential reads. So offset can
  837. // be less than first buffers' offset. In that case it clears all
  838. // buffers.
  839. // - In case of tuning of readahead_size, on Reseek, we have to clear all
  840. // buffers otherwise, we may end up with inconsistent BlockHandles in queue
  841. // and data in buffer.
  842. if (!IsBufferQueueEmpty()) {
  843. BufferInfo* buf = GetFirstBuffer();
  844. if (readaheadsize_cb_ != nullptr || !buf->IsOffsetInBuffer(offset)) {
  845. FreeAllBuffers();
  846. }
  847. }
  848. UpdateReadPattern(offset, n, /*decrease_readaheadsize=*/false);
  849. bool data_found = false;
  850. // If first buffer has full data.
  851. if (!IsBufferQueueEmpty()) {
  852. BufferInfo* buf = GetFirstBuffer();
  853. if (buf->DoesBufferContainData() && buf->IsDataBlockInBuffer(offset, n)) {
  854. uint64_t offset_in_buffer = offset - buf->offset_;
  855. *result = Slice(buf->buffer_.BufferStart() + offset_in_buffer, n);
  856. data_found = true;
  857. UpdateStats(/*found_in_buffer=*/true, n);
  858. // Update num_file_reads_ as TryReadFromCacheAsync won't be called for
  859. // poll and update num_file_reads_ if data is found.
  860. num_file_reads_++;
  861. // If next buffer contains some data or is not eligible for prefetching,
  862. // return.
  863. if (!is_eligible_for_prefetching || NumBuffersAllocated() > 1) {
  864. return Status::OK();
  865. }
  866. } else {
  867. // Partial data in first buffer. Clear it to return continous data in one
  868. // buffer.
  869. FreeAllBuffers();
  870. }
  871. }
  872. std::string msg;
  873. Status s;
  874. size_t alignment = GetRequiredBufferAlignment(reader);
  875. size_t readahead_size = is_eligible_for_prefetching ? readahead_size_ / 2 : 0;
  876. size_t offset_to_read = static_cast<size_t>(offset);
  877. uint64_t start_offset1 = offset, end_offset1 = 0, aligned_useful_len1 = 0;
  878. size_t read_len1 = 0;
  879. AllocateBufferIfEmpty();
  880. BufferInfo* buf = GetFirstBuffer();
  881. // - If first buffer is empty.
  882. // - Call async read for full data + readahead_size on first buffer.
  883. // - Call async read for readahead_size on all remaining buffers if
  884. // eligible.
  885. // - If first buffer contains data,
  886. // - Call async read for readahead_size on all remaining buffers if
  887. // eligible.
  888. // Calculate length and offsets for reading.
  889. if (!buf->DoesBufferContainData()) {
  890. uint64_t roundup_len1;
  891. // Prefetch full data + readahead_size in the first buffer.
  892. if (is_eligible_for_prefetching || reader->use_direct_io()) {
  893. ReadAheadSizeTuning(buf, /*read_curr_block=*/true, /*refit_tail=*/false,
  894. /*use_fs_buffer=*/false,
  895. /*prev_buf_end_offset=*/start_offset1, alignment, n,
  896. readahead_size, start_offset1, end_offset1, read_len1,
  897. aligned_useful_len1);
  898. } else {
  899. // No alignment or extra prefetching.
  900. start_offset1 = offset_to_read;
  901. end_offset1 = offset_to_read + n;
  902. roundup_len1 = end_offset1 - start_offset1;
  903. PrepareBufferForRead(buf, alignment, start_offset1, roundup_len1,
  904. /*refit_tail=*/false, /*use_fs_buffer=*/false,
  905. aligned_useful_len1);
  906. assert(aligned_useful_len1 == 0);
  907. assert(roundup_len1 >= aligned_useful_len1);
  908. read_len1 = static_cast<size_t>(roundup_len1);
  909. buf->offset_ = start_offset1;
  910. }
  911. if (read_len1 > 0) {
  912. s = ReadAsync(buf, opts, reader, read_len1, start_offset1);
  913. if (!s.ok()) {
  914. DestroyAndClearIOHandle(buf);
  915. FreeLastBuffer();
  916. return s;
  917. }
  918. explicit_prefetch_submitted_ = true;
  919. prev_len_ = 0;
  920. }
  921. }
  922. if (is_eligible_for_prefetching) {
  923. s = PrefetchRemBuffers(opts, reader, end_offset1, alignment,
  924. readahead_size);
  925. if (!s.ok()) {
  926. return s;
  927. }
  928. readahead_size_ = std::min(max_readahead_size_, readahead_size_ * 2);
  929. }
  930. return (data_found ? Status::OK() : Status::TryAgain());
  931. }
  932. Status FilePrefetchBuffer::PrefetchRemBuffers(const IOOptions& opts,
  933. RandomAccessFileReader* reader,
  934. uint64_t end_offset1,
  935. size_t alignment,
  936. size_t readahead_size) {
  937. Status s;
  938. while (NumBuffersAllocated() < num_buffers_) {
  939. BufferInfo* prev_buf = GetLastBuffer();
  940. uint64_t start_offset2 = prev_buf->initial_end_offset_;
  941. AllocateBuffer();
  942. BufferInfo* new_buf = GetLastBuffer();
  943. uint64_t end_offset2 = start_offset2, aligned_useful_len2 = 0;
  944. size_t read_len2 = 0;
  945. ReadAheadSizeTuning(new_buf, /*read_curr_block=*/false,
  946. /*refit_tail=*/false, /*use_fs_buffer=*/false,
  947. /*prev_buf_end_offset=*/end_offset1, alignment,
  948. /*length=*/0, readahead_size, start_offset2,
  949. end_offset2, read_len2, aligned_useful_len2);
  950. if (read_len2 > 0) {
  951. TEST_SYNC_POINT("FilePrefetchBuffer::PrefetchAsync:ExtraPrefetching");
  952. s = ReadAsync(new_buf, opts, reader, read_len2, start_offset2);
  953. if (!s.ok()) {
  954. DestroyAndClearIOHandle(new_buf);
  955. FreeLastBuffer();
  956. return s;
  957. }
  958. }
  959. end_offset1 = end_offset2;
  960. }
  961. return s;
  962. }
  963. } // namespace ROCKSDB_NAMESPACE