sequence_file_reader.cc 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #include "file/sequence_file_reader.h"
  10. #include <algorithm>
  11. #include <mutex>
  12. #include "file/read_write_util.h"
  13. #include "monitoring/histogram.h"
  14. #include "monitoring/iostats_context_imp.h"
  15. #include "port/port.h"
  16. #include "test_util/sync_point.h"
  17. #include "util/aligned_buffer.h"
  18. #include "util/random.h"
  19. #include "util/rate_limiter.h"
  20. namespace ROCKSDB_NAMESPACE {
  21. Status SequentialFileReader::Read(size_t n, Slice* result, char* scratch) {
  22. Status s;
  23. if (use_direct_io()) {
  24. #ifndef ROCKSDB_LITE
  25. size_t offset = offset_.fetch_add(n);
  26. size_t alignment = file_->GetRequiredBufferAlignment();
  27. size_t aligned_offset = TruncateToPageBoundary(alignment, offset);
  28. size_t offset_advance = offset - aligned_offset;
  29. size_t size = Roundup(offset + n, alignment) - aligned_offset;
  30. size_t r = 0;
  31. AlignedBuffer buf;
  32. buf.Alignment(alignment);
  33. buf.AllocateNewBuffer(size);
  34. Slice tmp;
  35. s = file_->PositionedRead(aligned_offset, size, IOOptions(), &tmp,
  36. buf.BufferStart(), nullptr);
  37. if (s.ok() && offset_advance < tmp.size()) {
  38. buf.Size(tmp.size());
  39. r = buf.Read(scratch, offset_advance,
  40. std::min(tmp.size() - offset_advance, n));
  41. }
  42. *result = Slice(scratch, r);
  43. #endif // !ROCKSDB_LITE
  44. } else {
  45. s = file_->Read(n, IOOptions(), result, scratch, nullptr);
  46. }
  47. IOSTATS_ADD(bytes_read, result->size());
  48. return s;
  49. }
  50. Status SequentialFileReader::Skip(uint64_t n) {
  51. #ifndef ROCKSDB_LITE
  52. if (use_direct_io()) {
  53. offset_ += static_cast<size_t>(n);
  54. return Status::OK();
  55. }
  56. #endif // !ROCKSDB_LITE
  57. return file_->Skip(n);
  58. }
  59. namespace {
  60. // This class wraps a SequentialFile, exposing same API, with the differenece
  61. // of being able to prefetch up to readahead_size bytes and then serve them
  62. // from memory, avoiding the entire round-trip if, for example, the data for the
  63. // file is actually remote.
  64. class ReadaheadSequentialFile : public FSSequentialFile {
  65. public:
  66. ReadaheadSequentialFile(std::unique_ptr<FSSequentialFile>&& file,
  67. size_t readahead_size)
  68. : file_(std::move(file)),
  69. alignment_(file_->GetRequiredBufferAlignment()),
  70. readahead_size_(Roundup(readahead_size, alignment_)),
  71. buffer_(),
  72. buffer_offset_(0),
  73. read_offset_(0) {
  74. buffer_.Alignment(alignment_);
  75. buffer_.AllocateNewBuffer(readahead_size_);
  76. }
  77. ReadaheadSequentialFile(const ReadaheadSequentialFile&) = delete;
  78. ReadaheadSequentialFile& operator=(const ReadaheadSequentialFile&) = delete;
  79. IOStatus Read(size_t n, const IOOptions& opts, Slice* result, char* scratch,
  80. IODebugContext* dbg) override {
  81. std::unique_lock<std::mutex> lk(lock_);
  82. size_t cached_len = 0;
  83. // Check if there is a cache hit, meaning that [offset, offset + n) is
  84. // either completely or partially in the buffer. If it's completely cached,
  85. // including end of file case when offset + n is greater than EOF, then
  86. // return.
  87. if (TryReadFromCache(n, &cached_len, scratch) &&
  88. (cached_len == n || buffer_.CurrentSize() < readahead_size_)) {
  89. // We read exactly what we needed, or we hit end of file - return.
  90. *result = Slice(scratch, cached_len);
  91. return IOStatus::OK();
  92. }
  93. n -= cached_len;
  94. IOStatus s;
  95. // Read-ahead only make sense if we have some slack left after reading
  96. if (n + alignment_ >= readahead_size_) {
  97. s = file_->Read(n, opts, result, scratch + cached_len, dbg);
  98. if (s.ok()) {
  99. read_offset_ += result->size();
  100. *result = Slice(scratch, cached_len + result->size());
  101. }
  102. buffer_.Clear();
  103. return s;
  104. }
  105. s = ReadIntoBuffer(readahead_size_, opts, dbg);
  106. if (s.ok()) {
  107. // The data we need is now in cache, so we can safely read it
  108. size_t remaining_len;
  109. TryReadFromCache(n, &remaining_len, scratch + cached_len);
  110. *result = Slice(scratch, cached_len + remaining_len);
  111. }
  112. return s;
  113. }
  114. IOStatus Skip(uint64_t n) override {
  115. std::unique_lock<std::mutex> lk(lock_);
  116. IOStatus s = IOStatus::OK();
  117. // First check if we need to skip already cached data
  118. if (buffer_.CurrentSize() > 0) {
  119. // Do we need to skip beyond cached data?
  120. if (read_offset_ + n >= buffer_offset_ + buffer_.CurrentSize()) {
  121. // Yes. Skip whaterver is in memory and adjust offset accordingly
  122. n -= buffer_offset_ + buffer_.CurrentSize() - read_offset_;
  123. read_offset_ = buffer_offset_ + buffer_.CurrentSize();
  124. } else {
  125. // No. The entire section to be skipped is entirely i cache.
  126. read_offset_ += n;
  127. n = 0;
  128. }
  129. }
  130. if (n > 0) {
  131. // We still need to skip more, so call the file API for skipping
  132. s = file_->Skip(n);
  133. if (s.ok()) {
  134. read_offset_ += n;
  135. }
  136. buffer_.Clear();
  137. }
  138. return s;
  139. }
  140. IOStatus PositionedRead(uint64_t offset, size_t n, const IOOptions& opts,
  141. Slice* result, char* scratch,
  142. IODebugContext* dbg) override {
  143. return file_->PositionedRead(offset, n, opts, result, scratch, dbg);
  144. }
  145. IOStatus InvalidateCache(size_t offset, size_t length) override {
  146. std::unique_lock<std::mutex> lk(lock_);
  147. buffer_.Clear();
  148. return file_->InvalidateCache(offset, length);
  149. }
  150. bool use_direct_io() const override { return file_->use_direct_io(); }
  151. private:
  152. // Tries to read from buffer_ n bytes. If anything was read from the cache, it
  153. // sets cached_len to the number of bytes actually read, copies these number
  154. // of bytes to scratch and returns true.
  155. // If nothing was read sets cached_len to 0 and returns false.
  156. bool TryReadFromCache(size_t n, size_t* cached_len, char* scratch) {
  157. if (read_offset_ < buffer_offset_ ||
  158. read_offset_ >= buffer_offset_ + buffer_.CurrentSize()) {
  159. *cached_len = 0;
  160. return false;
  161. }
  162. uint64_t offset_in_buffer = read_offset_ - buffer_offset_;
  163. *cached_len = std::min(
  164. buffer_.CurrentSize() - static_cast<size_t>(offset_in_buffer), n);
  165. memcpy(scratch, buffer_.BufferStart() + offset_in_buffer, *cached_len);
  166. read_offset_ += *cached_len;
  167. return true;
  168. }
  169. // Reads into buffer_ the next n bytes from file_.
  170. // Can actually read less if EOF was reached.
  171. // Returns the status of the read operastion on the file.
  172. IOStatus ReadIntoBuffer(size_t n, const IOOptions& opts,
  173. IODebugContext* dbg) {
  174. if (n > buffer_.Capacity()) {
  175. n = buffer_.Capacity();
  176. }
  177. assert(IsFileSectorAligned(n, alignment_));
  178. Slice result;
  179. IOStatus s = file_->Read(n, opts, &result, buffer_.BufferStart(), dbg);
  180. if (s.ok()) {
  181. buffer_offset_ = read_offset_;
  182. buffer_.Size(result.size());
  183. assert(result.size() == 0 || buffer_.BufferStart() == result.data());
  184. }
  185. return s;
  186. }
  187. const std::unique_ptr<FSSequentialFile> file_;
  188. const size_t alignment_;
  189. const size_t readahead_size_;
  190. std::mutex lock_;
  191. // The buffer storing the prefetched data
  192. AlignedBuffer buffer_;
  193. // The offset in file_, corresponding to data stored in buffer_
  194. uint64_t buffer_offset_;
  195. // The offset up to which data was read from file_. In fact, it can be larger
  196. // than the actual file size, since the file_->Skip(n) call doesn't return the
  197. // actual number of bytes that were skipped, which can be less than n.
  198. // This is not a problemm since read_offset_ is monotonically increasing and
  199. // its only use is to figure out if next piece of data should be read from
  200. // buffer_ or file_ directly.
  201. uint64_t read_offset_;
  202. };
  203. } // namespace
  204. std::unique_ptr<FSSequentialFile>
  205. SequentialFileReader::NewReadaheadSequentialFile(
  206. std::unique_ptr<FSSequentialFile>&& file, size_t readahead_size) {
  207. if (file->GetRequiredBufferAlignment() >= readahead_size) {
  208. // Short-circuit and return the original file if readahead_size is
  209. // too small and hence doesn't make sense to be used for prefetching.
  210. return std::move(file);
  211. }
  212. std::unique_ptr<FSSequentialFile> result(
  213. new ReadaheadSequentialFile(std::move(file), readahead_size));
  214. return result;
  215. }
  216. } // namespace ROCKSDB_NAMESPACE