compact_files_example.cc 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // An example code demonstrating how to use CompactFiles, EventListener,
  7. // and GetColumnFamilyMetaData APIs to implement custom compaction algorithm.
  8. #include <mutex>
  9. #include <string>
  10. #include "rocksdb/db.h"
  11. #include "rocksdb/env.h"
  12. #include "rocksdb/options.h"
  13. using ROCKSDB_NAMESPACE::ColumnFamilyMetaData;
  14. using ROCKSDB_NAMESPACE::CompactionOptions;
  15. using ROCKSDB_NAMESPACE::DB;
  16. using ROCKSDB_NAMESPACE::EventListener;
  17. using ROCKSDB_NAMESPACE::FlushJobInfo;
  18. using ROCKSDB_NAMESPACE::Options;
  19. using ROCKSDB_NAMESPACE::ReadOptions;
  20. using ROCKSDB_NAMESPACE::Status;
  21. using ROCKSDB_NAMESPACE::WriteOptions;
  22. #if defined(OS_WIN)
  23. std::string kDBPath = "C:\\Windows\\TEMP\\rocksdb_compact_files_example";
  24. #else
  25. std::string kDBPath = "/tmp/rocksdb_compact_files_example";
  26. #endif
  27. struct CompactionTask;
  28. // This is an example interface of external-compaction algorithm.
  29. // Compaction algorithm can be implemented outside the core-RocksDB
  30. // code by using the pluggable compaction APIs that RocksDb provides.
  31. class Compactor : public EventListener {
  32. public:
  33. // Picks and returns a compaction task given the specified DB
  34. // and column family. It is the caller's responsibility to
  35. // destroy the returned CompactionTask. Returns "nullptr"
  36. // if it cannot find a proper compaction task.
  37. virtual CompactionTask* PickCompaction(DB* db,
  38. const std::string& cf_name) = 0;
  39. // Schedule and run the specified compaction task in background.
  40. virtual void ScheduleCompaction(CompactionTask* task) = 0;
  41. };
  42. // Example structure that describes a compaction task.
  43. struct CompactionTask {
  44. CompactionTask(DB* _db, Compactor* _compactor,
  45. const std::string& _column_family_name,
  46. const std::vector<std::string>& _input_file_names,
  47. const int _output_level,
  48. const CompactionOptions& _compact_options, bool _retry_on_fail)
  49. : db(_db),
  50. compactor(_compactor),
  51. column_family_name(_column_family_name),
  52. input_file_names(_input_file_names),
  53. output_level(_output_level),
  54. compact_options(_compact_options),
  55. retry_on_fail(_retry_on_fail) {}
  56. DB* db;
  57. Compactor* compactor;
  58. const std::string& column_family_name;
  59. std::vector<std::string> input_file_names;
  60. int output_level;
  61. CompactionOptions compact_options;
  62. bool retry_on_fail;
  63. };
  64. // A simple compaction algorithm that always compacts everything
  65. // to the highest level whenever possible.
  66. class FullCompactor : public Compactor {
  67. public:
  68. explicit FullCompactor(const Options options) : options_(options) {
  69. compact_options_.compression = options_.compression;
  70. compact_options_.output_file_size_limit = options_.target_file_size_base;
  71. }
  72. // When flush happens, it determines whether to trigger compaction. If
  73. // triggered_writes_stop is true, it will also set the retry flag of
  74. // compaction-task to true.
  75. void OnFlushCompleted(DB* db, const FlushJobInfo& info) override {
  76. CompactionTask* task = PickCompaction(db, info.cf_name);
  77. if (task != nullptr) {
  78. if (info.triggered_writes_stop) {
  79. task->retry_on_fail = true;
  80. }
  81. // Schedule compaction in a different thread.
  82. ScheduleCompaction(task);
  83. }
  84. }
  85. // Always pick a compaction which includes all files whenever possible.
  86. CompactionTask* PickCompaction(DB* db, const std::string& cf_name) override {
  87. ColumnFamilyMetaData cf_meta;
  88. db->GetColumnFamilyMetaData(&cf_meta);
  89. std::vector<std::string> input_file_names;
  90. for (auto level : cf_meta.levels) {
  91. for (auto file : level.files) {
  92. if (file.being_compacted) {
  93. return nullptr;
  94. }
  95. input_file_names.push_back(file.name);
  96. }
  97. }
  98. return new CompactionTask(db, this, cf_name, input_file_names,
  99. options_.num_levels - 1, compact_options_, false);
  100. }
  101. // Schedule the specified compaction task in background.
  102. void ScheduleCompaction(CompactionTask* task) override {
  103. options_.env->Schedule(&FullCompactor::CompactFiles, task);
  104. }
  105. static void CompactFiles(void* arg) {
  106. std::unique_ptr<CompactionTask> task(static_cast<CompactionTask*>(arg));
  107. assert(task);
  108. assert(task->db);
  109. Status s = task->db->CompactFiles(
  110. task->compact_options, task->input_file_names, task->output_level);
  111. printf("CompactFiles() finished with status %s\n", s.ToString().c_str());
  112. if (!s.ok() && !s.IsIOError() && task->retry_on_fail) {
  113. // If a compaction task with its retry_on_fail=true failed,
  114. // try to schedule another compaction in case the reason
  115. // is not an IO error.
  116. CompactionTask* new_task =
  117. task->compactor->PickCompaction(task->db, task->column_family_name);
  118. task->compactor->ScheduleCompaction(new_task);
  119. }
  120. }
  121. private:
  122. Options options_;
  123. CompactionOptions compact_options_;
  124. };
  125. int main() {
  126. Options options;
  127. options.create_if_missing = true;
  128. // Disable RocksDB background compaction.
  129. options.compaction_style = ROCKSDB_NAMESPACE::kCompactionStyleNone;
  130. // Small write buffer size for generating more sst files in level 0.
  131. options.write_buffer_size = 4 << 20;
  132. // Small slowdown and stop trigger for experimental purpose.
  133. options.level0_slowdown_writes_trigger = 3;
  134. options.level0_stop_writes_trigger = 5;
  135. options.IncreaseParallelism(5);
  136. options.listeners.emplace_back(new FullCompactor(options));
  137. DB* db = nullptr;
  138. ROCKSDB_NAMESPACE::DestroyDB(kDBPath, options);
  139. Status s = DB::Open(options, kDBPath, &db);
  140. assert(s.ok());
  141. assert(db);
  142. // if background compaction is not working, write will stall
  143. // because of options.level0_stop_writes_trigger
  144. for (int i = 1000; i < 99999; ++i) {
  145. db->Put(WriteOptions(), std::to_string(i),
  146. std::string(500, 'a' + (i % 26)));
  147. }
  148. // verify the values are still there
  149. std::string value;
  150. for (int i = 1000; i < 99999; ++i) {
  151. db->Get(ReadOptions(), std::to_string(i), &value);
  152. assert(value == std::string(500, 'a' + (i % 26)));
  153. }
  154. // close the db.
  155. delete db;
  156. return 0;
  157. }