readahead_raf.cc 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #include "file/readahead_raf.h"
  10. #include <algorithm>
  11. #include <mutex>
  12. #include "file/read_write_util.h"
  13. #include "util/aligned_buffer.h"
  14. #include "util/rate_limiter.h"
  15. namespace ROCKSDB_NAMESPACE {
  16. namespace {
  17. class ReadaheadRandomAccessFile : public RandomAccessFile {
  18. public:
  19. ReadaheadRandomAccessFile(std::unique_ptr<RandomAccessFile>&& file,
  20. size_t readahead_size)
  21. : file_(std::move(file)),
  22. alignment_(file_->GetRequiredBufferAlignment()),
  23. readahead_size_(Roundup(readahead_size, alignment_)),
  24. buffer_(),
  25. buffer_offset_(0) {
  26. buffer_.Alignment(alignment_);
  27. buffer_.AllocateNewBuffer(readahead_size_);
  28. }
  29. ReadaheadRandomAccessFile(const ReadaheadRandomAccessFile&) = delete;
  30. ReadaheadRandomAccessFile& operator=(const ReadaheadRandomAccessFile&) =
  31. delete;
  32. Status Read(uint64_t offset, size_t n, Slice* result,
  33. char* scratch) const override {
  34. // Read-ahead only make sense if we have some slack left after reading
  35. if (n + alignment_ >= readahead_size_) {
  36. return file_->Read(offset, n, result, scratch);
  37. }
  38. std::unique_lock<std::mutex> lk(lock_);
  39. size_t cached_len = 0;
  40. // Check if there is a cache hit, meaning that [offset, offset + n) is
  41. // either completely or partially in the buffer. If it's completely cached,
  42. // including end of file case when offset + n is greater than EOF, then
  43. // return.
  44. if (TryReadFromCache(offset, n, &cached_len, scratch) &&
  45. (cached_len == n || buffer_.CurrentSize() < readahead_size_)) {
  46. // We read exactly what we needed, or we hit end of file - return.
  47. *result = Slice(scratch, cached_len);
  48. return Status::OK();
  49. }
  50. size_t advanced_offset = static_cast<size_t>(offset + cached_len);
  51. // In the case of cache hit advanced_offset is already aligned, means that
  52. // chunk_offset equals to advanced_offset
  53. size_t chunk_offset = TruncateToPageBoundary(alignment_, advanced_offset);
  54. Status s = ReadIntoBuffer(chunk_offset, readahead_size_);
  55. if (s.ok()) {
  56. // The data we need is now in cache, so we can safely read it
  57. size_t remaining_len;
  58. TryReadFromCache(advanced_offset, n - cached_len, &remaining_len,
  59. scratch + cached_len);
  60. *result = Slice(scratch, cached_len + remaining_len);
  61. }
  62. return s;
  63. }
  64. Status Prefetch(uint64_t offset, size_t n) override {
  65. if (n < readahead_size_) {
  66. // Don't allow smaller prefetches than the configured `readahead_size_`.
  67. // `Read()` assumes a smaller prefetch buffer indicates EOF was reached.
  68. return Status::OK();
  69. }
  70. std::unique_lock<std::mutex> lk(lock_);
  71. size_t offset_ = static_cast<size_t>(offset);
  72. size_t prefetch_offset = TruncateToPageBoundary(alignment_, offset_);
  73. if (prefetch_offset == buffer_offset_) {
  74. return Status::OK();
  75. }
  76. return ReadIntoBuffer(prefetch_offset,
  77. Roundup(offset_ + n, alignment_) - prefetch_offset);
  78. }
  79. size_t GetUniqueId(char* id, size_t max_size) const override {
  80. return file_->GetUniqueId(id, max_size);
  81. }
  82. void Hint(AccessPattern pattern) override { file_->Hint(pattern); }
  83. Status InvalidateCache(size_t offset, size_t length) override {
  84. std::unique_lock<std::mutex> lk(lock_);
  85. buffer_.Clear();
  86. return file_->InvalidateCache(offset, length);
  87. }
  88. bool use_direct_io() const override { return file_->use_direct_io(); }
  89. private:
  90. // Tries to read from buffer_ n bytes starting at offset. If anything was read
  91. // from the cache, it sets cached_len to the number of bytes actually read,
  92. // copies these number of bytes to scratch and returns true.
  93. // If nothing was read sets cached_len to 0 and returns false.
  94. bool TryReadFromCache(uint64_t offset, size_t n, size_t* cached_len,
  95. char* scratch) const {
  96. if (offset < buffer_offset_ ||
  97. offset >= buffer_offset_ + buffer_.CurrentSize()) {
  98. *cached_len = 0;
  99. return false;
  100. }
  101. uint64_t offset_in_buffer = offset - buffer_offset_;
  102. *cached_len = std::min(
  103. buffer_.CurrentSize() - static_cast<size_t>(offset_in_buffer), n);
  104. memcpy(scratch, buffer_.BufferStart() + offset_in_buffer, *cached_len);
  105. return true;
  106. }
  107. // Reads into buffer_ the next n bytes from file_ starting at offset.
  108. // Can actually read less if EOF was reached.
  109. // Returns the status of the read operastion on the file.
  110. Status ReadIntoBuffer(uint64_t offset, size_t n) const {
  111. if (n > buffer_.Capacity()) {
  112. n = buffer_.Capacity();
  113. }
  114. assert(IsFileSectorAligned(offset, alignment_));
  115. assert(IsFileSectorAligned(n, alignment_));
  116. Slice result;
  117. Status s = file_->Read(offset, n, &result, buffer_.BufferStart());
  118. if (s.ok()) {
  119. buffer_offset_ = offset;
  120. buffer_.Size(result.size());
  121. assert(result.size() == 0 || buffer_.BufferStart() == result.data());
  122. }
  123. return s;
  124. }
  125. const std::unique_ptr<RandomAccessFile> file_;
  126. const size_t alignment_;
  127. const size_t readahead_size_;
  128. mutable std::mutex lock_;
  129. // The buffer storing the prefetched data
  130. mutable AlignedBuffer buffer_;
  131. // The offset in file_, corresponding to data stored in buffer_
  132. mutable uint64_t buffer_offset_;
  133. };
  134. } // namespace
  135. std::unique_ptr<RandomAccessFile> NewReadaheadRandomAccessFile(
  136. std::unique_ptr<RandomAccessFile>&& file, size_t readahead_size) {
  137. std::unique_ptr<RandomAccessFile> result(
  138. new ReadaheadRandomAccessFile(std::move(file), readahead_size));
  139. return result;
  140. }
  141. } // namespace ROCKSDB_NAMESPACE