block_fetcher.cc 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #include "table/block_fetcher.h"
  10. #include <cassert>
  11. #include <cinttypes>
  12. #include <string>
  13. #include "logging/logging.h"
  14. #include "memory/memory_allocator_impl.h"
  15. #include "monitoring/perf_context_imp.h"
  16. #include "rocksdb/compression_type.h"
  17. #include "rocksdb/env.h"
  18. #include "table/block_based/block.h"
  19. #include "table/block_based/block_based_table_reader.h"
  20. #include "table/block_based/block_type.h"
  21. #include "table/block_based/reader_common.h"
  22. #include "table/format.h"
  23. #include "table/persistent_cache_helper.h"
  24. #include "util/compression.h"
  25. #include "util/stop_watch.h"
  26. namespace ROCKSDB_NAMESPACE {
  27. inline void BlockFetcher::ProcessTrailerIfPresent() {
  28. if (footer_.GetBlockTrailerSize() > 0) {
  29. assert(footer_.GetBlockTrailerSize() == BlockBasedTable::kBlockTrailerSize);
  30. if (read_options_.verify_checksums) {
  31. io_status_ = status_to_io_status(
  32. VerifyBlockChecksum(footer_, slice_.data(), block_size_,
  33. file_->file_name(), handle_.offset()));
  34. RecordTick(ioptions_.stats, BLOCK_CHECKSUM_COMPUTE_COUNT);
  35. if (!io_status_.ok()) {
  36. assert(io_status_.IsCorruption());
  37. RecordTick(ioptions_.stats, BLOCK_CHECKSUM_MISMATCH_COUNT);
  38. }
  39. }
  40. compression_type() =
  41. BlockBasedTable::GetBlockCompressionType(slice_.data(), block_size_);
  42. } else {
  43. // E.g. plain table or cuckoo table
  44. compression_type() = kNoCompression;
  45. }
  46. }
  47. inline bool BlockFetcher::TryGetUncompressBlockFromPersistentCache() {
  48. if (cache_options_.persistent_cache &&
  49. !cache_options_.persistent_cache->IsCompressed()) {
  50. Status status = PersistentCacheHelper::LookupUncompressed(
  51. cache_options_, handle_, contents_);
  52. if (status.ok()) {
  53. // uncompressed page is found for the block handle
  54. return true;
  55. } else {
  56. // uncompressed page is not found
  57. if (ioptions_.logger && !status.IsNotFound()) {
  58. assert(!status.ok());
  59. ROCKS_LOG_INFO(ioptions_.logger,
  60. "Error reading from persistent cache. %s",
  61. status.ToString().c_str());
  62. }
  63. }
  64. }
  65. return false;
  66. }
  67. inline bool BlockFetcher::TryGetFromPrefetchBuffer() {
  68. if (prefetch_buffer_ != nullptr) {
  69. IOOptions opts;
  70. IODebugContext dbg;
  71. IOStatus io_s = file_->PrepareIOOptions(read_options_, opts, &dbg);
  72. if (io_s.ok()) {
  73. bool read_from_prefetch_buffer = prefetch_buffer_->TryReadFromCache(
  74. opts, file_, handle_.offset(), block_size_with_trailer_, &slice_,
  75. &io_s, for_compaction_);
  76. if (read_from_prefetch_buffer) {
  77. ProcessTrailerIfPresent();
  78. if (io_status_.ok()) {
  79. got_from_prefetch_buffer_ = true;
  80. used_buf_ = const_cast<char*>(slice_.data());
  81. } else if (io_status_.IsCorruption()) {
  82. // Returning true apparently indicates we either got some data from
  83. // the prefetch buffer, or we tried and encountered an error.
  84. return true;
  85. }
  86. }
  87. }
  88. if (!io_s.ok()) {
  89. io_status_ = io_s;
  90. return true;
  91. }
  92. }
  93. return got_from_prefetch_buffer_;
  94. }
  95. inline bool BlockFetcher::TryGetSerializedBlockFromPersistentCache() {
  96. if (cache_options_.persistent_cache &&
  97. cache_options_.persistent_cache->IsCompressed()) {
  98. std::unique_ptr<char[]> buf;
  99. io_status_ = status_to_io_status(PersistentCacheHelper::LookupSerialized(
  100. cache_options_, handle_, &buf, block_size_with_trailer_));
  101. if (io_status_.ok()) {
  102. heap_buf_ = CacheAllocationPtr(buf.release());
  103. used_buf_ = heap_buf_.get();
  104. slice_ = Slice(heap_buf_.get(), block_size_);
  105. ProcessTrailerIfPresent();
  106. return true;
  107. } else if (!io_status_.IsNotFound() && ioptions_.logger) {
  108. assert(!io_status_.ok());
  109. ROCKS_LOG_INFO(ioptions_.logger,
  110. "Error reading from persistent cache. %s",
  111. io_status_.ToString().c_str());
  112. }
  113. }
  114. return false;
  115. }
  116. inline void BlockFetcher::PrepareBufferForBlockFromFile() {
  117. // cache miss read from device
  118. if ((do_uncompress_ || ioptions_.allow_mmap_reads) &&
  119. block_size_with_trailer_ < kDefaultStackBufferSize) {
  120. // If we've got a small enough chunk of data, read it in to the
  121. // trivially allocated stack buffer instead of needing a full malloc()
  122. //
  123. // `GetBlockContents()` cannot return this data as its lifetime is tied to
  124. // this `BlockFetcher`'s lifetime. That is fine because this is only used
  125. // in cases where we do not expect the `GetBlockContents()` result to be the
  126. // same buffer we are assigning here. If we guess incorrectly, there will be
  127. // a heap allocation and memcpy in `GetBlockContents()` to obtain the final
  128. // result. Considering we are eliding a heap allocation here by using the
  129. // stack buffer, the cost of guessing incorrectly here is one extra memcpy.
  130. //
  131. // When `do_uncompress_` is true, we expect the uncompression step will
  132. // allocate heap memory for the final result. However this expectation will
  133. // be wrong if the block turns out to already be uncompressed, which we
  134. // won't know for sure until after reading it.
  135. //
  136. // When `ioptions_.allow_mmap_reads` is true, we do not expect the file
  137. // reader to use the scratch buffer at all, but instead return a pointer
  138. // into the mapped memory. This expectation will be wrong when using a
  139. // file reader that does not implement mmap reads properly.
  140. used_buf_ = &stack_buf_[0];
  141. } else if (maybe_compressed_ && !do_uncompress_) {
  142. compressed_buf_ =
  143. AllocateBlock(block_size_with_trailer_, memory_allocator_compressed_);
  144. used_buf_ = compressed_buf_.get();
  145. } else {
  146. heap_buf_ = AllocateBlock(block_size_with_trailer_, memory_allocator_);
  147. used_buf_ = heap_buf_.get();
  148. }
  149. }
  150. inline void BlockFetcher::InsertCompressedBlockToPersistentCacheIfNeeded() {
  151. if (io_status_.ok() && read_options_.fill_cache &&
  152. cache_options_.persistent_cache &&
  153. cache_options_.persistent_cache->IsCompressed()) {
  154. PersistentCacheHelper::InsertSerialized(cache_options_, handle_, used_buf_,
  155. block_size_with_trailer_);
  156. }
  157. }
  158. inline void BlockFetcher::InsertUncompressedBlockToPersistentCacheIfNeeded() {
  159. if (io_status_.ok() && !got_from_prefetch_buffer_ &&
  160. read_options_.fill_cache && cache_options_.persistent_cache &&
  161. !cache_options_.persistent_cache->IsCompressed()) {
  162. // insert to uncompressed cache
  163. PersistentCacheHelper::InsertUncompressed(cache_options_, handle_,
  164. *contents_);
  165. }
  166. }
  167. inline void BlockFetcher::CopyBufferToHeapBuf() {
  168. assert(used_buf_ != heap_buf_.get());
  169. heap_buf_ = AllocateBlock(block_size_with_trailer_, memory_allocator_);
  170. memcpy(heap_buf_.get(), used_buf_, block_size_with_trailer_);
  171. #ifndef NDEBUG
  172. num_heap_buf_memcpy_++;
  173. #endif
  174. }
  175. inline void BlockFetcher::CopyBufferToCompressedBuf() {
  176. assert(used_buf_ != compressed_buf_.get());
  177. compressed_buf_ =
  178. AllocateBlock(block_size_with_trailer_, memory_allocator_compressed_);
  179. memcpy(compressed_buf_.get(), used_buf_, block_size_with_trailer_);
  180. #ifndef NDEBUG
  181. num_compressed_buf_memcpy_++;
  182. #endif
  183. }
  184. // Before - Entering this method means the block is uncompressed or do not need
  185. // to be decompressed.
  186. //
  187. // The block can be in one of the following buffers:
  188. // 1. prefetch buffer if prefetch is enabled and the block is prefetched before
  189. // 2. stack_buf_ if block size is smaller than the stack_buf_ size and block
  190. // is not compressed
  191. // 3. heap_buf_ if the block is not compressed
  192. // 4. compressed_buf_ if the block is compressed
  193. // 5. direct_io_buf_ if direct IO is enabled or
  194. // 6. underlying file_system scratch is used (FSReadRequest.fs_scratch).
  195. //
  196. // After - After this method, if the block is compressed, it should be in
  197. // compressed_buf_ and heap_buf_ points to compressed_buf_, otherwise should be
  198. // in heap_buf_.
  199. inline void BlockFetcher::GetBlockContents() {
  200. if (slice_.data() != used_buf_) {
  201. // the slice content is not the buffer provided
  202. *contents_ = BlockContents(Slice(slice_.data(), block_size_));
  203. } else {
  204. // page can be either uncompressed or compressed, the buffer either stack
  205. // or heap provided. Refer to https://github.com/facebook/rocksdb/pull/4096
  206. if (got_from_prefetch_buffer_ || used_buf_ == &stack_buf_[0]) {
  207. CopyBufferToHeapBuf();
  208. } else if (used_buf_ == compressed_buf_.get()) {
  209. if (compression_type() == kNoCompression &&
  210. memory_allocator_ != memory_allocator_compressed_) {
  211. CopyBufferToHeapBuf();
  212. } else {
  213. heap_buf_ = std::move(compressed_buf_);
  214. }
  215. } else if (direct_io_buf_.get() != nullptr || use_fs_scratch_) {
  216. if (compression_type() == kNoCompression) {
  217. CopyBufferToHeapBuf();
  218. } else {
  219. CopyBufferToCompressedBuf();
  220. heap_buf_ = std::move(compressed_buf_);
  221. }
  222. }
  223. *contents_ = BlockContents(std::move(heap_buf_), block_size_);
  224. }
  225. #ifndef NDEBUG
  226. contents_->has_trailer = footer_.GetBlockTrailerSize() > 0;
  227. #endif
  228. }
  229. // Read a block from the file and verify its checksum. Upon return, io_status_
  230. // will be updated with the status of the read, and slice_ will be
  231. // updated with a pointer to the data.
  232. void BlockFetcher::ReadBlock(bool retry) {
  233. FSReadRequest read_req;
  234. IOOptions opts;
  235. IODebugContext dbg;
  236. io_status_ = file_->PrepareIOOptions(read_options_, opts, &dbg);
  237. opts.verify_and_reconstruct_read = retry;
  238. read_req.status.PermitUncheckedError();
  239. // Actual file read
  240. if (io_status_.ok()) {
  241. if (file_->use_direct_io()) {
  242. PERF_TIMER_GUARD(block_read_time);
  243. PERF_CPU_TIMER_GUARD(
  244. block_read_cpu_time,
  245. ioptions_.env ? ioptions_.env->GetSystemClock().get() : nullptr);
  246. io_status_ =
  247. file_->Read(opts, handle_.offset(), block_size_with_trailer_, &slice_,
  248. /*scratch=*/nullptr, &direct_io_buf_, &dbg);
  249. PERF_COUNTER_ADD(block_read_count, 1);
  250. used_buf_ = const_cast<char*>(slice_.data());
  251. } else if (use_fs_scratch_) {
  252. PERF_TIMER_GUARD(block_read_time);
  253. PERF_CPU_TIMER_GUARD(
  254. block_read_cpu_time,
  255. ioptions_.env ? ioptions_.env->GetSystemClock().get() : nullptr);
  256. read_req.offset = handle_.offset();
  257. read_req.len = block_size_with_trailer_;
  258. read_req.scratch = nullptr;
  259. io_status_ = file_->MultiRead(opts, &read_req, /*num_reqs=*/1,
  260. /*AlignedBuf* =*/nullptr, &dbg);
  261. PERF_COUNTER_ADD(block_read_count, 1);
  262. slice_ = Slice(read_req.result.data(), read_req.result.size());
  263. used_buf_ = const_cast<char*>(slice_.data());
  264. } else {
  265. // It allocates/assign used_buf_
  266. PrepareBufferForBlockFromFile();
  267. PERF_TIMER_GUARD(block_read_time);
  268. PERF_CPU_TIMER_GUARD(
  269. block_read_cpu_time,
  270. ioptions_.env ? ioptions_.env->GetSystemClock().get() : nullptr);
  271. io_status_ =
  272. file_->Read(opts, handle_.offset(), /*size*/ block_size_with_trailer_,
  273. /*result*/ &slice_, /*scratch*/ used_buf_,
  274. /*aligned_buf=*/nullptr, &dbg);
  275. PERF_COUNTER_ADD(block_read_count, 1);
  276. #ifndef NDEBUG
  277. if (slice_.data() == &stack_buf_[0]) {
  278. num_stack_buf_memcpy_++;
  279. } else if (slice_.data() == heap_buf_.get()) {
  280. num_heap_buf_memcpy_++;
  281. } else if (slice_.data() == compressed_buf_.get()) {
  282. num_compressed_buf_memcpy_++;
  283. }
  284. #endif
  285. }
  286. }
  287. // TODO: introduce dedicated perf counter for range tombstones
  288. switch (block_type_) {
  289. case BlockType::kFilter:
  290. case BlockType::kFilterPartitionIndex:
  291. PERF_COUNTER_ADD(filter_block_read_count, 1);
  292. break;
  293. case BlockType::kCompressionDictionary:
  294. PERF_COUNTER_ADD(compression_dict_block_read_count, 1);
  295. break;
  296. case BlockType::kIndex:
  297. PERF_COUNTER_ADD(index_block_read_count, 1);
  298. break;
  299. // Nothing to do here as we don't have counters for the other types.
  300. default:
  301. break;
  302. }
  303. PERF_COUNTER_ADD(block_read_byte, block_size_with_trailer_);
  304. IGNORE_STATUS_IF_ERROR(io_status_);
  305. if (io_status_.ok()) {
  306. if (use_fs_scratch_ && !read_req.status.ok()) {
  307. io_status_ = read_req.status;
  308. } else if (slice_.size() != block_size_with_trailer_) {
  309. io_status_ = IOStatus::Corruption(
  310. "truncated block read from " + file_->file_name() + " offset " +
  311. std::to_string(handle_.offset()) + ", expected " +
  312. std::to_string(block_size_with_trailer_) + " bytes, got " +
  313. std::to_string(slice_.size()));
  314. }
  315. }
  316. if (io_status_.ok()) {
  317. ProcessTrailerIfPresent();
  318. }
  319. if (retry) {
  320. RecordTick(ioptions_.stats, FILE_READ_CORRUPTION_RETRY_COUNT);
  321. }
  322. if (io_status_.ok()) {
  323. InsertCompressedBlockToPersistentCacheIfNeeded();
  324. fs_buf_ = std::move(read_req.fs_scratch);
  325. if (retry) {
  326. RecordTick(ioptions_.stats, FILE_READ_CORRUPTION_RETRY_SUCCESS_COUNT);
  327. }
  328. } else {
  329. ReleaseFileSystemProvidedBuffer(&read_req);
  330. direct_io_buf_.reset();
  331. compressed_buf_.reset();
  332. heap_buf_.reset();
  333. used_buf_ = nullptr;
  334. }
  335. }
  336. IOStatus BlockFetcher::ReadBlockContents() {
  337. if (TryGetUncompressBlockFromPersistentCache()) {
  338. compression_type() = kNoCompression;
  339. #ifndef NDEBUG
  340. contents_->has_trailer = footer_.GetBlockTrailerSize() > 0;
  341. #endif // NDEBUG
  342. return IOStatus::OK();
  343. }
  344. if (TryGetFromPrefetchBuffer()) {
  345. if (io_status_.IsCorruption() && retry_corrupt_read_) {
  346. ReadBlock(/*retry=*/true);
  347. }
  348. if (!io_status_.ok()) {
  349. assert(!fs_buf_);
  350. return io_status_;
  351. }
  352. } else if (!TryGetSerializedBlockFromPersistentCache()) {
  353. ReadBlock(/*retry =*/false);
  354. // If the file system supports retry after corruption, then try to
  355. // re-read the block and see if it succeeds.
  356. if (io_status_.IsCorruption() && retry_corrupt_read_) {
  357. assert(!fs_buf_);
  358. ReadBlock(/*retry=*/true);
  359. }
  360. if (!io_status_.ok()) {
  361. assert(!fs_buf_);
  362. return io_status_;
  363. }
  364. }
  365. if (do_uncompress_ && compression_type() != kNoCompression) {
  366. PERF_TIMER_GUARD(block_decompress_time);
  367. // Process the compressed block without trailer
  368. slice_.size_ = block_size_;
  369. decomp_args_.compressed_data = slice_;
  370. io_status_ = status_to_io_status(DecompressSerializedBlock(
  371. decomp_args_, *decompressor_, contents_, ioptions_, memory_allocator_));
  372. #ifndef NDEBUG
  373. num_heap_buf_memcpy_++;
  374. #endif
  375. } else {
  376. GetBlockContents();
  377. slice_ = Slice();
  378. }
  379. InsertUncompressedBlockToPersistentCacheIfNeeded();
  380. return io_status_;
  381. }
  382. IOStatus BlockFetcher::ReadAsyncBlockContents() {
  383. if (TryGetUncompressBlockFromPersistentCache()) {
  384. compression_type() = kNoCompression;
  385. #ifndef NDEBUG
  386. contents_->has_trailer = footer_.GetBlockTrailerSize() > 0;
  387. #endif // NDEBUG
  388. return IOStatus::OK();
  389. } else if (!TryGetSerializedBlockFromPersistentCache()) {
  390. assert(prefetch_buffer_ != nullptr);
  391. if (!for_compaction_) {
  392. IOOptions opts;
  393. IODebugContext dbg;
  394. IOStatus io_s = file_->PrepareIOOptions(read_options_, opts, &dbg);
  395. if (!io_s.ok()) {
  396. return io_s;
  397. }
  398. io_s = status_to_io_status(prefetch_buffer_->PrefetchAsync(
  399. opts, file_, handle_.offset(), block_size_with_trailer_, &slice_));
  400. if (io_s.IsTryAgain()) {
  401. return io_s;
  402. }
  403. if (io_s.ok()) {
  404. // Data Block is already in prefetch.
  405. got_from_prefetch_buffer_ = true;
  406. ProcessTrailerIfPresent();
  407. if (io_status_.IsCorruption() && retry_corrupt_read_) {
  408. got_from_prefetch_buffer_ = false;
  409. ReadBlock(/*retry = */ true);
  410. }
  411. if (!io_status_.ok()) {
  412. assert(!fs_buf_);
  413. return io_status_;
  414. }
  415. used_buf_ = const_cast<char*>(slice_.data());
  416. if (do_uncompress_ && compression_type() != kNoCompression) {
  417. PERF_TIMER_GUARD(block_decompress_time);
  418. // Process the compressed block without trailer
  419. slice_.size_ = block_size_;
  420. decomp_args_.compressed_data = slice_;
  421. io_status_ = status_to_io_status(
  422. DecompressSerializedBlock(decomp_args_, *decompressor_, contents_,
  423. ioptions_, memory_allocator_));
  424. #ifndef NDEBUG
  425. num_heap_buf_memcpy_++;
  426. #endif
  427. } else {
  428. GetBlockContents();
  429. }
  430. InsertUncompressedBlockToPersistentCacheIfNeeded();
  431. return io_status_;
  432. }
  433. }
  434. // Fallback to sequential reading of data blocks in case of io_s returns
  435. // error or for_compaction_is true.
  436. return ReadBlockContents();
  437. }
  438. return io_status_;
  439. }
  440. } // namespace ROCKSDB_NAMESPACE