readahead_raf.cc 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #include "file/readahead_raf.h"
  10. #include <algorithm>
  11. #include <mutex>
  12. #include "file/read_write_util.h"
  13. #include "rocksdb/file_system.h"
  14. #include "util/aligned_buffer.h"
  15. #include "util/rate_limiter_impl.h"
  16. namespace ROCKSDB_NAMESPACE {
  17. namespace {
  18. class ReadaheadRandomAccessFile : public FSRandomAccessFile {
  19. public:
  20. ReadaheadRandomAccessFile(std::unique_ptr<FSRandomAccessFile>&& file,
  21. size_t readahead_size)
  22. : file_(std::move(file)),
  23. alignment_(file_->GetRequiredBufferAlignment()),
  24. readahead_size_(Roundup(readahead_size, alignment_)),
  25. buffer_(),
  26. buffer_offset_(0) {
  27. buffer_.Alignment(alignment_);
  28. buffer_.AllocateNewBuffer(readahead_size_);
  29. }
  30. ReadaheadRandomAccessFile(const ReadaheadRandomAccessFile&) = delete;
  31. ReadaheadRandomAccessFile& operator=(const ReadaheadRandomAccessFile&) =
  32. delete;
  33. IOStatus Read(uint64_t offset, size_t n, const IOOptions& options,
  34. Slice* result, char* scratch,
  35. IODebugContext* dbg) const override {
  36. // Read-ahead only make sense if we have some slack left after reading
  37. if (n + alignment_ >= readahead_size_) {
  38. return file_->Read(offset, n, options, result, scratch, dbg);
  39. }
  40. std::unique_lock<std::mutex> lk(lock_);
  41. size_t cached_len = 0;
  42. // Check if there is a cache hit, meaning that [offset, offset + n) is
  43. // either completely or partially in the buffer. If it's completely cached,
  44. // including end of file case when offset + n is greater than EOF, then
  45. // return.
  46. if (TryReadFromCache(offset, n, &cached_len, scratch) &&
  47. (cached_len == n || buffer_.CurrentSize() < readahead_size_)) {
  48. // We read exactly what we needed, or we hit end of file - return.
  49. *result = Slice(scratch, cached_len);
  50. return IOStatus::OK();
  51. }
  52. size_t advanced_offset = static_cast<size_t>(offset + cached_len);
  53. // In the case of cache hit advanced_offset is already aligned, means that
  54. // chunk_offset equals to advanced_offset
  55. size_t chunk_offset = TruncateToPageBoundary(alignment_, advanced_offset);
  56. IOStatus s = ReadIntoBuffer(chunk_offset, readahead_size_, options, dbg);
  57. if (s.ok()) {
  58. // The data we need is now in cache, so we can safely read it
  59. size_t remaining_len;
  60. TryReadFromCache(advanced_offset, n - cached_len, &remaining_len,
  61. scratch + cached_len);
  62. *result = Slice(scratch, cached_len + remaining_len);
  63. }
  64. return s;
  65. }
  66. IOStatus Prefetch(uint64_t offset, size_t n, const IOOptions& options,
  67. IODebugContext* dbg) override {
  68. if (n < readahead_size_) {
  69. // Don't allow smaller prefetches than the configured `readahead_size_`.
  70. // `Read()` assumes a smaller prefetch buffer indicates EOF was reached.
  71. return IOStatus::OK();
  72. }
  73. std::unique_lock<std::mutex> lk(lock_);
  74. size_t offset_ = static_cast<size_t>(offset);
  75. size_t prefetch_offset = TruncateToPageBoundary(alignment_, offset_);
  76. if (prefetch_offset == buffer_offset_) {
  77. return IOStatus::OK();
  78. }
  79. return ReadIntoBuffer(prefetch_offset,
  80. Roundup(offset_ + n, alignment_) - prefetch_offset,
  81. options, dbg);
  82. }
  83. size_t GetUniqueId(char* id, size_t max_size) const override {
  84. return file_->GetUniqueId(id, max_size);
  85. }
  86. void Hint(AccessPattern pattern) override { file_->Hint(pattern); }
  87. IOStatus InvalidateCache(size_t offset, size_t length) override {
  88. std::unique_lock<std::mutex> lk(lock_);
  89. buffer_.Clear();
  90. return file_->InvalidateCache(offset, length);
  91. }
  92. bool use_direct_io() const override { return file_->use_direct_io(); }
  93. IOStatus GetFileSize(uint64_t* result) override {
  94. return file_->GetFileSize(result);
  95. }
  96. private:
  97. // Tries to read from buffer_ n bytes starting at offset. If anything was read
  98. // from the cache, it sets cached_len to the number of bytes actually read,
  99. // copies these number of bytes to scratch and returns true.
  100. // If nothing was read sets cached_len to 0 and returns false.
  101. bool TryReadFromCache(uint64_t offset, size_t n, size_t* cached_len,
  102. char* scratch) const {
  103. if (offset < buffer_offset_ ||
  104. offset >= buffer_offset_ + buffer_.CurrentSize()) {
  105. *cached_len = 0;
  106. return false;
  107. }
  108. uint64_t offset_in_buffer = offset - buffer_offset_;
  109. *cached_len = std::min(
  110. buffer_.CurrentSize() - static_cast<size_t>(offset_in_buffer), n);
  111. memcpy(scratch, buffer_.BufferStart() + offset_in_buffer, *cached_len);
  112. return true;
  113. }
  114. // Reads into buffer_ the next n bytes from file_ starting at offset.
  115. // Can actually read less if EOF was reached.
  116. // Returns the status of the read operastion on the file.
  117. IOStatus ReadIntoBuffer(uint64_t offset, size_t n, const IOOptions& options,
  118. IODebugContext* dbg) const {
  119. if (n > buffer_.Capacity()) {
  120. n = buffer_.Capacity();
  121. }
  122. assert(IsFileSectorAligned(offset, alignment_));
  123. assert(IsFileSectorAligned(n, alignment_));
  124. Slice result;
  125. IOStatus s =
  126. file_->Read(offset, n, options, &result, buffer_.BufferStart(), dbg);
  127. if (s.ok()) {
  128. buffer_offset_ = offset;
  129. buffer_.Size(result.size());
  130. assert(result.size() == 0 || buffer_.BufferStart() == result.data());
  131. }
  132. return s;
  133. }
  134. const std::unique_ptr<FSRandomAccessFile> file_;
  135. const size_t alignment_;
  136. const size_t readahead_size_;
  137. mutable std::mutex lock_;
  138. // The buffer storing the prefetched data
  139. mutable AlignedBuffer buffer_;
  140. // The offset in file_, corresponding to data stored in buffer_
  141. mutable uint64_t buffer_offset_;
  142. };
  143. } // namespace
  144. std::unique_ptr<FSRandomAccessFile> NewReadaheadRandomAccessFile(
  145. std::unique_ptr<FSRandomAccessFile>&& file, size_t readahead_size) {
  146. std::unique_ptr<FSRandomAccessFile> result(
  147. new ReadaheadRandomAccessFile(std::move(file), readahead_size));
  148. return result;
  149. }
  150. } // namespace ROCKSDB_NAMESPACE