partitioned_index_reader.cc 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #include "table/block_based/partitioned_index_reader.h"
  10. #include "block_cache.h"
  11. #include "file/random_access_file_reader.h"
  12. #include "table/block_based/block_based_table_reader.h"
  13. #include "table/block_based/partitioned_index_iterator.h"
  14. namespace ROCKSDB_NAMESPACE {
  15. Status PartitionIndexReader::Create(
  16. const BlockBasedTable* table, const ReadOptions& ro,
  17. FilePrefetchBuffer* prefetch_buffer, bool use_cache, bool prefetch,
  18. bool pin, BlockCacheLookupContext* lookup_context,
  19. std::unique_ptr<IndexReader>* index_reader) {
  20. assert(table != nullptr);
  21. assert(table->get_rep());
  22. assert(!pin || prefetch);
  23. assert(index_reader != nullptr);
  24. CachableEntry<Block> index_block;
  25. if (prefetch || !use_cache) {
  26. const Status s =
  27. ReadIndexBlock(table, prefetch_buffer, ro, use_cache,
  28. /*get_context=*/nullptr, lookup_context, &index_block);
  29. if (!s.ok()) {
  30. return s;
  31. }
  32. if (use_cache && !pin) {
  33. index_block.Reset();
  34. }
  35. }
  36. index_reader->reset(new PartitionIndexReader(table, std::move(index_block)));
  37. return Status::OK();
  38. }
  39. InternalIteratorBase<IndexValue>* PartitionIndexReader::NewIterator(
  40. const ReadOptions& read_options, bool /* disable_prefix_seek */,
  41. IndexBlockIter* iter, GetContext* get_context,
  42. BlockCacheLookupContext* lookup_context) {
  43. CachableEntry<Block> index_block;
  44. const Status s = GetOrReadIndexBlock(get_context, lookup_context,
  45. &index_block, read_options);
  46. if (!s.ok()) {
  47. if (iter != nullptr) {
  48. iter->Invalidate(s);
  49. return iter;
  50. }
  51. return NewErrorInternalIterator<IndexValue>(s);
  52. }
  53. const BlockBasedTable::Rep* rep = table()->rep_;
  54. InternalIteratorBase<IndexValue>* it = nullptr;
  55. Statistics* kNullStats = nullptr;
  56. // Filters are already checked before seeking the index
  57. if (!partition_map_.empty()) {
  58. // We don't return pinned data from index blocks, so no need
  59. // to set `block_contents_pinned`.
  60. it = NewTwoLevelIterator(
  61. new BlockBasedTable::PartitionedIndexIteratorState(table(),
  62. &partition_map_),
  63. index_block.GetValue()->NewIndexIterator(
  64. internal_comparator()->user_comparator(),
  65. rep->get_global_seqno(BlockType::kIndex), nullptr, kNullStats, true,
  66. index_has_first_key(), index_key_includes_seq(),
  67. index_value_is_full(), false /* block_contents_pinned */,
  68. user_defined_timestamps_persisted()));
  69. } else {
  70. ReadOptions ro{read_options};
  71. // FIXME? Possible regression seen in prefetch_test if this field is
  72. // propagated
  73. ro.readahead_size = ReadOptions{}.readahead_size;
  74. // We don't return pinned data from index blocks, so no need
  75. // to set `block_contents_pinned`.
  76. std::unique_ptr<InternalIteratorBase<IndexValue>> index_iter(
  77. index_block.GetValue()->NewIndexIterator(
  78. internal_comparator()->user_comparator(),
  79. rep->get_global_seqno(BlockType::kIndex), nullptr, kNullStats, true,
  80. index_has_first_key(), index_key_includes_seq(),
  81. index_value_is_full(), false /* block_contents_pinned */,
  82. user_defined_timestamps_persisted()));
  83. it = new PartitionedIndexIterator(
  84. table(), ro, *internal_comparator(), std::move(index_iter),
  85. lookup_context ? lookup_context->caller
  86. : TableReaderCaller::kUncategorized);
  87. }
  88. assert(it != nullptr);
  89. index_block.TransferTo(it);
  90. return it;
  91. // TODO(myabandeh): Update TwoLevelIterator to be able to make use of
  92. // on-stack BlockIter while the state is on heap. Currentlly it assumes
  93. // the first level iter is always on heap and will attempt to delete it
  94. // in its destructor.
  95. }
  96. Status PartitionIndexReader::CacheDependencies(
  97. const ReadOptions& ro, bool pin, FilePrefetchBuffer* tail_prefetch_buffer) {
  98. if (!partition_map_.empty()) {
  99. // The dependencies are already cached since `partition_map_` is filled in
  100. // an all-or-nothing manner.
  101. return Status::OK();
  102. }
  103. // Before read partitions, prefetch them to avoid lots of IOs
  104. BlockCacheLookupContext lookup_context{TableReaderCaller::kPrefetch};
  105. const BlockBasedTable::Rep* rep = table()->rep_;
  106. IndexBlockIter biter;
  107. BlockHandle handle;
  108. Statistics* kNullStats = nullptr;
  109. CachableEntry<Block> index_block;
  110. {
  111. Status s = GetOrReadIndexBlock(nullptr /* get_context */, &lookup_context,
  112. &index_block, ro);
  113. if (!s.ok()) {
  114. return s;
  115. }
  116. }
  117. // We don't return pinned data from index blocks, so no need
  118. // to set `block_contents_pinned`.
  119. index_block.GetValue()->NewIndexIterator(
  120. internal_comparator()->user_comparator(),
  121. rep->get_global_seqno(BlockType::kIndex), &biter, kNullStats, true,
  122. index_has_first_key(), index_key_includes_seq(), index_value_is_full(),
  123. false /* block_contents_pinned */, user_defined_timestamps_persisted());
  124. // Index partitions are assumed to be consecuitive. Prefetch them all.
  125. // Read the first block offset
  126. biter.SeekToFirst();
  127. if (!biter.Valid()) {
  128. // Empty index.
  129. return biter.status();
  130. }
  131. handle = biter.value().handle;
  132. uint64_t prefetch_off = handle.offset();
  133. // Read the last block's offset
  134. biter.SeekToLast();
  135. if (!biter.Valid()) {
  136. // Empty index.
  137. return biter.status();
  138. }
  139. handle = biter.value().handle;
  140. uint64_t last_off =
  141. handle.offset() + BlockBasedTable::BlockSizeWithTrailer(handle);
  142. uint64_t prefetch_len = last_off - prefetch_off;
  143. std::unique_ptr<FilePrefetchBuffer> prefetch_buffer;
  144. if (tail_prefetch_buffer == nullptr || !tail_prefetch_buffer->Enabled() ||
  145. tail_prefetch_buffer->GetPrefetchOffset() > prefetch_off) {
  146. rep->CreateFilePrefetchBuffer(ReadaheadParams(), &prefetch_buffer,
  147. /*readaheadsize_cb*/ nullptr,
  148. /*usage=*/FilePrefetchBufferUsage::kUnknown);
  149. IOOptions opts;
  150. {
  151. Status s = rep->file->PrepareIOOptions(ro, opts);
  152. if (s.ok()) {
  153. s = prefetch_buffer->Prefetch(opts, rep->file.get(), prefetch_off,
  154. static_cast<size_t>(prefetch_len));
  155. }
  156. if (!s.ok()) {
  157. return s;
  158. }
  159. }
  160. }
  161. // For saving "all or nothing" to partition_map_
  162. UnorderedMap<uint64_t, CachableEntry<Block>> map_in_progress;
  163. // After prefetch, read the partitions one by one
  164. biter.SeekToFirst();
  165. size_t partition_count = 0;
  166. for (; biter.Valid(); biter.Next()) {
  167. handle = biter.value().handle;
  168. CachableEntry<Block> block;
  169. ++partition_count;
  170. // TODO: Support counter batch update for partitioned index and
  171. // filter blocks
  172. Status s = table()->MaybeReadBlockAndLoadToCache(
  173. prefetch_buffer ? prefetch_buffer.get() : tail_prefetch_buffer, ro,
  174. handle, rep->decompressor.get(),
  175. /*for_compaction=*/false, &block.As<Block_kIndex>(),
  176. /*get_context=*/nullptr, &lookup_context, /*contents=*/nullptr,
  177. /*async_read=*/false, /*use_block_cache_for_lookup=*/true);
  178. if (!s.ok()) {
  179. return s;
  180. }
  181. if (block.GetValue() != nullptr) {
  182. // Might need to "pin" some mmap-read blocks (GetOwnValue) if some
  183. // partitions are successfully compressed (cached) and some are not
  184. // compressed (mmap eligible)
  185. if (block.IsCached() || block.GetOwnValue()) {
  186. if (pin) {
  187. map_in_progress[handle.offset()] = std::move(block);
  188. }
  189. }
  190. }
  191. }
  192. Status s = biter.status();
  193. // Save (pin) them only if everything checks out
  194. if (map_in_progress.size() == partition_count && s.ok()) {
  195. std::swap(partition_map_, map_in_progress);
  196. }
  197. return s;
  198. }
  199. void PartitionIndexReader::EraseFromCacheBeforeDestruction(
  200. uint32_t uncache_aggressiveness) {
  201. // NOTE: essentially a copy of
  202. // PartitionedFilterBlockReader::EraseFromCacheBeforeDestruction
  203. if (uncache_aggressiveness > 0) {
  204. CachableEntry<Block> top_level_block;
  205. ReadOptions ro_no_io;
  206. ro_no_io.read_tier = ReadTier::kBlockCacheTier;
  207. GetOrReadIndexBlock(/*get_context=*/nullptr,
  208. /*lookup_context=*/nullptr, &top_level_block, ro_no_io)
  209. .PermitUncheckedError();
  210. if (!partition_map_.empty()) {
  211. // All partitions present if any
  212. for (auto& e : partition_map_) {
  213. e.second.ResetEraseIfLastRef();
  214. }
  215. } else if (!top_level_block.IsEmpty()) {
  216. IndexBlockIter biter;
  217. const InternalKeyComparator* const comparator = internal_comparator();
  218. Statistics* kNullStats = nullptr;
  219. top_level_block.GetValue()->NewIndexIterator(
  220. comparator->user_comparator(),
  221. table()->get_rep()->get_global_seqno(BlockType::kIndex), &biter,
  222. kNullStats, true /* total_order_seek */, index_has_first_key(),
  223. index_key_includes_seq(), index_value_is_full(),
  224. false /* block_contents_pinned */,
  225. user_defined_timestamps_persisted());
  226. UncacheAggressivenessAdvisor advisor(uncache_aggressiveness);
  227. for (biter.SeekToFirst(); biter.Valid() && advisor.ShouldContinue();
  228. biter.Next()) {
  229. bool erased = table()->EraseFromCache(biter.value().handle);
  230. advisor.Report(erased);
  231. }
  232. biter.status().PermitUncheckedError();
  233. }
  234. top_level_block.ResetEraseIfLastRef();
  235. }
  236. // Might be needed to un-cache a pinned top-level block
  237. BlockBasedTable::IndexReaderCommon::EraseFromCacheBeforeDestruction(
  238. uncache_aggressiveness);
  239. }
  240. } // namespace ROCKSDB_NAMESPACE