block_fetcher.cc 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #include "table/block_fetcher.h"
  10. #include <cinttypes>
  11. #include <string>
  12. #include "logging/logging.h"
  13. #include "memory/memory_allocator.h"
  14. #include "monitoring/perf_context_imp.h"
  15. #include "rocksdb/env.h"
  16. #include "table/block_based/block.h"
  17. #include "table/block_based/block_based_table_reader.h"
  18. #include "table/format.h"
  19. #include "table/persistent_cache_helper.h"
  20. #include "util/coding.h"
  21. #include "util/compression.h"
  22. #include "util/crc32c.h"
  23. #include "util/stop_watch.h"
  24. #include "util/string_util.h"
  25. #include "util/xxhash.h"
  26. namespace ROCKSDB_NAMESPACE {
  27. inline void BlockFetcher::CheckBlockChecksum() {
  28. // Check the crc of the type and the block contents
  29. if (read_options_.verify_checksums) {
  30. const char* data = slice_.data(); // Pointer to where Read put the data
  31. PERF_TIMER_GUARD(block_checksum_time);
  32. uint32_t value = DecodeFixed32(data + block_size_ + 1);
  33. uint32_t actual = 0;
  34. switch (footer_.checksum()) {
  35. case kNoChecksum:
  36. break;
  37. case kCRC32c:
  38. value = crc32c::Unmask(value);
  39. actual = crc32c::Value(data, block_size_ + 1);
  40. break;
  41. case kxxHash:
  42. actual = XXH32(data, static_cast<int>(block_size_) + 1, 0);
  43. break;
  44. case kxxHash64:
  45. actual = static_cast<uint32_t>(
  46. XXH64(data, static_cast<int>(block_size_) + 1, 0) &
  47. uint64_t{0xffffffff});
  48. break;
  49. default:
  50. status_ = Status::Corruption(
  51. "unknown checksum type " + ToString(footer_.checksum()) + " in " +
  52. file_->file_name() + " offset " + ToString(handle_.offset()) +
  53. " size " + ToString(block_size_));
  54. }
  55. if (status_.ok() && actual != value) {
  56. status_ = Status::Corruption(
  57. "block checksum mismatch: expected " + ToString(actual) + ", got " +
  58. ToString(value) + " in " + file_->file_name() + " offset " +
  59. ToString(handle_.offset()) + " size " + ToString(block_size_));
  60. }
  61. }
  62. }
  63. inline bool BlockFetcher::TryGetUncompressBlockFromPersistentCache() {
  64. if (cache_options_.persistent_cache &&
  65. !cache_options_.persistent_cache->IsCompressed()) {
  66. Status status = PersistentCacheHelper::LookupUncompressedPage(
  67. cache_options_, handle_, contents_);
  68. if (status.ok()) {
  69. // uncompressed page is found for the block handle
  70. return true;
  71. } else {
  72. // uncompressed page is not found
  73. if (ioptions_.info_log && !status.IsNotFound()) {
  74. assert(!status.ok());
  75. ROCKS_LOG_INFO(ioptions_.info_log,
  76. "Error reading from persistent cache. %s",
  77. status.ToString().c_str());
  78. }
  79. }
  80. }
  81. return false;
  82. }
  83. inline bool BlockFetcher::TryGetFromPrefetchBuffer() {
  84. if (prefetch_buffer_ != nullptr &&
  85. prefetch_buffer_->TryReadFromCache(
  86. handle_.offset(),
  87. static_cast<size_t>(handle_.size()) + kBlockTrailerSize, &slice_,
  88. for_compaction_)) {
  89. block_size_ = static_cast<size_t>(handle_.size());
  90. CheckBlockChecksum();
  91. if (!status_.ok()) {
  92. return true;
  93. }
  94. got_from_prefetch_buffer_ = true;
  95. used_buf_ = const_cast<char*>(slice_.data());
  96. }
  97. return got_from_prefetch_buffer_;
  98. }
  99. inline bool BlockFetcher::TryGetCompressedBlockFromPersistentCache() {
  100. if (cache_options_.persistent_cache &&
  101. cache_options_.persistent_cache->IsCompressed()) {
  102. // lookup uncompressed cache mode p-cache
  103. std::unique_ptr<char[]> raw_data;
  104. status_ = PersistentCacheHelper::LookupRawPage(
  105. cache_options_, handle_, &raw_data, block_size_ + kBlockTrailerSize);
  106. if (status_.ok()) {
  107. heap_buf_ = CacheAllocationPtr(raw_data.release());
  108. used_buf_ = heap_buf_.get();
  109. slice_ = Slice(heap_buf_.get(), block_size_);
  110. return true;
  111. } else if (!status_.IsNotFound() && ioptions_.info_log) {
  112. assert(!status_.ok());
  113. ROCKS_LOG_INFO(ioptions_.info_log,
  114. "Error reading from persistent cache. %s",
  115. status_.ToString().c_str());
  116. }
  117. }
  118. return false;
  119. }
  120. inline void BlockFetcher::PrepareBufferForBlockFromFile() {
  121. // cache miss read from device
  122. if (do_uncompress_ &&
  123. block_size_ + kBlockTrailerSize < kDefaultStackBufferSize) {
  124. // If we've got a small enough hunk of data, read it in to the
  125. // trivially allocated stack buffer instead of needing a full malloc()
  126. used_buf_ = &stack_buf_[0];
  127. } else if (maybe_compressed_ && !do_uncompress_) {
  128. compressed_buf_ = AllocateBlock(block_size_ + kBlockTrailerSize,
  129. memory_allocator_compressed_);
  130. used_buf_ = compressed_buf_.get();
  131. } else {
  132. heap_buf_ =
  133. AllocateBlock(block_size_ + kBlockTrailerSize, memory_allocator_);
  134. used_buf_ = heap_buf_.get();
  135. }
  136. }
  137. inline void BlockFetcher::InsertCompressedBlockToPersistentCacheIfNeeded() {
  138. if (status_.ok() && read_options_.fill_cache &&
  139. cache_options_.persistent_cache &&
  140. cache_options_.persistent_cache->IsCompressed()) {
  141. // insert to raw cache
  142. PersistentCacheHelper::InsertRawPage(cache_options_, handle_, used_buf_,
  143. block_size_ + kBlockTrailerSize);
  144. }
  145. }
  146. inline void BlockFetcher::InsertUncompressedBlockToPersistentCacheIfNeeded() {
  147. if (status_.ok() && !got_from_prefetch_buffer_ && read_options_.fill_cache &&
  148. cache_options_.persistent_cache &&
  149. !cache_options_.persistent_cache->IsCompressed()) {
  150. // insert to uncompressed cache
  151. PersistentCacheHelper::InsertUncompressedPage(cache_options_, handle_,
  152. *contents_);
  153. }
  154. }
  155. inline void BlockFetcher::CopyBufferToHeap() {
  156. assert(used_buf_ != heap_buf_.get());
  157. heap_buf_ = AllocateBlock(block_size_ + kBlockTrailerSize, memory_allocator_);
  158. memcpy(heap_buf_.get(), used_buf_, block_size_ + kBlockTrailerSize);
  159. }
  160. inline void BlockFetcher::GetBlockContents() {
  161. if (slice_.data() != used_buf_) {
  162. // the slice content is not the buffer provided
  163. *contents_ = BlockContents(Slice(slice_.data(), block_size_));
  164. } else {
  165. // page can be either uncompressed or compressed, the buffer either stack
  166. // or heap provided. Refer to https://github.com/facebook/rocksdb/pull/4096
  167. if (got_from_prefetch_buffer_ || used_buf_ == &stack_buf_[0]) {
  168. CopyBufferToHeap();
  169. } else if (used_buf_ == compressed_buf_.get()) {
  170. if (compression_type_ == kNoCompression &&
  171. memory_allocator_ != memory_allocator_compressed_) {
  172. CopyBufferToHeap();
  173. } else {
  174. heap_buf_ = std::move(compressed_buf_);
  175. }
  176. }
  177. *contents_ = BlockContents(std::move(heap_buf_), block_size_);
  178. }
  179. #ifndef NDEBUG
  180. contents_->is_raw_block = true;
  181. #endif
  182. }
  183. Status BlockFetcher::ReadBlockContents() {
  184. block_size_ = static_cast<size_t>(handle_.size());
  185. if (TryGetUncompressBlockFromPersistentCache()) {
  186. compression_type_ = kNoCompression;
  187. #ifndef NDEBUG
  188. contents_->is_raw_block = true;
  189. #endif // NDEBUG
  190. return Status::OK();
  191. }
  192. if (TryGetFromPrefetchBuffer()) {
  193. if (!status_.ok()) {
  194. return status_;
  195. }
  196. } else if (!TryGetCompressedBlockFromPersistentCache()) {
  197. PrepareBufferForBlockFromFile();
  198. Status s;
  199. {
  200. PERF_TIMER_GUARD(block_read_time);
  201. // Actual file read
  202. status_ = file_->Read(handle_.offset(), block_size_ + kBlockTrailerSize,
  203. &slice_, used_buf_, for_compaction_);
  204. }
  205. PERF_COUNTER_ADD(block_read_count, 1);
  206. // TODO: introduce dedicated perf counter for range tombstones
  207. switch (block_type_) {
  208. case BlockType::kFilter:
  209. PERF_COUNTER_ADD(filter_block_read_count, 1);
  210. break;
  211. case BlockType::kCompressionDictionary:
  212. PERF_COUNTER_ADD(compression_dict_block_read_count, 1);
  213. break;
  214. case BlockType::kIndex:
  215. PERF_COUNTER_ADD(index_block_read_count, 1);
  216. break;
  217. // Nothing to do here as we don't have counters for the other types.
  218. default:
  219. break;
  220. }
  221. PERF_COUNTER_ADD(block_read_byte, block_size_ + kBlockTrailerSize);
  222. if (!status_.ok()) {
  223. return status_;
  224. }
  225. if (slice_.size() != block_size_ + kBlockTrailerSize) {
  226. return Status::Corruption("truncated block read from " +
  227. file_->file_name() + " offset " +
  228. ToString(handle_.offset()) + ", expected " +
  229. ToString(block_size_ + kBlockTrailerSize) +
  230. " bytes, got " + ToString(slice_.size()));
  231. }
  232. CheckBlockChecksum();
  233. if (status_.ok()) {
  234. InsertCompressedBlockToPersistentCacheIfNeeded();
  235. } else {
  236. return status_;
  237. }
  238. }
  239. PERF_TIMER_GUARD(block_decompress_time);
  240. compression_type_ = get_block_compression_type(slice_.data(), block_size_);
  241. if (do_uncompress_ && compression_type_ != kNoCompression) {
  242. // compressed page, uncompress, update cache
  243. UncompressionContext context(compression_type_);
  244. UncompressionInfo info(context, uncompression_dict_, compression_type_);
  245. status_ = UncompressBlockContents(info, slice_.data(), block_size_,
  246. contents_, footer_.version(), ioptions_,
  247. memory_allocator_);
  248. compression_type_ = kNoCompression;
  249. } else {
  250. GetBlockContents();
  251. }
  252. InsertUncompressedBlockToPersistentCacheIfNeeded();
  253. return status_;
  254. }
  255. } // namespace ROCKSDB_NAMESPACE