db_stress_common.cc 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. //
  10. #ifdef GFLAGS
  11. #include "db_stress_tool/db_stress_common.h"
  12. #include <cmath>
  13. #include "file/file_util.h"
  14. #include "rocksdb/secondary_cache.h"
  15. #include "util/file_checksum_helper.h"
  16. #include "util/xxhash.h"
  17. ROCKSDB_NAMESPACE::Env* db_stress_listener_env = nullptr;
  18. ROCKSDB_NAMESPACE::Env* db_stress_env = nullptr;
  19. std::shared_ptr<ROCKSDB_NAMESPACE::FaultInjectionTestFS> fault_fs_guard;
  20. std::shared_ptr<ROCKSDB_NAMESPACE::SecondaryCache> compressed_secondary_cache;
  21. std::shared_ptr<ROCKSDB_NAMESPACE::Cache> block_cache;
  22. enum ROCKSDB_NAMESPACE::CompressionType compression_type_e =
  23. ROCKSDB_NAMESPACE::kSnappyCompression;
  24. enum ROCKSDB_NAMESPACE::CompressionType bottommost_compression_type_e =
  25. ROCKSDB_NAMESPACE::kSnappyCompression;
  26. enum ROCKSDB_NAMESPACE::ChecksumType checksum_type_e =
  27. ROCKSDB_NAMESPACE::kCRC32c;
  28. enum RepFactory FLAGS_rep_factory = kSkipList;
  29. std::vector<double> sum_probs(100001);
  30. constexpr int64_t zipf_sum_size = 100000;
  31. namespace ROCKSDB_NAMESPACE {
  32. // Zipfian distribution is generated based on a pre-calculated array.
  33. // It should be used before start the stress test.
  34. // First, the probability distribution function (PDF) of this Zipfian follows
  35. // power low. P(x) = 1/(x^alpha).
  36. // So we calculate the PDF when x is from 0 to zipf_sum_size in first for loop
  37. // and add the PDF value togetger as c. So we get the total probability in c.
  38. // Next, we calculate inverse CDF of Zipfian and store the value of each in
  39. // an array (sum_probs). The rank is from 0 to zipf_sum_size. For example, for
  40. // integer k, its Zipfian CDF value is sum_probs[k].
  41. // Third, when we need to get an integer whose probability follows Zipfian
  42. // distribution, we use a rand_seed [0,1] which follows uniform distribution
  43. // as a seed and search it in the sum_probs via binary search. When we find
  44. // the closest sum_probs[i] of rand_seed, i is the integer that in
  45. // [0, zipf_sum_size] following Zipfian distribution with parameter alpha.
  46. // Finally, we can scale i to [0, max_key] scale.
  47. // In order to avoid that hot keys are close to each other and skew towards 0,
  48. // we use Rando64 to shuffle it.
  49. void InitializeHotKeyGenerator(double alpha) {
  50. double c = 0;
  51. for (int64_t i = 1; i <= zipf_sum_size; i++) {
  52. c = c + (1.0 / std::pow(static_cast<double>(i), alpha));
  53. }
  54. c = 1.0 / c;
  55. sum_probs[0] = 0;
  56. for (int64_t i = 1; i <= zipf_sum_size; i++) {
  57. sum_probs[i] =
  58. sum_probs[i - 1] + c / std::pow(static_cast<double>(i), alpha);
  59. }
  60. }
  61. // Generate one key that follows the Zipfian distribution. The skewness
  62. // is decided by the parameter alpha. Input is the rand_seed [0,1] and
  63. // the max of the key to be generated. If we directly return tmp_zipf_seed,
  64. // the closer to 0, the higher probability will be. To randomly distribute
  65. // the hot keys in [0, max_key], we use Random64 to shuffle it.
  66. int64_t GetOneHotKeyID(double rand_seed, int64_t max_key) {
  67. int64_t low = 1, mid, high = zipf_sum_size, zipf = 0;
  68. while (low <= high) {
  69. mid = (low + high) / 2;
  70. if (sum_probs[mid] >= rand_seed && sum_probs[mid - 1] < rand_seed) {
  71. zipf = mid;
  72. break;
  73. } else if (sum_probs[mid] >= rand_seed) {
  74. high = mid - 1;
  75. } else {
  76. low = mid + 1;
  77. }
  78. }
  79. int64_t tmp_zipf_seed = zipf * max_key / zipf_sum_size;
  80. Random64 rand_local(tmp_zipf_seed);
  81. return rand_local.Next() % max_key;
  82. }
  83. void PoolSizeChangeThread(void* v) {
  84. assert(FLAGS_compaction_thread_pool_adjust_interval > 0);
  85. ThreadState* thread = static_cast<ThreadState*>(v);
  86. SharedState* shared = thread->shared;
  87. while (true) {
  88. {
  89. MutexLock l(shared->GetMutex());
  90. if (shared->ShouldStopBgThread()) {
  91. shared->IncBgThreadsFinished();
  92. if (shared->BgThreadsFinished()) {
  93. shared->GetCondVar()->SignalAll();
  94. }
  95. return;
  96. }
  97. }
  98. auto thread_pool_size_base = FLAGS_max_background_compactions;
  99. auto thread_pool_size_var = FLAGS_compaction_thread_pool_variations;
  100. int new_thread_pool_size =
  101. thread_pool_size_base - thread_pool_size_var +
  102. thread->rand.Next() % (thread_pool_size_var * 2 + 1);
  103. if (new_thread_pool_size < 1) {
  104. new_thread_pool_size = 1;
  105. }
  106. db_stress_env->SetBackgroundThreads(new_thread_pool_size,
  107. ROCKSDB_NAMESPACE::Env::Priority::LOW);
  108. // Sleep up to 3 seconds
  109. db_stress_env->SleepForMicroseconds(
  110. thread->rand.Next() % FLAGS_compaction_thread_pool_adjust_interval *
  111. 1000 +
  112. 1);
  113. }
  114. }
  115. void DbVerificationThread(void* v) {
  116. assert(FLAGS_continuous_verification_interval > 0);
  117. auto* thread = static_cast<ThreadState*>(v);
  118. SharedState* shared = thread->shared;
  119. StressTest* stress_test = shared->GetStressTest();
  120. assert(stress_test != nullptr);
  121. while (true) {
  122. {
  123. MutexLock l(shared->GetMutex());
  124. if (shared->ShouldStopBgThread()) {
  125. shared->IncBgThreadsFinished();
  126. if (shared->BgThreadsFinished()) {
  127. shared->GetCondVar()->SignalAll();
  128. }
  129. return;
  130. }
  131. }
  132. if (!shared->HasVerificationFailedYet()) {
  133. stress_test->ContinuouslyVerifyDb(thread);
  134. }
  135. db_stress_env->SleepForMicroseconds(
  136. thread->rand.Next() % FLAGS_continuous_verification_interval * 1000 +
  137. 1);
  138. }
  139. }
  140. void CompressedCacheSetCapacityThread(void* v) {
  141. assert(FLAGS_compressed_secondary_cache_size > 0 ||
  142. FLAGS_compressed_secondary_cache_ratio > 0.0);
  143. auto* thread = static_cast<ThreadState*>(v);
  144. SharedState* shared = thread->shared;
  145. while (true) {
  146. {
  147. MutexLock l(shared->GetMutex());
  148. if (shared->ShouldStopBgThread()) {
  149. shared->IncBgThreadsFinished();
  150. if (shared->BgThreadsFinished()) {
  151. shared->GetCondVar()->SignalAll();
  152. }
  153. return;
  154. }
  155. }
  156. db_stress_env->SleepForMicroseconds(FLAGS_secondary_cache_update_interval);
  157. if (FLAGS_compressed_secondary_cache_size > 0) {
  158. Status s = compressed_secondary_cache->SetCapacity(0);
  159. size_t capacity;
  160. if (s.ok()) {
  161. s = compressed_secondary_cache->GetCapacity(capacity);
  162. assert(capacity == 0);
  163. }
  164. db_stress_env->SleepForMicroseconds(10 * 1000 * 1000);
  165. if (s.ok()) {
  166. s = compressed_secondary_cache->SetCapacity(
  167. FLAGS_compressed_secondary_cache_size);
  168. }
  169. if (s.ok()) {
  170. s = compressed_secondary_cache->GetCapacity(capacity);
  171. assert(capacity == FLAGS_compressed_secondary_cache_size);
  172. }
  173. if (!s.ok()) {
  174. fprintf(stderr, "Compressed cache Set/GetCapacity returned error: %s\n",
  175. s.ToString().c_str());
  176. }
  177. } else if (FLAGS_compressed_secondary_cache_ratio > 0.0) {
  178. if (thread->rand.OneIn(2)) { // if (thread->rand.OneIn(2)) {
  179. size_t capacity = block_cache->GetCapacity();
  180. size_t adjustment;
  181. if (FLAGS_use_write_buffer_manager && FLAGS_db_write_buffer_size > 0) {
  182. adjustment = (capacity - FLAGS_db_write_buffer_size);
  183. } else {
  184. adjustment = capacity;
  185. }
  186. // Lower by upto 50% of usable block cache capacity
  187. adjustment = (adjustment * thread->rand.Uniform(50)) / 100;
  188. block_cache->SetCapacity(capacity - adjustment);
  189. fprintf(stdout, "New cache capacity = %zu\n",
  190. block_cache->GetCapacity());
  191. db_stress_env->SleepForMicroseconds(10 * 1000 * 1000);
  192. block_cache->SetCapacity(capacity);
  193. } else {
  194. Status s;
  195. double new_comp_cache_ratio =
  196. (double)thread->rand.Uniform(
  197. FLAGS_compressed_secondary_cache_ratio * 100) /
  198. 100;
  199. fprintf(stdout, "New comp cache ratio = %f\n", new_comp_cache_ratio);
  200. s = UpdateTieredCache(block_cache, /*capacity*/ -1,
  201. new_comp_cache_ratio);
  202. if (s.ok()) {
  203. db_stress_env->SleepForMicroseconds(10 * 1000 * 1000);
  204. }
  205. if (s.ok()) {
  206. s = UpdateTieredCache(block_cache, /*capacity*/ -1,
  207. FLAGS_compressed_secondary_cache_ratio);
  208. }
  209. if (!s.ok()) {
  210. fprintf(stderr, "UpdateTieredCache returned error: %s\n",
  211. s.ToString().c_str());
  212. }
  213. }
  214. }
  215. }
  216. }
  217. #ifndef NDEBUG
  218. static void SetupFaultInjectionForRemoteCompaction(SharedState* shared) {
  219. if (!fault_fs_guard) {
  220. return;
  221. }
  222. fault_fs_guard->SetThreadLocalErrorContext(
  223. FaultInjectionIOType::kRead, shared->GetSeed(), FLAGS_read_fault_one_in,
  224. FLAGS_inject_error_severity == 1 /* retryable */,
  225. FLAGS_inject_error_severity == 2 /* has_data_loss*/);
  226. fault_fs_guard->EnableThreadLocalErrorInjection(FaultInjectionIOType::kRead);
  227. fault_fs_guard->SetThreadLocalErrorContext(
  228. FaultInjectionIOType::kWrite, shared->GetSeed(), FLAGS_write_fault_one_in,
  229. FLAGS_inject_error_severity == 1 /* retryable */,
  230. FLAGS_inject_error_severity == 2 /* has_data_loss*/);
  231. fault_fs_guard->EnableThreadLocalErrorInjection(FaultInjectionIOType::kWrite);
  232. fault_fs_guard->SetThreadLocalErrorContext(
  233. FaultInjectionIOType::kMetadataRead, shared->GetSeed(),
  234. FLAGS_metadata_read_fault_one_in,
  235. FLAGS_inject_error_severity == 1 /* retryable */,
  236. FLAGS_inject_error_severity == 2 /* has_data_loss*/);
  237. fault_fs_guard->EnableThreadLocalErrorInjection(
  238. FaultInjectionIOType::kMetadataRead);
  239. fault_fs_guard->SetThreadLocalErrorContext(
  240. FaultInjectionIOType::kMetadataWrite, shared->GetSeed(),
  241. FLAGS_metadata_write_fault_one_in,
  242. FLAGS_inject_error_severity == 1 /* retryable */,
  243. FLAGS_inject_error_severity == 2 /* has_data_loss*/);
  244. fault_fs_guard->EnableThreadLocalErrorInjection(
  245. FaultInjectionIOType::kMetadataWrite);
  246. }
  247. #endif // NDEBUG
  248. static CompactionServiceOptionsOverride CreateOverrideOptions(
  249. const Options& options, const CompactionServiceJobInfo& job_info) {
  250. CompactionServiceOptionsOverride override_options{
  251. .env = db_stress_env,
  252. .file_checksum_gen_factory = options.file_checksum_gen_factory,
  253. .merge_operator = options.merge_operator,
  254. .compaction_filter = options.compaction_filter,
  255. .compaction_filter_factory = options.compaction_filter_factory,
  256. .prefix_extractor = options.prefix_extractor,
  257. .sst_partitioner_factory = options.sst_partitioner_factory,
  258. .listeners = options.listeners,
  259. .statistics = options.statistics,
  260. .table_properties_collector_factories =
  261. options.table_properties_collector_factories};
  262. // TODO(jaykorean) - create a new compaction filter / merge operator and
  263. // others for remote compactions
  264. //
  265. // Create a new Table Factory
  266. ConfigOptions config_options;
  267. config_options.ignore_unknown_options = false;
  268. config_options.ignore_unsupported_options = false;
  269. Status s = TableFactory::CreateFromString(config_options,
  270. options.table_factory->Name(),
  271. &override_options.table_factory);
  272. if (s.ok()) {
  273. std::string options_str;
  274. s = options.table_factory->GetOptionString(config_options, &options_str);
  275. if (s.ok()) {
  276. s = override_options.table_factory->ConfigureFromString(config_options,
  277. options_str);
  278. }
  279. }
  280. if (!s.ok()) {
  281. fprintf(stdout,
  282. "Failed to set up TableFactory for remote compaction - (%s): %s\n",
  283. job_info.db_name.c_str(), s.ToString().c_str());
  284. }
  285. return override_options;
  286. }
  287. static Status CleanupOutputDirectory(const std::string& output_directory) {
  288. #ifndef NDEBUG
  289. // Temporarily disable fault injection to ensure deletion always succeeds
  290. if (fault_fs_guard) {
  291. fault_fs_guard->DisableAllThreadLocalErrorInjection();
  292. }
  293. #endif // NDEBUG
  294. Status s = DestroyDir(db_stress_env, output_directory);
  295. if (!s.ok()) {
  296. fprintf(stderr,
  297. "Failed to destroy output directory %s when allow_resumption is "
  298. "false: %s\n",
  299. output_directory.c_str(), s.ToString().c_str());
  300. }
  301. if (s.ok()) {
  302. s = db_stress_env->CreateDir(output_directory);
  303. if (!s.ok()) {
  304. fprintf(stderr,
  305. "Failed to recreate output directory %s when allow_resumption is "
  306. "false: %s\n",
  307. output_directory.c_str(), s.ToString().c_str());
  308. }
  309. }
  310. #ifndef NDEBUG
  311. // Re-enable fault injection after deletion
  312. if (fault_fs_guard) {
  313. fault_fs_guard->EnableAllThreadLocalErrorInjection();
  314. }
  315. #endif // NDEBUG
  316. return s;
  317. }
  318. // Set up cancellation mechanism for testing resumable remote compactions.
  319. // Spawns a detached thread to trigger cancellation after a delay (50ms
  320. // initially, or 2/3 of the previous successful compaction time for adaptive
  321. // timing). First-time jobs are always canceled; retries have a 10% chance
  322. // to test consecutive cancellation scenarios.
  323. static std::shared_ptr<std::atomic<bool>> SetupCancellation(
  324. OpenAndCompactOptions& open_compact_options, bool was_canceled,
  325. Random& rand, uint64_t successful_compaction_end_to_end_micros) {
  326. auto canceled = std::make_shared<std::atomic<bool>>(false);
  327. open_compact_options.canceled = canceled.get();
  328. bool should_cancel = !was_canceled || rand.OneIn(10);
  329. if (should_cancel) {
  330. std::thread interruption_thread(
  331. [canceled, successful_compaction_end_to_end_micros]() {
  332. uint64_t sleep_micros =
  333. successful_compaction_end_to_end_micros == 0
  334. ? 50000
  335. : successful_compaction_end_to_end_micros * 2 / 3;
  336. std::this_thread::sleep_for(std::chrono::microseconds(sleep_micros));
  337. canceled->store(true);
  338. });
  339. interruption_thread.detach();
  340. }
  341. return canceled;
  342. }
  343. // Process the result of OpenAndCompact operation
  344. static void ProcessCompactionResult(
  345. const Status& s, const std::string& job_id,
  346. const CompactionServiceJobInfo& job_info,
  347. const std::string& serialized_input, const std::string& output_directory,
  348. const std::string& serialized_output, SharedState* shared,
  349. uint64_t& successful_compaction_end_to_end_micros, uint64_t start_micros,
  350. Env* env) {
  351. if (s.IsManualCompactionPaused() && FLAGS_allow_resumption_one_in > 0) {
  352. // Re-enqueue for retry
  353. shared->EnqueueRemoteCompaction(job_id, job_info, serialized_input,
  354. output_directory, true /* was_cancelled */);
  355. return;
  356. }
  357. if (!s.ok()) {
  358. if (!StressTest::IsErrorInjectedAndRetryable(s)) {
  359. // Print in stdout instead of stderr to avoid stress test failure,
  360. // because OpenAndCompact() failure doesn't necessarily mean
  361. // primary db instance failure.
  362. fprintf(stdout, "Failed to run OpenAndCompact(%s): %s\n",
  363. job_info.db_name.c_str(), s.ToString().c_str());
  364. }
  365. } else {
  366. // Track successful completion time
  367. successful_compaction_end_to_end_micros = env->NowMicros() - start_micros;
  368. }
  369. // Add the output regardless of status, so that primary DB doesn't rely
  370. // on the timeout to finish waiting. The actual failure from the
  371. // deserialization can fail the compaction properly
  372. shared->AddRemoteCompactionResult(job_id, s, serialized_output);
  373. }
  374. static void ProcessRemoteCompactionJob(
  375. const std::string& job_id, const CompactionServiceJobInfo& job_info,
  376. const std::string& serialized_input, const std::string& output_directory,
  377. bool was_canceled, SharedState* shared, StressTest* stress_test,
  378. Random& rand, uint64_t& successful_compaction_end_to_end_micros) {
  379. auto options = stress_test->GetOptions(job_info.cf_id);
  380. assert(options.env != nullptr);
  381. auto override_options = CreateOverrideOptions(options, job_info);
  382. OpenAndCompactOptions open_compact_options;
  383. if (FLAGS_allow_resumption_one_in > 0) {
  384. open_compact_options.allow_resumption =
  385. rand.OneIn(FLAGS_allow_resumption_one_in);
  386. } else {
  387. open_compact_options.allow_resumption = false;
  388. }
  389. if (!open_compact_options.allow_resumption) {
  390. CleanupOutputDirectory(output_directory);
  391. }
  392. std::shared_ptr<std::atomic<bool>> canceled = nullptr;
  393. if (FLAGS_allow_resumption_one_in > 0) {
  394. canceled = SetupCancellation(open_compact_options, was_canceled, rand,
  395. successful_compaction_end_to_end_micros);
  396. }
  397. std::string serialized_output;
  398. uint64_t start_micros = options.env->NowMicros();
  399. Status s = DB::OpenAndCompact(open_compact_options, job_info.db_name,
  400. output_directory, serialized_input,
  401. &serialized_output, override_options);
  402. ProcessCompactionResult(s, job_id, job_info, serialized_input,
  403. output_directory, serialized_output, shared,
  404. successful_compaction_end_to_end_micros, start_micros,
  405. options.env);
  406. }
  407. void RemoteCompactionWorkerThread(void* v) {
  408. assert(FLAGS_remote_compaction_worker_threads > 0);
  409. assert(FLAGS_remote_compaction_worker_interval > 0);
  410. auto* thread = static_cast<ThreadState*>(v);
  411. SharedState* shared = thread->shared;
  412. StressTest* stress_test = shared->GetStressTest();
  413. assert(stress_test != nullptr);
  414. #ifndef NDEBUG
  415. SetupFaultInjectionForRemoteCompaction(shared);
  416. #endif // NDEBUG
  417. // Tracks the duration (in microseconds) of the most recent successfully
  418. // completed compaction from start to finish. This value is used in
  419. // SetupCancellation() to adaptively set up cancellation point for a
  420. // compaction
  421. uint64_t successful_compaction_end_to_end_micros = 0;
  422. Random rand(static_cast<uint32_t>(FLAGS_seed));
  423. // Main worker loop
  424. while (true) {
  425. // Check if we should stop
  426. {
  427. MutexLock l(shared->GetMutex());
  428. if (shared->ShouldStopBgThread()) {
  429. shared->IncBgThreadsFinished();
  430. if (shared->BgThreadsFinished()) {
  431. shared->GetCondVar()->SignalAll();
  432. }
  433. return;
  434. }
  435. }
  436. std::string job_id;
  437. CompactionServiceJobInfo job_info;
  438. std::string serialized_input;
  439. std::string output_directory;
  440. bool was_canceled;
  441. if (shared->DequeueRemoteCompaction(&job_id, &job_info, &serialized_input,
  442. &output_directory, &was_canceled)) {
  443. ProcessRemoteCompactionJob(
  444. job_id, job_info, serialized_input, output_directory, was_canceled,
  445. shared, stress_test, rand, successful_compaction_end_to_end_micros);
  446. }
  447. db_stress_env->SleepForMicroseconds(
  448. thread->rand.Next() % FLAGS_remote_compaction_worker_interval * 1000 +
  449. 1);
  450. }
  451. }
  452. void PrintKeyValue(int cf, uint64_t key, const char* value, size_t sz) {
  453. if (!FLAGS_verbose) {
  454. return;
  455. }
  456. std::string tmp;
  457. tmp.reserve(sz * 2 + 16);
  458. char buf[4];
  459. for (size_t i = 0; i < sz; i++) {
  460. snprintf(buf, 4, "%X", value[i]);
  461. tmp.append(buf);
  462. }
  463. auto key_str = Key(key);
  464. Slice key_slice = key_str;
  465. fprintf(stdout, "[CF %d] %s (%" PRIi64 ") == > (%" ROCKSDB_PRIszt ") %s\n",
  466. cf, key_slice.ToString(true).c_str(), key, sz, tmp.c_str());
  467. }
  468. // Note that if hot_key_alpha != 0, it generates the key based on Zipfian
  469. // distribution. Keys are randomly scattered to [0, FLAGS_max_key]. It does
  470. // not ensure the order of the keys being generated and the keys does not have
  471. // the active range which is related to FLAGS_active_width.
  472. int64_t GenerateOneKey(ThreadState* thread, uint64_t iteration) {
  473. const double completed_ratio =
  474. static_cast<double>(iteration) / FLAGS_ops_per_thread;
  475. const int64_t base_key = static_cast<int64_t>(
  476. completed_ratio * (FLAGS_max_key - FLAGS_active_width));
  477. int64_t rand_seed = base_key + thread->rand.Next() % FLAGS_active_width;
  478. int64_t cur_key = rand_seed;
  479. if (FLAGS_hot_key_alpha != 0) {
  480. // If set the Zipfian distribution Alpha to non 0, use Zipfian
  481. double float_rand =
  482. (static_cast<double>(thread->rand.Next() % FLAGS_max_key)) /
  483. FLAGS_max_key;
  484. cur_key = GetOneHotKeyID(float_rand, FLAGS_max_key);
  485. }
  486. return cur_key;
  487. }
  488. // Note that if hot_key_alpha != 0, it generates the key based on Zipfian
  489. // distribution. Keys being generated are in random order.
  490. // If user want to generate keys based on uniform distribution, user needs to
  491. // set hot_key_alpha == 0. It will generate the random keys in increasing
  492. // order in the key array (ensure key[i] >= key[i+1]) and constrained in a
  493. // range related to FLAGS_active_width.
  494. std::vector<int64_t> GenerateNKeys(ThreadState* thread, int num_keys,
  495. uint64_t iteration) {
  496. const double completed_ratio =
  497. static_cast<double>(iteration) / FLAGS_ops_per_thread;
  498. const int64_t base_key = static_cast<int64_t>(
  499. completed_ratio * (FLAGS_max_key - FLAGS_active_width));
  500. std::vector<int64_t> keys;
  501. keys.reserve(num_keys);
  502. int64_t next_key = base_key + thread->rand.Next() % FLAGS_active_width;
  503. keys.push_back(next_key);
  504. for (int i = 1; i < num_keys; ++i) {
  505. // Generate the key follows zipfian distribution
  506. if (FLAGS_hot_key_alpha != 0) {
  507. double float_rand =
  508. (static_cast<double>(thread->rand.Next() % FLAGS_max_key)) /
  509. FLAGS_max_key;
  510. next_key = GetOneHotKeyID(float_rand, FLAGS_max_key);
  511. } else {
  512. // This may result in some duplicate keys
  513. next_key = next_key + thread->rand.Next() %
  514. (FLAGS_active_width - (next_key - base_key));
  515. }
  516. keys.push_back(next_key);
  517. }
  518. return keys;
  519. }
  520. size_t GenerateValue(uint32_t rand, char* v, size_t max_sz) {
  521. size_t value_sz =
  522. ((rand % kRandomValueMaxFactor) + 1) * FLAGS_value_size_mult;
  523. assert(value_sz <= max_sz && value_sz >= sizeof(uint32_t));
  524. (void)max_sz;
  525. PutUnaligned(reinterpret_cast<uint32_t*>(v), rand);
  526. for (size_t i = sizeof(uint32_t); i < value_sz; i++) {
  527. v[i] = (char)(rand ^ i);
  528. }
  529. v[value_sz] = '\0';
  530. return value_sz; // the size of the value set.
  531. }
  532. uint32_t GetValueBase(Slice s) {
  533. assert(s.size() >= sizeof(uint32_t));
  534. uint32_t res;
  535. GetUnaligned(reinterpret_cast<const uint32_t*>(s.data()), &res);
  536. return res;
  537. }
  538. AttributeGroups GenerateAttributeGroups(
  539. const std::vector<ColumnFamilyHandle*>& cfhs, uint32_t value_base,
  540. const Slice& slice) {
  541. WideColumns columns = GenerateWideColumns(value_base, slice);
  542. AttributeGroups attribute_groups;
  543. for (auto* cfh : cfhs) {
  544. attribute_groups.emplace_back(cfh, columns);
  545. }
  546. return attribute_groups;
  547. }
  548. WideColumns GenerateWideColumns(uint32_t value_base, const Slice& slice) {
  549. WideColumns columns;
  550. constexpr size_t max_columns = 4;
  551. const size_t num_columns = (value_base % max_columns) + 1;
  552. columns.reserve(num_columns);
  553. assert(slice.size() >= num_columns);
  554. columns.emplace_back(kDefaultWideColumnName, slice);
  555. for (size_t i = 1; i < num_columns; ++i) {
  556. const Slice name(slice.data(), i);
  557. const Slice value(slice.data() + i, slice.size() - i);
  558. columns.emplace_back(name, value);
  559. }
  560. return columns;
  561. }
  562. WideColumns GenerateExpectedWideColumns(uint32_t value_base,
  563. const Slice& slice) {
  564. if (FLAGS_use_put_entity_one_in == 0 ||
  565. (value_base % FLAGS_use_put_entity_one_in) != 0) {
  566. return WideColumns{{kDefaultWideColumnName, slice}};
  567. }
  568. WideColumns columns = GenerateWideColumns(value_base, slice);
  569. WideColumnsHelper::SortColumns(columns);
  570. return columns;
  571. }
  572. bool VerifyWideColumns(const Slice& value, const WideColumns& columns) {
  573. if (value.size() < sizeof(uint32_t)) {
  574. return false;
  575. }
  576. const uint32_t value_base = GetValueBase(value);
  577. const WideColumns expected_columns =
  578. GenerateExpectedWideColumns(value_base, value);
  579. if (columns != expected_columns) {
  580. return false;
  581. }
  582. return true;
  583. }
  584. bool VerifyWideColumns(const WideColumns& columns) {
  585. if (!WideColumnsHelper::HasDefaultColumn(columns)) {
  586. return false;
  587. }
  588. const Slice& value_of_default = WideColumnsHelper::GetDefaultColumn(columns);
  589. return VerifyWideColumns(value_of_default, columns);
  590. }
  591. bool VerifyIteratorAttributeGroups(
  592. const IteratorAttributeGroups& attribute_groups) {
  593. for (const auto& attribute_group : attribute_groups) {
  594. if (!VerifyWideColumns(attribute_group.columns())) {
  595. return false;
  596. }
  597. }
  598. return true;
  599. }
  600. std::string GetNowNanos() {
  601. uint64_t t = db_stress_env->NowNanos();
  602. std::string ret;
  603. PutFixed64(&ret, t);
  604. return ret;
  605. }
  606. uint64_t GetWriteUnixTime(ThreadState* thread) {
  607. static uint64_t kPreserveSeconds =
  608. std::max(FLAGS_preserve_internal_time_seconds,
  609. FLAGS_preclude_last_level_data_seconds);
  610. static uint64_t kFallbackTime = std::numeric_limits<uint64_t>::max();
  611. int64_t write_time = 0;
  612. Status s = db_stress_env->GetCurrentTime(&write_time);
  613. uint32_t write_time_mode = thread->rand.Uniform(3);
  614. if (write_time_mode == 0 || !s.ok()) {
  615. return kFallbackTime;
  616. } else if (write_time_mode == 1) {
  617. uint64_t delta = kPreserveSeconds > 0
  618. ? static_cast<uint64_t>(thread->rand.Uniform(
  619. static_cast<int>(kPreserveSeconds)))
  620. : 0;
  621. return static_cast<uint64_t>(write_time) - delta;
  622. } else {
  623. return static_cast<uint64_t>(write_time) - kPreserveSeconds;
  624. }
  625. }
  626. namespace {
  627. class MyXXH64Checksum : public FileChecksumGenerator {
  628. public:
  629. explicit MyXXH64Checksum(bool big) : big_(big) {
  630. state_ = XXH64_createState();
  631. XXH64_reset(state_, 0);
  632. }
  633. ~MyXXH64Checksum() override { XXH64_freeState(state_); }
  634. void Update(const char* data, size_t n) override {
  635. XXH64_update(state_, data, n);
  636. }
  637. void Finalize() override {
  638. assert(str_.empty());
  639. uint64_t digest = XXH64_digest(state_);
  640. // Store as little endian raw bytes
  641. PutFixed64(&str_, digest);
  642. if (big_) {
  643. // Throw in some more data for stress testing (448 bits total)
  644. PutFixed64(&str_, GetSliceHash64(str_));
  645. PutFixed64(&str_, GetSliceHash64(str_));
  646. PutFixed64(&str_, GetSliceHash64(str_));
  647. PutFixed64(&str_, GetSliceHash64(str_));
  648. PutFixed64(&str_, GetSliceHash64(str_));
  649. PutFixed64(&str_, GetSliceHash64(str_));
  650. }
  651. }
  652. std::string GetChecksum() const override {
  653. assert(!str_.empty());
  654. return str_;
  655. }
  656. const char* Name() const override {
  657. return big_ ? "MyBigChecksum" : "MyXXH64Checksum";
  658. }
  659. private:
  660. bool big_;
  661. XXH64_state_t* state_;
  662. std::string str_;
  663. };
  664. class DbStressChecksumGenFactory : public FileChecksumGenFactory {
  665. std::string default_func_name_;
  666. std::unique_ptr<FileChecksumGenerator> CreateFromFuncName(
  667. const std::string& func_name) {
  668. std::unique_ptr<FileChecksumGenerator> rv;
  669. if (func_name == "FileChecksumCrc32c") {
  670. rv.reset(new FileChecksumGenCrc32c(FileChecksumGenContext()));
  671. } else if (func_name == "MyXXH64Checksum") {
  672. rv.reset(new MyXXH64Checksum(false /* big */));
  673. } else if (func_name == "MyBigChecksum") {
  674. rv.reset(new MyXXH64Checksum(true /* big */));
  675. } else {
  676. // Should be a recognized function when we get here
  677. assert(false);
  678. }
  679. return rv;
  680. }
  681. public:
  682. explicit DbStressChecksumGenFactory(const std::string& default_func_name)
  683. : default_func_name_(default_func_name) {}
  684. std::unique_ptr<FileChecksumGenerator> CreateFileChecksumGenerator(
  685. const FileChecksumGenContext& context) override {
  686. if (context.requested_checksum_func_name.empty()) {
  687. return CreateFromFuncName(default_func_name_);
  688. } else {
  689. return CreateFromFuncName(context.requested_checksum_func_name);
  690. }
  691. }
  692. const char* Name() const override { return "FileChecksumGenCrc32cFactory"; }
  693. };
  694. } // namespace
  695. std::shared_ptr<FileChecksumGenFactory> GetFileChecksumImpl(
  696. const std::string& name) {
  697. // Translate from friendly names to internal names
  698. std::string internal_name;
  699. if (name == "crc32c") {
  700. internal_name = "FileChecksumCrc32c";
  701. } else if (name == "xxh64") {
  702. internal_name = "MyXXH64Checksum";
  703. } else if (name == "big") {
  704. internal_name = "MyBigChecksum";
  705. } else {
  706. assert(name.empty() || name == "none");
  707. return nullptr;
  708. }
  709. return std::make_shared<DbStressChecksumGenFactory>(internal_name);
  710. }
  711. Status DeleteFilesInDirectory(const std::string& dirname) {
  712. std::vector<std::string> filenames;
  713. Status s = Env::Default()->GetChildren(dirname, &filenames);
  714. for (size_t i = 0; s.ok() && i < filenames.size(); ++i) {
  715. s = Env::Default()->DeleteFile(dirname + "/" + filenames[i]);
  716. }
  717. return s;
  718. }
  719. Status SaveFilesInDirectory(const std::string& src_dirname,
  720. const std::string& dst_dirname) {
  721. std::vector<std::string> filenames;
  722. Status s = Env::Default()->GetChildren(src_dirname, &filenames);
  723. for (size_t i = 0; s.ok() && i < filenames.size(); ++i) {
  724. bool is_dir = false;
  725. s = Env::Default()->IsDirectory(src_dirname + "/" + filenames[i], &is_dir);
  726. if (s.ok()) {
  727. if (is_dir) {
  728. continue;
  729. }
  730. s = Env::Default()->LinkFile(src_dirname + "/" + filenames[i],
  731. dst_dirname + "/" + filenames[i]);
  732. }
  733. }
  734. return s;
  735. }
  736. Status InitUnverifiedSubdir(const std::string& dirname) {
  737. Status s = Env::Default()->FileExists(dirname);
  738. if (s.IsNotFound()) {
  739. return Status::OK();
  740. }
  741. const std::string kUnverifiedDirname = dirname + "/unverified";
  742. if (s.ok()) {
  743. s = Env::Default()->CreateDirIfMissing(kUnverifiedDirname);
  744. }
  745. if (s.ok()) {
  746. // It might already exist with some stale contents. Delete any such
  747. // contents.
  748. s = DeleteFilesInDirectory(kUnverifiedDirname);
  749. }
  750. if (s.ok()) {
  751. s = SaveFilesInDirectory(dirname, kUnverifiedDirname);
  752. }
  753. return s;
  754. }
  755. Status DestroyUnverifiedSubdir(const std::string& dirname) {
  756. Status s = Env::Default()->FileExists(dirname);
  757. if (s.IsNotFound()) {
  758. return Status::OK();
  759. }
  760. const std::string kUnverifiedDirname = dirname + "/unverified";
  761. if (s.ok()) {
  762. s = Env::Default()->FileExists(kUnverifiedDirname);
  763. }
  764. if (s.IsNotFound()) {
  765. return Status::OK();
  766. }
  767. if (s.ok()) {
  768. s = DeleteFilesInDirectory(kUnverifiedDirname);
  769. }
  770. if (s.ok()) {
  771. s = Env::Default()->DeleteDir(kUnverifiedDirname);
  772. }
  773. return s;
  774. }
  775. } // namespace ROCKSDB_NAMESPACE
  776. #endif // GFLAGS