db_stress_shared_state.h 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors
  9. #ifdef GFLAGS
  10. #pragma once
  11. #include "db_stress_tool/db_stress_stat.h"
  12. #include "db_stress_tool/expected_state.h"
  13. // SyncPoint is not supported in Released Windows Mode.
  14. #if !(defined NDEBUG) || !defined(OS_WIN)
  15. #include "test_util/sync_point.h"
  16. #endif // !(defined NDEBUG) || !defined(OS_WIN)
  17. #include "util/gflags_compat.h"
  18. DECLARE_uint64(seed);
  19. DECLARE_int64(max_key);
  20. DECLARE_uint64(log2_keys_per_lock);
  21. DECLARE_int32(threads);
  22. DECLARE_int32(column_families);
  23. DECLARE_int32(nooverwritepercent);
  24. DECLARE_string(expected_values_dir);
  25. DECLARE_int32(clear_column_family_one_in);
  26. DECLARE_bool(test_batches_snapshots);
  27. DECLARE_int32(compaction_thread_pool_adjust_interval);
  28. DECLARE_int32(continuous_verification_interval);
  29. DECLARE_bool(error_recovery_with_no_fault_injection);
  30. DECLARE_bool(sync_fault_injection);
  31. DECLARE_int32(range_deletion_width);
  32. DECLARE_bool(disable_wal);
  33. DECLARE_int32(manual_wal_flush_one_in);
  34. DECLARE_int32(metadata_read_fault_one_in);
  35. DECLARE_int32(metadata_write_fault_one_in);
  36. DECLARE_int32(read_fault_one_in);
  37. DECLARE_int32(write_fault_one_in);
  38. DECLARE_bool(exclude_wal_from_write_fault_injection);
  39. DECLARE_int32(open_metadata_read_fault_one_in);
  40. DECLARE_int32(open_metadata_write_fault_one_in);
  41. DECLARE_int32(open_write_fault_one_in);
  42. DECLARE_int32(open_read_fault_one_in);
  43. DECLARE_int32(inject_error_severity);
  44. DECLARE_bool(disable_auto_compactions);
  45. DECLARE_bool(enable_compaction_filter);
  46. namespace ROCKSDB_NAMESPACE {
  47. class StressTest;
  48. struct RemoteCompactionQueueItem {
  49. std::string job_id;
  50. CompactionServiceJobInfo job_info;
  51. std::string serialized_input;
  52. std::string output_directory;
  53. bool canceled;
  54. RemoteCompactionQueueItem(const std::string& id,
  55. const CompactionServiceJobInfo& info,
  56. const std::string& input,
  57. const std::string& output_dir, bool was_canceled)
  58. : job_id(id),
  59. job_info(info),
  60. serialized_input(input),
  61. output_directory(output_dir),
  62. canceled(was_canceled) {}
  63. };
  64. // State shared by all concurrent executions of the same benchmark.
  65. class SharedState {
  66. public:
  67. // Errors when reading filter blocks are ignored, so we use a thread
  68. // local variable updated via sync points to keep track of errors injected
  69. // while reading filter blocks in order to ignore the Get/MultiGet result
  70. // for those calls
  71. static thread_local bool ignore_read_error;
  72. SharedState(Env* /*env*/, StressTest* stress_test)
  73. : cv_(&mu_),
  74. seed_(static_cast<uint32_t>(FLAGS_seed)),
  75. max_key_(FLAGS_max_key),
  76. log2_keys_per_lock_(static_cast<uint32_t>(FLAGS_log2_keys_per_lock)),
  77. num_threads_(0),
  78. num_initialized_(0),
  79. num_populated_(0),
  80. vote_reopen_(0),
  81. num_done_(0),
  82. start_(false),
  83. start_verify_(false),
  84. num_bg_threads_(0),
  85. should_stop_bg_thread_(false),
  86. bg_thread_finished_(0),
  87. stress_test_(stress_test),
  88. verification_failure_(false),
  89. should_stop_test_(false),
  90. no_overwrite_ids_(GenerateNoOverwriteIds()),
  91. expected_state_manager_(nullptr),
  92. printing_verification_results_(false),
  93. start_timestamp_(Env::Default()->NowNanos()) {
  94. Status status;
  95. // TODO: We should introduce a way to explicitly disable verification
  96. // during shutdown. When that is disabled and FLAGS_expected_values_dir
  97. // is empty (disabling verification at startup), we can skip tracking
  98. // expected state. Only then should we permit bypassing the below feature
  99. // compatibility checks.
  100. if (!FLAGS_expected_values_dir.empty()) {
  101. if (!std::atomic<uint32_t>{}.is_lock_free() ||
  102. !std::atomic<uint64_t>{}.is_lock_free()) {
  103. std::ostringstream status_s;
  104. status_s << "Cannot use --expected_values_dir on platforms without "
  105. "lock-free "
  106. << (!std::atomic<uint32_t>{}.is_lock_free()
  107. ? "std::atomic<uint32_t>"
  108. : "std::atomic<uint64_t>");
  109. status = Status::InvalidArgument(status_s.str());
  110. }
  111. if (status.ok() && FLAGS_clear_column_family_one_in > 0) {
  112. status = Status::InvalidArgument(
  113. "Cannot use --expected_values_dir on when "
  114. "--clear_column_family_one_in is greater than zero.");
  115. }
  116. }
  117. if (status.ok()) {
  118. if (FLAGS_expected_values_dir.empty()) {
  119. expected_state_manager_.reset(
  120. new AnonExpectedStateManager(FLAGS_max_key, FLAGS_column_families));
  121. } else {
  122. expected_state_manager_.reset(new FileExpectedStateManager(
  123. FLAGS_max_key, FLAGS_column_families, FLAGS_expected_values_dir));
  124. }
  125. status = expected_state_manager_->Open();
  126. }
  127. if (!status.ok()) {
  128. fprintf(stderr, "Failed setting up expected state with error: %s\n",
  129. status.ToString().c_str());
  130. exit(1);
  131. }
  132. if (FLAGS_test_batches_snapshots) {
  133. fprintf(stdout, "No lock creation because test_batches_snapshots set\n");
  134. return;
  135. }
  136. long num_locks = static_cast<long>(max_key_ >> log2_keys_per_lock_);
  137. if (max_key_ & ((1 << log2_keys_per_lock_) - 1)) {
  138. num_locks++;
  139. }
  140. fprintf(stdout, "Creating %ld locks\n", num_locks * FLAGS_column_families);
  141. key_locks_.resize(FLAGS_column_families);
  142. for (int i = 0; i < FLAGS_column_families; ++i) {
  143. key_locks_[i].reset(new port::Mutex[num_locks]);
  144. }
  145. if (FLAGS_read_fault_one_in || FLAGS_metadata_read_fault_one_in) {
  146. #ifdef NDEBUG
  147. // Unsupported in release mode because it relies on
  148. // `IGNORE_STATUS_IF_ERROR` to distinguish faults not expected to lead to
  149. // failure.
  150. fprintf(stderr,
  151. "Cannot set nonzero value for --read_fault_one_in in "
  152. "release mode.");
  153. exit(1);
  154. #else // NDEBUG
  155. SyncPoint::GetInstance()->SetCallBack("FaultInjectionIgnoreError",
  156. IgnoreReadErrorCallback);
  157. SyncPoint::GetInstance()->EnableProcessing();
  158. #endif // NDEBUG
  159. }
  160. }
  161. ~SharedState() {
  162. #ifndef NDEBUG
  163. if (FLAGS_read_fault_one_in || FLAGS_write_fault_one_in ||
  164. FLAGS_metadata_write_fault_one_in) {
  165. SyncPoint::GetInstance()->ClearAllCallBacks();
  166. SyncPoint::GetInstance()->DisableProcessing();
  167. }
  168. #endif
  169. }
  170. port::Mutex* GetMutex() { return &mu_; }
  171. port::CondVar* GetCondVar() { return &cv_; }
  172. StressTest* GetStressTest() const { return stress_test_; }
  173. int64_t GetMaxKey() const { return max_key_; }
  174. uint32_t GetNumThreads() const { return num_threads_; }
  175. void SetThreads(int num_threads) { num_threads_ = num_threads; }
  176. void IncInitialized() { num_initialized_++; }
  177. void IncOperated() { num_populated_++; }
  178. void IncDone() { num_done_++; }
  179. void IncVotedReopen() { vote_reopen_ = (vote_reopen_ + 1) % num_threads_; }
  180. bool AllInitialized() const { return num_initialized_ >= num_threads_; }
  181. bool AllOperated() const { return num_populated_ >= num_threads_; }
  182. bool AllDone() const { return num_done_ >= num_threads_; }
  183. bool AllVotedReopen() { return (vote_reopen_ == 0); }
  184. void SetStart() { start_ = true; }
  185. void SetStartVerify() { start_verify_ = true; }
  186. bool Started() const { return start_; }
  187. bool VerifyStarted() const { return start_verify_; }
  188. void SetVerificationFailure() { verification_failure_.store(true); }
  189. bool HasVerificationFailedYet() const { return verification_failure_.load(); }
  190. void SetShouldStopTest() { should_stop_test_.store(true); }
  191. bool ShouldStopTest() const { return should_stop_test_.load(); }
  192. // Returns a lock covering `key` in `cf`.
  193. port::Mutex* GetMutexForKey(int cf, int64_t key) {
  194. return &key_locks_[cf][key >> log2_keys_per_lock_];
  195. }
  196. // Acquires locks for all keys in `cf`.
  197. void LockColumnFamily(int cf) {
  198. for (int i = 0; i < max_key_ >> log2_keys_per_lock_; ++i) {
  199. key_locks_[cf][i].Lock();
  200. }
  201. }
  202. // Releases locks for all keys in `cf`.
  203. void UnlockColumnFamily(int cf) {
  204. for (int i = 0; i < max_key_ >> log2_keys_per_lock_; ++i) {
  205. key_locks_[cf][i].Unlock();
  206. }
  207. }
  208. // Returns a collection of mutex locks covering the key range [start, end) in
  209. // `cf`.
  210. std::vector<std::unique_ptr<MutexLock>> GetLocksForKeyRange(int cf,
  211. int64_t start,
  212. int64_t end) {
  213. std::vector<std::unique_ptr<MutexLock>> range_locks;
  214. if (start >= end) {
  215. return range_locks;
  216. }
  217. const int64_t start_idx = start >> log2_keys_per_lock_;
  218. int64_t end_idx = end >> log2_keys_per_lock_;
  219. if ((end & ((1 << log2_keys_per_lock_) - 1)) == 0) {
  220. --end_idx;
  221. }
  222. for (int64_t idx = start_idx; idx <= end_idx; ++idx) {
  223. range_locks.emplace_back(
  224. std::make_unique<MutexLock>(&key_locks_[cf][idx]));
  225. }
  226. return range_locks;
  227. }
  228. Status SaveAtAndAfter(DB* db) {
  229. return expected_state_manager_->SaveAtAndAfter(db);
  230. }
  231. bool HasHistory() { return expected_state_manager_->HasHistory(); }
  232. Status Restore(DB* db) { return expected_state_manager_->Restore(db); }
  233. // Requires external locking covering all keys in `cf`.
  234. void ClearColumnFamily(int cf) {
  235. return expected_state_manager_->ClearColumnFamily(cf);
  236. }
  237. void SetPersistedSeqno(SequenceNumber seqno) {
  238. MutexLock l(&persist_seqno_mu_);
  239. return expected_state_manager_->SetPersistedSeqno(seqno);
  240. }
  241. SequenceNumber GetPersistedSeqno() {
  242. MutexLock l(&persist_seqno_mu_);
  243. return expected_state_manager_->GetPersistedSeqno();
  244. }
  245. void EnqueueRemoteCompaction(const std::string& job_id,
  246. const CompactionServiceJobInfo& job_info,
  247. const std::string& serialized_input,
  248. const std::string& output_directory,
  249. bool canceled) {
  250. MutexLock l(&remote_compaction_queue_mu_);
  251. remote_compaction_queue_.emplace(job_id, job_info, serialized_input,
  252. output_directory, canceled);
  253. }
  254. bool DequeueRemoteCompaction(std::string* job_id,
  255. CompactionServiceJobInfo* job_info,
  256. std::string* serialized_input,
  257. std::string* output_directory, bool* canceled) {
  258. assert(job_id);
  259. assert(job_info);
  260. assert(serialized_input);
  261. assert(output_directory);
  262. assert(canceled);
  263. MutexLock l(&remote_compaction_queue_mu_);
  264. if (!remote_compaction_queue_.empty()) {
  265. const RemoteCompactionQueueItem& item = remote_compaction_queue_.front();
  266. *job_id = item.job_id;
  267. *job_info = item.job_info;
  268. *serialized_input = item.serialized_input;
  269. *output_directory = item.output_directory;
  270. *canceled = item.canceled;
  271. remote_compaction_queue_.pop();
  272. return true;
  273. }
  274. return false;
  275. }
  276. void AddRemoteCompactionResult(const std::string& job_id,
  277. const Status& status,
  278. const std::string& result) {
  279. MutexLock l(&remote_compaction_result_map_mu_);
  280. remote_compaction_result_map_.emplace(
  281. job_id, std::pair<Status, std::string>{status, result});
  282. }
  283. std::optional<Status> GetRemoteCompactionResult(const std::string& job_id,
  284. std::string* result) {
  285. MutexLock l(&remote_compaction_result_map_mu_);
  286. if (remote_compaction_result_map_.find(job_id) !=
  287. remote_compaction_result_map_.end()) {
  288. const auto& pair = remote_compaction_result_map_.at(job_id);
  289. *result = pair.second;
  290. return pair.first;
  291. }
  292. return std::nullopt;
  293. }
  294. void RemoveRemoteCompactionResult(const std::string& job_id) {
  295. MutexLock l(&remote_compaction_result_map_mu_);
  296. remote_compaction_result_map_.erase(job_id);
  297. }
  298. // Prepare a Put that will be started but not finish yet
  299. // This is useful for crash-recovery testing when the process may crash
  300. // before updating the corresponding expected value
  301. //
  302. // Requires external locking covering `key` in `cf` to prevent
  303. // concurrent write or delete to the same `key`.
  304. PendingExpectedValue PreparePut(int cf, int64_t key) {
  305. return expected_state_manager_->PreparePut(cf, key);
  306. }
  307. // Does not requires external locking.
  308. ExpectedValue Get(int cf, int64_t key) {
  309. return expected_state_manager_->Get(cf, key);
  310. }
  311. // Prepare a Delete that will be started but not finish yet
  312. // This is useful for crash-recovery testing when the process may crash
  313. // before updating the corresponding expected value
  314. //
  315. // Requires external locking covering `key` in `cf` to prevent concurrent
  316. // write or delete to the same `key`.
  317. PendingExpectedValue PrepareDelete(int cf, int64_t key) {
  318. return expected_state_manager_->PrepareDelete(cf, key);
  319. }
  320. // Requires external locking covering `key` in `cf` to prevent concurrent
  321. // write or delete to the same `key`.
  322. PendingExpectedValue PrepareSingleDelete(int cf, int64_t key) {
  323. return expected_state_manager_->PrepareSingleDelete(cf, key);
  324. }
  325. // Requires external locking covering keys in `[begin_key, end_key)` in `cf`
  326. // to prevent concurrent write or delete to the same `key`.
  327. std::vector<PendingExpectedValue> PrepareDeleteRange(int cf,
  328. int64_t begin_key,
  329. int64_t end_key) {
  330. return expected_state_manager_->PrepareDeleteRange(cf, begin_key, end_key);
  331. }
  332. bool AllowsOverwrite(int64_t key) const {
  333. return no_overwrite_ids_.find(key) == no_overwrite_ids_.end();
  334. }
  335. // Requires external locking covering `key` in `cf` to prevent concurrent
  336. // delete to the same `key`.
  337. bool Exists(int cf, int64_t key) {
  338. return expected_state_manager_->Exists(cf, key);
  339. }
  340. // Sync the `value_base` to the corresponding expected value
  341. void SyncPut(int cf, int64_t key, uint32_t value_base) {
  342. return expected_state_manager_->SyncPut(cf, key, value_base);
  343. }
  344. // Sync the corresponding expected value to be pending Put
  345. void SyncPendingPut(int cf, int64_t key) {
  346. return expected_state_manager_->SyncPendingPut(cf, key);
  347. }
  348. // Sync the corresponding expected value to be deleted
  349. void SyncDelete(int cf, int64_t key) {
  350. return expected_state_manager_->SyncDelete(cf, key);
  351. }
  352. uint32_t GetSeed() const { return seed_; }
  353. void SetShouldStopBgThread() { should_stop_bg_thread_ = true; }
  354. bool ShouldStopBgThread() { return should_stop_bg_thread_; }
  355. void IncBgThreads() { ++num_bg_threads_; }
  356. void IncBgThreadsFinished() { ++bg_thread_finished_; }
  357. bool BgThreadsFinished() const {
  358. return bg_thread_finished_ == num_bg_threads_;
  359. }
  360. bool ShouldVerifyAtBeginning() const {
  361. return !FLAGS_expected_values_dir.empty();
  362. }
  363. bool PrintingVerificationResults() {
  364. bool tmp = false;
  365. return !printing_verification_results_.compare_exchange_strong(
  366. tmp, true, std::memory_order_relaxed);
  367. }
  368. void FinishPrintingVerificationResults() {
  369. printing_verification_results_.store(false, std::memory_order_relaxed);
  370. }
  371. uint64_t GetStartTimestamp() const { return start_timestamp_; }
  372. void SafeTerminate() {
  373. // Grab mutex so that we don't call terminate while another thread is
  374. // attempting to print a stack trace due to the first one
  375. MutexLock l(&mu_);
  376. std::terminate();
  377. }
  378. private:
  379. static void IgnoreReadErrorCallback(void*) { ignore_read_error = true; }
  380. // Pick random keys in each column family that will not experience overwrite.
  381. std::unordered_set<int64_t> GenerateNoOverwriteIds() const {
  382. fprintf(stdout, "Choosing random keys with no overwrite\n");
  383. // Start with the identity permutation. Subsequent iterations of
  384. // for loop below will start with perm of previous for loop
  385. std::vector<int64_t> permutation(max_key_);
  386. for (int64_t i = 0; i < max_key_; ++i) {
  387. permutation[i] = i;
  388. }
  389. // Now do the Knuth shuffle
  390. const int64_t num_no_overwrite_keys =
  391. (max_key_ * FLAGS_nooverwritepercent) / 100;
  392. // Only need to figure out first num_no_overwrite_keys of permutation
  393. std::unordered_set<int64_t> ret;
  394. ret.reserve(num_no_overwrite_keys);
  395. Random64 rnd(seed_);
  396. for (int64_t i = 0; i < num_no_overwrite_keys; i++) {
  397. assert(i < max_key_);
  398. int64_t rand_index = i + rnd.Next() % (max_key_ - i);
  399. // Swap i and rand_index;
  400. int64_t temp = permutation[i];
  401. permutation[i] = permutation[rand_index];
  402. permutation[rand_index] = temp;
  403. // Fill no_overwrite_ids_ with the first num_no_overwrite_keys of
  404. // permutation
  405. ret.insert(permutation[i]);
  406. }
  407. return ret;
  408. }
  409. port::Mutex mu_;
  410. port::CondVar cv_;
  411. port::Mutex persist_seqno_mu_;
  412. const uint32_t seed_;
  413. const int64_t max_key_;
  414. const uint32_t log2_keys_per_lock_;
  415. int num_threads_;
  416. long num_initialized_;
  417. long num_populated_;
  418. long vote_reopen_;
  419. long num_done_;
  420. bool start_;
  421. bool start_verify_;
  422. int num_bg_threads_;
  423. bool should_stop_bg_thread_;
  424. int bg_thread_finished_;
  425. StressTest* stress_test_;
  426. std::atomic<bool> verification_failure_;
  427. std::atomic<bool> should_stop_test_;
  428. // Queue for the remote compaction.
  429. port::Mutex remote_compaction_queue_mu_;
  430. std::queue<RemoteCompactionQueueItem> remote_compaction_queue_;
  431. // Result Map for the remote compaciton. Key is the scheduled_job_id and value
  432. // is serialized compaction_service_result
  433. port::Mutex remote_compaction_result_map_mu_;
  434. std::unordered_map<std::string, std::pair<Status, std::string>>
  435. remote_compaction_result_map_;
  436. // Keys that should not be overwritten
  437. const std::unordered_set<int64_t> no_overwrite_ids_;
  438. std::unique_ptr<ExpectedStateManager> expected_state_manager_;
  439. // Cannot store `port::Mutex` directly in vector since it is not copyable
  440. // and storing it in the container may require copying depending on the impl.
  441. std::vector<std::unique_ptr<port::Mutex[]>> key_locks_;
  442. std::atomic<bool> printing_verification_results_;
  443. const uint64_t start_timestamp_;
  444. };
  445. // Per-thread state for concurrent executions of the same benchmark.
  446. struct ThreadState {
  447. uint32_t tid; // 0..n-1
  448. Random rand; // Has different seeds for different threads
  449. SharedState* shared;
  450. Stats stats;
  451. struct SnapshotState {
  452. const Snapshot* snapshot;
  453. // The cf from which we did a Get at this snapshot
  454. int cf_at;
  455. // The name of the cf at the time that we did a read
  456. std::string cf_at_name;
  457. // The key with which we did a Get at this snapshot
  458. std::string key;
  459. // The status of the Get
  460. Status status;
  461. // The value of the Get
  462. std::string value;
  463. // optional state of all keys in the db
  464. std::vector<bool>* key_vec;
  465. std::string timestamp;
  466. };
  467. std::queue<std::pair<uint64_t, SnapshotState>> snapshot_queue;
  468. ThreadState(uint32_t index, SharedState* _shared)
  469. : tid(index), rand(1000 + index + _shared->GetSeed()), shared(_shared) {}
  470. };
  471. } // namespace ROCKSDB_NAMESPACE
  472. #endif // GFLAGS