block_fetcher.h 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #pragma once
  10. #include "file/file_util.h"
  11. #include "memory/memory_allocator_impl.h"
  12. #include "table/block_based/block.h"
  13. #include "table/block_based/block_type.h"
  14. #include "table/format.h"
  15. #include "table/persistent_cache_options.h"
  16. #include "util/cast_util.h"
  17. namespace ROCKSDB_NAMESPACE {
  18. // Retrieves a single block of a given file. Utilizes the prefetch buffer and/or
  19. // persistent cache provided (if any) to try to avoid reading from the file
  20. // directly. Note that both the prefetch buffer and the persistent cache are
  21. // optional; also, note that the persistent cache may be configured to store
  22. // either compressed or uncompressed blocks.
  23. //
  24. // If the retrieved block is compressed and the do_uncompress flag is set,
  25. // BlockFetcher uncompresses the block (using the uncompression dictionary,
  26. // if provided, to prime the compression algorithm), and returns the resulting
  27. // uncompressed block data. Otherwise, it returns the original block.
  28. //
  29. // Two read options affect the behavior of BlockFetcher: if verify_checksums is
  30. // true, the checksum of the (original) block is checked; if fill_cache is true,
  31. // the block is added to the persistent cache if needed.
  32. //
  33. // Memory for uncompressed and compressed blocks is allocated as needed
  34. // using memory_allocator and memory_allocator_compressed, respectively
  35. // (if provided; otherwise, the default allocator is used).
  36. class BlockFetcher {
  37. public:
  38. BlockFetcher(RandomAccessFileReader* file,
  39. FilePrefetchBuffer* prefetch_buffer,
  40. const Footer& footer /* ref retained */,
  41. const ReadOptions& read_options,
  42. const BlockHandle& handle /* ref retained */,
  43. BlockContents* contents,
  44. const ImmutableOptions& ioptions /* ref retained */,
  45. bool do_uncompress, bool maybe_compressed, BlockType block_type,
  46. UnownedPtr<Decompressor> decompressor,
  47. const PersistentCacheOptions& cache_options /* ref retained */,
  48. MemoryAllocator* memory_allocator = nullptr,
  49. MemoryAllocator* memory_allocator_compressed = nullptr,
  50. bool for_compaction = false)
  51. : file_(file),
  52. prefetch_buffer_(prefetch_buffer),
  53. footer_(footer),
  54. read_options_(read_options),
  55. handle_(handle),
  56. contents_(contents),
  57. ioptions_(ioptions),
  58. do_uncompress_(do_uncompress),
  59. maybe_compressed_(maybe_compressed),
  60. block_type_(block_type),
  61. block_size_(static_cast<size_t>(handle_.size())),
  62. block_size_with_trailer_(block_size_ + footer.GetBlockTrailerSize()),
  63. decompressor_(decompressor),
  64. cache_options_(cache_options),
  65. memory_allocator_(memory_allocator),
  66. memory_allocator_compressed_(memory_allocator_compressed),
  67. for_compaction_(for_compaction) {
  68. io_status_.PermitUncheckedError(); // TODO(AR) can we improve on this?
  69. if (CheckFSFeatureSupport(ioptions_.fs.get(), FSSupportedOps::kFSBuffer)) {
  70. use_fs_scratch_ = true;
  71. }
  72. if (CheckFSFeatureSupport(ioptions_.fs.get(),
  73. FSSupportedOps::kVerifyAndReconstructRead)) {
  74. retry_corrupt_read_ = true;
  75. }
  76. }
  77. IOStatus ReadBlockContents();
  78. IOStatus ReadAsyncBlockContents();
  79. inline CompressionType compression_type() const {
  80. return decomp_args_.compression_type;
  81. }
  82. inline CompressionType& compression_type() {
  83. return decomp_args_.compression_type;
  84. }
  85. inline size_t GetBlockSizeWithTrailer() const {
  86. return block_size_with_trailer_;
  87. }
  88. inline Slice& GetCompressedBlock() {
  89. assert(compression_type() != kNoCompression);
  90. return slice_;
  91. }
  92. #ifndef NDEBUG
  93. int TEST_GetNumStackBufMemcpy() const { return num_stack_buf_memcpy_; }
  94. int TEST_GetNumHeapBufMemcpy() const { return num_heap_buf_memcpy_; }
  95. int TEST_GetNumCompressedBufMemcpy() const {
  96. return num_compressed_buf_memcpy_;
  97. }
  98. #endif
  99. private:
  100. #ifndef NDEBUG
  101. int num_stack_buf_memcpy_ = 0;
  102. int num_heap_buf_memcpy_ = 0;
  103. int num_compressed_buf_memcpy_ = 0;
  104. #endif
  105. static const uint32_t kDefaultStackBufferSize = 5000;
  106. RandomAccessFileReader* file_;
  107. FilePrefetchBuffer* prefetch_buffer_;
  108. const Footer& footer_;
  109. const ReadOptions read_options_;
  110. const BlockHandle& handle_;
  111. BlockContents* contents_;
  112. const ImmutableOptions& ioptions_;
  113. const bool do_uncompress_;
  114. const bool maybe_compressed_;
  115. const BlockType block_type_;
  116. const size_t block_size_;
  117. const size_t block_size_with_trailer_;
  118. UnownedPtr<Decompressor> decompressor_;
  119. const PersistentCacheOptions& cache_options_;
  120. MemoryAllocator* memory_allocator_;
  121. MemoryAllocator* memory_allocator_compressed_;
  122. IOStatus io_status_;
  123. Slice slice_;
  124. char* used_buf_ = nullptr;
  125. AlignedBuf direct_io_buf_;
  126. CacheAllocationPtr heap_buf_;
  127. CacheAllocationPtr compressed_buf_;
  128. char stack_buf_[kDefaultStackBufferSize];
  129. bool got_from_prefetch_buffer_ = false;
  130. bool for_compaction_ = false;
  131. bool use_fs_scratch_ = false;
  132. bool retry_corrupt_read_ = false;
  133. FSAllocationPtr fs_buf_;
  134. Decompressor::Args decomp_args_;
  135. // return true if found
  136. bool TryGetUncompressBlockFromPersistentCache();
  137. // return true if found
  138. bool TryGetFromPrefetchBuffer();
  139. bool TryGetSerializedBlockFromPersistentCache();
  140. void PrepareBufferForBlockFromFile();
  141. // Copy content from used_buf_ to new heap_buf_.
  142. void CopyBufferToHeapBuf();
  143. // Copy content from used_buf_ to new compressed_buf_.
  144. void CopyBufferToCompressedBuf();
  145. void GetBlockContents();
  146. void InsertCompressedBlockToPersistentCacheIfNeeded();
  147. void InsertUncompressedBlockToPersistentCacheIfNeeded();
  148. void ProcessTrailerIfPresent();
  149. void ReadBlock(bool retry);
  150. void ReleaseFileSystemProvidedBuffer(FSReadRequest* read_req) {
  151. if (use_fs_scratch_) {
  152. // Free the scratch buffer allocated by FileSystem.
  153. if (read_req->fs_scratch != nullptr) {
  154. read_req->fs_scratch.reset();
  155. read_req->fs_scratch = nullptr;
  156. }
  157. }
  158. }
  159. };
  160. } // namespace ROCKSDB_NAMESPACE