compact_files_example.cc 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // An example code demonstrating how to use CompactFiles, EventListener,
  7. // and GetColumnFamilyMetaData APIs to implement custom compaction algorithm.
  8. #include <mutex>
  9. #include <string>
  10. #include "rocksdb/db.h"
  11. #include "rocksdb/env.h"
  12. #include "rocksdb/options.h"
  13. using namespace ROCKSDB_NAMESPACE;
  14. std::string kDBPath = "/tmp/rocksdb_compact_files_example";
  15. struct CompactionTask;
  16. // This is an example interface of external-compaction algorithm.
  17. // Compaction algorithm can be implemented outside the core-RocksDB
  18. // code by using the pluggable compaction APIs that RocksDb provides.
  19. class Compactor : public EventListener {
  20. public:
  21. // Picks and returns a compaction task given the specified DB
  22. // and column family. It is the caller's responsibility to
  23. // destroy the returned CompactionTask. Returns "nullptr"
  24. // if it cannot find a proper compaction task.
  25. virtual CompactionTask* PickCompaction(
  26. DB* db, const std::string& cf_name) = 0;
  27. // Schedule and run the specified compaction task in background.
  28. virtual void ScheduleCompaction(CompactionTask *task) = 0;
  29. };
  30. // Example structure that describes a compaction task.
  31. struct CompactionTask {
  32. CompactionTask(
  33. DB* _db, Compactor* _compactor,
  34. const std::string& _column_family_name,
  35. const std::vector<std::string>& _input_file_names,
  36. const int _output_level,
  37. const CompactionOptions& _compact_options,
  38. bool _retry_on_fail)
  39. : db(_db),
  40. compactor(_compactor),
  41. column_family_name(_column_family_name),
  42. input_file_names(_input_file_names),
  43. output_level(_output_level),
  44. compact_options(_compact_options),
  45. retry_on_fail(_retry_on_fail) {}
  46. DB* db;
  47. Compactor* compactor;
  48. const std::string& column_family_name;
  49. std::vector<std::string> input_file_names;
  50. int output_level;
  51. CompactionOptions compact_options;
  52. bool retry_on_fail;
  53. };
  54. // A simple compaction algorithm that always compacts everything
  55. // to the highest level whenever possible.
  56. class FullCompactor : public Compactor {
  57. public:
  58. explicit FullCompactor(const Options options) : options_(options) {
  59. compact_options_.compression = options_.compression;
  60. compact_options_.output_file_size_limit =
  61. options_.target_file_size_base;
  62. }
  63. // When flush happens, it determines whether to trigger compaction. If
  64. // triggered_writes_stop is true, it will also set the retry flag of
  65. // compaction-task to true.
  66. void OnFlushCompleted(
  67. DB* db, const FlushJobInfo& info) override {
  68. CompactionTask* task = PickCompaction(db, info.cf_name);
  69. if (task != nullptr) {
  70. if (info.triggered_writes_stop) {
  71. task->retry_on_fail = true;
  72. }
  73. // Schedule compaction in a different thread.
  74. ScheduleCompaction(task);
  75. }
  76. }
  77. // Always pick a compaction which includes all files whenever possible.
  78. CompactionTask* PickCompaction(
  79. DB* db, const std::string& cf_name) override {
  80. ColumnFamilyMetaData cf_meta;
  81. db->GetColumnFamilyMetaData(&cf_meta);
  82. std::vector<std::string> input_file_names;
  83. for (auto level : cf_meta.levels) {
  84. for (auto file : level.files) {
  85. if (file.being_compacted) {
  86. return nullptr;
  87. }
  88. input_file_names.push_back(file.name);
  89. }
  90. }
  91. return new CompactionTask(
  92. db, this, cf_name, input_file_names,
  93. options_.num_levels - 1, compact_options_, false);
  94. }
  95. // Schedule the specified compaction task in background.
  96. void ScheduleCompaction(CompactionTask* task) override {
  97. options_.env->Schedule(&FullCompactor::CompactFiles, task);
  98. }
  99. static void CompactFiles(void* arg) {
  100. std::unique_ptr<CompactionTask> task(
  101. reinterpret_cast<CompactionTask*>(arg));
  102. assert(task);
  103. assert(task->db);
  104. Status s = task->db->CompactFiles(
  105. task->compact_options,
  106. task->input_file_names,
  107. task->output_level);
  108. printf("CompactFiles() finished with status %s\n", s.ToString().c_str());
  109. if (!s.ok() && !s.IsIOError() && task->retry_on_fail) {
  110. // If a compaction task with its retry_on_fail=true failed,
  111. // try to schedule another compaction in case the reason
  112. // is not an IO error.
  113. CompactionTask* new_task = task->compactor->PickCompaction(
  114. task->db, task->column_family_name);
  115. task->compactor->ScheduleCompaction(new_task);
  116. }
  117. }
  118. private:
  119. Options options_;
  120. CompactionOptions compact_options_;
  121. };
  122. int main() {
  123. Options options;
  124. options.create_if_missing = true;
  125. // Disable RocksDB background compaction.
  126. options.compaction_style = kCompactionStyleNone;
  127. // Small slowdown and stop trigger for experimental purpose.
  128. options.level0_slowdown_writes_trigger = 3;
  129. options.level0_stop_writes_trigger = 5;
  130. options.IncreaseParallelism(5);
  131. options.listeners.emplace_back(new FullCompactor(options));
  132. DB* db = nullptr;
  133. DestroyDB(kDBPath, options);
  134. Status s = DB::Open(options, kDBPath, &db);
  135. assert(s.ok());
  136. assert(db);
  137. // if background compaction is not working, write will stall
  138. // because of options.level0_stop_writes_trigger
  139. for (int i = 1000; i < 99999; ++i) {
  140. db->Put(WriteOptions(), std::to_string(i),
  141. std::string(500, 'a' + (i % 26)));
  142. }
  143. // verify the values are still there
  144. std::string value;
  145. for (int i = 1000; i < 99999; ++i) {
  146. db->Get(ReadOptions(), std::to_string(i),
  147. &value);
  148. assert(value == std::string(500, 'a' + (i % 26)));
  149. }
  150. // close the db.
  151. delete db;
  152. return 0;
  153. }