| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286 |
- // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
- // This source code is licensed under both the GPLv2 (found in the
- // COPYING file in the root directory) and Apache 2.0 License
- // (found in the LICENSE.Apache file in the root directory).
- //
- // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
- // Use of this source code is governed by a BSD-style license that can be
- // found in the LICENSE file. See the AUTHORS file for names of contributors.
- //
- #include "db_stress_tool/db_stress_shared_state.h"
- #ifdef GFLAGS
- #include "db_stress_tool/db_stress_common.h"
- #include "utilities/fault_injection_fs.h"
- namespace ROCKSDB_NAMESPACE {
- void ThreadBody(void* v) {
- ThreadStatusUtil::RegisterThread(db_stress_env, ThreadStatus::USER);
- ThreadState* thread = static_cast<ThreadState*>(v);
- SharedState* shared = thread->shared;
- if (!FLAGS_skip_verifydb && shared->ShouldVerifyAtBeginning()) {
- thread->shared->GetStressTest()->VerifyDb(thread);
- }
- {
- MutexLock l(shared->GetMutex());
- shared->IncInitialized();
- if (shared->AllInitialized()) {
- shared->GetCondVar()->SignalAll();
- }
- }
- if (!FLAGS_verification_only) {
- {
- MutexLock l(shared->GetMutex());
- while (!shared->Started()) {
- shared->GetCondVar()->Wait();
- }
- }
- thread->shared->GetStressTest()->OperateDb(thread);
- {
- MutexLock l(shared->GetMutex());
- shared->IncOperated();
- if (shared->AllOperated()) {
- shared->GetCondVar()->SignalAll();
- }
- while (!shared->VerifyStarted()) {
- shared->GetCondVar()->Wait();
- }
- }
- if (!FLAGS_skip_verifydb) {
- thread->shared->GetStressTest()->VerifyDb(thread);
- }
- {
- MutexLock l(shared->GetMutex());
- shared->IncDone();
- if (shared->AllDone()) {
- shared->GetCondVar()->SignalAll();
- }
- }
- }
- ThreadStatusUtil::UnregisterThread();
- }
- bool RunStressTestImpl(SharedState* shared) {
- SystemClock* clock = db_stress_env->GetSystemClock().get();
- StressTest* stress = shared->GetStressTest();
- if (shared->ShouldVerifyAtBeginning() && FLAGS_preserve_unverified_changes) {
- Status s = InitUnverifiedSubdir(FLAGS_db);
- if (s.ok() && !FLAGS_expected_values_dir.empty()) {
- s = InitUnverifiedSubdir(FLAGS_expected_values_dir);
- }
- if (!s.ok()) {
- fprintf(stderr, "Failed to setup unverified state dir: %s\n",
- s.ToString().c_str());
- exit(1);
- }
- }
- stress->InitDb(shared);
- stress->FinishInitDb(shared);
- uint32_t n = FLAGS_threads;
- uint64_t now = clock->NowMicros();
- fprintf(stdout, "%s Initializing worker threads\n",
- clock->TimeToString(now / 1000000).c_str());
- shared->SetThreads(n);
- if (FLAGS_compaction_thread_pool_adjust_interval > 0) {
- shared->IncBgThreads();
- }
- if (FLAGS_continuous_verification_interval > 0) {
- shared->IncBgThreads();
- }
- if (FLAGS_compressed_secondary_cache_size > 0 ||
- FLAGS_compressed_secondary_cache_ratio > 0.0) {
- shared->IncBgThreads();
- }
- uint32_t remote_compaction_worker_thread_count =
- FLAGS_remote_compaction_worker_threads;
- if (remote_compaction_worker_thread_count > 0) {
- for (uint32_t i = 0; i < remote_compaction_worker_thread_count; i++) {
- shared->IncBgThreads();
- }
- }
- std::vector<ThreadState*> threads(n);
- for (uint32_t i = 0; i < n; i++) {
- threads[i] = new ThreadState(i, shared);
- db_stress_env->StartThread(ThreadBody, threads[i]);
- }
- ThreadState bg_thread(0, shared);
- if (FLAGS_compaction_thread_pool_adjust_interval > 0) {
- db_stress_env->StartThread(PoolSizeChangeThread, &bg_thread);
- }
- ThreadState continuous_verification_thread(0, shared);
- if (FLAGS_continuous_verification_interval > 0) {
- db_stress_env->StartThread(DbVerificationThread,
- &continuous_verification_thread);
- }
- ThreadState compressed_cache_set_capacity_thread(0, shared);
- if (FLAGS_compressed_secondary_cache_size > 0 ||
- FLAGS_compressed_secondary_cache_ratio > 0.0) {
- db_stress_env->StartThread(CompressedCacheSetCapacityThread,
- &compressed_cache_set_capacity_thread);
- }
- std::vector<ThreadState*> remote_compaction_worker_threads;
- if (remote_compaction_worker_thread_count > 0) {
- remote_compaction_worker_threads.reserve(
- remote_compaction_worker_thread_count);
- for (uint32_t i = 0; i < remote_compaction_worker_thread_count; i++) {
- ThreadState* ts = new ThreadState(i, shared);
- remote_compaction_worker_threads.push_back(ts);
- db_stress_env->StartThread(RemoteCompactionWorkerThread, ts);
- }
- }
- // Each thread goes through the following states:
- // initializing -> wait for others to init -> read/populate/depopulate
- // wait for others to operate -> verify -> done
- {
- MutexLock l(shared->GetMutex());
- while (!shared->AllInitialized()) {
- shared->GetCondVar()->Wait();
- }
- if (shared->ShouldVerifyAtBeginning()) {
- if (shared->HasVerificationFailedYet()) {
- fprintf(stderr, "Crash-recovery verification failed :(\n");
- } else {
- fprintf(stdout, "Crash-recovery verification passed :)\n");
- Status s = DestroyUnverifiedSubdir(FLAGS_db);
- if (s.ok() && !FLAGS_expected_values_dir.empty()) {
- s = DestroyUnverifiedSubdir(FLAGS_expected_values_dir);
- }
- if (!s.ok()) {
- fprintf(stderr, "Failed to cleanup unverified state dir: %s\n",
- s.ToString().c_str());
- exit(1);
- }
- }
- }
- if (!FLAGS_verification_only) {
- // This is after the verification step to avoid making all those `Get()`s
- // and `MultiGet()`s contend on the DB-wide trace mutex.
- if (!FLAGS_expected_values_dir.empty()) {
- stress->TrackExpectedState(shared);
- }
- if (FLAGS_sync_fault_injection || FLAGS_write_fault_one_in > 0) {
- fault_fs_guard->SetFilesystemDirectWritable(false);
- fault_fs_guard->SetInjectUnsyncedDataLoss(FLAGS_sync_fault_injection);
- if (FLAGS_exclude_wal_from_write_fault_injection) {
- fault_fs_guard->SetFileTypesExcludedFromWriteFaultInjection(
- {FileType::kWalFile});
- }
- }
- if (ShouldDisableAutoCompactionsBeforeVerifyDb()) {
- Status s = stress->EnableAutoCompaction();
- assert(s.ok());
- }
- fprintf(stdout, "%s Starting database operations\n",
- clock->TimeToString(now / 1000000).c_str());
- shared->SetStart();
- shared->GetCondVar()->SignalAll();
- while (!shared->AllOperated()) {
- shared->GetCondVar()->Wait();
- }
- now = clock->NowMicros();
- if (FLAGS_test_batches_snapshots) {
- fprintf(stdout, "%s Limited verification already done during gets\n",
- clock->TimeToString((uint64_t)now / 1000000).c_str());
- } else if (FLAGS_skip_verifydb) {
- fprintf(stdout, "%s Verification skipped\n",
- clock->TimeToString((uint64_t)now / 1000000).c_str());
- } else {
- fprintf(stdout, "%s Starting verification\n",
- clock->TimeToString((uint64_t)now / 1000000).c_str());
- }
- shared->SetStartVerify();
- shared->GetCondVar()->SignalAll();
- while (!shared->AllDone()) {
- shared->GetCondVar()->Wait();
- }
- }
- }
- // If we are running verification_only
- // stats will be empty and trying to report them will
- // emit no ops or writes error. To avoid this, merging and reporting stats
- // are not executed when running with verification_only
- // TODO: We need to create verification stats (e.g. how many keys
- // are verified by which method) and report them here instead of operation
- // stats.
- if (!FLAGS_verification_only) {
- for (unsigned int i = 1; i < n; i++) {
- threads[0]->stats.Merge(threads[i]->stats);
- }
- threads[0]->stats.Report("Stress Test");
- }
- for (unsigned int i = 0; i < n; i++) {
- delete threads[i];
- threads[i] = nullptr;
- }
- now = clock->NowMicros();
- if (!FLAGS_skip_verifydb && !FLAGS_test_batches_snapshots &&
- !shared->HasVerificationFailedYet()) {
- fprintf(stdout, "%s Verification successful\n",
- clock->TimeToString(now / 1000000).c_str());
- }
- if (!FLAGS_verification_only) {
- stress->PrintStatistics();
- }
- if (FLAGS_compaction_thread_pool_adjust_interval > 0 ||
- FLAGS_continuous_verification_interval > 0 ||
- FLAGS_compressed_secondary_cache_size > 0 ||
- FLAGS_compressed_secondary_cache_ratio > 0.0 ||
- remote_compaction_worker_thread_count > 0) {
- MutexLock l(shared->GetMutex());
- shared->SetShouldStopBgThread();
- while (!shared->BgThreadsFinished()) {
- shared->GetCondVar()->Wait();
- }
- }
- assert(remote_compaction_worker_threads.size() ==
- remote_compaction_worker_thread_count);
- if (remote_compaction_worker_thread_count > 0) {
- for (ThreadState* thread_state : remote_compaction_worker_threads) {
- delete thread_state;
- }
- remote_compaction_worker_threads.clear();
- }
- if (shared->HasVerificationFailedYet()) {
- fprintf(stderr, "Verification failed :(\n");
- return false;
- }
- return true;
- }
- bool RunStressTest(SharedState* shared) {
- ThreadStatusUtil::RegisterThread(db_stress_env, ThreadStatus::USER);
- bool result = RunStressTestImpl(shared);
- ThreadStatusUtil::UnregisterThread();
- return result;
- }
- } // namespace ROCKSDB_NAMESPACE
- #endif // GFLAGS
|