external_sst_file_ingestion_job.h 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #pragma once
  6. #include <string>
  7. #include <unordered_set>
  8. #include <vector>
  9. #include "db/column_family.h"
  10. #include "db/dbformat.h"
  11. #include "db/internal_stats.h"
  12. #include "db/snapshot_impl.h"
  13. #include "logging/event_logger.h"
  14. #include "options/db_options.h"
  15. #include "rocksdb/db.h"
  16. #include "rocksdb/env.h"
  17. #include "rocksdb/sst_file_writer.h"
  18. #include "util/autovector.h"
  19. namespace ROCKSDB_NAMESPACE {
  20. class Directories;
  21. struct IngestedFileInfo {
  22. // External file path
  23. std::string external_file_path;
  24. // Smallest internal key in external file
  25. InternalKey smallest_internal_key;
  26. // Largest internal key in external file
  27. InternalKey largest_internal_key;
  28. // Sequence number for keys in external file
  29. SequenceNumber original_seqno;
  30. // Offset of the global sequence number field in the file, will
  31. // be zero if version is 1 (global seqno is not supported)
  32. size_t global_seqno_offset;
  33. // External file size
  34. uint64_t file_size;
  35. // total number of keys in external file
  36. uint64_t num_entries;
  37. // total number of range deletions in external file
  38. uint64_t num_range_deletions;
  39. // Id of column family this file shoule be ingested into
  40. uint32_t cf_id;
  41. // TableProperties read from external file
  42. TableProperties table_properties;
  43. // Version of external file
  44. int version;
  45. // FileDescriptor for the file inside the DB
  46. FileDescriptor fd;
  47. // file path that we picked for file inside the DB
  48. std::string internal_file_path;
  49. // Global sequence number that we picked for the file inside the DB
  50. SequenceNumber assigned_seqno = 0;
  51. // Level inside the DB we picked for the external file.
  52. int picked_level = 0;
  53. // Whether to copy or link the external sst file. copy_file will be set to
  54. // false if ingestion_options.move_files is true and underlying FS
  55. // supports link operation. Need to provide a default value to make the
  56. // undefined-behavior sanity check of llvm happy. Since
  57. // ingestion_options.move_files is false by default, thus copy_file is true
  58. // by default.
  59. bool copy_file = true;
  60. };
  61. class ExternalSstFileIngestionJob {
  62. public:
  63. ExternalSstFileIngestionJob(
  64. Env* env, VersionSet* versions, ColumnFamilyData* cfd,
  65. const ImmutableDBOptions& db_options, const EnvOptions& env_options,
  66. SnapshotList* db_snapshots,
  67. const IngestExternalFileOptions& ingestion_options,
  68. Directories* directories, EventLogger* event_logger)
  69. : env_(env),
  70. fs_(db_options.fs.get()),
  71. versions_(versions),
  72. cfd_(cfd),
  73. db_options_(db_options),
  74. env_options_(env_options),
  75. db_snapshots_(db_snapshots),
  76. ingestion_options_(ingestion_options),
  77. directories_(directories),
  78. event_logger_(event_logger),
  79. job_start_time_(env_->NowMicros()),
  80. consumed_seqno_count_(0) {
  81. assert(directories != nullptr);
  82. }
  83. // Prepare the job by copying external files into the DB.
  84. Status Prepare(const std::vector<std::string>& external_files_paths,
  85. uint64_t next_file_number, SuperVersion* sv);
  86. // Check if we need to flush the memtable before running the ingestion job
  87. // This will be true if the files we are ingesting are overlapping with any
  88. // key range in the memtable.
  89. //
  90. // @param super_version A referenced SuperVersion that will be held for the
  91. // duration of this function.
  92. //
  93. // Thread-safe
  94. Status NeedsFlush(bool* flush_needed, SuperVersion* super_version);
  95. // Will execute the ingestion job and prepare edit() to be applied.
  96. // REQUIRES: Mutex held
  97. Status Run();
  98. // Update column family stats.
  99. // REQUIRES: Mutex held
  100. void UpdateStats();
  101. // Cleanup after successful/failed job
  102. void Cleanup(const Status& status);
  103. VersionEdit* edit() { return &edit_; }
  104. const autovector<IngestedFileInfo>& files_to_ingest() const {
  105. return files_to_ingest_;
  106. }
  107. // How many sequence numbers did we consume as part of the ingest job?
  108. int ConsumedSequenceNumbersCount() const { return consumed_seqno_count_; }
  109. private:
  110. // Open the external file and populate `file_to_ingest` with all the
  111. // external information we need to ingest this file.
  112. Status GetIngestedFileInfo(const std::string& external_file,
  113. IngestedFileInfo* file_to_ingest,
  114. SuperVersion* sv);
  115. // Assign `file_to_ingest` the appropriate sequence number and the lowest
  116. // possible level that it can be ingested to according to compaction_style.
  117. // REQUIRES: Mutex held
  118. Status AssignLevelAndSeqnoForIngestedFile(SuperVersion* sv,
  119. bool force_global_seqno,
  120. CompactionStyle compaction_style,
  121. SequenceNumber last_seqno,
  122. IngestedFileInfo* file_to_ingest,
  123. SequenceNumber* assigned_seqno);
  124. // File that we want to ingest behind always goes to the lowest level;
  125. // we just check that it fits in the level, that DB allows ingest_behind,
  126. // and that we don't have 0 seqnums at the upper levels.
  127. // REQUIRES: Mutex held
  128. Status CheckLevelForIngestedBehindFile(IngestedFileInfo* file_to_ingest);
  129. // Set the file global sequence number to `seqno`
  130. Status AssignGlobalSeqnoForIngestedFile(IngestedFileInfo* file_to_ingest,
  131. SequenceNumber seqno);
  132. // Check if `file_to_ingest` can fit in level `level`
  133. // REQUIRES: Mutex held
  134. bool IngestedFileFitInLevel(const IngestedFileInfo* file_to_ingest,
  135. int level);
  136. // Helper method to sync given file.
  137. template <typename TWritableFile>
  138. Status SyncIngestedFile(TWritableFile* file);
  139. Env* env_;
  140. FileSystem* fs_;
  141. VersionSet* versions_;
  142. ColumnFamilyData* cfd_;
  143. const ImmutableDBOptions& db_options_;
  144. const EnvOptions& env_options_;
  145. SnapshotList* db_snapshots_;
  146. autovector<IngestedFileInfo> files_to_ingest_;
  147. const IngestExternalFileOptions& ingestion_options_;
  148. Directories* directories_;
  149. EventLogger* event_logger_;
  150. VersionEdit edit_;
  151. uint64_t job_start_time_;
  152. int consumed_seqno_count_;
  153. // Set in ExternalSstFileIngestionJob::Prepare(), if true all files are
  154. // ingested in L0
  155. bool files_overlap_{false};
  156. };
  157. } // namespace ROCKSDB_NAMESPACE