sequence_file_reader.cc 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #include "file/sequence_file_reader.h"
  10. #include <algorithm>
  11. #include <mutex>
  12. #include "file/read_write_util.h"
  13. #include "monitoring/histogram.h"
  14. #include "monitoring/iostats_context_imp.h"
  15. #include "port/port.h"
  16. #include "rocksdb/file_system.h"
  17. #include "test_util/sync_point.h"
  18. #include "util/aligned_buffer.h"
  19. #include "util/random.h"
  20. #include "util/rate_limiter_impl.h"
  21. namespace ROCKSDB_NAMESPACE {
  22. IOStatus SequentialFileReader::Create(
  23. const std::shared_ptr<FileSystem>& fs, const std::string& fname,
  24. const FileOptions& file_opts, std::unique_ptr<SequentialFileReader>* reader,
  25. IODebugContext* dbg, RateLimiter* rate_limiter) {
  26. std::unique_ptr<FSSequentialFile> file;
  27. IOStatus io_s = fs->NewSequentialFile(fname, file_opts, &file, dbg);
  28. if (io_s.ok()) {
  29. reader->reset(new SequentialFileReader(std::move(file), fname, nullptr, {},
  30. rate_limiter));
  31. }
  32. return io_s;
  33. }
  34. IOStatus SequentialFileReader::Read(size_t n, Slice* result, char* scratch,
  35. Env::IOPriority rate_limiter_priority) {
  36. IOStatus io_s;
  37. IOOptions io_opts;
  38. io_opts.rate_limiter_priority = rate_limiter_priority;
  39. io_opts.verify_and_reconstruct_read = verify_and_reconstruct_read_;
  40. if (use_direct_io()) {
  41. //
  42. // |-offset_advance-|---bytes returned--|
  43. // |----------------------buf size-------------------------|
  44. // | | | |
  45. // aligned offset offset + n Roundup(offset + n,
  46. // offset alignment)
  47. //
  48. size_t offset = offset_.fetch_add(n);
  49. size_t alignment = file_->GetRequiredBufferAlignment();
  50. size_t aligned_offset = TruncateToPageBoundary(alignment, offset);
  51. size_t offset_advance = offset - aligned_offset;
  52. size_t size = Roundup(offset + n, alignment) - aligned_offset;
  53. size_t r = 0;
  54. AlignedBuffer buf;
  55. buf.Alignment(alignment);
  56. buf.AllocateNewBuffer(size);
  57. while (buf.CurrentSize() < size) {
  58. size_t allowed;
  59. if (rate_limiter_priority != Env::IO_TOTAL && rate_limiter_ != nullptr) {
  60. allowed = rate_limiter_->RequestToken(
  61. buf.Capacity() - buf.CurrentSize(), buf.Alignment(),
  62. rate_limiter_priority, nullptr /* stats */,
  63. RateLimiter::OpType::kRead);
  64. } else {
  65. assert(buf.CurrentSize() == 0);
  66. allowed = size;
  67. }
  68. Slice tmp;
  69. uint64_t orig_offset = 0;
  70. FileOperationInfo::StartTimePoint start_ts;
  71. if (ShouldNotifyListeners()) {
  72. orig_offset = aligned_offset + buf.CurrentSize();
  73. start_ts = FileOperationInfo::StartNow();
  74. }
  75. io_s = file_->PositionedRead(aligned_offset + buf.CurrentSize(), allowed,
  76. io_opts, &tmp, buf.Destination(),
  77. nullptr /* dbg */);
  78. if (ShouldNotifyListeners()) {
  79. auto finish_ts = FileOperationInfo::FinishNow();
  80. NotifyOnFileReadFinish(orig_offset, tmp.size(), start_ts, finish_ts,
  81. io_s);
  82. }
  83. buf.Size(buf.CurrentSize() + tmp.size());
  84. if (!io_s.ok() || tmp.size() < allowed) {
  85. break;
  86. }
  87. }
  88. if (io_s.ok() && offset_advance < buf.CurrentSize()) {
  89. r = buf.Read(scratch, offset_advance,
  90. std::min(buf.CurrentSize() - offset_advance, n));
  91. }
  92. *result = Slice(scratch, r);
  93. } else {
  94. // To be paranoid, modify scratch a little bit, so in case underlying
  95. // FileSystem doesn't fill the buffer but return success and `scratch`
  96. // returns contains a previous block, returned value will not pass
  97. // checksum.
  98. // It's hard to find useful byte for direct I/O case, so we skip it.
  99. if (n > 0 && scratch != nullptr) {
  100. scratch[0]++;
  101. }
  102. size_t read = 0;
  103. while (read < n) {
  104. size_t allowed;
  105. if (rate_limiter_priority != Env::IO_TOTAL && rate_limiter_ != nullptr) {
  106. allowed = rate_limiter_->RequestToken(
  107. n - read, 0 /* alignment */, rate_limiter_priority,
  108. nullptr /* stats */, RateLimiter::OpType::kRead);
  109. } else {
  110. allowed = n;
  111. }
  112. FileOperationInfo::StartTimePoint start_ts;
  113. if (ShouldNotifyListeners()) {
  114. start_ts = FileOperationInfo::StartNow();
  115. }
  116. Slice tmp;
  117. io_s = file_->Read(allowed, io_opts, &tmp, scratch + read,
  118. nullptr /* dbg */);
  119. if (ShouldNotifyListeners()) {
  120. auto finish_ts = FileOperationInfo::FinishNow();
  121. size_t offset = offset_.fetch_add(tmp.size());
  122. NotifyOnFileReadFinish(offset, tmp.size(), start_ts, finish_ts, io_s);
  123. }
  124. read += tmp.size();
  125. if (!io_s.ok() || tmp.size() < allowed) {
  126. break;
  127. }
  128. }
  129. *result = Slice(scratch, read);
  130. }
  131. IOSTATS_ADD(bytes_read, result->size());
  132. return io_s;
  133. }
  134. IOStatus SequentialFileReader::Skip(uint64_t n) {
  135. if (use_direct_io()) {
  136. offset_ += static_cast<size_t>(n);
  137. return IOStatus::OK();
  138. }
  139. return file_->Skip(n);
  140. }
  141. namespace {
  142. // This class wraps a SequentialFile, exposing same API, with the differenece
  143. // of being able to prefetch up to readahead_size bytes and then serve them
  144. // from memory, avoiding the entire round-trip if, for example, the data for the
  145. // file is actually remote.
  146. class ReadaheadSequentialFile : public FSSequentialFile {
  147. public:
  148. ReadaheadSequentialFile(std::unique_ptr<FSSequentialFile>&& file,
  149. size_t readahead_size)
  150. : file_(std::move(file)),
  151. alignment_(file_->GetRequiredBufferAlignment()),
  152. readahead_size_(Roundup(readahead_size, alignment_)),
  153. buffer_(),
  154. buffer_offset_(0),
  155. read_offset_(0) {
  156. buffer_.Alignment(alignment_);
  157. buffer_.AllocateNewBuffer(readahead_size_);
  158. }
  159. ReadaheadSequentialFile(const ReadaheadSequentialFile&) = delete;
  160. ReadaheadSequentialFile& operator=(const ReadaheadSequentialFile&) = delete;
  161. IOStatus Read(size_t n, const IOOptions& opts, Slice* result, char* scratch,
  162. IODebugContext* dbg) override {
  163. std::unique_lock<std::mutex> lk(lock_);
  164. size_t cached_len = 0;
  165. // Check if there is a cache hit, meaning that [offset, offset + n) is
  166. // either completely or partially in the buffer. If it's completely cached,
  167. // including end of file case when offset + n is greater than EOF, then
  168. // return.
  169. if (TryReadFromCache(n, &cached_len, scratch) &&
  170. (cached_len == n || buffer_.CurrentSize() < readahead_size_)) {
  171. // We read exactly what we needed, or we hit end of file - return.
  172. *result = Slice(scratch, cached_len);
  173. return IOStatus::OK();
  174. }
  175. n -= cached_len;
  176. IOStatus s;
  177. // Read-ahead only make sense if we have some slack left after reading
  178. if (n + alignment_ >= readahead_size_) {
  179. s = file_->Read(n, opts, result, scratch + cached_len, dbg);
  180. if (s.ok()) {
  181. read_offset_ += result->size();
  182. *result = Slice(scratch, cached_len + result->size());
  183. }
  184. buffer_.Clear();
  185. return s;
  186. }
  187. s = ReadIntoBuffer(readahead_size_, opts, dbg);
  188. if (s.ok()) {
  189. // The data we need is now in cache, so we can safely read it
  190. size_t remaining_len;
  191. TryReadFromCache(n, &remaining_len, scratch + cached_len);
  192. *result = Slice(scratch, cached_len + remaining_len);
  193. }
  194. return s;
  195. }
  196. IOStatus Skip(uint64_t n) override {
  197. std::unique_lock<std::mutex> lk(lock_);
  198. IOStatus s = IOStatus::OK();
  199. // First check if we need to skip already cached data
  200. if (buffer_.CurrentSize() > 0) {
  201. // Do we need to skip beyond cached data?
  202. if (read_offset_ + n >= buffer_offset_ + buffer_.CurrentSize()) {
  203. // Yes. Skip whaterver is in memory and adjust offset accordingly
  204. n -= buffer_offset_ + buffer_.CurrentSize() - read_offset_;
  205. read_offset_ = buffer_offset_ + buffer_.CurrentSize();
  206. } else {
  207. // No. The entire section to be skipped is entirely i cache.
  208. read_offset_ += n;
  209. n = 0;
  210. }
  211. }
  212. if (n > 0) {
  213. // We still need to skip more, so call the file API for skipping
  214. s = file_->Skip(n);
  215. if (s.ok()) {
  216. read_offset_ += n;
  217. }
  218. buffer_.Clear();
  219. }
  220. return s;
  221. }
  222. IOStatus PositionedRead(uint64_t offset, size_t n, const IOOptions& opts,
  223. Slice* result, char* scratch,
  224. IODebugContext* dbg) override {
  225. return file_->PositionedRead(offset, n, opts, result, scratch, dbg);
  226. }
  227. IOStatus InvalidateCache(size_t offset, size_t length) override {
  228. std::unique_lock<std::mutex> lk(lock_);
  229. buffer_.Clear();
  230. return file_->InvalidateCache(offset, length);
  231. }
  232. bool use_direct_io() const override { return file_->use_direct_io(); }
  233. private:
  234. // Tries to read from buffer_ n bytes. If anything was read from the cache, it
  235. // sets cached_len to the number of bytes actually read, copies these number
  236. // of bytes to scratch and returns true.
  237. // If nothing was read sets cached_len to 0 and returns false.
  238. bool TryReadFromCache(size_t n, size_t* cached_len, char* scratch) {
  239. if (read_offset_ < buffer_offset_ ||
  240. read_offset_ >= buffer_offset_ + buffer_.CurrentSize()) {
  241. *cached_len = 0;
  242. return false;
  243. }
  244. uint64_t offset_in_buffer = read_offset_ - buffer_offset_;
  245. *cached_len = std::min(
  246. buffer_.CurrentSize() - static_cast<size_t>(offset_in_buffer), n);
  247. memcpy(scratch, buffer_.BufferStart() + offset_in_buffer, *cached_len);
  248. read_offset_ += *cached_len;
  249. return true;
  250. }
  251. // Reads into buffer_ the next n bytes from file_.
  252. // Can actually read less if EOF was reached.
  253. // Returns the status of the read operastion on the file.
  254. IOStatus ReadIntoBuffer(size_t n, const IOOptions& opts,
  255. IODebugContext* dbg) {
  256. if (n > buffer_.Capacity()) {
  257. n = buffer_.Capacity();
  258. }
  259. assert(IsFileSectorAligned(n, alignment_));
  260. Slice result;
  261. IOStatus s = file_->Read(n, opts, &result, buffer_.BufferStart(), dbg);
  262. if (s.ok()) {
  263. buffer_offset_ = read_offset_;
  264. buffer_.Size(result.size());
  265. assert(result.size() == 0 || buffer_.BufferStart() == result.data());
  266. }
  267. return s;
  268. }
  269. const std::unique_ptr<FSSequentialFile> file_;
  270. const size_t alignment_;
  271. const size_t readahead_size_;
  272. std::mutex lock_;
  273. // The buffer storing the prefetched data
  274. AlignedBuffer buffer_;
  275. // The offset in file_, corresponding to data stored in buffer_
  276. uint64_t buffer_offset_;
  277. // The offset up to which data was read from file_. In fact, it can be larger
  278. // than the actual file size, since the file_->Skip(n) call doesn't return the
  279. // actual number of bytes that were skipped, which can be less than n.
  280. // This is not a problemm since read_offset_ is monotonically increasing and
  281. // its only use is to figure out if next piece of data should be read from
  282. // buffer_ or file_ directly.
  283. uint64_t read_offset_;
  284. };
  285. } // namespace
  286. std::unique_ptr<FSSequentialFile>
  287. SequentialFileReader::NewReadaheadSequentialFile(
  288. std::unique_ptr<FSSequentialFile>&& file, size_t readahead_size) {
  289. if (file->GetRequiredBufferAlignment() >= readahead_size) {
  290. // Short-circuit and return the original file if readahead_size is
  291. // too small and hence doesn't make sense to be used for prefetching.
  292. return std::move(file);
  293. }
  294. std::unique_ptr<FSSequentialFile> result(
  295. new ReadaheadSequentialFile(std::move(file), readahead_size));
  296. return result;
  297. }
  298. } // namespace ROCKSDB_NAMESPACE