sst_file_reader.cc 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #include "rocksdb/sst_file_reader.h"
  6. #include "db/arena_wrapped_db_iter.h"
  7. #include "db/db_iter.h"
  8. #include "db/dbformat.h"
  9. #include "file/random_access_file_reader.h"
  10. #include "options/cf_options.h"
  11. #include "rocksdb/env.h"
  12. #include "rocksdb/file_system.h"
  13. #include "table/get_context.h"
  14. #include "table/table_builder.h"
  15. #include "table/table_iterator.h"
  16. #include "table/table_reader.h"
  17. namespace ROCKSDB_NAMESPACE {
  18. struct SstFileReader::Rep {
  19. Options options;
  20. EnvOptions soptions;
  21. ImmutableOptions ioptions;
  22. MutableCFOptions moptions;
  23. // Keep a member variable for this, since `NewIterator()` uses a const
  24. // reference of `ReadOptions`.
  25. ReadOptions roptions_for_table_iter;
  26. std::unique_ptr<TableReader> table_reader;
  27. Rep(const Options& opts)
  28. : options(opts),
  29. soptions(options),
  30. ioptions(options),
  31. moptions(ColumnFamilyOptions(options)) {
  32. roptions_for_table_iter =
  33. ReadOptions(/*_verify_checksums=*/true, /*_fill_cache=*/false);
  34. }
  35. };
  36. SstFileReader::SstFileReader(const Options& options) : rep_(new Rep(options)) {}
  37. SstFileReader::~SstFileReader() = default;
  38. Status SstFileReader::Open(const std::string& file_path) {
  39. auto r = rep_.get();
  40. Status s;
  41. uint64_t file_size = 0;
  42. std::unique_ptr<FSRandomAccessFile> file;
  43. std::unique_ptr<RandomAccessFileReader> file_reader;
  44. FileOptions fopts(r->soptions);
  45. const auto& fs = r->options.env->GetFileSystem();
  46. s = fs->GetFileSize(file_path, fopts.io_options, &file_size, nullptr);
  47. if (s.ok()) {
  48. s = fs->NewRandomAccessFile(file_path, fopts, &file, nullptr);
  49. }
  50. if (s.ok()) {
  51. file_reader.reset(new RandomAccessFileReader(std::move(file), file_path));
  52. }
  53. if (s.ok()) {
  54. TableReaderOptions t_opt(
  55. r->ioptions, r->moptions.prefix_extractor,
  56. r->moptions.compression_manager.get(), r->soptions,
  57. r->ioptions.internal_comparator,
  58. r->moptions.block_protection_bytes_per_key,
  59. /*skip_filters*/ false, /*immortal*/ false,
  60. /*force_direct_prefetch*/ false, /*level*/ -1,
  61. /*block_cache_tracer*/ nullptr,
  62. /*max_file_size_for_l0_meta_pin*/ 0, /*cur_db_session_id*/ "",
  63. /*cur_file_num*/ 0,
  64. /* unique_id */ {}, /* largest_seqno */ 0,
  65. /* tail_size */ 0, r->ioptions.persist_user_defined_timestamps);
  66. // Allow open file with global sequence number for backward compatibility.
  67. t_opt.largest_seqno = kMaxSequenceNumber;
  68. s = r->options.table_factory->NewTableReader(t_opt, std::move(file_reader),
  69. file_size, &r->table_reader);
  70. }
  71. return s;
  72. }
  73. std::vector<Status> SstFileReader::MultiGet(const ReadOptions& roptions,
  74. const std::vector<Slice>& keys,
  75. std::vector<std::string>* values) {
  76. const auto num_keys = keys.size();
  77. std::vector<Status> statuses(num_keys, Status::OK());
  78. std::vector<PinnableSlice> pin_values(num_keys);
  79. auto r = rep_.get();
  80. const Comparator* user_comparator =
  81. r->ioptions.internal_comparator.user_comparator();
  82. autovector<KeyContext, MultiGetContext::MAX_BATCH_SIZE> key_context;
  83. autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE> sorted_keys;
  84. autovector<GetContext, MultiGetContext::MAX_BATCH_SIZE> get_ctx;
  85. autovector<MergeContext, MultiGetContext::MAX_BATCH_SIZE> merge_ctx;
  86. sorted_keys.resize(num_keys);
  87. for (size_t i = 0; i < num_keys; ++i) {
  88. PinnableSlice* val = &pin_values[i];
  89. val->Reset();
  90. merge_ctx.emplace_back();
  91. key_context.emplace_back(nullptr, keys[i], val, nullptr,
  92. nullptr /* timestamp */, &statuses[i]);
  93. get_ctx.emplace_back(user_comparator, r->ioptions.merge_operator.get(),
  94. nullptr, nullptr, GetContext::kNotFound,
  95. *key_context[i].key, val, nullptr, nullptr, nullptr,
  96. &merge_ctx[i], true,
  97. &key_context[i].max_covering_tombstone_seq, nullptr);
  98. key_context[i].get_context = &get_ctx[i];
  99. }
  100. for (size_t i = 0; i < num_keys; ++i) {
  101. sorted_keys[i] = &key_context[i];
  102. }
  103. struct CompareKeyContext {
  104. explicit CompareKeyContext(const Comparator* comp) : comparator(comp) {}
  105. inline bool operator()(const KeyContext* lhs, const KeyContext* rhs) const {
  106. return comparator->CompareWithoutTimestamp(*(lhs->key), false,
  107. *(rhs->key), false) < 0;
  108. }
  109. const Comparator* comparator;
  110. };
  111. std::sort(sorted_keys.begin(), sorted_keys.end(),
  112. CompareKeyContext(user_comparator));
  113. const auto sequence = roptions.snapshot != nullptr
  114. ? roptions.snapshot->GetSequenceNumber()
  115. : kMaxSequenceNumber;
  116. MultiGetContext ctx(&sorted_keys, 0, num_keys, sequence, roptions,
  117. r->ioptions.fs.get(), nullptr);
  118. MultiGetRange range = ctx.GetMultiGetRange();
  119. r->table_reader->MultiGet(roptions, &range,
  120. r->moptions.prefix_extractor.get(),
  121. false /* skip filters */);
  122. values->resize(num_keys);
  123. for (size_t i = 0; i < num_keys; ++i) {
  124. if (statuses[i].ok()) {
  125. switch (get_ctx[i].State()) {
  126. case GetContext::kFound:
  127. (*values)[i].assign(pin_values[i].data(), pin_values[i].size());
  128. break;
  129. case GetContext::kNotFound:
  130. case GetContext::kDeleted:
  131. statuses[i] = Status::NotFound();
  132. break;
  133. case GetContext::kMerge:
  134. statuses[i] = Status::MergeInProgress();
  135. break;
  136. case GetContext::kCorrupt:
  137. case GetContext::kUnexpectedBlobIndex:
  138. case GetContext::kMergeOperatorFailed:
  139. statuses[i] = Status::Corruption();
  140. break;
  141. };
  142. }
  143. }
  144. return statuses;
  145. }
  146. Iterator* SstFileReader::NewIterator(const ReadOptions& roptions) {
  147. assert(roptions.io_activity == Env::IOActivity::kUnknown);
  148. auto r = rep_.get();
  149. auto sequence = roptions.snapshot != nullptr
  150. ? roptions.snapshot->GetSequenceNumber()
  151. : kMaxSequenceNumber;
  152. ArenaWrappedDBIter* res = new ArenaWrappedDBIter();
  153. res->Init(r->options.env, roptions, r->ioptions, r->moptions,
  154. nullptr /* version */, sequence, 0 /* version_number */,
  155. nullptr /* read_callback */, nullptr /* cfh */,
  156. true /* expose_blob_index */, false /* allow_refresh */,
  157. /*active_mem=*/nullptr);
  158. auto internal_iter = r->table_reader->NewIterator(
  159. res->GetReadOptions(), r->moptions.prefix_extractor.get(),
  160. res->GetArena(), false /* skip_filters */,
  161. TableReaderCaller::kSSTFileReader);
  162. res->SetIterUnderDBIter(internal_iter);
  163. return res;
  164. }
  165. std::unique_ptr<Iterator> SstFileReader::NewTableIterator() {
  166. auto r = rep_.get();
  167. InternalIterator* internal_iter = r->table_reader->NewIterator(
  168. r->roptions_for_table_iter, r->moptions.prefix_extractor.get(),
  169. /*arena*/ nullptr, false /* skip_filters */,
  170. TableReaderCaller::kSSTFileReader);
  171. assert(internal_iter);
  172. if (internal_iter == nullptr) {
  173. // Do not attempt to create a TableIterator if we cannot get a valid
  174. // InternalIterator.
  175. return nullptr;
  176. }
  177. return std::make_unique<TableIterator>(internal_iter);
  178. }
  179. std::shared_ptr<const TableProperties> SstFileReader::GetTableProperties()
  180. const {
  181. return rep_->table_reader->GetTableProperties();
  182. }
  183. Status SstFileReader::VerifyChecksum(const ReadOptions& read_options) {
  184. assert(read_options.io_activity == Env::IOActivity::kUnknown);
  185. return rep_->table_reader->VerifyChecksum(read_options,
  186. TableReaderCaller::kSSTFileReader);
  187. }
  188. Status SstFileReader::VerifyNumEntries(const ReadOptions& read_options) {
  189. Rep* r = rep_.get();
  190. std::unique_ptr<InternalIterator> internal_iter{r->table_reader->NewIterator(
  191. read_options, r->moptions.prefix_extractor.get(), nullptr,
  192. false /* skip_filters */, TableReaderCaller::kSSTFileReader)};
  193. internal_iter->SeekToFirst();
  194. Status s = internal_iter->status();
  195. if (!s.ok()) {
  196. return s;
  197. }
  198. uint64_t num_read = 0;
  199. for (; internal_iter->Valid(); internal_iter->Next()) {
  200. ++num_read;
  201. }
  202. s = internal_iter->status();
  203. if (!s.ok()) {
  204. return s;
  205. }
  206. std::shared_ptr<const TableProperties> tp = GetTableProperties();
  207. if (!tp) {
  208. s = Status::Corruption("table properties not available");
  209. } else {
  210. // TODO: verify num_range_deletions
  211. uint64_t expected = tp->num_entries - tp->num_range_deletions;
  212. if (num_read != expected) {
  213. std::ostringstream oss;
  214. oss << "Table property expects " << expected
  215. << " entries when excluding range deletions,"
  216. << " but scanning the table returned " << std::to_string(num_read)
  217. << " entries";
  218. s = Status::Corruption(oss.str());
  219. }
  220. }
  221. return s;
  222. }
  223. } // namespace ROCKSDB_NAMESPACE