import_column_family_job.cc 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276
  1. #ifndef ROCKSDB_LITE
  2. #include "db/import_column_family_job.h"
  3. #include <algorithm>
  4. #include <cinttypes>
  5. #include <string>
  6. #include <vector>
  7. #include "db/version_edit.h"
  8. #include "file/file_util.h"
  9. #include "file/random_access_file_reader.h"
  10. #include "table/merging_iterator.h"
  11. #include "table/scoped_arena_iterator.h"
  12. #include "table/sst_file_writer_collectors.h"
  13. #include "table/table_builder.h"
  14. #include "util/stop_watch.h"
  15. namespace ROCKSDB_NAMESPACE {
  16. Status ImportColumnFamilyJob::Prepare(uint64_t next_file_number,
  17. SuperVersion* sv) {
  18. Status status;
  19. // Read the information of files we are importing
  20. for (const auto& file_metadata : metadata_) {
  21. const auto file_path = file_metadata.db_path + "/" + file_metadata.name;
  22. IngestedFileInfo file_to_import;
  23. status = GetIngestedFileInfo(file_path, &file_to_import, sv);
  24. if (!status.ok()) {
  25. return status;
  26. }
  27. files_to_import_.push_back(file_to_import);
  28. }
  29. const auto ucmp = cfd_->internal_comparator().user_comparator();
  30. auto num_files = files_to_import_.size();
  31. if (num_files == 0) {
  32. return Status::InvalidArgument("The list of files is empty");
  33. } else if (num_files > 1) {
  34. // Verify that passed files don't have overlapping ranges in any particular
  35. // level.
  36. int min_level = 1; // Check for overlaps in Level 1 and above.
  37. int max_level = -1;
  38. for (const auto& file_metadata : metadata_) {
  39. if (file_metadata.level > max_level) {
  40. max_level = file_metadata.level;
  41. }
  42. }
  43. for (int level = min_level; level <= max_level; ++level) {
  44. autovector<const IngestedFileInfo*> sorted_files;
  45. for (size_t i = 0; i < num_files; i++) {
  46. if (metadata_[i].level == level) {
  47. sorted_files.push_back(&files_to_import_[i]);
  48. }
  49. }
  50. std::sort(sorted_files.begin(), sorted_files.end(),
  51. [&ucmp](const IngestedFileInfo* info1,
  52. const IngestedFileInfo* info2) {
  53. return sstableKeyCompare(ucmp, info1->smallest_internal_key,
  54. info2->smallest_internal_key) < 0;
  55. });
  56. for (size_t i = 0; i < sorted_files.size() - 1; i++) {
  57. if (sstableKeyCompare(ucmp, sorted_files[i]->largest_internal_key,
  58. sorted_files[i + 1]->smallest_internal_key) >=
  59. 0) {
  60. return Status::InvalidArgument("Files have overlapping ranges");
  61. }
  62. }
  63. }
  64. }
  65. for (const auto& f : files_to_import_) {
  66. if (f.num_entries == 0) {
  67. return Status::InvalidArgument("File contain no entries");
  68. }
  69. if (!f.smallest_internal_key.Valid() || !f.largest_internal_key.Valid()) {
  70. return Status::Corruption("File has corrupted keys");
  71. }
  72. }
  73. // Copy/Move external files into DB
  74. auto hardlink_files = import_options_.move_files;
  75. for (auto& f : files_to_import_) {
  76. f.fd = FileDescriptor(next_file_number++, 0, f.file_size);
  77. const auto path_outside_db = f.external_file_path;
  78. const auto path_inside_db = TableFileName(
  79. cfd_->ioptions()->cf_paths, f.fd.GetNumber(), f.fd.GetPathId());
  80. if (hardlink_files) {
  81. status =
  82. fs_->LinkFile(path_outside_db, path_inside_db, IOOptions(), nullptr);
  83. if (status.IsNotSupported()) {
  84. // Original file is on a different FS, use copy instead of hard linking
  85. hardlink_files = false;
  86. }
  87. }
  88. if (!hardlink_files) {
  89. status = CopyFile(fs_, path_outside_db, path_inside_db, 0,
  90. db_options_.use_fsync);
  91. }
  92. if (!status.ok()) {
  93. break;
  94. }
  95. f.copy_file = !hardlink_files;
  96. f.internal_file_path = path_inside_db;
  97. }
  98. if (!status.ok()) {
  99. // We failed, remove all files that we copied into the db
  100. for (const auto& f : files_to_import_) {
  101. if (f.internal_file_path.empty()) {
  102. break;
  103. }
  104. const auto s =
  105. fs_->DeleteFile(f.internal_file_path, IOOptions(), nullptr);
  106. if (!s.ok()) {
  107. ROCKS_LOG_WARN(db_options_.info_log,
  108. "AddFile() clean up for file %s failed : %s",
  109. f.internal_file_path.c_str(), s.ToString().c_str());
  110. }
  111. }
  112. }
  113. return status;
  114. }
  115. // REQUIRES: we have become the only writer by entering both write_thread_ and
  116. // nonmem_write_thread_
  117. Status ImportColumnFamilyJob::Run() {
  118. Status status;
  119. edit_.SetColumnFamily(cfd_->GetID());
  120. // We use the import time as the ancester time. This is the time the data
  121. // is written to the database.
  122. int64_t temp_current_time = 0;
  123. uint64_t oldest_ancester_time = kUnknownOldestAncesterTime;
  124. uint64_t current_time = kUnknownOldestAncesterTime;
  125. if (env_->GetCurrentTime(&temp_current_time).ok()) {
  126. current_time = oldest_ancester_time =
  127. static_cast<uint64_t>(temp_current_time);
  128. }
  129. for (size_t i = 0; i < files_to_import_.size(); ++i) {
  130. const auto& f = files_to_import_[i];
  131. const auto& file_metadata = metadata_[i];
  132. edit_.AddFile(file_metadata.level, f.fd.GetNumber(), f.fd.GetPathId(),
  133. f.fd.GetFileSize(), f.smallest_internal_key,
  134. f.largest_internal_key, file_metadata.smallest_seqno,
  135. file_metadata.largest_seqno, false, kInvalidBlobFileNumber,
  136. oldest_ancester_time, current_time, kUnknownFileChecksum,
  137. kUnknownFileChecksumFuncName);
  138. // If incoming sequence number is higher, update local sequence number.
  139. if (file_metadata.largest_seqno > versions_->LastSequence()) {
  140. versions_->SetLastAllocatedSequence(file_metadata.largest_seqno);
  141. versions_->SetLastPublishedSequence(file_metadata.largest_seqno);
  142. versions_->SetLastSequence(file_metadata.largest_seqno);
  143. }
  144. }
  145. return status;
  146. }
  147. void ImportColumnFamilyJob::Cleanup(const Status& status) {
  148. if (!status.ok()) {
  149. // We failed to add files to the database remove all the files we copied.
  150. for (const auto& f : files_to_import_) {
  151. const auto s =
  152. fs_->DeleteFile(f.internal_file_path, IOOptions(), nullptr);
  153. if (!s.ok()) {
  154. ROCKS_LOG_WARN(db_options_.info_log,
  155. "AddFile() clean up for file %s failed : %s",
  156. f.internal_file_path.c_str(), s.ToString().c_str());
  157. }
  158. }
  159. } else if (status.ok() && import_options_.move_files) {
  160. // The files were moved and added successfully, remove original file links
  161. for (IngestedFileInfo& f : files_to_import_) {
  162. const auto s =
  163. fs_->DeleteFile(f.external_file_path, IOOptions(), nullptr);
  164. if (!s.ok()) {
  165. ROCKS_LOG_WARN(
  166. db_options_.info_log,
  167. "%s was added to DB successfully but failed to remove original "
  168. "file link : %s",
  169. f.external_file_path.c_str(), s.ToString().c_str());
  170. }
  171. }
  172. }
  173. }
  174. Status ImportColumnFamilyJob::GetIngestedFileInfo(
  175. const std::string& external_file, IngestedFileInfo* file_to_import,
  176. SuperVersion* sv) {
  177. file_to_import->external_file_path = external_file;
  178. // Get external file size
  179. Status status = fs_->GetFileSize(external_file, IOOptions(),
  180. &file_to_import->file_size, nullptr);
  181. if (!status.ok()) {
  182. return status;
  183. }
  184. // Create TableReader for external file
  185. std::unique_ptr<TableReader> table_reader;
  186. std::unique_ptr<FSRandomAccessFile> sst_file;
  187. std::unique_ptr<RandomAccessFileReader> sst_file_reader;
  188. status = fs_->NewRandomAccessFile(external_file, env_options_,
  189. &sst_file, nullptr);
  190. if (!status.ok()) {
  191. return status;
  192. }
  193. sst_file_reader.reset(
  194. new RandomAccessFileReader(std::move(sst_file), external_file));
  195. status = cfd_->ioptions()->table_factory->NewTableReader(
  196. TableReaderOptions(*cfd_->ioptions(),
  197. sv->mutable_cf_options.prefix_extractor.get(),
  198. env_options_, cfd_->internal_comparator()),
  199. std::move(sst_file_reader), file_to_import->file_size, &table_reader);
  200. if (!status.ok()) {
  201. return status;
  202. }
  203. // Get the external file properties
  204. auto props = table_reader->GetTableProperties();
  205. // Set original_seqno to 0.
  206. file_to_import->original_seqno = 0;
  207. // Get number of entries in table
  208. file_to_import->num_entries = props->num_entries;
  209. ParsedInternalKey key;
  210. ReadOptions ro;
  211. // During reading the external file we can cache blocks that we read into
  212. // the block cache, if we later change the global seqno of this file, we will
  213. // have block in cache that will include keys with wrong seqno.
  214. // We need to disable fill_cache so that we read from the file without
  215. // updating the block cache.
  216. ro.fill_cache = false;
  217. std::unique_ptr<InternalIterator> iter(table_reader->NewIterator(
  218. ro, sv->mutable_cf_options.prefix_extractor.get(), /*arena=*/nullptr,
  219. /*skip_filters=*/false, TableReaderCaller::kExternalSSTIngestion));
  220. // Get first (smallest) key from file
  221. iter->SeekToFirst();
  222. if (!ParseInternalKey(iter->key(), &key)) {
  223. return Status::Corruption("external file have corrupted keys");
  224. }
  225. file_to_import->smallest_internal_key.SetFrom(key);
  226. // Get last (largest) key from file
  227. iter->SeekToLast();
  228. if (!ParseInternalKey(iter->key(), &key)) {
  229. return Status::Corruption("external file have corrupted keys");
  230. }
  231. file_to_import->largest_internal_key.SetFrom(key);
  232. file_to_import->cf_id = static_cast<uint32_t>(props->column_family_id);
  233. file_to_import->table_properties = *props;
  234. return status;
  235. }
  236. } // namespace ROCKSDB_NAMESPACE
  237. #endif // !ROCKSDB_LITE