db_stress_common.cc 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. //
  10. #ifdef GFLAGS
  11. #include "db_stress_tool/db_stress_common.h"
  12. #include <cmath>
  13. ROCKSDB_NAMESPACE::DbStressEnvWrapper* db_stress_env = nullptr;
  14. enum ROCKSDB_NAMESPACE::CompressionType compression_type_e =
  15. ROCKSDB_NAMESPACE::kSnappyCompression;
  16. enum ROCKSDB_NAMESPACE::CompressionType bottommost_compression_type_e =
  17. ROCKSDB_NAMESPACE::kSnappyCompression;
  18. enum ROCKSDB_NAMESPACE::ChecksumType checksum_type_e =
  19. ROCKSDB_NAMESPACE::kCRC32c;
  20. enum RepFactory FLAGS_rep_factory = kSkipList;
  21. std::vector<double> sum_probs(100001);
  22. int64_t zipf_sum_size = 100000;
  23. namespace ROCKSDB_NAMESPACE {
  24. // Zipfian distribution is generated based on a pre-calculated array.
  25. // It should be used before start the stress test.
  26. // First, the probability distribution function (PDF) of this Zipfian follows
  27. // power low. P(x) = 1/(x^alpha).
  28. // So we calculate the PDF when x is from 0 to zipf_sum_size in first for loop
  29. // and add the PDF value togetger as c. So we get the total probability in c.
  30. // Next, we calculate inverse CDF of Zipfian and store the value of each in
  31. // an array (sum_probs). The rank is from 0 to zipf_sum_size. For example, for
  32. // integer k, its Zipfian CDF value is sum_probs[k].
  33. // Third, when we need to get an integer whose probability follows Zipfian
  34. // distribution, we use a rand_seed [0,1] which follows uniform distribution
  35. // as a seed and search it in the sum_probs via binary search. When we find
  36. // the closest sum_probs[i] of rand_seed, i is the integer that in
  37. // [0, zipf_sum_size] following Zipfian distribution with parameter alpha.
  38. // Finally, we can scale i to [0, max_key] scale.
  39. // In order to avoid that hot keys are close to each other and skew towards 0,
  40. // we use Rando64 to shuffle it.
  41. void InitializeHotKeyGenerator(double alpha) {
  42. double c = 0;
  43. for (int64_t i = 1; i <= zipf_sum_size; i++) {
  44. c = c + (1.0 / std::pow(static_cast<double>(i), alpha));
  45. }
  46. c = 1.0 / c;
  47. sum_probs[0] = 0;
  48. for (int64_t i = 1; i <= zipf_sum_size; i++) {
  49. sum_probs[i] =
  50. sum_probs[i - 1] + c / std::pow(static_cast<double>(i), alpha);
  51. }
  52. }
  53. // Generate one key that follows the Zipfian distribution. The skewness
  54. // is decided by the parameter alpha. Input is the rand_seed [0,1] and
  55. // the max of the key to be generated. If we directly return tmp_zipf_seed,
  56. // the closer to 0, the higher probability will be. To randomly distribute
  57. // the hot keys in [0, max_key], we use Random64 to shuffle it.
  58. int64_t GetOneHotKeyID(double rand_seed, int64_t max_key) {
  59. int64_t low = 1, mid, high = zipf_sum_size, zipf = 0;
  60. while (low <= high) {
  61. mid = (low + high) / 2;
  62. if (sum_probs[mid] >= rand_seed && sum_probs[mid - 1] < rand_seed) {
  63. zipf = mid;
  64. break;
  65. } else if (sum_probs[mid] >= rand_seed) {
  66. high = mid - 1;
  67. } else {
  68. low = mid + 1;
  69. }
  70. }
  71. int64_t tmp_zipf_seed = zipf * max_key / zipf_sum_size;
  72. Random64 rand_local(tmp_zipf_seed);
  73. return rand_local.Next() % max_key;
  74. }
  75. void PoolSizeChangeThread(void* v) {
  76. assert(FLAGS_compaction_thread_pool_adjust_interval > 0);
  77. ThreadState* thread = reinterpret_cast<ThreadState*>(v);
  78. SharedState* shared = thread->shared;
  79. while (true) {
  80. {
  81. MutexLock l(shared->GetMutex());
  82. if (shared->ShouldStopBgThread()) {
  83. shared->IncBgThreadsFinished();
  84. if (shared->BgThreadsFinished()) {
  85. shared->GetCondVar()->SignalAll();
  86. }
  87. return;
  88. }
  89. }
  90. auto thread_pool_size_base = FLAGS_max_background_compactions;
  91. auto thread_pool_size_var = FLAGS_compaction_thread_pool_variations;
  92. int new_thread_pool_size =
  93. thread_pool_size_base - thread_pool_size_var +
  94. thread->rand.Next() % (thread_pool_size_var * 2 + 1);
  95. if (new_thread_pool_size < 1) {
  96. new_thread_pool_size = 1;
  97. }
  98. db_stress_env->SetBackgroundThreads(new_thread_pool_size,
  99. ROCKSDB_NAMESPACE::Env::Priority::LOW);
  100. // Sleep up to 3 seconds
  101. db_stress_env->SleepForMicroseconds(
  102. thread->rand.Next() % FLAGS_compaction_thread_pool_adjust_interval *
  103. 1000 +
  104. 1);
  105. }
  106. }
  107. void DbVerificationThread(void* v) {
  108. assert(FLAGS_continuous_verification_interval > 0);
  109. auto* thread = reinterpret_cast<ThreadState*>(v);
  110. SharedState* shared = thread->shared;
  111. StressTest* stress_test = shared->GetStressTest();
  112. assert(stress_test != nullptr);
  113. while (true) {
  114. {
  115. MutexLock l(shared->GetMutex());
  116. if (shared->ShouldStopBgThread()) {
  117. shared->IncBgThreadsFinished();
  118. if (shared->BgThreadsFinished()) {
  119. shared->GetCondVar()->SignalAll();
  120. }
  121. return;
  122. }
  123. }
  124. if (!shared->HasVerificationFailedYet()) {
  125. stress_test->ContinuouslyVerifyDb(thread);
  126. }
  127. db_stress_env->SleepForMicroseconds(
  128. thread->rand.Next() % FLAGS_continuous_verification_interval * 1000 +
  129. 1);
  130. }
  131. }
  132. void PrintKeyValue(int cf, uint64_t key, const char* value, size_t sz) {
  133. if (!FLAGS_verbose) {
  134. return;
  135. }
  136. std::string tmp;
  137. tmp.reserve(sz * 2 + 16);
  138. char buf[4];
  139. for (size_t i = 0; i < sz; i++) {
  140. snprintf(buf, 4, "%X", value[i]);
  141. tmp.append(buf);
  142. }
  143. fprintf(stdout, "[CF %d] %" PRIi64 " == > (%" ROCKSDB_PRIszt ") %s\n", cf,
  144. key, sz, tmp.c_str());
  145. }
  146. // Note that if hot_key_alpha != 0, it generates the key based on Zipfian
  147. // distribution. Keys are randomly scattered to [0, FLAGS_max_key]. It does
  148. // not ensure the order of the keys being generated and the keys does not have
  149. // the active range which is related to FLAGS_active_width.
  150. int64_t GenerateOneKey(ThreadState* thread, uint64_t iteration) {
  151. const double completed_ratio =
  152. static_cast<double>(iteration) / FLAGS_ops_per_thread;
  153. const int64_t base_key = static_cast<int64_t>(
  154. completed_ratio * (FLAGS_max_key - FLAGS_active_width));
  155. int64_t rand_seed = base_key + thread->rand.Next() % FLAGS_active_width;
  156. int64_t cur_key = rand_seed;
  157. if (FLAGS_hot_key_alpha != 0) {
  158. // If set the Zipfian distribution Alpha to non 0, use Zipfian
  159. double float_rand =
  160. (static_cast<double>(thread->rand.Next() % FLAGS_max_key)) /
  161. FLAGS_max_key;
  162. cur_key = GetOneHotKeyID(float_rand, FLAGS_max_key);
  163. }
  164. return cur_key;
  165. }
  166. // Note that if hot_key_alpha != 0, it generates the key based on Zipfian
  167. // distribution. Keys being generated are in random order.
  168. // If user want to generate keys based on uniform distribution, user needs to
  169. // set hot_key_alpha == 0. It will generate the random keys in increasing
  170. // order in the key array (ensure key[i] >= key[i+1]) and constrained in a
  171. // range related to FLAGS_active_width.
  172. std::vector<int64_t> GenerateNKeys(ThreadState* thread, int num_keys,
  173. uint64_t iteration) {
  174. const double completed_ratio =
  175. static_cast<double>(iteration) / FLAGS_ops_per_thread;
  176. const int64_t base_key = static_cast<int64_t>(
  177. completed_ratio * (FLAGS_max_key - FLAGS_active_width));
  178. std::vector<int64_t> keys;
  179. keys.reserve(num_keys);
  180. int64_t next_key = base_key + thread->rand.Next() % FLAGS_active_width;
  181. keys.push_back(next_key);
  182. for (int i = 1; i < num_keys; ++i) {
  183. // Generate the key follows zipfian distribution
  184. if (FLAGS_hot_key_alpha != 0) {
  185. double float_rand =
  186. (static_cast<double>(thread->rand.Next() % FLAGS_max_key)) /
  187. FLAGS_max_key;
  188. next_key = GetOneHotKeyID(float_rand, FLAGS_max_key);
  189. } else {
  190. // This may result in some duplicate keys
  191. next_key = next_key + thread->rand.Next() %
  192. (FLAGS_active_width - (next_key - base_key));
  193. }
  194. keys.push_back(next_key);
  195. }
  196. return keys;
  197. }
  198. size_t GenerateValue(uint32_t rand, char* v, size_t max_sz) {
  199. size_t value_sz =
  200. ((rand % kRandomValueMaxFactor) + 1) * FLAGS_value_size_mult;
  201. assert(value_sz <= max_sz && value_sz >= sizeof(uint32_t));
  202. (void)max_sz;
  203. *((uint32_t*)v) = rand;
  204. for (size_t i = sizeof(uint32_t); i < value_sz; i++) {
  205. v[i] = (char)(rand ^ i);
  206. }
  207. v[value_sz] = '\0';
  208. return value_sz; // the size of the value set.
  209. }
  210. } // namespace ROCKSDB_NAMESPACE
  211. #endif // GFLAGS