job_context.h 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #pragma once
  10. #include <string>
  11. #include <vector>
  12. #include "db/column_family.h"
  13. #include "db/log_writer.h"
  14. #include "db/version_set.h"
  15. #include "util/autovector.h"
  16. namespace ROCKSDB_NAMESPACE {
  17. class MemTable;
  18. struct SuperVersion;
  19. // The purpose of this struct is to simplify pushing work such as
  20. // allocation/construction, de-allocation/destruction, and notifications to
  21. // outside of holding the DB mutex.
  22. struct SuperVersionContext {
  23. struct WriteStallNotification {
  24. WriteStallInfo write_stall_info;
  25. const ImmutableOptions* immutable_options;
  26. };
  27. autovector<SuperVersion*> superversions_to_free;
  28. #ifndef ROCKSDB_DISABLE_STALL_NOTIFICATION
  29. autovector<WriteStallNotification> write_stall_notifications;
  30. #endif
  31. std::unique_ptr<SuperVersion>
  32. new_superversion; // if nullptr no new superversion
  33. explicit SuperVersionContext(bool create_superversion = false)
  34. : new_superversion(create_superversion ? new SuperVersion() : nullptr) {}
  35. explicit SuperVersionContext(SuperVersionContext&& other) noexcept
  36. : superversions_to_free(std::move(other.superversions_to_free)),
  37. #ifndef ROCKSDB_DISABLE_STALL_NOTIFICATION
  38. write_stall_notifications(std::move(other.write_stall_notifications)),
  39. #endif
  40. new_superversion(std::move(other.new_superversion)) {
  41. }
  42. // No copies
  43. SuperVersionContext(const SuperVersionContext& other) = delete;
  44. void operator=(const SuperVersionContext& other) = delete;
  45. void NewSuperVersion() {
  46. new_superversion = std::unique_ptr<SuperVersion>(new SuperVersion());
  47. }
  48. inline bool HaveSomethingToDelete() const {
  49. #ifndef ROCKSDB_DISABLE_STALL_NOTIFICATION
  50. return !superversions_to_free.empty() || !write_stall_notifications.empty();
  51. #else
  52. return !superversions_to_free.empty();
  53. #endif
  54. }
  55. void PushWriteStallNotification(WriteStallCondition old_cond,
  56. WriteStallCondition new_cond,
  57. const std::string& name,
  58. const ImmutableOptions* ioptions) {
  59. #if !defined(ROCKSDB_DISABLE_STALL_NOTIFICATION)
  60. WriteStallNotification notif;
  61. notif.write_stall_info.cf_name = name;
  62. notif.write_stall_info.condition.prev = old_cond;
  63. notif.write_stall_info.condition.cur = new_cond;
  64. notif.immutable_options = ioptions;
  65. write_stall_notifications.push_back(notif);
  66. #else
  67. (void)old_cond;
  68. (void)new_cond;
  69. (void)name;
  70. (void)ioptions;
  71. #endif // !defined(ROCKSDB_DISABLE_STALL_NOTIFICATION)
  72. }
  73. void Clean() {
  74. #if !defined(ROCKSDB_DISABLE_STALL_NOTIFICATION)
  75. // notify listeners on changed write stall conditions
  76. for (auto& notif : write_stall_notifications) {
  77. for (auto& listener : notif.immutable_options->listeners) {
  78. listener->OnStallConditionsChanged(notif.write_stall_info);
  79. }
  80. }
  81. write_stall_notifications.clear();
  82. #endif
  83. // free superversions
  84. for (auto s : superversions_to_free) {
  85. delete s;
  86. }
  87. superversions_to_free.clear();
  88. }
  89. ~SuperVersionContext() {
  90. #ifndef ROCKSDB_DISABLE_STALL_NOTIFICATION
  91. assert(write_stall_notifications.empty());
  92. #endif
  93. assert(superversions_to_free.empty());
  94. }
  95. };
  96. struct JobContext {
  97. inline bool HaveSomethingToDelete() const {
  98. return !(full_scan_candidate_files.empty() && sst_delete_files.empty() &&
  99. blob_delete_files.empty() && log_delete_files.empty() &&
  100. manifest_delete_files.empty());
  101. }
  102. inline bool HaveSomethingToClean() const {
  103. bool sv_have_sth = false;
  104. for (const auto& sv_ctx : superversion_contexts) {
  105. if (sv_ctx.HaveSomethingToDelete()) {
  106. sv_have_sth = true;
  107. break;
  108. }
  109. }
  110. return memtables_to_free.size() > 0 || wals_to_free.size() > 0 ||
  111. job_snapshot != nullptr || sv_have_sth;
  112. }
  113. SequenceNumber GetJobSnapshotSequence() const {
  114. if (job_snapshot) {
  115. assert(job_snapshot->snapshot());
  116. return job_snapshot->snapshot()->GetSequenceNumber();
  117. }
  118. return kMaxSequenceNumber;
  119. }
  120. SequenceNumber GetLatestSnapshotSequence() const {
  121. assert(snapshot_context_initialized);
  122. if (snapshot_seqs.empty()) {
  123. return 0;
  124. }
  125. return snapshot_seqs.back();
  126. }
  127. SequenceNumber GetEarliestSnapshotSequence() const {
  128. assert(snapshot_context_initialized);
  129. if (snapshot_seqs.empty()) {
  130. return kMaxSequenceNumber;
  131. }
  132. return snapshot_seqs.front();
  133. }
  134. void InitSnapshotContext(SnapshotChecker* checker,
  135. std::unique_ptr<ManagedSnapshot> managed_snapshot,
  136. SequenceNumber earliest_write_conflict,
  137. std::vector<SequenceNumber>&& snapshots) {
  138. if (snapshot_context_initialized) {
  139. return;
  140. }
  141. snapshot_context_initialized = true;
  142. snapshot_checker = checker;
  143. assert(!job_snapshot);
  144. job_snapshot = std::move(managed_snapshot);
  145. earliest_write_conflict_snapshot = earliest_write_conflict;
  146. snapshot_seqs = std::move(snapshots);
  147. }
  148. // Structure to store information for candidate files to delete.
  149. struct CandidateFileInfo {
  150. std::string file_name;
  151. std::string file_path;
  152. CandidateFileInfo(std::string name, std::string path)
  153. : file_name(std::move(name)), file_path(std::move(path)) {}
  154. bool operator==(const CandidateFileInfo& other) const {
  155. return file_name == other.file_name && file_path == other.file_path;
  156. }
  157. };
  158. // a list of all files that we'll consider deleting
  159. // (every once in a while this is filled up with all files
  160. // in the DB directory)
  161. // (filled only if we're doing full scan)
  162. std::vector<CandidateFileInfo> full_scan_candidate_files;
  163. // the list of all live sst files that cannot be deleted
  164. std::vector<uint64_t> sst_live;
  165. // the list of sst files that we need to delete
  166. std::vector<ObsoleteFileInfo> sst_delete_files;
  167. // the list of all live blob files that cannot be deleted
  168. std::vector<uint64_t> blob_live;
  169. // the list of blob files that we need to delete
  170. std::vector<ObsoleteBlobFileInfo> blob_delete_files;
  171. // a list of log files that we need to delete
  172. std::vector<uint64_t> log_delete_files;
  173. // a list of log files that we need to preserve during full purge since they
  174. // will be reused later
  175. std::vector<uint64_t> log_recycle_files;
  176. // Files quarantined from deletion. This list contains file numbers for files
  177. // that are in an ambiguous states. This includes newly generated SST files
  178. // and blob files from flush and compaction job whose VersionEdits' persist
  179. // state in Manifest are unclear. An old manifest file whose immediately
  180. // following new manifest file's CURRENT file creation is in an unclear state.
  181. // WAL logs don't have this premature deletion risk since
  182. // min_log_number_to_keep is only updated after successful manifest commits.
  183. // So this data structure doesn't track log files.
  184. autovector<uint64_t> files_to_quarantine;
  185. // a list of manifest files that we need to delete
  186. std::vector<std::string> manifest_delete_files;
  187. // a list of memtables to be free
  188. autovector<ReadOnlyMemTable*> memtables_to_free;
  189. // contexts for installing superversions for multiple column families
  190. std::vector<SuperVersionContext> superversion_contexts;
  191. autovector<log::Writer*> wals_to_free;
  192. // the current manifest_file_number, log_number and prev_log_number
  193. // that corresponds to the set of files in 'live'.
  194. uint64_t manifest_file_number = 0;
  195. uint64_t pending_manifest_file_number = 0;
  196. // Used for remote compaction. To prevent OPTIONS files from getting
  197. // purged by PurgeObsoleteFiles() of the primary host
  198. uint64_t min_options_file_number;
  199. uint64_t log_number = 0;
  200. uint64_t prev_log_number = 0;
  201. uint64_t min_pending_output = 0;
  202. uint64_t prev_wals_total_size = 0;
  203. size_t num_alive_wal_files = 0;
  204. uint64_t size_log_to_delete = 0;
  205. // Snapshot taken before flush/compaction job.
  206. std::unique_ptr<ManagedSnapshot> job_snapshot;
  207. SnapshotChecker* snapshot_checker = nullptr;
  208. std::vector<SequenceNumber> snapshot_seqs;
  209. // This is the earliest snapshot that could be used for write-conflict
  210. // checking by a transaction. For any user-key newer than this snapshot, we
  211. // should make sure not to remove evidence that a write occurred.
  212. SequenceNumber earliest_write_conflict_snapshot = kMaxSequenceNumber;
  213. // Unique job id
  214. int job_id;
  215. bool snapshot_context_initialized = false;
  216. explicit JobContext(int _job_id, bool create_superversion = false) {
  217. job_id = _job_id;
  218. superversion_contexts.emplace_back(
  219. SuperVersionContext(create_superversion));
  220. }
  221. // Delete the default constructor
  222. JobContext() = delete;
  223. // For non-empty JobContext Clean() has to be called at least once before
  224. // before destruction (see asserts in ~JobContext()). Should be called with
  225. // unlocked DB mutex. Destructor doesn't call Clean() to avoid accidentally
  226. // doing potentially slow Clean() with locked DB mutex.
  227. void Clean() {
  228. // free superversions
  229. for (auto& sv_context : superversion_contexts) {
  230. sv_context.Clean();
  231. }
  232. // free pending memtables
  233. for (auto m : memtables_to_free) {
  234. delete m;
  235. }
  236. for (auto l : wals_to_free) {
  237. delete l;
  238. }
  239. memtables_to_free.clear();
  240. wals_to_free.clear();
  241. job_snapshot.reset();
  242. }
  243. ~JobContext() {
  244. assert(memtables_to_free.size() == 0);
  245. assert(wals_to_free.size() == 0);
  246. }
  247. };
  248. } // namespace ROCKSDB_NAMESPACE