external_table.cc 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487
  1. // Copyright (c) Meta Platforms, Inc. and affiliates.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #include "rocksdb/external_table.h"
  6. #include "logging/logging.h"
  7. #include "rocksdb/table.h"
  8. #include "table/block_based/block.h"
  9. #include "table/internal_iterator.h"
  10. #include "table/meta_blocks.h"
  11. #include "table/table_builder.h"
  12. #include "table/table_reader.h"
  13. namespace ROCKSDB_NAMESPACE {
  14. namespace {
  15. class ExternalTableIteratorAdapter : public InternalIterator {
  16. public:
  17. explicit ExternalTableIteratorAdapter(ExternalTableIterator* iterator)
  18. : iterator_(iterator), valid_(false) {}
  19. // No copying allowed
  20. ExternalTableIteratorAdapter(const ExternalTableIteratorAdapter&) = delete;
  21. ExternalTableIteratorAdapter& operator=(const ExternalTableIteratorAdapter&) =
  22. delete;
  23. ~ExternalTableIteratorAdapter() override {}
  24. bool Valid() const override { return valid_; }
  25. void SeekToFirst() override {
  26. status_ = Status::OK();
  27. if (iterator_) {
  28. iterator_->SeekToFirst();
  29. UpdateKey(OptSlice());
  30. }
  31. }
  32. void SeekToLast() override {
  33. status_ = Status::OK();
  34. if (iterator_) {
  35. iterator_->SeekToLast();
  36. UpdateKey(OptSlice());
  37. }
  38. }
  39. void Seek(const Slice& target) override {
  40. status_ = Status::OK();
  41. if (iterator_) {
  42. ParsedInternalKey pkey;
  43. status_ = ParseInternalKey(target, &pkey, /*log_err_key=*/false);
  44. if (status_.ok()) {
  45. iterator_->Seek(pkey.user_key);
  46. UpdateKey(OptSlice());
  47. }
  48. }
  49. }
  50. void SeekForPrev(const Slice& target) override {
  51. status_ = Status::OK();
  52. if (iterator_) {
  53. ParsedInternalKey pkey;
  54. status_ = ParseInternalKey(target, &pkey, /*log_err_key=*/false);
  55. if (status_.ok()) {
  56. iterator_->SeekForPrev(pkey.user_key);
  57. UpdateKey(OptSlice());
  58. }
  59. }
  60. }
  61. void Next() override {
  62. if (iterator_) {
  63. iterator_->Next();
  64. UpdateKey(OptSlice());
  65. }
  66. }
  67. bool NextAndGetResult(IterateResult* result) override {
  68. if (iterator_) {
  69. valid_ = iterator_->NextAndGetResult(&result_);
  70. result->value_prepared = result_.value_prepared;
  71. result->bound_check_result = result_.bound_check_result;
  72. if (valid_) {
  73. UpdateKey(result_.key);
  74. result->key = key();
  75. }
  76. } else {
  77. valid_ = false;
  78. }
  79. return valid_;
  80. }
  81. bool PrepareValue() override {
  82. if (iterator_ && !result_.value_prepared) {
  83. valid_ = iterator_->PrepareValue();
  84. result_.value_prepared = true;
  85. }
  86. return valid_;
  87. }
  88. IterBoundCheck UpperBoundCheckResult() override {
  89. if (iterator_) {
  90. result_.bound_check_result = iterator_->UpperBoundCheckResult();
  91. }
  92. return result_.bound_check_result;
  93. }
  94. void Prev() override {
  95. if (iterator_) {
  96. iterator_->Prev();
  97. UpdateKey(OptSlice());
  98. }
  99. }
  100. Slice key() const override {
  101. if (iterator_) {
  102. return Slice(*key_.const_rep());
  103. }
  104. return Slice();
  105. }
  106. Slice value() const override {
  107. if (iterator_) {
  108. return iterator_->value();
  109. }
  110. return Slice();
  111. }
  112. Status status() const override { return status_; }
  113. void Prepare(const MultiScanArgs* scan_opts) override {
  114. if (iterator_ && scan_opts) {
  115. iterator_->Prepare(scan_opts->GetScanRanges().data(), scan_opts->size());
  116. } else if (iterator_) {
  117. iterator_->Prepare(nullptr, 0);
  118. }
  119. }
  120. private:
  121. std::unique_ptr<ExternalTableIterator> iterator_;
  122. InternalKey key_;
  123. bool valid_;
  124. Status status_;
  125. IterateResult result_;
  126. void UpdateKey(OptSlice res) {
  127. if (iterator_) {
  128. valid_ = iterator_->Valid();
  129. status_ = iterator_->status();
  130. if (valid_ && status_.ok()) {
  131. key_.Set(res.has_value() ? res.value() : iterator_->key(), 0,
  132. ValueType::kTypeValue);
  133. }
  134. }
  135. }
  136. };
  137. class ExternalTableReaderAdapter : public TableReader {
  138. public:
  139. explicit ExternalTableReaderAdapter(
  140. const ImmutableOptions& ioptions,
  141. std::unique_ptr<ExternalTableReader>&& reader)
  142. : ioptions_(ioptions), reader_(std::move(reader)) {}
  143. ~ExternalTableReaderAdapter() override {}
  144. // No copying allowed
  145. ExternalTableReaderAdapter(const ExternalTableReaderAdapter&) = delete;
  146. ExternalTableReaderAdapter& operator=(const ExternalTableReaderAdapter&) =
  147. delete;
  148. InternalIterator* NewIterator(
  149. const ReadOptions& read_options, const SliceTransform* prefix_extractor,
  150. Arena* arena, bool /* skip_filters */, TableReaderCaller /* caller */,
  151. size_t /* compaction_readahead_size */ = 0,
  152. bool /* allow_unprepared_value */ = false) override {
  153. auto iterator = reader_->NewIterator(read_options, prefix_extractor);
  154. if (arena == nullptr) {
  155. return new ExternalTableIteratorAdapter(iterator);
  156. } else {
  157. auto* mem = arena->AllocateAligned(sizeof(ExternalTableIteratorAdapter));
  158. return new (mem) ExternalTableIteratorAdapter(iterator);
  159. }
  160. }
  161. uint64_t ApproximateOffsetOf(const ReadOptions&, const Slice&,
  162. TableReaderCaller) override {
  163. return 0;
  164. }
  165. uint64_t ApproximateSize(const ReadOptions&, const Slice&, const Slice&,
  166. TableReaderCaller) override {
  167. return 0;
  168. }
  169. void SetupForCompaction() override {}
  170. std::shared_ptr<const TableProperties> GetTableProperties() const override {
  171. std::shared_ptr<TableProperties> props;
  172. std::unique_ptr<char[]> property_block;
  173. uint64_t property_block_size = 0;
  174. uint64_t property_block_offset = 0;
  175. Status s;
  176. // Get the raw properties block from the external table reader. We don't
  177. // support writing the global sequence number, but we still get and return
  178. // the correct global seqno offset in the file to prevent accidental
  179. // corruption.
  180. s = reader_->GetPropertiesBlock(&property_block, &property_block_size,
  181. &property_block_offset);
  182. if (s.ok()) {
  183. std::unique_ptr<TableProperties> table_properties =
  184. std::make_unique<TableProperties>();
  185. BlockContents block_contents(std::move(property_block),
  186. property_block_size);
  187. Block block(std::move(block_contents));
  188. s = ParsePropertiesBlock(ioptions_, property_block_offset, block,
  189. table_properties);
  190. if (s.ok()) {
  191. props.reset(table_properties.release());
  192. }
  193. } else {
  194. // Fallback to getting a minimal table properties structure from the
  195. // external table reader
  196. props = std::make_shared<TableProperties>(*reader_->GetTableProperties());
  197. props->key_largest_seqno = 0;
  198. props->key_smallest_seqno = 0;
  199. }
  200. return props;
  201. }
  202. size_t ApproximateMemoryUsage() const override { return 0; }
  203. Status Get(const ReadOptions&, const Slice&, GetContext*,
  204. const SliceTransform*, bool = false) override {
  205. return Status::NotSupported(
  206. "Get() not supported on external file iterator");
  207. }
  208. virtual Status VerifyChecksum(const ReadOptions& /*ro*/,
  209. TableReaderCaller /*caller*/) override {
  210. return Status::OK();
  211. }
  212. private:
  213. const ImmutableOptions& ioptions_;
  214. std::unique_ptr<ExternalTableReader> reader_;
  215. };
  216. class ExternalTableBuilderAdapter : public TableBuilder {
  217. public:
  218. explicit ExternalTableBuilderAdapter(
  219. const TableBuilderOptions& topts,
  220. std::unique_ptr<ExternalTableBuilder>&& builder,
  221. std::unique_ptr<FSWritableFile>&& file)
  222. : builder_(std::move(builder)),
  223. file_(std::move(file)),
  224. ioptions_(topts.ioptions) {
  225. properties_.num_data_blocks = 1;
  226. properties_.index_size = 0;
  227. properties_.filter_size = 0;
  228. properties_.format_version = 0;
  229. properties_.key_largest_seqno = 0;
  230. properties_.key_smallest_seqno = 0;
  231. properties_.column_family_id = topts.column_family_id;
  232. properties_.column_family_name = topts.column_family_name;
  233. properties_.db_id = topts.db_id;
  234. properties_.db_session_id = topts.db_session_id;
  235. properties_.db_host_id = topts.ioptions.db_host_id;
  236. if (!ReifyDbHostIdProperty(topts.ioptions.env, &properties_.db_host_id)
  237. .ok()) {
  238. ROCKS_LOG_INFO(topts.ioptions.logger,
  239. "db_host_id property will not be set");
  240. }
  241. properties_.orig_file_number = topts.cur_file_num;
  242. properties_.comparator_name = topts.ioptions.user_comparator != nullptr
  243. ? topts.ioptions.user_comparator->Name()
  244. : "nullptr";
  245. properties_.prefix_extractor_name =
  246. topts.moptions.prefix_extractor != nullptr
  247. ? topts.moptions.prefix_extractor->AsString()
  248. : "nullptr";
  249. for (auto& factory : *topts.internal_tbl_prop_coll_factories) {
  250. assert(factory);
  251. std::unique_ptr<InternalTblPropColl> collector{
  252. factory->CreateInternalTblPropColl(topts.column_family_id,
  253. topts.level_at_creation,
  254. topts.ioptions.num_levels)};
  255. if (collector) {
  256. table_properties_collectors_.emplace_back(std::move(collector));
  257. }
  258. }
  259. }
  260. void Add(const Slice& key, const Slice& value) override {
  261. ParsedInternalKey pkey;
  262. status_ = ParseInternalKey(key, &pkey, /*log_err_key=*/false);
  263. if (status_.ok()) {
  264. if (pkey.type != ValueType::kTypeValue) {
  265. status_ = Status::NotSupported(
  266. "Value type " + std::to_string(pkey.type) + "not supported");
  267. } else {
  268. builder_->Add(pkey.user_key, value);
  269. properties_.num_entries++;
  270. properties_.raw_key_size += key.size();
  271. properties_.raw_value_size += value.size();
  272. NotifyCollectTableCollectorsOnAdd(key, value, /*file_size=*/0,
  273. table_properties_collectors_,
  274. ioptions_.logger);
  275. }
  276. }
  277. }
  278. Status status() const override {
  279. if (status_.ok()) {
  280. return builder_->status();
  281. } else {
  282. return status_;
  283. }
  284. }
  285. IOStatus io_status() const override { return status_to_io_status(status()); }
  286. Status Finish() override {
  287. // Approximate the data size
  288. properties_.data_size =
  289. properties_.raw_key_size + properties_.raw_value_size;
  290. PropertyBlockBuilder property_block_builder;
  291. property_block_builder.AddTableProperty(properties_);
  292. UserCollectedProperties more_user_collected_properties;
  293. NotifyCollectTableCollectorsOnFinish(
  294. table_properties_collectors_, ioptions_.logger, &property_block_builder,
  295. more_user_collected_properties, properties_.readable_properties);
  296. properties_.user_collected_properties.insert(
  297. more_user_collected_properties.begin(),
  298. more_user_collected_properties.end());
  299. Slice prop_block = property_block_builder.Finish();
  300. Status s = builder_->PutPropertiesBlock(prop_block);
  301. if (s.ok() || s.IsNotSupported()) {
  302. // If the builder doesn't support writing the properties block,
  303. // we still call Finish() and let the external builder handle it.
  304. s = builder_->Finish();
  305. }
  306. return s;
  307. }
  308. void Abandon() override { builder_->Abandon(); }
  309. uint64_t FileSize() const override { return builder_->FileSize(); }
  310. uint64_t NumEntries() const override { return properties_.num_entries; }
  311. TableProperties GetTableProperties() const override {
  312. return builder_->GetTableProperties();
  313. }
  314. std::string GetFileChecksum() const override {
  315. return builder_->GetFileChecksum();
  316. }
  317. const char* GetFileChecksumFuncName() const override {
  318. return builder_->GetFileChecksumFuncName();
  319. }
  320. private:
  321. Status status_;
  322. std::unique_ptr<ExternalTableBuilder> builder_;
  323. std::unique_ptr<FSWritableFile> file_;
  324. const ImmutableOptions& ioptions_;
  325. TableProperties properties_;
  326. std::vector<std::unique_ptr<InternalTblPropColl>>
  327. table_properties_collectors_;
  328. };
  329. class ExternalTableFactoryAdapter : public TableFactory {
  330. public:
  331. explicit ExternalTableFactoryAdapter(
  332. std::shared_ptr<ExternalTableFactory> inner)
  333. : inner_(std::move(inner)) {}
  334. const char* Name() const override { return inner_->Name(); }
  335. using TableFactory::NewTableReader;
  336. Status NewTableReader(
  337. const ReadOptions& ro, const TableReaderOptions& topts,
  338. std::unique_ptr<RandomAccessFileReader>&& file, uint64_t /* file_size */,
  339. std::unique_ptr<TableReader>* table_reader,
  340. bool /* prefetch_index_and_filter_in_cache */) const override {
  341. // SstFileReader specifies largest_seqno as kMaxSequenceNumber to denote
  342. // that its unknown
  343. if (topts.largest_seqno > 0 && topts.largest_seqno != kMaxSequenceNumber) {
  344. return Status::NotSupported(
  345. "Ingesting file with sequence number larger than 0");
  346. }
  347. std::unique_ptr<ExternalTableReader> reader;
  348. FileOptions fopts(topts.env_options);
  349. ExternalTableOptions ext_topts(topts.prefix_extractor,
  350. topts.ioptions.user_comparator,
  351. topts.ioptions.fs, fopts);
  352. auto status =
  353. inner_->NewTableReader(ro, file->file_name(), ext_topts, &reader);
  354. if (!status.ok()) {
  355. return status;
  356. }
  357. table_reader->reset(
  358. new ExternalTableReaderAdapter(topts.ioptions, std::move(reader)));
  359. file.reset();
  360. return Status::OK();
  361. }
  362. using TableFactory::NewTableBuilder;
  363. TableBuilder* NewTableBuilder(const TableBuilderOptions& topts,
  364. WritableFileWriter* file) const override {
  365. std::unique_ptr<ExternalTableBuilder> builder;
  366. ExternalTableBuilderOptions ext_topts(
  367. topts.read_options, topts.write_options,
  368. topts.moptions.prefix_extractor, topts.ioptions.user_comparator,
  369. topts.column_family_name, topts.reason);
  370. auto file_wrapper =
  371. std::make_unique<ExternalTableWritableFileWrapper>(file);
  372. builder.reset(inner_->NewTableBuilder(ext_topts, file->file_name(),
  373. file_wrapper.get()));
  374. if (builder) {
  375. return new ExternalTableBuilderAdapter(topts, std::move(builder),
  376. std::move(file_wrapper));
  377. }
  378. return nullptr;
  379. }
  380. std::unique_ptr<TableFactory> Clone() const override { return nullptr; }
  381. private:
  382. // An FSWritableFile subclass for wrapping a WritableFileWriter. The
  383. // latter is private to RocksDB, so we wrap it here in order to pass it
  384. // to the ExternalTableBuilder. This is necessary for WritableFileWriter
  385. // to intercept Append so that it can calculate the file checksum.
  386. class ExternalTableWritableFileWrapper : public FSWritableFile {
  387. public:
  388. explicit ExternalTableWritableFileWrapper(WritableFileWriter* writer)
  389. : writer_(writer) {}
  390. using FSWritableFile::Append;
  391. IOStatus Append(const Slice& data, const IOOptions& options,
  392. IODebugContext* /*dbg*/) override {
  393. return writer_->Append(options, data);
  394. }
  395. IOStatus Close(const IOOptions& options, IODebugContext* /*dbg*/) override {
  396. return writer_->Close(options);
  397. }
  398. IOStatus Flush(const IOOptions& options, IODebugContext* /*dbg*/) override {
  399. return writer_->Flush(options);
  400. }
  401. IOStatus Sync(const IOOptions& options, IODebugContext* /*dbg*/) override {
  402. return writer_->Sync(options, /*use_fsync=*/false);
  403. }
  404. uint64_t GetFileSize(const IOOptions& options,
  405. IODebugContext* dbg) override {
  406. return writer_->writable_file()->GetFileSize(options, dbg);
  407. }
  408. private:
  409. WritableFileWriter* writer_;
  410. };
  411. std::shared_ptr<ExternalTableFactory> inner_;
  412. };
  413. } // namespace
  414. std::unique_ptr<TableFactory> NewExternalTableFactory(
  415. std::shared_ptr<ExternalTableFactory> inner_factory) {
  416. std::unique_ptr<TableFactory> res;
  417. res = std::make_unique<ExternalTableFactoryAdapter>(std::move(inner_factory));
  418. return res;
  419. }
  420. } // namespace ROCKSDB_NAMESPACE