db_impl_experimental.cc 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #include "db/db_impl/db_impl.h"
  10. #include <cinttypes>
  11. #include <vector>
  12. #include "db/column_family.h"
  13. #include "db/job_context.h"
  14. #include "db/version_set.h"
  15. #include "rocksdb/status.h"
  16. namespace ROCKSDB_NAMESPACE {
  17. #ifndef ROCKSDB_LITE
  18. Status DBImpl::SuggestCompactRange(ColumnFamilyHandle* column_family,
  19. const Slice* begin, const Slice* end) {
  20. auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
  21. auto cfd = cfh->cfd();
  22. InternalKey start_key, end_key;
  23. if (begin != nullptr) {
  24. start_key.SetMinPossibleForUserKey(*begin);
  25. }
  26. if (end != nullptr) {
  27. end_key.SetMaxPossibleForUserKey(*end);
  28. }
  29. {
  30. InstrumentedMutexLock l(&mutex_);
  31. auto vstorage = cfd->current()->storage_info();
  32. for (int level = 0; level < vstorage->num_non_empty_levels() - 1; ++level) {
  33. std::vector<FileMetaData*> inputs;
  34. vstorage->GetOverlappingInputs(
  35. level, begin == nullptr ? nullptr : &start_key,
  36. end == nullptr ? nullptr : &end_key, &inputs);
  37. for (auto f : inputs) {
  38. f->marked_for_compaction = true;
  39. }
  40. }
  41. // Since we have some more files to compact, we should also recompute
  42. // compaction score
  43. vstorage->ComputeCompactionScore(*cfd->ioptions(),
  44. *cfd->GetLatestMutableCFOptions());
  45. SchedulePendingCompaction(cfd);
  46. MaybeScheduleFlushOrCompaction();
  47. }
  48. return Status::OK();
  49. }
  50. Status DBImpl::PromoteL0(ColumnFamilyHandle* column_family, int target_level) {
  51. assert(column_family);
  52. if (target_level < 1) {
  53. ROCKS_LOG_INFO(immutable_db_options_.info_log,
  54. "PromoteL0 FAILED. Invalid target level %d\n", target_level);
  55. return Status::InvalidArgument("Invalid target level");
  56. }
  57. Status status;
  58. VersionEdit edit;
  59. JobContext job_context(next_job_id_.fetch_add(1), true);
  60. {
  61. InstrumentedMutexLock l(&mutex_);
  62. auto* cfd = static_cast<ColumnFamilyHandleImpl*>(column_family)->cfd();
  63. const auto* vstorage = cfd->current()->storage_info();
  64. if (target_level >= vstorage->num_levels()) {
  65. ROCKS_LOG_INFO(immutable_db_options_.info_log,
  66. "PromoteL0 FAILED. Target level %d does not exist\n",
  67. target_level);
  68. job_context.Clean();
  69. return Status::InvalidArgument("Target level does not exist");
  70. }
  71. // Sort L0 files by range.
  72. const InternalKeyComparator* icmp = &cfd->internal_comparator();
  73. auto l0_files = vstorage->LevelFiles(0);
  74. std::sort(l0_files.begin(), l0_files.end(),
  75. [icmp](FileMetaData* f1, FileMetaData* f2) {
  76. return icmp->Compare(f1->largest, f2->largest) < 0;
  77. });
  78. // Check that no L0 file is being compacted and that they have
  79. // non-overlapping ranges.
  80. for (size_t i = 0; i < l0_files.size(); ++i) {
  81. auto f = l0_files[i];
  82. if (f->being_compacted) {
  83. ROCKS_LOG_INFO(immutable_db_options_.info_log,
  84. "PromoteL0 FAILED. File %" PRIu64 " being compacted\n",
  85. f->fd.GetNumber());
  86. job_context.Clean();
  87. return Status::InvalidArgument("PromoteL0 called during L0 compaction");
  88. }
  89. if (i == 0) continue;
  90. auto prev_f = l0_files[i - 1];
  91. if (icmp->Compare(prev_f->largest, f->smallest) >= 0) {
  92. ROCKS_LOG_INFO(immutable_db_options_.info_log,
  93. "PromoteL0 FAILED. Files %" PRIu64 " and %" PRIu64
  94. " have overlapping ranges\n",
  95. prev_f->fd.GetNumber(), f->fd.GetNumber());
  96. job_context.Clean();
  97. return Status::InvalidArgument("L0 has overlapping files");
  98. }
  99. }
  100. // Check that all levels up to target_level are empty.
  101. for (int level = 1; level <= target_level; ++level) {
  102. if (vstorage->NumLevelFiles(level) > 0) {
  103. ROCKS_LOG_INFO(immutable_db_options_.info_log,
  104. "PromoteL0 FAILED. Level %d not empty\n", level);
  105. job_context.Clean();
  106. return Status::InvalidArgument(
  107. "All levels up to target_level "
  108. "must be empty");
  109. }
  110. }
  111. edit.SetColumnFamily(cfd->GetID());
  112. for (const auto& f : l0_files) {
  113. edit.DeleteFile(0, f->fd.GetNumber());
  114. edit.AddFile(target_level, f->fd.GetNumber(), f->fd.GetPathId(),
  115. f->fd.GetFileSize(), f->smallest, f->largest,
  116. f->fd.smallest_seqno, f->fd.largest_seqno,
  117. f->marked_for_compaction, f->oldest_blob_file_number,
  118. f->oldest_ancester_time, f->file_creation_time,
  119. f->file_checksum, f->file_checksum_func_name);
  120. }
  121. status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(),
  122. &edit, &mutex_, directories_.GetDbDir());
  123. if (status.ok()) {
  124. InstallSuperVersionAndScheduleWork(cfd,
  125. &job_context.superversion_contexts[0],
  126. *cfd->GetLatestMutableCFOptions());
  127. }
  128. } // lock released here
  129. LogFlush(immutable_db_options_.info_log);
  130. job_context.Clean();
  131. return status;
  132. }
  133. #endif // ROCKSDB_LITE
  134. } // namespace ROCKSDB_NAMESPACE