db_stress_test_base.h 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #include "rocksdb/io_status.h"
  10. #ifdef GFLAGS
  11. #pragma once
  12. #include "db_stress_tool/db_stress_common.h"
  13. #include "db_stress_tool/db_stress_shared_state.h"
  14. #include "rocksdb/experimental.h"
  15. #include "utilities/fault_injection_fs.h"
  16. namespace ROCKSDB_NAMESPACE {
  17. class SystemClock;
  18. class Transaction;
  19. class TransactionDB;
  20. class OptimisticTransactionDB;
  21. struct TransactionDBOptions;
  22. using experimental::SstQueryFilterConfigsManager;
  23. class StressTest {
  24. public:
  25. static bool IsErrorInjectedAndRetryable(const Status& error_s) {
  26. assert(!error_s.ok());
  27. return error_s.getState() &&
  28. FaultInjectionTestFS::IsInjectedError(error_s) &&
  29. !status_to_io_status(Status(error_s)).GetDataLoss();
  30. }
  31. StressTest();
  32. virtual ~StressTest() {}
  33. std::shared_ptr<Cache> NewCache(size_t capacity, int32_t num_shard_bits);
  34. static std::vector<std::string> GetBlobCompressionTags();
  35. bool BuildOptionsTable();
  36. void InitDb(SharedState*);
  37. // The initialization work is split into two parts to avoid a circular
  38. // dependency with `SharedState`.
  39. virtual void FinishInitDb(SharedState*);
  40. void TrackExpectedState(SharedState* shared);
  41. void OperateDb(ThreadState* thread);
  42. virtual void VerifyDb(ThreadState* thread) const = 0;
  43. virtual void ContinuouslyVerifyDb(ThreadState* /*thread*/) const = 0;
  44. void PrintStatistics();
  45. bool MightHaveUnsyncedDataLoss() {
  46. return FLAGS_sync_fault_injection || FLAGS_disable_wal ||
  47. FLAGS_manual_wal_flush_one_in > 0;
  48. }
  49. Status EnableAutoCompaction() {
  50. assert(options_.disable_auto_compactions);
  51. Status s = db_->EnableAutoCompaction(column_families_);
  52. return s;
  53. }
  54. Options GetOptions(int cf_id);
  55. void CleanUp();
  56. protected:
  57. static int GetMinInjectedErrorCount(int error_count_1, int error_count_2) {
  58. if (error_count_1 > 0 && error_count_2 > 0) {
  59. return std::min(error_count_1, error_count_2);
  60. } else if (error_count_1 > 0) {
  61. return error_count_1;
  62. } else if (error_count_2 > 0) {
  63. return error_count_2;
  64. } else {
  65. return 0;
  66. }
  67. }
  68. void UpdateIfInitialWriteFails(Env* db_stress_env, const Status& write_s,
  69. Status* initial_write_s,
  70. bool* initial_wal_write_may_succeed,
  71. uint64_t* wait_for_recover_start_time,
  72. bool commit_bypass_memtable = false) {
  73. assert(db_stress_env && initial_write_s && initial_wal_write_may_succeed &&
  74. wait_for_recover_start_time);
  75. // Only update `initial_write_s`, `initial_wal_write_may_succeed` when the
  76. // first write fails
  77. if (!write_s.ok() && (*initial_write_s).ok()) {
  78. *initial_write_s = write_s;
  79. // With commit_bypass_memtable, we create a new WAL after WAL write
  80. // succeeds, that wal creation may fail due to injected error. So the
  81. // initial wal write may succeed even if status is failed to write to wal
  82. *initial_wal_write_may_succeed =
  83. commit_bypass_memtable ||
  84. !FaultInjectionTestFS::IsFailedToWriteToWALError(*initial_write_s);
  85. *wait_for_recover_start_time = db_stress_env->NowMicros();
  86. }
  87. }
  88. void PrintWriteRecoveryWaitTimeIfNeeded(Env* db_stress_env,
  89. const Status& initial_write_s,
  90. bool initial_wal_write_may_succeed,
  91. uint64_t wait_for_recover_start_time,
  92. const std::string& thread_name) {
  93. assert(db_stress_env);
  94. bool waited_for_recovery = !initial_write_s.ok() &&
  95. IsErrorInjectedAndRetryable(initial_write_s) &&
  96. initial_wal_write_may_succeed;
  97. if (waited_for_recovery) {
  98. uint64_t elapsed_sec =
  99. (db_stress_env->NowMicros() - wait_for_recover_start_time) / 1000000;
  100. if (elapsed_sec > 10) {
  101. fprintf(stdout,
  102. "%s thread slept to wait for write recovery for "
  103. "%" PRIu64 " seconds\n",
  104. thread_name.c_str(), elapsed_sec);
  105. }
  106. }
  107. }
  108. void GetDeleteRangeKeyLocks(
  109. ThreadState* thread, int rand_column_family, int64_t rand_key,
  110. std::vector<std::unique_ptr<MutexLock>>* range_locks) {
  111. for (int j = 0; j < FLAGS_range_deletion_width; ++j) {
  112. if (j == 0 ||
  113. ((rand_key + j) & ((1 << FLAGS_log2_keys_per_lock) - 1)) == 0) {
  114. range_locks->emplace_back(new MutexLock(
  115. thread->shared->GetMutexForKey(rand_column_family, rand_key + j)));
  116. }
  117. }
  118. }
  119. Status AssertSame(DB* db, ColumnFamilyHandle* cf,
  120. ThreadState::SnapshotState& snap_state);
  121. // Currently PreloadDb has to be single-threaded.
  122. void PreloadDbAndReopenAsReadOnly(int64_t number_of_keys,
  123. SharedState* shared);
  124. Status SetOptions(ThreadState* thread);
  125. // For transactionsDB, there can be txns prepared but not yet committeed
  126. // right before previous stress run crash.
  127. // They will be recovered and processed through
  128. // ProcessRecoveredPreparedTxnsHelper on the start of current stress run.
  129. void ProcessRecoveredPreparedTxns(SharedState* shared);
  130. // Default implementation will first update ExpectedState to be
  131. // `SharedState::UNKNOWN` for each keys in `txn` and then randomly
  132. // commit or rollback `txn`.
  133. virtual void ProcessRecoveredPreparedTxnsHelper(Transaction* txn,
  134. SharedState* shared);
  135. // ExecuteTransaction is recommended instead
  136. // @param commit_bypass_memtable Whether commit_bypass_memtable is set to
  137. // true in transaction options.
  138. Status NewTxn(WriteOptions& write_opts, ThreadState* thread,
  139. std::unique_ptr<Transaction>* out_txn,
  140. bool* commit_bypass_memtable = nullptr);
  141. Status CommitTxn(Transaction& txn, ThreadState* thread = nullptr);
  142. // Creates a transaction, executes `ops`, and tries to commit
  143. // @param commit_bypass_memtable Whether commit_bypass_memtable is set to
  144. // true in transaction options.
  145. Status ExecuteTransaction(WriteOptions& write_opts, ThreadState* thread,
  146. std::function<Status(Transaction&)>&& ops,
  147. bool* commit_bypass_memtable = nullptr);
  148. virtual void MaybeClearOneColumnFamily(ThreadState* /* thread */) {}
  149. virtual bool ShouldAcquireMutexOnKey() const { return false; }
  150. // Returns true if DB state is tracked by the stress test.
  151. virtual bool IsStateTracked() const = 0;
  152. virtual std::vector<int> GenerateColumnFamilies(
  153. const int /* num_column_families */, int rand_column_family) const {
  154. return {rand_column_family};
  155. }
  156. virtual std::vector<int64_t> GenerateKeys(int64_t rand_key) const {
  157. return {rand_key};
  158. }
  159. virtual void TestKeyMayExist(ThreadState*, const ReadOptions&,
  160. const std::vector<int>&,
  161. const std::vector<int64_t>&) {}
  162. virtual Status TestGet(ThreadState* thread, const ReadOptions& read_opts,
  163. const std::vector<int>& rand_column_families,
  164. const std::vector<int64_t>& rand_keys) = 0;
  165. virtual std::vector<Status> TestMultiGet(
  166. ThreadState* thread, const ReadOptions& read_opts,
  167. const std::vector<int>& rand_column_families,
  168. const std::vector<int64_t>& rand_keys) = 0;
  169. virtual void TestGetEntity(ThreadState* thread, const ReadOptions& read_opts,
  170. const std::vector<int>& rand_column_families,
  171. const std::vector<int64_t>& rand_keys) = 0;
  172. virtual void TestMultiGetEntity(ThreadState* thread,
  173. const ReadOptions& read_opts,
  174. const std::vector<int>& rand_column_families,
  175. const std::vector<int64_t>& rand_keys) = 0;
  176. virtual Status TestPrefixScan(ThreadState* thread,
  177. const ReadOptions& read_opts,
  178. const std::vector<int>& rand_column_families,
  179. const std::vector<int64_t>& rand_keys) = 0;
  180. virtual Status TestPut(ThreadState* thread, WriteOptions& write_opts,
  181. const ReadOptions& read_opts,
  182. const std::vector<int>& cf_ids,
  183. const std::vector<int64_t>& keys,
  184. char (&value)[100]) = 0;
  185. virtual Status TestDelete(ThreadState* thread, WriteOptions& write_opts,
  186. const std::vector<int>& rand_column_families,
  187. const std::vector<int64_t>& rand_keys) = 0;
  188. virtual Status TestDeleteRange(ThreadState* thread, WriteOptions& write_opts,
  189. const std::vector<int>& rand_column_families,
  190. const std::vector<int64_t>& rand_keys) = 0;
  191. virtual void TestIngestExternalFile(
  192. ThreadState* thread, const std::vector<int>& rand_column_families,
  193. const std::vector<int64_t>& rand_keys) = 0;
  194. // Issue compact range, starting with start_key, whose integer value
  195. // is rand_key.
  196. virtual void TestCompactRange(ThreadState* thread, int64_t rand_key,
  197. const Slice& start_key,
  198. ColumnFamilyHandle* column_family);
  199. virtual void TestPromoteL0(ThreadState* thread,
  200. ColumnFamilyHandle* column_family);
  201. // Calculate a hash value for all keys in range [start_key, end_key]
  202. // at a certain snapshot.
  203. uint32_t GetRangeHash(ThreadState* thread, const Snapshot* snapshot,
  204. ColumnFamilyHandle* column_family,
  205. const Slice& start_key, const Slice& end_key);
  206. // Return a column family handle that mirrors what is pointed by
  207. // `column_family_id`, which will be used to validate data to be correct.
  208. // By default, the column family itself will be returned.
  209. virtual ColumnFamilyHandle* GetControlCfh(ThreadState* /* thread*/,
  210. int column_family_id) {
  211. return column_families_[column_family_id];
  212. }
  213. // Generated a list of keys that close to boundaries of SST keys.
  214. // If there isn't any SST file in the DB, return empty list.
  215. std::vector<std::string> GetWhiteBoxKeys(ThreadState* thread, DB* db,
  216. ColumnFamilyHandle* cfh,
  217. size_t num_keys);
  218. // Given a key K, this creates an iterator which scans to K and then
  219. // does a random sequence of Next/Prev operations.
  220. virtual Status TestIterate(ThreadState* thread, const ReadOptions& read_opts,
  221. const std::vector<int>& rand_column_families,
  222. const std::vector<int64_t>& rand_keys);
  223. // Given a key K, this creates an attribute group iterator which scans to K
  224. // and then does a random sequence of Next/Prev operations. Called only when
  225. // use_attribute_group=1
  226. virtual Status TestIterateAttributeGroups(
  227. ThreadState* thread, const ReadOptions& read_opts,
  228. const std::vector<int>& rand_column_families,
  229. const std::vector<int64_t>& rand_keys);
  230. template <typename IterType, typename NewIterFunc, typename VerifyFunc>
  231. Status TestIterateImpl(ThreadState* thread, const ReadOptions& read_opts,
  232. const std::vector<int>& rand_column_families,
  233. const std::vector<int64_t>& rand_keys,
  234. NewIterFunc new_iter_func, VerifyFunc verify_func);
  235. virtual Status TestIterateAgainstExpected(
  236. ThreadState* /* thread */, const ReadOptions& /* read_opts */,
  237. const std::vector<int>& /* rand_column_families */,
  238. const std::vector<int64_t>& /* rand_keys */) {
  239. return Status::NotSupported();
  240. }
  241. Status TestMultiScan(ThreadState* thread, const ReadOptions& read_opts,
  242. const std::vector<int>& rand_column_families,
  243. const std::vector<int64_t>& rand_keys);
  244. // Enum used by VerifyIterator() to identify the mode to validate.
  245. enum LastIterateOp {
  246. kLastOpSeek,
  247. kLastOpSeekForPrev,
  248. kLastOpNextOrPrev,
  249. kLastOpSeekToFirst,
  250. kLastOpSeekToLast
  251. };
  252. // Compare the two iterator, iter and cmp_iter are in the same position,
  253. // unless iter might be made invalidate or undefined because of
  254. // upper or lower bounds, or prefix extractor.
  255. // Will flag failure if the verification fails.
  256. // diverged = true if the two iterator is already diverged.
  257. // True if verification passed, false if not.
  258. // op_logs is the information to print when validation fails.
  259. template <typename IterType, typename VerifyFuncType>
  260. void VerifyIterator(ThreadState* thread, ColumnFamilyHandle* cmp_cfh,
  261. const ReadOptions& ro, IterType* iter, Iterator* cmp_iter,
  262. LastIterateOp op, const Slice& seek_key,
  263. const std::string& op_logs, VerifyFuncType verifyFunc,
  264. bool* diverged);
  265. virtual Status TestBackupRestore(ThreadState* thread,
  266. const std::vector<int>& rand_column_families,
  267. const std::vector<int64_t>& rand_keys);
  268. virtual Status PrepareOptionsForRestoredDB(Options* options);
  269. virtual Status TestCheckpoint(ThreadState* thread,
  270. const std::vector<int>& rand_column_families,
  271. const std::vector<int64_t>& rand_keys);
  272. void TestCompactFiles(ThreadState* thread, ColumnFamilyHandle* column_family);
  273. Status TestFlush(const std::vector<int>& rand_column_families);
  274. Status TestResetStats();
  275. Status TestPauseBackground(ThreadState* thread);
  276. Status TestDisableFileDeletions(ThreadState* thread);
  277. Status TestDisableManualCompaction(ThreadState* thread);
  278. void TestAcquireSnapshot(ThreadState* thread, int rand_column_family,
  279. const std::string& keystr, uint64_t i);
  280. Status MaybeReleaseSnapshots(ThreadState* thread, uint64_t i);
  281. Status TestGetLiveFiles() const;
  282. Status TestGetLiveFilesMetaData() const;
  283. Status TestGetLiveFilesStorageInfo() const;
  284. Status TestGetAllColumnFamilyMetaData() const;
  285. Status TestGetSortedWalFiles() const;
  286. Status TestGetCurrentWalFile() const;
  287. void TestGetProperty(ThreadState* thread) const;
  288. Status TestGetPropertiesOfAllTables() const;
  289. virtual Status TestApproximateSize(
  290. ThreadState* thread, uint64_t iteration,
  291. const std::vector<int>& rand_column_families,
  292. const std::vector<int64_t>& rand_keys);
  293. virtual Status TestCustomOperations(
  294. ThreadState* /*thread*/,
  295. const std::vector<int>& /*rand_column_families*/) {
  296. return Status::NotSupported("TestCustomOperations() must be overridden");
  297. }
  298. void ProcessStatus(SharedState* shared, std::string msg, const Status& s,
  299. bool ignore_injected_error = true) const;
  300. void VerificationAbort(SharedState* shared, std::string msg) const;
  301. void VerificationAbort(SharedState* shared, std::string msg, int cf,
  302. int64_t key) const;
  303. void VerificationAbort(SharedState* shared, std::string msg, int cf,
  304. int64_t key, Slice value_from_db,
  305. Slice value_from_expected) const;
  306. void VerificationAbort(SharedState* shared, int cf, int64_t key,
  307. const Slice& value, const WideColumns& columns) const;
  308. static std::string DebugString(const Slice& value,
  309. const WideColumns& columns);
  310. void PrintEnv() const;
  311. void Open(SharedState* shared, bool reopen = false);
  312. void Reopen(ThreadState* thread);
  313. virtual void RegisterAdditionalListeners() {}
  314. virtual void PrepareTxnDbOptions(SharedState* /*shared*/,
  315. TransactionDBOptions& /*txn_db_opts*/) {}
  316. // Returns whether the timestamp of read_opts is updated.
  317. bool MaybeUseOlderTimestampForPointLookup(ThreadState* thread,
  318. std::string& ts_str,
  319. Slice& ts_slice,
  320. ReadOptions& read_opts);
  321. void MaybeUseOlderTimestampForRangeScan(ThreadState* thread,
  322. std::string& ts_str, Slice& ts_slice,
  323. ReadOptions& read_opts);
  324. void CleanUpColumnFamilies();
  325. std::shared_ptr<Cache> cache_;
  326. std::shared_ptr<Cache> compressed_cache_;
  327. std::shared_ptr<const FilterPolicy> filter_policy_;
  328. DB* db_;
  329. TransactionDB* txn_db_;
  330. OptimisticTransactionDB* optimistic_txn_db_;
  331. // Currently only used in MultiOpsTxnsStressTest
  332. std::atomic<DB*> db_aptr_;
  333. Options options_;
  334. SystemClock* clock_;
  335. std::vector<ColumnFamilyHandle*> column_families_;
  336. std::vector<std::string> column_family_names_;
  337. std::atomic<int> new_column_family_name_;
  338. int num_times_reopened_;
  339. std::unordered_map<std::string, std::vector<std::string>> options_table_;
  340. std::vector<std::string> options_index_;
  341. std::atomic<bool> db_preload_finished_;
  342. std::shared_ptr<SstQueryFilterConfigsManager::Factory> sqfc_factory_;
  343. DB* secondary_db_;
  344. std::vector<ColumnFamilyHandle*> secondary_cfhs_;
  345. bool is_db_stopped_;
  346. };
  347. // Load options from OPTIONS file and populate `options`.
  348. bool InitializeOptionsFromFile(Options& options);
  349. // Initialize `options` using command line arguments.
  350. // When this function is called, `cache`, `block_cache_compressed`,
  351. // `filter_policy` have all been initialized. Therefore, we just pass them as
  352. // input arguments.
  353. void InitializeOptionsFromFlags(
  354. const std::shared_ptr<Cache>& cache,
  355. const std::shared_ptr<const FilterPolicy>& filter_policy, Options& options);
  356. // Initialize `options` on which `InitializeOptionsFromFile()` and
  357. // `InitializeOptionsFromFlags()` have both been called already.
  358. // There are two cases.
  359. // Case 1: OPTIONS file is not specified. Command line arguments have been used
  360. // to initialize `options`. InitializeOptionsGeneral() will use
  361. // `cache` and `filter_policy` to initialize
  362. // corresponding fields of `options`. InitializeOptionsGeneral() will
  363. // also set up other fields of `options` so that stress test can run.
  364. // Examples include `create_if_missing` and
  365. // `create_missing_column_families`, etc.
  366. // Case 2: OPTIONS file is specified. It is possible that, after loading from
  367. // the given OPTIONS files, some shared object fields are still not
  368. // initialized because they are not set in the OPTIONS file. In this
  369. // case, if command line arguments indicate that the user wants to set
  370. // up such shared objects, e.g. block cache, compressed block cache,
  371. // row cache, filter policy, then InitializeOptionsGeneral() will honor
  372. // the user's choice, thus passing `cache`,
  373. // `filter_policy` as input arguments.
  374. //
  375. // InitializeOptionsGeneral() must not overwrite fields of `options` loaded
  376. // from OPTIONS file.
  377. void InitializeOptionsGeneral(
  378. const std::shared_ptr<Cache>& cache,
  379. const std::shared_ptr<const FilterPolicy>& filter_policy,
  380. const std::shared_ptr<SstQueryFilterConfigsManager::Factory>& sqfc_factory,
  381. Options& options);
  382. // If no OPTIONS file is specified, set up `options` so that we can test
  383. // user-defined timestamp which requires `-user_timestamp_size=8`.
  384. // This function also checks for known (currently) incompatible features with
  385. // user-defined timestamp.
  386. void CheckAndSetOptionsForUserTimestamp(Options& options);
  387. bool ShouldDisableAutoCompactionsBeforeVerifyDb();
  388. } // namespace ROCKSDB_NAMESPACE
  389. #endif // GFLAGS