db_stress_shared_state.h 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors
  9. #ifdef GFLAGS
  10. #pragma once
  11. #include "db_stress_tool/db_stress_stat.h"
  12. #include "util/gflags_compat.h"
  13. DECLARE_uint64(seed);
  14. DECLARE_int64(max_key);
  15. DECLARE_uint64(log2_keys_per_lock);
  16. DECLARE_int32(threads);
  17. DECLARE_int32(column_families);
  18. DECLARE_int32(nooverwritepercent);
  19. DECLARE_string(expected_values_path);
  20. DECLARE_int32(clear_column_family_one_in);
  21. DECLARE_bool(test_batches_snapshots);
  22. DECLARE_int32(compaction_thread_pool_adjust_interval);
  23. DECLARE_int32(continuous_verification_interval);
  24. namespace ROCKSDB_NAMESPACE {
  25. class StressTest;
  26. // State shared by all concurrent executions of the same benchmark.
  27. class SharedState {
  28. public:
  29. // indicates a key may have any value (or not be present) as an operation on
  30. // it is incomplete.
  31. static const uint32_t UNKNOWN_SENTINEL;
  32. // indicates a key should definitely be deleted
  33. static const uint32_t DELETION_SENTINEL;
  34. SharedState(Env* env, StressTest* stress_test)
  35. : cv_(&mu_),
  36. seed_(static_cast<uint32_t>(FLAGS_seed)),
  37. max_key_(FLAGS_max_key),
  38. log2_keys_per_lock_(static_cast<uint32_t>(FLAGS_log2_keys_per_lock)),
  39. num_threads_(FLAGS_threads),
  40. num_initialized_(0),
  41. num_populated_(0),
  42. vote_reopen_(0),
  43. num_done_(0),
  44. start_(false),
  45. start_verify_(false),
  46. num_bg_threads_(0),
  47. should_stop_bg_thread_(false),
  48. bg_thread_finished_(0),
  49. stress_test_(stress_test),
  50. verification_failure_(false),
  51. should_stop_test_(false),
  52. no_overwrite_ids_(FLAGS_column_families),
  53. values_(nullptr),
  54. printing_verification_results_(false) {
  55. // Pick random keys in each column family that will not experience
  56. // overwrite
  57. fprintf(stdout, "Choosing random keys with no overwrite\n");
  58. Random64 rnd(seed_);
  59. // Start with the identity permutation. Subsequent iterations of
  60. // for loop below will start with perm of previous for loop
  61. int64_t* permutation = new int64_t[max_key_];
  62. for (int64_t i = 0; i < max_key_; i++) {
  63. permutation[i] = i;
  64. }
  65. // Now do the Knuth shuffle
  66. int64_t num_no_overwrite_keys = (max_key_ * FLAGS_nooverwritepercent) / 100;
  67. // Only need to figure out first num_no_overwrite_keys of permutation
  68. no_overwrite_ids_.reserve(num_no_overwrite_keys);
  69. for (int64_t i = 0; i < num_no_overwrite_keys; i++) {
  70. int64_t rand_index = i + rnd.Next() % (max_key_ - i);
  71. // Swap i and rand_index;
  72. int64_t temp = permutation[i];
  73. permutation[i] = permutation[rand_index];
  74. permutation[rand_index] = temp;
  75. // Fill no_overwrite_ids_ with the first num_no_overwrite_keys of
  76. // permutation
  77. no_overwrite_ids_.insert(permutation[i]);
  78. }
  79. delete[] permutation;
  80. size_t expected_values_size =
  81. sizeof(std::atomic<uint32_t>) * FLAGS_column_families * max_key_;
  82. bool values_init_needed = false;
  83. Status status;
  84. if (!FLAGS_expected_values_path.empty()) {
  85. if (!std::atomic<uint32_t>{}.is_lock_free()) {
  86. status = Status::InvalidArgument(
  87. "Cannot use --expected_values_path on platforms without lock-free "
  88. "std::atomic<uint32_t>");
  89. }
  90. if (status.ok() && FLAGS_clear_column_family_one_in > 0) {
  91. status = Status::InvalidArgument(
  92. "Cannot use --expected_values_path on when "
  93. "--clear_column_family_one_in is greater than zero.");
  94. }
  95. uint64_t size = 0;
  96. if (status.ok()) {
  97. status = env->GetFileSize(FLAGS_expected_values_path, &size);
  98. }
  99. std::unique_ptr<WritableFile> wfile;
  100. if (status.ok() && size == 0) {
  101. const EnvOptions soptions;
  102. status =
  103. env->NewWritableFile(FLAGS_expected_values_path, &wfile, soptions);
  104. }
  105. if (status.ok() && size == 0) {
  106. std::string buf(expected_values_size, '\0');
  107. status = wfile->Append(buf);
  108. values_init_needed = true;
  109. }
  110. if (status.ok()) {
  111. status = env->NewMemoryMappedFileBuffer(FLAGS_expected_values_path,
  112. &expected_mmap_buffer_);
  113. }
  114. if (status.ok()) {
  115. assert(expected_mmap_buffer_->GetLen() == expected_values_size);
  116. values_ = static_cast<std::atomic<uint32_t>*>(
  117. expected_mmap_buffer_->GetBase());
  118. assert(values_ != nullptr);
  119. } else {
  120. fprintf(stderr, "Failed opening shared file '%s' with error: %s\n",
  121. FLAGS_expected_values_path.c_str(), status.ToString().c_str());
  122. assert(values_ == nullptr);
  123. }
  124. }
  125. if (values_ == nullptr) {
  126. values_allocation_.reset(
  127. new std::atomic<uint32_t>[FLAGS_column_families * max_key_]);
  128. values_ = &values_allocation_[0];
  129. values_init_needed = true;
  130. }
  131. assert(values_ != nullptr);
  132. if (values_init_needed) {
  133. for (int i = 0; i < FLAGS_column_families; ++i) {
  134. for (int j = 0; j < max_key_; ++j) {
  135. Delete(i, j, false /* pending */);
  136. }
  137. }
  138. }
  139. if (FLAGS_test_batches_snapshots) {
  140. fprintf(stdout, "No lock creation because test_batches_snapshots set\n");
  141. return;
  142. }
  143. long num_locks = static_cast<long>(max_key_ >> log2_keys_per_lock_);
  144. if (max_key_ & ((1 << log2_keys_per_lock_) - 1)) {
  145. num_locks++;
  146. }
  147. fprintf(stdout, "Creating %ld locks\n", num_locks * FLAGS_column_families);
  148. key_locks_.resize(FLAGS_column_families);
  149. for (int i = 0; i < FLAGS_column_families; ++i) {
  150. key_locks_[i].resize(num_locks);
  151. for (auto& ptr : key_locks_[i]) {
  152. ptr.reset(new port::Mutex);
  153. }
  154. }
  155. if (FLAGS_compaction_thread_pool_adjust_interval > 0) {
  156. ++num_bg_threads_;
  157. fprintf(stdout, "Starting compaction_thread_pool_adjust_thread\n");
  158. }
  159. if (FLAGS_continuous_verification_interval > 0) {
  160. ++num_bg_threads_;
  161. fprintf(stdout, "Starting continuous_verification_thread\n");
  162. }
  163. }
  164. ~SharedState() {}
  165. port::Mutex* GetMutex() { return &mu_; }
  166. port::CondVar* GetCondVar() { return &cv_; }
  167. StressTest* GetStressTest() const { return stress_test_; }
  168. int64_t GetMaxKey() const { return max_key_; }
  169. uint32_t GetNumThreads() const { return num_threads_; }
  170. void IncInitialized() { num_initialized_++; }
  171. void IncOperated() { num_populated_++; }
  172. void IncDone() { num_done_++; }
  173. void IncVotedReopen() { vote_reopen_ = (vote_reopen_ + 1) % num_threads_; }
  174. bool AllInitialized() const { return num_initialized_ >= num_threads_; }
  175. bool AllOperated() const { return num_populated_ >= num_threads_; }
  176. bool AllDone() const { return num_done_ >= num_threads_; }
  177. bool AllVotedReopen() { return (vote_reopen_ == 0); }
  178. void SetStart() { start_ = true; }
  179. void SetStartVerify() { start_verify_ = true; }
  180. bool Started() const { return start_; }
  181. bool VerifyStarted() const { return start_verify_; }
  182. void SetVerificationFailure() { verification_failure_.store(true); }
  183. bool HasVerificationFailedYet() const { return verification_failure_.load(); }
  184. void SetShouldStopTest() { should_stop_test_.store(true); }
  185. bool ShouldStopTest() const { return should_stop_test_.load(); }
  186. port::Mutex* GetMutexForKey(int cf, int64_t key) {
  187. return key_locks_[cf][key >> log2_keys_per_lock_].get();
  188. }
  189. void LockColumnFamily(int cf) {
  190. for (auto& mutex : key_locks_[cf]) {
  191. mutex->Lock();
  192. }
  193. }
  194. void UnlockColumnFamily(int cf) {
  195. for (auto& mutex : key_locks_[cf]) {
  196. mutex->Unlock();
  197. }
  198. }
  199. std::atomic<uint32_t>& Value(int cf, int64_t key) const {
  200. return values_[cf * max_key_ + key];
  201. }
  202. void ClearColumnFamily(int cf) {
  203. std::fill(&Value(cf, 0 /* key */), &Value(cf + 1, 0 /* key */),
  204. DELETION_SENTINEL);
  205. }
  206. // @param pending True if the update may have started but is not yet
  207. // guaranteed finished. This is useful for crash-recovery testing when the
  208. // process may crash before updating the expected values array.
  209. void Put(int cf, int64_t key, uint32_t value_base, bool pending) {
  210. if (!pending) {
  211. // prevent expected-value update from reordering before Write
  212. std::atomic_thread_fence(std::memory_order_release);
  213. }
  214. Value(cf, key).store(pending ? UNKNOWN_SENTINEL : value_base,
  215. std::memory_order_relaxed);
  216. if (pending) {
  217. // prevent Write from reordering before expected-value update
  218. std::atomic_thread_fence(std::memory_order_release);
  219. }
  220. }
  221. uint32_t Get(int cf, int64_t key) const { return Value(cf, key); }
  222. // @param pending See comment above Put()
  223. // Returns true if the key was not yet deleted.
  224. bool Delete(int cf, int64_t key, bool pending) {
  225. if (Value(cf, key) == DELETION_SENTINEL) {
  226. return false;
  227. }
  228. Put(cf, key, DELETION_SENTINEL, pending);
  229. return true;
  230. }
  231. // @param pending See comment above Put()
  232. // Returns true if the key was not yet deleted.
  233. bool SingleDelete(int cf, int64_t key, bool pending) {
  234. return Delete(cf, key, pending);
  235. }
  236. // @param pending See comment above Put()
  237. // Returns number of keys deleted by the call.
  238. int DeleteRange(int cf, int64_t begin_key, int64_t end_key, bool pending) {
  239. int covered = 0;
  240. for (int64_t key = begin_key; key < end_key; ++key) {
  241. if (Delete(cf, key, pending)) {
  242. ++covered;
  243. }
  244. }
  245. return covered;
  246. }
  247. bool AllowsOverwrite(int64_t key) {
  248. return no_overwrite_ids_.find(key) == no_overwrite_ids_.end();
  249. }
  250. bool Exists(int cf, int64_t key) {
  251. // UNKNOWN_SENTINEL counts as exists. That assures a key for which overwrite
  252. // is disallowed can't be accidentally added a second time, in which case
  253. // SingleDelete wouldn't be able to properly delete the key. It does allow
  254. // the case where a SingleDelete might be added which covers nothing, but
  255. // that's not a correctness issue.
  256. uint32_t expected_value = Value(cf, key).load();
  257. return expected_value != DELETION_SENTINEL;
  258. }
  259. uint32_t GetSeed() const { return seed_; }
  260. void SetShouldStopBgThread() { should_stop_bg_thread_ = true; }
  261. bool ShouldStopBgThread() { return should_stop_bg_thread_; }
  262. void IncBgThreadsFinished() { ++bg_thread_finished_; }
  263. bool BgThreadsFinished() const {
  264. return bg_thread_finished_ == num_bg_threads_;
  265. }
  266. bool ShouldVerifyAtBeginning() const {
  267. return expected_mmap_buffer_.get() != nullptr;
  268. }
  269. bool PrintingVerificationResults() {
  270. bool tmp = false;
  271. return !printing_verification_results_.compare_exchange_strong(
  272. tmp, true, std::memory_order_relaxed);
  273. }
  274. void FinishPrintingVerificationResults() {
  275. printing_verification_results_.store(false, std::memory_order_relaxed);
  276. }
  277. private:
  278. port::Mutex mu_;
  279. port::CondVar cv_;
  280. const uint32_t seed_;
  281. const int64_t max_key_;
  282. const uint32_t log2_keys_per_lock_;
  283. const int num_threads_;
  284. long num_initialized_;
  285. long num_populated_;
  286. long vote_reopen_;
  287. long num_done_;
  288. bool start_;
  289. bool start_verify_;
  290. int num_bg_threads_;
  291. bool should_stop_bg_thread_;
  292. int bg_thread_finished_;
  293. StressTest* stress_test_;
  294. std::atomic<bool> verification_failure_;
  295. std::atomic<bool> should_stop_test_;
  296. // Keys that should not be overwritten
  297. std::unordered_set<size_t> no_overwrite_ids_;
  298. std::atomic<uint32_t>* values_;
  299. std::unique_ptr<std::atomic<uint32_t>[]> values_allocation_;
  300. // Has to make it owned by a smart ptr as port::Mutex is not copyable
  301. // and storing it in the container may require copying depending on the impl.
  302. std::vector<std::vector<std::unique_ptr<port::Mutex>>> key_locks_;
  303. std::unique_ptr<MemoryMappedFileBuffer> expected_mmap_buffer_;
  304. std::atomic<bool> printing_verification_results_;
  305. };
  306. // Per-thread state for concurrent executions of the same benchmark.
  307. struct ThreadState {
  308. uint32_t tid; // 0..n-1
  309. Random rand; // Has different seeds for different threads
  310. SharedState* shared;
  311. Stats stats;
  312. struct SnapshotState {
  313. const Snapshot* snapshot;
  314. // The cf from which we did a Get at this snapshot
  315. int cf_at;
  316. // The name of the cf at the time that we did a read
  317. std::string cf_at_name;
  318. // The key with which we did a Get at this snapshot
  319. std::string key;
  320. // The status of the Get
  321. Status status;
  322. // The value of the Get
  323. std::string value;
  324. // optional state of all keys in the db
  325. std::vector<bool>* key_vec;
  326. };
  327. std::queue<std::pair<uint64_t, SnapshotState>> snapshot_queue;
  328. ThreadState(uint32_t index, SharedState* _shared)
  329. : tid(index), rand(1000 + index + _shared->GetSeed()), shared(_shared) {}
  330. };
  331. } // namespace ROCKSDB_NAMESPACE
  332. #endif // GFLAGS