import_column_family_job.cc 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457
  1. // Copyright (c) Meta Platforms, Inc. and affiliates.
  2. //
  3. // This source code is licensed under both the GPLv2 (found in the
  4. // COPYING file in the root directory) and Apache 2.0 License
  5. // (found in the LICENSE.Apache file in the root directory).
  6. #include "db/import_column_family_job.h"
  7. #include <algorithm>
  8. #include <cinttypes>
  9. #include <string>
  10. #include <vector>
  11. #include "db/version_builder.h"
  12. #include "db/version_edit.h"
  13. #include "file/file_util.h"
  14. #include "file/random_access_file_reader.h"
  15. #include "logging/logging.h"
  16. #include "table/merging_iterator.h"
  17. #include "table/sst_file_writer_collectors.h"
  18. #include "table/table_builder.h"
  19. #include "table/unique_id_impl.h"
  20. #include "util/stop_watch.h"
  21. namespace ROCKSDB_NAMESPACE {
  22. Status ImportColumnFamilyJob::Prepare(uint64_t next_file_number,
  23. SuperVersion* sv) {
  24. Status status;
  25. std::vector<ColumnFamilyIngestFileInfo> cf_ingest_infos;
  26. for (const auto& metadata_per_cf : metadatas_) {
  27. // Read the information of files we are importing
  28. ColumnFamilyIngestFileInfo cf_file_info;
  29. InternalKey smallest, largest;
  30. int num_files = 0;
  31. std::vector<IngestedFileInfo> files_to_import_per_cf;
  32. for (size_t i = 0; i < metadata_per_cf.size(); i++) {
  33. auto file_metadata = *metadata_per_cf[i];
  34. const auto file_path = file_metadata.db_path + "/" + file_metadata.name;
  35. IngestedFileInfo file_to_import;
  36. status = GetIngestedFileInfo(file_path, next_file_number++, sv,
  37. file_metadata, &file_to_import);
  38. if (!status.ok()) {
  39. return status;
  40. }
  41. if (file_to_import.num_entries == 0) {
  42. status = Status::InvalidArgument("File contain no entries");
  43. return status;
  44. }
  45. if (!file_to_import.smallest_internal_key.Valid() ||
  46. !file_to_import.largest_internal_key.Valid()) {
  47. status = Status::Corruption("File has corrupted keys");
  48. return status;
  49. }
  50. files_to_import_per_cf.push_back(file_to_import);
  51. num_files++;
  52. // Calculate the smallest and largest keys of all files in this CF
  53. if (i == 0) {
  54. smallest = file_to_import.smallest_internal_key;
  55. largest = file_to_import.largest_internal_key;
  56. } else {
  57. if (cfd_->internal_comparator().Compare(
  58. smallest, file_to_import.smallest_internal_key) > 0) {
  59. smallest = file_to_import.smallest_internal_key;
  60. }
  61. if (cfd_->internal_comparator().Compare(
  62. largest, file_to_import.largest_internal_key) < 0) {
  63. largest = file_to_import.largest_internal_key;
  64. }
  65. }
  66. }
  67. if (num_files == 0) {
  68. status = Status::InvalidArgument("The list of files is empty");
  69. return status;
  70. }
  71. files_to_import_.push_back(files_to_import_per_cf);
  72. cf_file_info.smallest_internal_key = smallest;
  73. cf_file_info.largest_internal_key = largest;
  74. cf_ingest_infos.push_back(cf_file_info);
  75. }
  76. std::sort(cf_ingest_infos.begin(), cf_ingest_infos.end(),
  77. [this](const ColumnFamilyIngestFileInfo& info1,
  78. const ColumnFamilyIngestFileInfo& info2) {
  79. return cfd_->user_comparator()->Compare(
  80. info1.smallest_internal_key.user_key(),
  81. info2.smallest_internal_key.user_key()) < 0;
  82. });
  83. for (size_t i = 0; i + 1 < cf_ingest_infos.size(); i++) {
  84. if (cfd_->user_comparator()->Compare(
  85. cf_ingest_infos[i].largest_internal_key.user_key(),
  86. cf_ingest_infos[i + 1].smallest_internal_key.user_key()) >= 0) {
  87. status = Status::InvalidArgument("CFs have overlapping ranges");
  88. return status;
  89. }
  90. }
  91. // Copy/Move external files into DB
  92. auto hardlink_files = import_options_.move_files;
  93. for (auto& files_to_import_per_cf : files_to_import_) {
  94. for (auto& f : files_to_import_per_cf) {
  95. const auto path_outside_db = f.external_file_path;
  96. const auto path_inside_db = TableFileName(
  97. cfd_->ioptions().cf_paths, f.fd.GetNumber(), f.fd.GetPathId());
  98. if (hardlink_files) {
  99. status = fs_->LinkFile(path_outside_db, path_inside_db, IOOptions(),
  100. nullptr);
  101. if (status.IsNotSupported()) {
  102. // Original file is on a different FS, use copy instead of hard
  103. // linking
  104. hardlink_files = false;
  105. ROCKS_LOG_INFO(db_options_.info_log,
  106. "Try to link file %s but it's not supported : %s",
  107. f.internal_file_path.c_str(),
  108. status.ToString().c_str());
  109. }
  110. }
  111. if (!hardlink_files) {
  112. // FIXME: temperature handling (like ExternalSstFileIngestionJob)
  113. status = CopyFile(fs_.get(), path_outside_db, Temperature::kUnknown,
  114. path_inside_db, Temperature::kUnknown, 0,
  115. db_options_.use_fsync, io_tracer_);
  116. }
  117. if (!status.ok()) {
  118. break;
  119. }
  120. f.copy_file = !hardlink_files;
  121. f.internal_file_path = path_inside_db;
  122. }
  123. if (!status.ok()) {
  124. break;
  125. }
  126. }
  127. if (!status.ok()) {
  128. // We failed, remove all files that we copied into the db
  129. for (auto& files_to_import_per_cf : files_to_import_) {
  130. for (auto& f : files_to_import_per_cf) {
  131. if (f.internal_file_path.empty()) {
  132. break;
  133. }
  134. const auto s =
  135. fs_->DeleteFile(f.internal_file_path, IOOptions(), nullptr);
  136. if (!s.ok()) {
  137. ROCKS_LOG_WARN(db_options_.info_log,
  138. "AddFile() clean up for file %s failed : %s",
  139. f.internal_file_path.c_str(), s.ToString().c_str());
  140. }
  141. }
  142. }
  143. }
  144. return status;
  145. }
  146. // REQUIRES: we have become the only writer by entering both write_thread_ and
  147. // nonmem_write_thread_
  148. Status ImportColumnFamilyJob::Run() {
  149. // We use the import time as the ancester time. This is the time the data
  150. // is written to the database.
  151. int64_t temp_current_time = 0;
  152. uint64_t oldest_ancester_time = kUnknownOldestAncesterTime;
  153. uint64_t current_time = kUnknownOldestAncesterTime;
  154. if (clock_->GetCurrentTime(&temp_current_time).ok()) {
  155. current_time = oldest_ancester_time =
  156. static_cast<uint64_t>(temp_current_time);
  157. }
  158. Status s;
  159. // When importing multiple CFs, we should not reuse epoch number from ingested
  160. // files. Since these epoch numbers were assigned by different CFs, there may
  161. // be different files from different CFs with the same epoch number. With a
  162. // subsequent intra-L0 compaction we may end up with files with overlapping
  163. // key range but the same epoch number. Here we will create a dummy
  164. // VersionStorageInfo per CF being imported. Each CF's files will be assigned
  165. // increasing epoch numbers to avoid duplicated epoch number. This is done by
  166. // only resetting epoch number of the new CF in the first call to
  167. // RecoverEpochNumbers() below.
  168. for (size_t i = 0; s.ok() && i < files_to_import_.size(); ++i) {
  169. VersionBuilder dummy_version_builder(
  170. cfd_->current()->version_set()->file_options(), &cfd_->ioptions(),
  171. cfd_->table_cache(), cfd_->current()->storage_info(),
  172. cfd_->current()->version_set(),
  173. cfd_->GetFileMetadataCacheReservationManager());
  174. VersionStorageInfo dummy_vstorage(
  175. &cfd_->internal_comparator(), cfd_->user_comparator(),
  176. cfd_->NumberLevels(), cfd_->ioptions().compaction_style,
  177. nullptr /* src_vstorage */, cfd_->ioptions().force_consistency_checks,
  178. EpochNumberRequirement::kMightMissing, cfd_->ioptions().clock,
  179. cfd_->GetLatestMutableCFOptions().bottommost_file_compaction_delay,
  180. cfd_->current()->version_set()->offpeak_time_option());
  181. for (size_t j = 0; s.ok() && j < files_to_import_[i].size(); ++j) {
  182. const auto& f = files_to_import_[i][j];
  183. const auto& file_metadata = *metadatas_[i][j];
  184. uint64_t tail_size = FileMetaData::CalculateTailSize(f.fd.GetFileSize(),
  185. f.table_properties);
  186. VersionEdit dummy_version_edit;
  187. dummy_version_edit.AddFile(
  188. file_metadata.level, f.fd.GetNumber(), f.fd.GetPathId(),
  189. f.fd.GetFileSize(), f.smallest_internal_key, f.largest_internal_key,
  190. file_metadata.smallest_seqno, file_metadata.largest_seqno, false,
  191. file_metadata.temperature, kInvalidBlobFileNumber,
  192. oldest_ancester_time, current_time, file_metadata.epoch_number,
  193. kUnknownFileChecksum, kUnknownFileChecksumFuncName, f.unique_id, 0,
  194. tail_size,
  195. static_cast<bool>(
  196. f.table_properties.user_defined_timestamps_persisted));
  197. s = dummy_version_builder.Apply(&dummy_version_edit);
  198. }
  199. if (s.ok()) {
  200. s = dummy_version_builder.SaveTo(&dummy_vstorage);
  201. }
  202. if (s.ok()) {
  203. // force resetting epoch number for each file
  204. dummy_vstorage.RecoverEpochNumbers(cfd_, /*restart_epoch=*/i == 0,
  205. /*force=*/true);
  206. edit_.SetColumnFamily(cfd_->GetID());
  207. for (int level = 0; level < dummy_vstorage.num_levels(); level++) {
  208. for (FileMetaData* file_meta : dummy_vstorage.LevelFiles(level)) {
  209. edit_.AddFile(level, *file_meta);
  210. // If incoming sequence number is higher, update local sequence
  211. // number.
  212. if (file_meta->fd.largest_seqno > versions_->LastSequence()) {
  213. versions_->SetLastAllocatedSequence(file_meta->fd.largest_seqno);
  214. versions_->SetLastPublishedSequence(file_meta->fd.largest_seqno);
  215. versions_->SetLastSequence(file_meta->fd.largest_seqno);
  216. }
  217. }
  218. }
  219. }
  220. // Release resources occupied by the dummy VersionStorageInfo
  221. for (int level = 0; level < dummy_vstorage.num_levels(); level++) {
  222. for (FileMetaData* file_meta : dummy_vstorage.LevelFiles(level)) {
  223. file_meta->refs--;
  224. if (file_meta->refs <= 0) {
  225. delete file_meta;
  226. }
  227. }
  228. }
  229. }
  230. return s;
  231. }
  232. void ImportColumnFamilyJob::Cleanup(const Status& status) {
  233. if (!status.ok()) {
  234. // We failed to add files to the database remove all the files we copied.
  235. for (auto& files_to_import_per_cf : files_to_import_) {
  236. for (auto& f : files_to_import_per_cf) {
  237. const auto s =
  238. fs_->DeleteFile(f.internal_file_path, IOOptions(), nullptr);
  239. if (!s.ok()) {
  240. ROCKS_LOG_WARN(db_options_.info_log,
  241. "AddFile() clean up for file %s failed : %s",
  242. f.internal_file_path.c_str(), s.ToString().c_str());
  243. }
  244. }
  245. }
  246. } else if (status.ok() && import_options_.move_files) {
  247. // The files were moved and added successfully, remove original file links
  248. for (auto& files_to_import_per_cf : files_to_import_) {
  249. for (auto& f : files_to_import_per_cf) {
  250. const auto s =
  251. fs_->DeleteFile(f.external_file_path, IOOptions(), nullptr);
  252. if (!s.ok()) {
  253. ROCKS_LOG_WARN(
  254. db_options_.info_log,
  255. "%s was added to DB successfully but failed to remove original "
  256. "file link : %s",
  257. f.external_file_path.c_str(), s.ToString().c_str());
  258. }
  259. }
  260. }
  261. }
  262. }
  263. Status ImportColumnFamilyJob::GetIngestedFileInfo(
  264. const std::string& external_file, uint64_t new_file_number,
  265. SuperVersion* sv, const LiveFileMetaData& file_meta,
  266. IngestedFileInfo* file_to_import) {
  267. file_to_import->external_file_path = external_file;
  268. Status status;
  269. if (file_meta.size > 0) {
  270. file_to_import->file_size = file_meta.size;
  271. } else {
  272. // Get external file size
  273. status = fs_->GetFileSize(external_file, IOOptions(),
  274. &file_to_import->file_size, nullptr);
  275. if (!status.ok()) {
  276. return status;
  277. }
  278. }
  279. // Assign FD with number
  280. file_to_import->fd =
  281. FileDescriptor(new_file_number, 0, file_to_import->file_size);
  282. // Create TableReader for external file
  283. std::unique_ptr<TableReader> table_reader;
  284. std::unique_ptr<FSRandomAccessFile> sst_file;
  285. std::unique_ptr<RandomAccessFileReader> sst_file_reader;
  286. status =
  287. fs_->NewRandomAccessFile(external_file, env_options_, &sst_file, nullptr);
  288. if (!status.ok()) {
  289. return status;
  290. }
  291. sst_file_reader.reset(new RandomAccessFileReader(
  292. std::move(sst_file), external_file, nullptr /*Env*/, io_tracer_));
  293. // TODO(yuzhangyu): User-defined timestamps doesn't support importing column
  294. // family. Pass in the correct `user_defined_timestamps_persisted` flag for
  295. // creating `TableReaderOptions` when the support is there.
  296. status = sv->mutable_cf_options.table_factory->NewTableReader(
  297. TableReaderOptions(
  298. cfd_->ioptions(), sv->mutable_cf_options.prefix_extractor,
  299. sv->mutable_cf_options.compression_manager.get(), env_options_,
  300. cfd_->internal_comparator(),
  301. sv->mutable_cf_options.block_protection_bytes_per_key,
  302. /*skip_filters*/ false, /*immortal*/ false,
  303. /*force_direct_prefetch*/ false, /*level*/ -1,
  304. /*block_cache_tracer*/ nullptr,
  305. /*max_file_size_for_l0_meta_pin*/ 0, versions_->DbSessionId(),
  306. /*cur_file_num*/ new_file_number),
  307. std::move(sst_file_reader), file_to_import->file_size, &table_reader);
  308. if (!status.ok()) {
  309. return status;
  310. }
  311. // Get the external file properties
  312. auto props = table_reader->GetTableProperties();
  313. // Set original_seqno to 0.
  314. file_to_import->original_seqno = 0;
  315. // Get number of entries in table
  316. file_to_import->num_entries = props->num_entries;
  317. // If the importing files were exported with Checkpoint::ExportColumnFamily(),
  318. // we cannot simply recompute smallest and largest used to truncate range
  319. // tombstones from file content, and we expect smallest and largest populated
  320. // in file_meta.
  321. if (file_meta.smallest.empty()) {
  322. assert(file_meta.largest.empty());
  323. // TODO: plumb Env::IOActivity, Env::IOPriority
  324. ReadOptions ro;
  325. std::unique_ptr<InternalIterator> iter(table_reader->NewIterator(
  326. ro, sv->mutable_cf_options.prefix_extractor.get(), /*arena=*/nullptr,
  327. /*skip_filters=*/false, TableReaderCaller::kExternalSSTIngestion));
  328. // Get first (smallest) key from file
  329. iter->SeekToFirst();
  330. bool bound_set = false;
  331. if (iter->Valid()) {
  332. file_to_import->smallest_internal_key.DecodeFrom(iter->key());
  333. Slice largest;
  334. if (strcmp(sv->mutable_cf_options.table_factory->Name(), "PlainTable") ==
  335. 0) {
  336. // PlainTable iterator does not support SeekToLast().
  337. largest = iter->key();
  338. for (; iter->Valid(); iter->Next()) {
  339. if (cfd_->internal_comparator().Compare(iter->key(), largest) > 0) {
  340. largest = iter->key();
  341. }
  342. }
  343. if (!iter->status().ok()) {
  344. return iter->status();
  345. }
  346. } else {
  347. iter->SeekToLast();
  348. if (!iter->Valid()) {
  349. if (iter->status().ok()) {
  350. // The file contains at least 1 key since iter is valid after
  351. // SeekToFirst().
  352. return Status::Corruption("Can not find largest key in sst file");
  353. } else {
  354. return iter->status();
  355. }
  356. }
  357. largest = iter->key();
  358. }
  359. file_to_import->largest_internal_key.DecodeFrom(largest);
  360. bound_set = true;
  361. } else if (!iter->status().ok()) {
  362. return iter->status();
  363. }
  364. std::unique_ptr<InternalIterator> range_del_iter{
  365. table_reader->NewRangeTombstoneIterator(ro)};
  366. if (range_del_iter != nullptr) {
  367. range_del_iter->SeekToFirst();
  368. if (range_del_iter->Valid()) {
  369. ParsedInternalKey key;
  370. Status pik_status = ParseInternalKey(range_del_iter->key(), &key,
  371. db_options_.allow_data_in_errors);
  372. if (!pik_status.ok()) {
  373. return Status::Corruption("Corrupted key in external file. ",
  374. pik_status.getState());
  375. }
  376. RangeTombstone first_tombstone(key, range_del_iter->value());
  377. InternalKey start_key = first_tombstone.SerializeKey();
  378. const InternalKeyComparator* icmp = &cfd_->internal_comparator();
  379. if (!bound_set ||
  380. icmp->Compare(start_key, file_to_import->smallest_internal_key) <
  381. 0) {
  382. file_to_import->smallest_internal_key = start_key;
  383. }
  384. range_del_iter->SeekToLast();
  385. pik_status = ParseInternalKey(range_del_iter->key(), &key,
  386. db_options_.allow_data_in_errors);
  387. if (!pik_status.ok()) {
  388. return Status::Corruption("Corrupted key in external file. ",
  389. pik_status.getState());
  390. }
  391. RangeTombstone last_tombstone(key, range_del_iter->value());
  392. InternalKey end_key = last_tombstone.SerializeEndKey();
  393. if (!bound_set ||
  394. icmp->Compare(end_key, file_to_import->largest_internal_key) > 0) {
  395. file_to_import->largest_internal_key = end_key;
  396. }
  397. bound_set = true;
  398. }
  399. }
  400. assert(bound_set);
  401. } else {
  402. assert(!file_meta.largest.empty());
  403. file_to_import->smallest_internal_key.DecodeFrom(file_meta.smallest);
  404. file_to_import->largest_internal_key.DecodeFrom(file_meta.largest);
  405. }
  406. file_to_import->cf_id = static_cast<uint32_t>(props->column_family_id);
  407. file_to_import->table_properties = *props;
  408. auto s = GetSstInternalUniqueId(props->db_id, props->db_session_id,
  409. props->orig_file_number,
  410. &(file_to_import->unique_id));
  411. if (!s.ok()) {
  412. ROCKS_LOG_WARN(db_options_.info_log,
  413. "Failed to get SST unique id for file %s",
  414. file_to_import->internal_file_path.c_str());
  415. }
  416. return status;
  417. }
  418. } // namespace ROCKSDB_NAMESPACE