db_stress_driver.cc 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. //
  10. #ifdef GFLAGS
  11. #include "db_stress_tool/db_stress_common.h"
  12. namespace ROCKSDB_NAMESPACE {
  13. void ThreadBody(void* v) {
  14. ThreadState* thread = reinterpret_cast<ThreadState*>(v);
  15. SharedState* shared = thread->shared;
  16. if (shared->ShouldVerifyAtBeginning()) {
  17. thread->shared->GetStressTest()->VerifyDb(thread);
  18. }
  19. {
  20. MutexLock l(shared->GetMutex());
  21. shared->IncInitialized();
  22. if (shared->AllInitialized()) {
  23. shared->GetCondVar()->SignalAll();
  24. }
  25. while (!shared->Started()) {
  26. shared->GetCondVar()->Wait();
  27. }
  28. }
  29. thread->shared->GetStressTest()->OperateDb(thread);
  30. {
  31. MutexLock l(shared->GetMutex());
  32. shared->IncOperated();
  33. if (shared->AllOperated()) {
  34. shared->GetCondVar()->SignalAll();
  35. }
  36. while (!shared->VerifyStarted()) {
  37. shared->GetCondVar()->Wait();
  38. }
  39. }
  40. thread->shared->GetStressTest()->VerifyDb(thread);
  41. {
  42. MutexLock l(shared->GetMutex());
  43. shared->IncDone();
  44. if (shared->AllDone()) {
  45. shared->GetCondVar()->SignalAll();
  46. }
  47. }
  48. }
  49. bool RunStressTest(StressTest* stress) {
  50. stress->InitDb();
  51. SharedState shared(db_stress_env, stress);
  52. if (FLAGS_read_only) {
  53. stress->InitReadonlyDb(&shared);
  54. }
  55. uint32_t n = shared.GetNumThreads();
  56. uint64_t now = db_stress_env->NowMicros();
  57. fprintf(stdout, "%s Initializing worker threads\n",
  58. db_stress_env->TimeToString(now / 1000000).c_str());
  59. std::vector<ThreadState*> threads(n);
  60. for (uint32_t i = 0; i < n; i++) {
  61. threads[i] = new ThreadState(i, &shared);
  62. db_stress_env->StartThread(ThreadBody, threads[i]);
  63. }
  64. ThreadState bg_thread(0, &shared);
  65. if (FLAGS_compaction_thread_pool_adjust_interval > 0) {
  66. db_stress_env->StartThread(PoolSizeChangeThread, &bg_thread);
  67. }
  68. ThreadState continuous_verification_thread(0, &shared);
  69. if (FLAGS_continuous_verification_interval > 0) {
  70. db_stress_env->StartThread(DbVerificationThread,
  71. &continuous_verification_thread);
  72. }
  73. // Each thread goes through the following states:
  74. // initializing -> wait for others to init -> read/populate/depopulate
  75. // wait for others to operate -> verify -> done
  76. {
  77. MutexLock l(shared.GetMutex());
  78. while (!shared.AllInitialized()) {
  79. shared.GetCondVar()->Wait();
  80. }
  81. if (shared.ShouldVerifyAtBeginning()) {
  82. if (shared.HasVerificationFailedYet()) {
  83. fprintf(stderr, "Crash-recovery verification failed :(\n");
  84. } else {
  85. fprintf(stdout, "Crash-recovery verification passed :)\n");
  86. }
  87. }
  88. now = db_stress_env->NowMicros();
  89. fprintf(stdout, "%s Starting database operations\n",
  90. db_stress_env->TimeToString(now / 1000000).c_str());
  91. shared.SetStart();
  92. shared.GetCondVar()->SignalAll();
  93. while (!shared.AllOperated()) {
  94. shared.GetCondVar()->Wait();
  95. }
  96. now = db_stress_env->NowMicros();
  97. if (FLAGS_test_batches_snapshots) {
  98. fprintf(stdout, "%s Limited verification already done during gets\n",
  99. db_stress_env->TimeToString((uint64_t)now / 1000000).c_str());
  100. } else {
  101. fprintf(stdout, "%s Starting verification\n",
  102. db_stress_env->TimeToString((uint64_t)now / 1000000).c_str());
  103. }
  104. shared.SetStartVerify();
  105. shared.GetCondVar()->SignalAll();
  106. while (!shared.AllDone()) {
  107. shared.GetCondVar()->Wait();
  108. }
  109. }
  110. for (unsigned int i = 1; i < n; i++) {
  111. threads[0]->stats.Merge(threads[i]->stats);
  112. }
  113. threads[0]->stats.Report("Stress Test");
  114. for (unsigned int i = 0; i < n; i++) {
  115. delete threads[i];
  116. threads[i] = nullptr;
  117. }
  118. now = db_stress_env->NowMicros();
  119. if (!FLAGS_test_batches_snapshots && !shared.HasVerificationFailedYet()) {
  120. fprintf(stdout, "%s Verification successful\n",
  121. db_stress_env->TimeToString(now / 1000000).c_str());
  122. }
  123. stress->PrintStatistics();
  124. if (FLAGS_compaction_thread_pool_adjust_interval > 0 ||
  125. FLAGS_continuous_verification_interval > 0) {
  126. MutexLock l(shared.GetMutex());
  127. shared.SetShouldStopBgThread();
  128. while (!shared.BgThreadsFinished()) {
  129. shared.GetCondVar()->Wait();
  130. }
  131. }
  132. if (!stress->VerifySecondaries()) {
  133. return false;
  134. }
  135. if (shared.HasVerificationFailedYet()) {
  136. fprintf(stderr, "Verification failed :(\n");
  137. return false;
  138. }
  139. return true;
  140. }
  141. } // namespace ROCKSDB_NAMESPACE
  142. #endif // GFLAGS