write_stress.cc 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. //
  7. // The goal of this tool is to be a simple stress test with focus on catching:
  8. // * bugs in compaction/flush processes, especially the ones that cause
  9. // assertion errors
  10. // * bugs in the code that deletes obsolete files
  11. //
  12. // There are two parts of the test:
  13. // * write_stress, a binary that writes to the database
  14. // * write_stress_runner.py, a script that invokes and kills write_stress
  15. //
  16. // Here are some interesting parts of write_stress:
  17. // * Runs with very high concurrency of compactions and flushes (32 threads
  18. // total) and tries to create a huge amount of small files
  19. // * The keys written to the database are not uniformly distributed -- there is
  20. // a 3-character prefix that mutates occasionally (in prefix mutator thread), in
  21. // such a way that the first character mutates slower than second, which mutates
  22. // slower than third character. That way, the compaction stress tests some
  23. // interesting compaction features like trivial moves and bottommost level
  24. // calculation
  25. // * There is a thread that creates an iterator, holds it for couple of seconds
  26. // and then iterates over all keys. This is supposed to test RocksDB's abilities
  27. // to keep the files alive when there are references to them.
  28. // * Some writes trigger WAL sync. This is stress testing our WAL sync code.
  29. // * At the end of the run, we make sure that we didn't leak any of the sst
  30. // files
  31. //
  32. // write_stress_runner.py changes the mode in which we run write_stress and also
  33. // kills and restarts it. There are some interesting characteristics:
  34. // * At the beginning we divide the full test runtime into smaller parts --
  35. // shorter runtimes (couple of seconds) and longer runtimes (100, 1000) seconds
  36. // * The first time we run write_stress, we destroy the old DB. Every next time
  37. // during the test, we use the same DB.
  38. // * We can run in kill mode or clean-restart mode. Kill mode kills the
  39. // write_stress violently.
  40. // * We can run in mode where delete_obsolete_files_with_fullscan is true or
  41. // false
  42. // * We can run with low_open_files mode turned on or off. When it's turned on,
  43. // we configure table cache to only hold a couple of files -- that way we need
  44. // to reopen files every time we access them.
  45. //
  46. // Another goal was to create a stress test without a lot of parameters. So
  47. // tools/write_stress_runner.py should only take one parameter -- runtime_sec
  48. // and it should figure out everything else on its own.
  49. #include <cstdio>
  50. #ifndef GFLAGS
  51. int main() {
  52. fprintf(stderr, "Please install gflags to run rocksdb tools\n");
  53. return 1;
  54. }
  55. #else
  56. #include <atomic>
  57. #include <cinttypes>
  58. #include <random>
  59. #include <set>
  60. #include <string>
  61. #include <thread>
  62. #include "file/filename.h"
  63. #include "port/port.h"
  64. #include "rocksdb/db.h"
  65. #include "rocksdb/env.h"
  66. #include "rocksdb/options.h"
  67. #include "rocksdb/slice.h"
  68. #include "util/gflags_compat.h"
  69. using GFLAGS_NAMESPACE::ParseCommandLineFlags;
  70. using GFLAGS_NAMESPACE::RegisterFlagValidator;
  71. using GFLAGS_NAMESPACE::SetUsageMessage;
  72. DEFINE_int32(key_size, 10, "Key size");
  73. DEFINE_int32(value_size, 100, "Value size");
  74. DEFINE_string(db, "", "Use the db with the following name.");
  75. DEFINE_bool(destroy_db, true,
  76. "Destroy the existing DB before running the test");
  77. DEFINE_int32(runtime_sec, 10 * 60, "How long are we running for, in seconds");
  78. DEFINE_int32(seed, 139, "Random seed");
  79. DEFINE_double(prefix_mutate_period_sec, 1.0,
  80. "How often are we going to mutate the prefix");
  81. DEFINE_double(first_char_mutate_probability, 0.1,
  82. "How likely are we to mutate the first char every period");
  83. DEFINE_double(second_char_mutate_probability, 0.2,
  84. "How likely are we to mutate the second char every period");
  85. DEFINE_double(third_char_mutate_probability, 0.5,
  86. "How likely are we to mutate the third char every period");
  87. DEFINE_int32(iterator_hold_sec, 5,
  88. "How long will the iterator hold files before it gets destroyed");
  89. DEFINE_double(sync_probability, 0.01, "How often are we syncing writes");
  90. DEFINE_bool(delete_obsolete_files_with_fullscan, false,
  91. "If true, we delete obsolete files after each compaction/flush "
  92. "using GetChildren() API");
  93. DEFINE_bool(low_open_files_mode, false,
  94. "If true, we set max_open_files to 20, so that every file access "
  95. "needs to reopen it");
  96. namespace ROCKSDB_NAMESPACE {
  97. static const int kPrefixSize = 3;
  98. class WriteStress {
  99. public:
  100. WriteStress() : stop_(false) {
  101. // initialize key_prefix
  102. for (int i = 0; i < kPrefixSize; ++i) {
  103. key_prefix_[i].store('a');
  104. }
  105. // Choose a location for the test database if none given with --db=<path>
  106. if (FLAGS_db.empty()) {
  107. std::string default_db_path;
  108. Env::Default()->GetTestDirectory(&default_db_path);
  109. default_db_path += "/write_stress";
  110. FLAGS_db = default_db_path;
  111. }
  112. Options options;
  113. if (FLAGS_destroy_db) {
  114. DestroyDB(FLAGS_db, options); // ignore
  115. }
  116. // make the LSM tree deep, so that we have many concurrent flushes and
  117. // compactions
  118. options.create_if_missing = true;
  119. options.write_buffer_size = 256 * 1024; // 256k
  120. options.max_bytes_for_level_base = 1 * 1024 * 1024; // 1MB
  121. options.target_file_size_base = 100 * 1024; // 100k
  122. options.max_write_buffer_number = 16;
  123. options.max_background_compactions = 16;
  124. options.max_background_flushes = 16;
  125. options.max_open_files = FLAGS_low_open_files_mode ? 20 : -1;
  126. if (FLAGS_delete_obsolete_files_with_fullscan) {
  127. options.delete_obsolete_files_period_micros = 0;
  128. }
  129. // open DB
  130. DB* db;
  131. Status s = DB::Open(options, FLAGS_db, &db);
  132. if (!s.ok()) {
  133. fprintf(stderr, "Can't open database: %s\n", s.ToString().c_str());
  134. std::abort();
  135. }
  136. db_.reset(db);
  137. }
  138. void WriteThread() {
  139. std::mt19937 rng(static_cast<unsigned int>(FLAGS_seed));
  140. std::uniform_real_distribution<double> dist(0, 1);
  141. auto random_string = [](std::mt19937& r, int len) {
  142. std::uniform_int_distribution<int> char_dist('a', 'z');
  143. std::string ret;
  144. for (int i = 0; i < len; ++i) {
  145. ret += static_cast<char>(char_dist(r));
  146. }
  147. return ret;
  148. };
  149. while (!stop_.load(std::memory_order_relaxed)) {
  150. std::string prefix;
  151. prefix.resize(kPrefixSize);
  152. for (int i = 0; i < kPrefixSize; ++i) {
  153. prefix[i] = key_prefix_[i].load(std::memory_order_relaxed);
  154. }
  155. auto key = prefix + random_string(rng, FLAGS_key_size - kPrefixSize);
  156. auto value = random_string(rng, FLAGS_value_size);
  157. WriteOptions woptions;
  158. woptions.sync = dist(rng) < FLAGS_sync_probability;
  159. auto s = db_->Put(woptions, key, value);
  160. if (!s.ok()) {
  161. fprintf(stderr, "Write to DB failed: %s\n", s.ToString().c_str());
  162. std::abort();
  163. }
  164. }
  165. }
  166. void IteratorHoldThread() {
  167. while (!stop_.load(std::memory_order_relaxed)) {
  168. std::unique_ptr<Iterator> iterator(db_->NewIterator(ReadOptions()));
  169. Env::Default()->SleepForMicroseconds(FLAGS_iterator_hold_sec * 1000 *
  170. 1000LL);
  171. for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) {
  172. }
  173. if (!iterator->status().ok()) {
  174. fprintf(stderr, "Iterator statuts not OK: %s\n",
  175. iterator->status().ToString().c_str());
  176. std::abort();
  177. }
  178. }
  179. }
  180. void PrefixMutatorThread() {
  181. std::mt19937 rng(static_cast<unsigned int>(FLAGS_seed));
  182. std::uniform_real_distribution<double> dist(0, 1);
  183. std::uniform_int_distribution<int> char_dist('a', 'z');
  184. while (!stop_.load(std::memory_order_relaxed)) {
  185. Env::Default()->SleepForMicroseconds(static_cast<int>(
  186. FLAGS_prefix_mutate_period_sec *
  187. 1000 * 1000LL));
  188. if (dist(rng) < FLAGS_first_char_mutate_probability) {
  189. key_prefix_[0].store(static_cast<char>(char_dist(rng)), std::memory_order_relaxed);
  190. }
  191. if (dist(rng) < FLAGS_second_char_mutate_probability) {
  192. key_prefix_[1].store(static_cast<char>(char_dist(rng)), std::memory_order_relaxed);
  193. }
  194. if (dist(rng) < FLAGS_third_char_mutate_probability) {
  195. key_prefix_[2].store(static_cast<char>(char_dist(rng)), std::memory_order_relaxed);
  196. }
  197. }
  198. }
  199. int Run() {
  200. threads_.emplace_back([&]() { WriteThread(); });
  201. threads_.emplace_back([&]() { PrefixMutatorThread(); });
  202. threads_.emplace_back([&]() { IteratorHoldThread(); });
  203. if (FLAGS_runtime_sec == -1) {
  204. // infinite runtime, until we get killed
  205. while (true) {
  206. Env::Default()->SleepForMicroseconds(1000 * 1000);
  207. }
  208. }
  209. Env::Default()->SleepForMicroseconds(FLAGS_runtime_sec * 1000 * 1000);
  210. stop_.store(true, std::memory_order_relaxed);
  211. for (auto& t : threads_) {
  212. t.join();
  213. }
  214. threads_.clear();
  215. // Skip checking for leaked files in ROCKSDB_LITE since we don't have access to
  216. // function GetLiveFilesMetaData
  217. #ifndef ROCKSDB_LITE
  218. // let's see if we leaked some files
  219. db_->PauseBackgroundWork();
  220. std::vector<LiveFileMetaData> metadata;
  221. db_->GetLiveFilesMetaData(&metadata);
  222. std::set<uint64_t> sst_file_numbers;
  223. for (const auto& file : metadata) {
  224. uint64_t number;
  225. FileType type;
  226. if (!ParseFileName(file.name, &number, "LOG", &type)) {
  227. continue;
  228. }
  229. if (type == kTableFile) {
  230. sst_file_numbers.insert(number);
  231. }
  232. }
  233. std::vector<std::string> children;
  234. Env::Default()->GetChildren(FLAGS_db, &children);
  235. for (const auto& child : children) {
  236. uint64_t number;
  237. FileType type;
  238. if (!ParseFileName(child, &number, "LOG", &type)) {
  239. continue;
  240. }
  241. if (type == kTableFile) {
  242. if (sst_file_numbers.find(number) == sst_file_numbers.end()) {
  243. fprintf(stderr,
  244. "Found a table file in DB path that should have been "
  245. "deleted: %s\n",
  246. child.c_str());
  247. std::abort();
  248. }
  249. }
  250. }
  251. db_->ContinueBackgroundWork();
  252. #endif // !ROCKSDB_LITE
  253. return 0;
  254. }
  255. private:
  256. // each key is prepended with this prefix. we occasionally change it. third
  257. // letter is changed more frequently than second, which is changed more
  258. // frequently than the first one.
  259. std::atomic<char> key_prefix_[kPrefixSize];
  260. std::atomic<bool> stop_;
  261. std::vector<port::Thread> threads_;
  262. std::unique_ptr<DB> db_;
  263. };
  264. } // namespace ROCKSDB_NAMESPACE
  265. int main(int argc, char** argv) {
  266. SetUsageMessage(std::string("\nUSAGE:\n") + std::string(argv[0]) +
  267. " [OPTIONS]...");
  268. ParseCommandLineFlags(&argc, &argv, true);
  269. ROCKSDB_NAMESPACE::WriteStress write_stress;
  270. return write_stress.Run();
  271. }
  272. #endif // GFLAGS