external_sst_file_ingestion_job.h 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #pragma once
  6. #include <string>
  7. #include <unordered_set>
  8. #include <vector>
  9. #include "db/column_family.h"
  10. #include "db/internal_stats.h"
  11. #include "db/snapshot_impl.h"
  12. #include "db/version_edit.h"
  13. #include "env/file_system_tracer.h"
  14. #include "logging/event_logger.h"
  15. #include "options/db_options.h"
  16. #include "rocksdb/db.h"
  17. #include "rocksdb/file_system.h"
  18. #include "rocksdb/sst_file_writer.h"
  19. #include "util/autovector.h"
  20. namespace ROCKSDB_NAMESPACE {
  21. class Directories;
  22. class SystemClock;
  23. struct KeyRangeInfo {
  24. // Smallest internal key in an external file or for a batch of external files.
  25. // unset() could be either invalid or "before all keys"
  26. InternalKey smallest_internal_key;
  27. // Largest internal key in an external file or for a batch of external files.
  28. // unset() could be either invalid or "after all keys"
  29. InternalKey largest_internal_key;
  30. bool unset() const {
  31. // Legal internal keys are at least 8 bytes.
  32. return smallest_internal_key.unset() || largest_internal_key.unset();
  33. }
  34. };
  35. // Helper class to apply SST file key range checks to the external files.
  36. // XXX: using sstableKeyCompare with user comparator on internal keys is
  37. // very broken
  38. class ExternalFileRangeChecker {
  39. public:
  40. explicit ExternalFileRangeChecker(const Comparator* ucmp) : ucmp_(ucmp) {}
  41. // Operator used for sorting ranges.
  42. bool operator()(const KeyRangeInfo* range1,
  43. const KeyRangeInfo* range2) const {
  44. assert(range1);
  45. assert(range2);
  46. assert(!range1->unset());
  47. assert(!range2->unset());
  48. return sstableKeyCompare(ucmp_, range1->smallest_internal_key,
  49. range2->smallest_internal_key) < 0;
  50. }
  51. bool Overlaps(const KeyRangeInfo& range1, const KeyRangeInfo& range2,
  52. bool known_sorted = false) const {
  53. return Overlaps(range1, range2.smallest_internal_key,
  54. range2.largest_internal_key, known_sorted);
  55. }
  56. bool Overlaps(const KeyRangeInfo& range1, const InternalKey& range2_smallest,
  57. const InternalKey& range2_largest,
  58. bool known_sorted = false) const {
  59. bool any_unset =
  60. range1.unset() || range2_smallest.unset() || range2_largest.unset();
  61. if (any_unset) {
  62. assert(!any_unset);
  63. return false;
  64. }
  65. if (known_sorted) {
  66. return sstableKeyCompare(ucmp_, range1.largest_internal_key,
  67. range2_smallest) >= 0;
  68. }
  69. return sstableKeyCompare(ucmp_, range1.largest_internal_key,
  70. range2_smallest) >= 0 &&
  71. sstableKeyCompare(ucmp_, range1.smallest_internal_key,
  72. range2_largest) <= 0;
  73. }
  74. bool Contains(const KeyRangeInfo& range1, const KeyRangeInfo& range2) {
  75. return Contains(range1, range2.smallest_internal_key,
  76. range2.largest_internal_key);
  77. }
  78. bool Contains(const KeyRangeInfo& range1, const InternalKey& range2_smallest,
  79. const InternalKey& range2_largest) {
  80. bool any_unset =
  81. range1.unset() || range2_smallest.unset() || range2_largest.unset();
  82. if (any_unset) {
  83. assert(!any_unset);
  84. return false;
  85. }
  86. return sstableKeyCompare(ucmp_, range1.smallest_internal_key,
  87. range2_smallest) <= 0 &&
  88. sstableKeyCompare(ucmp_, range1.largest_internal_key,
  89. range2_largest) >= 0;
  90. }
  91. void MaybeUpdateRange(const InternalKey& start_key,
  92. const InternalKey& end_key, KeyRangeInfo* range) const {
  93. assert(range);
  94. if (range->smallest_internal_key.size() == 0 ||
  95. sstableKeyCompare(ucmp_, start_key, range->smallest_internal_key) < 0) {
  96. range->smallest_internal_key = start_key;
  97. }
  98. if (range->largest_internal_key.size() == 0 ||
  99. sstableKeyCompare(ucmp_, end_key, range->largest_internal_key) > 0) {
  100. range->largest_internal_key = end_key;
  101. }
  102. }
  103. private:
  104. const Comparator* ucmp_;
  105. };
  106. struct IngestedFileInfo : public KeyRangeInfo {
  107. // External file path
  108. std::string external_file_path;
  109. // NOTE: use below two fields for all `*Overlap*` types of checks instead of
  110. // smallest_internal_key.user_key() and largest_internal_key.user_key().
  111. // The smallest / largest user key contained in the file for key range checks.
  112. // These could be different from smallest_internal_key.user_key(), and
  113. // largest_internal_key.user_key() when user-defined timestamps are enabled,
  114. // because the check is about making sure the user key without timestamps part
  115. // does not overlap. To achieve that, the smallest user key will be updated
  116. // with the maximum timestamp while the largest user key will be updated with
  117. // the min timestamp. It's otherwise the same.
  118. std::string start_ukey;
  119. std::string limit_ukey;
  120. // Sequence number for keys in external file
  121. SequenceNumber original_seqno;
  122. // Offset of the global sequence number field in the file, will
  123. // be zero if version is 1 (global seqno is not supported)
  124. size_t global_seqno_offset;
  125. // External file size
  126. uint64_t file_size;
  127. // total number of keys in external file
  128. uint64_t num_entries;
  129. // total number of range deletions in external file
  130. uint64_t num_range_deletions;
  131. // Id of column family this file should be ingested into
  132. uint32_t cf_id;
  133. // TableProperties read from external file
  134. TableProperties table_properties;
  135. // Version of external file
  136. int version;
  137. // FileDescriptor for the file inside the DB
  138. FileDescriptor fd;
  139. // file path that we picked for file inside the DB
  140. std::string internal_file_path;
  141. // Global sequence number that we picked for the file inside the DB
  142. SequenceNumber assigned_seqno = 0;
  143. // Level inside the DB we picked for the external file.
  144. int picked_level = 0;
  145. // Whether to copy or link the external sst file. copy_file will be set to
  146. // false if ingestion_options.move_files is true and underlying FS
  147. // supports link operation. Need to provide a default value to make the
  148. // undefined-behavior sanity check of llvm happy. Since
  149. // ingestion_options.move_files is false by default, thus copy_file is true
  150. // by default.
  151. bool copy_file = true;
  152. // The checksum of ingested file
  153. std::string file_checksum;
  154. // The name of checksum function that generate the checksum
  155. std::string file_checksum_func_name;
  156. // The temperature of the file to be ingested
  157. Temperature file_temperature = Temperature::kUnknown;
  158. // Unique id of the file to be ingested
  159. UniqueId64x2 unique_id{};
  160. // Whether the external file should be treated as if it has user-defined
  161. // timestamps or not. If this flag is false, and the column family enables
  162. // UDT feature, the file will have min-timestamp artificially padded to its
  163. // user keys when it's read. Since it will affect how `TableReader` reads a
  164. // table file, it's defaulted to optimize for the majority of the case where
  165. // the user key's format in the external file matches the column family's
  166. // setting.
  167. bool user_defined_timestamps_persisted = true;
  168. SequenceNumber largest_seqno = kMaxSequenceNumber;
  169. SequenceNumber smallest_seqno = kMaxSequenceNumber;
  170. };
  171. // A batch of files.
  172. struct FileBatchInfo : public KeyRangeInfo {
  173. autovector<IngestedFileInfo*> files;
  174. // When true, `smallest_internal_key` and `largest_internal_key` will be
  175. // tracked and updated as new file get added via `AddFile`. When false, we
  176. // bypass this tracking. This is used when the all input external files
  177. // are already checked and not overlapping, and they just need to be added
  178. // into one default batch.
  179. bool track_batch_range;
  180. void AddFile(IngestedFileInfo* file,
  181. const ExternalFileRangeChecker& key_range_checker) {
  182. assert(file);
  183. files.push_back(file);
  184. if (track_batch_range) {
  185. key_range_checker.MaybeUpdateRange(file->smallest_internal_key,
  186. file->largest_internal_key, this);
  187. }
  188. }
  189. explicit FileBatchInfo(bool _track_batch_range)
  190. : track_batch_range(_track_batch_range) {}
  191. };
  192. class ExternalSstFileIngestionJob {
  193. public:
  194. ExternalSstFileIngestionJob(
  195. VersionSet* versions, ColumnFamilyData* cfd,
  196. const ImmutableDBOptions& db_options,
  197. const MutableDBOptions& mutable_db_options, const EnvOptions& env_options,
  198. SnapshotList* db_snapshots,
  199. const IngestExternalFileOptions& ingestion_options,
  200. Directories* directories, EventLogger* event_logger,
  201. const std::shared_ptr<IOTracer>& io_tracer)
  202. : clock_(db_options.clock),
  203. fs_(db_options.fs, io_tracer),
  204. versions_(versions),
  205. cfd_(cfd),
  206. ucmp_(cfd ? cfd->user_comparator() : nullptr),
  207. file_range_checker_(ucmp_),
  208. db_options_(db_options),
  209. mutable_db_options_(mutable_db_options),
  210. env_options_(env_options),
  211. db_snapshots_(db_snapshots),
  212. ingestion_options_(ingestion_options),
  213. directories_(directories),
  214. event_logger_(event_logger),
  215. job_start_time_(clock_->NowMicros()),
  216. max_assigned_seqno_(0),
  217. io_tracer_(io_tracer) {
  218. assert(directories != nullptr);
  219. assert(cfd_);
  220. assert(ucmp_);
  221. }
  222. ~ExternalSstFileIngestionJob() { UnregisterRange(); }
  223. ColumnFamilyData* GetColumnFamilyData() const { return cfd_; }
  224. // Prepare the job by copying external files into the DB.
  225. Status Prepare(const std::vector<std::string>& external_files_paths,
  226. const std::vector<std::string>& files_checksums,
  227. const std::vector<std::string>& files_checksum_func_names,
  228. const std::optional<RangeOpt>& atomic_replace_range,
  229. const Temperature& file_temperature, uint64_t next_file_number,
  230. SuperVersion* sv);
  231. // Check if we need to flush the memtable before running the ingestion job
  232. // This will be true if the files we are ingesting are overlapping with any
  233. // key range in the memtable.
  234. //
  235. // @param super_version A referenced SuperVersion that will be held for the
  236. // duration of this function.
  237. //
  238. // Thread-safe
  239. Status NeedsFlush(bool* flush_needed, SuperVersion* super_version);
  240. void SetFlushedBeforeRun() { flushed_before_run_ = true; }
  241. // Will execute the ingestion job and prepare edit() to be applied.
  242. // REQUIRES: Mutex held
  243. Status Run();
  244. // Register key range involved in this ingestion job
  245. // to prevent key range conflict with other ongoing compaction/file ingestion
  246. // REQUIRES: Mutex held
  247. void RegisterRange();
  248. // Unregister key range registered for this ingestion job
  249. // REQUIRES: Mutex held
  250. void UnregisterRange();
  251. // Update column family stats.
  252. // REQUIRES: Mutex held
  253. void UpdateStats();
  254. // Cleanup after successful/failed job
  255. void Cleanup(const Status& status);
  256. VersionEdit* edit() { return &edit_; }
  257. const autovector<IngestedFileInfo>& files_to_ingest() const {
  258. return files_to_ingest_;
  259. }
  260. // Return the maximum assigned sequence number for all files in this job.
  261. // When allow_db_generated_files = false, we may assign global sequence
  262. // numbers to ingested files. The global sequence numbers are sequence numbers
  263. // following versions_->LastSequence().
  264. // When allow_db_generated_files = true, we ingest files that already have
  265. // sequence numbers assigned. max_assigned_seqno_ will be the max sequence
  266. // number among ingested files.
  267. SequenceNumber MaxAssignedSequenceNumber() const {
  268. return max_assigned_seqno_;
  269. }
  270. private:
  271. Status ResetTableReader(const std::string& external_file,
  272. uint64_t new_file_number,
  273. bool user_defined_timestamps_persisted,
  274. SuperVersion* sv, IngestedFileInfo* file_to_ingest,
  275. std::unique_ptr<TableReader>* table_reader);
  276. // Read the external file's table properties to do various sanity checks and
  277. // populates certain fields in `IngestedFileInfo` according to some table
  278. // properties.
  279. // In some cases when sanity check passes, `table_reader` could be reset with
  280. // different options. For example: when external file does not contain
  281. // timestamps while column family enables UDT in Memtables only feature.
  282. Status SanityCheckTableProperties(const std::string& external_file,
  283. uint64_t new_file_number, SuperVersion* sv,
  284. IngestedFileInfo* file_to_ingest,
  285. std::unique_ptr<TableReader>* table_reader);
  286. // Open the external file and populate `file_to_ingest` with all the
  287. // external information we need to ingest this file.
  288. Status GetIngestedFileInfo(const std::string& external_file,
  289. uint64_t new_file_number,
  290. IngestedFileInfo* file_to_ingest,
  291. SuperVersion* sv);
  292. // If the input files' key range overlaps themselves, this function divides
  293. // them in the user specified order into multiple batches. Where the files
  294. // within a batch do not overlap with each other, but key range could overlap
  295. // between batches.
  296. // If the input files' key range don't overlap themselves, they always just
  297. // make one batch.
  298. void DivideInputFilesIntoBatches();
  299. // Assign level for the files in one batch. The files within one batch are not
  300. // overlapping, and we assign level to each file one after another.
  301. // If `prev_batch_uppermost_level` is specified, all files in this batch will
  302. // be assigned to levels that are higher than `prev_batch_uppermost_level`.
  303. // The uppermost level used by this batch of files is tracked too, so that it
  304. // can be used by the next batch.
  305. // REQUIRES: Mutex held
  306. Status AssignLevelsForOneBatch(FileBatchInfo& batch,
  307. SuperVersion* super_version,
  308. bool force_global_seqno,
  309. SequenceNumber* last_seqno,
  310. int* batch_uppermost_level,
  311. std::optional<int> prev_batch_uppermost_level);
  312. // Assign `file_to_ingest` the appropriate sequence number and the lowest
  313. // possible level that it can be ingested to according to compaction_style.
  314. // If `prev_batch_uppermost_level` is specified, the file will only be
  315. // assigned to levels tha are higher than `prev_batch_uppermost_level`.
  316. // REQUIRES: Mutex held
  317. Status AssignLevelAndSeqnoForIngestedFile(
  318. SuperVersion* sv, bool force_global_seqno,
  319. CompactionStyle compaction_style, SequenceNumber last_seqno,
  320. IngestedFileInfo* file_to_ingest, SequenceNumber* assigned_seqno,
  321. std::optional<int> prev_batch_uppermost_level);
  322. // File that we want to ingest behind always goes to the lowest level;
  323. // we just check that it fits in the level, that the CF allows ingest_behind,
  324. // and that we don't have 0 seqnums at the upper levels.
  325. // REQUIRES: Mutex held
  326. Status CheckLevelForIngestedBehindFile(IngestedFileInfo* file_to_ingest);
  327. // Set the file global sequence number to `seqno`
  328. Status AssignGlobalSeqnoForIngestedFile(IngestedFileInfo* file_to_ingest,
  329. SequenceNumber seqno);
  330. // Generate the file checksum and store in the IngestedFileInfo
  331. IOStatus GenerateChecksumForIngestedFile(IngestedFileInfo* file_to_ingest);
  332. // Check if `file_to_ingest` can fit in level `level`
  333. // REQUIRES: Mutex held
  334. bool IngestedFileFitInLevel(const IngestedFileInfo* file_to_ingest,
  335. int level);
  336. // Helper method to sync given file.
  337. template <typename TWritableFile>
  338. Status SyncIngestedFile(TWritableFile* file);
  339. // Helper function to obtain the smallest and largest sequence number from a
  340. // file. When OK is returned, file_to_ingest->smallest_seqno and
  341. // file_to_ingest->largest_seqno will be updated.
  342. Status GetSeqnoBoundaryForFile(TableReader* table_reader, SuperVersion* sv,
  343. IngestedFileInfo* file_to_ingest,
  344. bool allow_data_in_errors);
  345. // Create equivalent `Compaction` objects to this file ingestion job
  346. // , which will be used to check range conflict with other ongoing
  347. // compactions.
  348. void CreateEquivalentFileIngestingCompactions();
  349. // Remove all the internal files created, called when ingestion job fails.
  350. void DeleteInternalFiles();
  351. SystemClock* clock_;
  352. FileSystemPtr fs_;
  353. VersionSet* versions_;
  354. ColumnFamilyData* cfd_;
  355. const Comparator* ucmp_;
  356. ExternalFileRangeChecker file_range_checker_;
  357. const ImmutableDBOptions& db_options_;
  358. const MutableDBOptions& mutable_db_options_;
  359. const EnvOptions& env_options_;
  360. SnapshotList* db_snapshots_;
  361. autovector<IngestedFileInfo> files_to_ingest_;
  362. std::vector<FileBatchInfo> file_batches_to_ingest_;
  363. const IngestExternalFileOptions& ingestion_options_;
  364. std::optional<KeyRangeInfo> atomic_replace_range_;
  365. Directories* directories_;
  366. EventLogger* event_logger_;
  367. VersionEdit edit_;
  368. uint64_t job_start_time_;
  369. SequenceNumber max_assigned_seqno_;
  370. // Set in ExternalSstFileIngestionJob::Prepare(), if true all files are
  371. // ingested in L0
  372. bool files_overlap_{false};
  373. // Set in ExternalSstFileIngestionJob::Prepare(), if true and DB
  374. // file_checksum_gen_factory is set, DB will generate checksum each file.
  375. bool need_generate_file_checksum_{true};
  376. std::shared_ptr<IOTracer> io_tracer_;
  377. // Flag indicating whether the column family is flushed after `Prepare` and
  378. // before `Run`.
  379. bool flushed_before_run_{false};
  380. // Below are variables used in (un)registering range for this ingestion job
  381. //
  382. // FileMetaData used in inputs of compactions equivalent to this ingestion
  383. // job
  384. std::vector<FileMetaData*> compaction_input_metdatas_;
  385. // Compactions equivalent to this ingestion job
  386. std::vector<Compaction*> file_ingesting_compactions_;
  387. };
  388. } // namespace ROCKSDB_NAMESPACE