multi_ops_txns_stress.h 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #ifdef GFLAGS
  10. #include "db_stress_tool/db_stress_common.h"
  11. namespace ROCKSDB_NAMESPACE {
  12. // This file defines MultiOpsTxnsStress so that we can stress test RocksDB
  13. // transactions on a simple, emulated relational table.
  14. //
  15. // The record format is similar to the example found at
  16. // https://github.com/facebook/mysql-5.6/wiki/MyRocks-record-format.
  17. //
  18. // The table is created by
  19. // ```
  20. // create table t1 (
  21. // a int primary key,
  22. // b int,
  23. // c int,
  24. // key(c),
  25. // )
  26. // ```
  27. //
  28. // (For simplicity, we use uint32_t for int here.)
  29. //
  30. // For this table, there is a primary index using `a`, as well as a secondary
  31. // index using `c` and `a`.
  32. //
  33. // Primary key format:
  34. // | index id | M(a) |
  35. // Primary index value:
  36. // | b | c |
  37. // M(a) represents the big-endian format of a.
  38. //
  39. // Secondary key format:
  40. // | index id | M(c) | M(a) |
  41. // Secondary index value:
  42. // | crc32 |
  43. // Similarly to M(a), M(c) is the big-endian format of c.
  44. //
  45. // The in-memory representation of a record is defined in class
  46. // MultiOpsTxnsStress:Record that includes a number of helper methods to
  47. // encode/decode primary index keys, primary index values, secondary index keys,
  48. // secondary index values, etc.
  49. //
  50. // Sometimes primary index and secondary index reside on different column
  51. // families, but sometimes they colocate in the same column family. Current
  52. // implementation puts them in the same (default) column family, and this is
  53. // subject to future change if we find it interesting to test the other case.
  54. //
  55. // Class MultiOpsTxnsStressTest has the following transactions for testing.
  56. //
  57. // 1. Primary key update
  58. // UPDATE t1 SET a = 3 WHERE a = 2;
  59. // ```
  60. // tx->GetForUpdate(primary key a=2)
  61. // tx->GetForUpdate(primary key a=3)
  62. // tx->Delete(primary key a=2)
  63. // tx->Put(primary key a=3, value)
  64. // tx->batch->SingleDelete(secondary key a=2)
  65. // tx->batch->Put(secondary key a=3, value)
  66. // tx->Prepare()
  67. // Tx->Commit()
  68. // ```
  69. //
  70. // 2. Secondary key update
  71. // UPDATE t1 SET c = 3 WHERE c = 2;
  72. // ```
  73. // iter->Seek(secondary key)
  74. // // Get corresponding primary key value(s) from iterator
  75. // tx->GetForUpdate(primary key)
  76. // tx->Put(primary key, value c=3)
  77. // tx->batch->SingleDelete(secondary key c=2)
  78. // tx->batch->Put(secondary key c=3)
  79. // tx->Prepare()
  80. // tx->Commit()
  81. // ```
  82. //
  83. // 3. Primary index value update
  84. // UPDATE t1 SET b = b + 1 WHERE a = 2;
  85. // ```
  86. // tx->GetForUpdate(primary key a=2)
  87. // tx->Put(primary key a=2, value b=b+1)
  88. // tx->Prepare()
  89. // tx->Commit()
  90. // ```
  91. //
  92. // 4. Point lookup
  93. // SELECT * FROM t1 WHERE a = 3;
  94. // ```
  95. // tx->Get(primary key a=3)
  96. // tx->Commit()
  97. // ```
  98. //
  99. // 5. Range scan
  100. // SELECT * FROM t1 WHERE c = 2;
  101. // ```
  102. // it = tx->GetIterator()
  103. // it->Seek(secondary key c=2)
  104. // tx->Commit()
  105. // ```
  106. class MultiOpsTxnsStressTest : public StressTest {
  107. public:
  108. class Record {
  109. public:
  110. static constexpr uint32_t kMetadataPrefix = 0;
  111. static constexpr uint32_t kPrimaryIndexId = 1;
  112. static constexpr uint32_t kSecondaryIndexId = 2;
  113. static constexpr size_t kPrimaryIndexEntrySize = 8 + 8;
  114. static constexpr size_t kSecondaryIndexEntrySize = 12 + 4;
  115. static_assert(kPrimaryIndexId < kSecondaryIndexId,
  116. "kPrimaryIndexId must be smaller than kSecondaryIndexId");
  117. static_assert(sizeof(kPrimaryIndexId) == sizeof(uint32_t),
  118. "kPrimaryIndexId must be 4 bytes");
  119. static_assert(sizeof(kSecondaryIndexId) == sizeof(uint32_t),
  120. "kSecondaryIndexId must be 4 bytes");
  121. // Used for generating search key to probe primary index.
  122. static std::string EncodePrimaryKey(uint32_t a);
  123. // Used for generating search prefix to probe secondary index.
  124. static std::string EncodeSecondaryKey(uint32_t c);
  125. // Used for generating search key to probe secondary index.
  126. static std::string EncodeSecondaryKey(uint32_t c, uint32_t a);
  127. static std::tuple<Status, uint32_t, uint32_t> DecodePrimaryIndexValue(
  128. Slice primary_index_value);
  129. static std::pair<Status, uint32_t> DecodeSecondaryIndexValue(
  130. Slice secondary_index_value);
  131. Record() = default;
  132. Record(uint32_t _a, uint32_t _b, uint32_t _c) : a_(_a), b_(_b), c_(_c) {}
  133. bool operator==(const Record& other) const {
  134. return a_ == other.a_ && b_ == other.b_ && c_ == other.c_;
  135. }
  136. bool operator!=(const Record& other) const { return !(*this == other); }
  137. std::pair<std::string, std::string> EncodePrimaryIndexEntry() const;
  138. std::string EncodePrimaryKey() const;
  139. std::string EncodePrimaryIndexValue() const;
  140. std::pair<std::string, std::string> EncodeSecondaryIndexEntry() const;
  141. std::string EncodeSecondaryKey() const;
  142. Status DecodePrimaryIndexEntry(Slice primary_index_key,
  143. Slice primary_index_value);
  144. Status DecodeSecondaryIndexEntry(Slice secondary_index_key,
  145. Slice secondary_index_value);
  146. uint32_t a_value() const { return a_; }
  147. uint32_t b_value() const { return b_; }
  148. uint32_t c_value() const { return c_; }
  149. void SetA(uint32_t _a) { a_ = _a; }
  150. void SetB(uint32_t _b) { b_ = _b; }
  151. void SetC(uint32_t _c) { c_ = _c; }
  152. std::string ToString() const {
  153. std::string ret("(");
  154. ret.append(std::to_string(a_));
  155. ret.append(",");
  156. ret.append(std::to_string(b_));
  157. ret.append(",");
  158. ret.append(std::to_string(c_));
  159. ret.append(")");
  160. return ret;
  161. }
  162. private:
  163. friend class InvariantChecker;
  164. uint32_t a_{0};
  165. uint32_t b_{0};
  166. uint32_t c_{0};
  167. };
  168. MultiOpsTxnsStressTest() {}
  169. ~MultiOpsTxnsStressTest() override {}
  170. void FinishInitDb(SharedState*) override;
  171. void ReopenAndPreloadDbIfNeeded(SharedState* shared);
  172. bool IsStateTracked() const override { return false; }
  173. Status TestGet(ThreadState* thread, const ReadOptions& read_opts,
  174. const std::vector<int>& rand_column_families,
  175. const std::vector<int64_t>& rand_keys) override;
  176. std::vector<Status> TestMultiGet(
  177. ThreadState* thread, const ReadOptions& read_opts,
  178. const std::vector<int>& rand_column_families,
  179. const std::vector<int64_t>& rand_keys) override;
  180. void TestGetEntity(ThreadState* thread, const ReadOptions& read_opts,
  181. const std::vector<int>& rand_column_families,
  182. const std::vector<int64_t>& rand_keys) override;
  183. void TestMultiGetEntity(ThreadState* thread, const ReadOptions& read_opts,
  184. const std::vector<int>& rand_column_families,
  185. const std::vector<int64_t>& rand_keys) override;
  186. Status TestPrefixScan(ThreadState* thread, const ReadOptions& read_opts,
  187. const std::vector<int>& rand_column_families,
  188. const std::vector<int64_t>& rand_keys) override;
  189. // Given a key K, this creates an iterator which scans to K and then
  190. // does a random sequence of Next/Prev operations.
  191. Status TestIterate(ThreadState* thread, const ReadOptions& read_opts,
  192. const std::vector<int>& rand_column_families,
  193. const std::vector<int64_t>& rand_keys) override;
  194. Status TestIterateAttributeGroups(
  195. ThreadState* thread, const ReadOptions& read_opts,
  196. const std::vector<int>& rand_column_families,
  197. const std::vector<int64_t>& rand_keys) override;
  198. Status TestPut(ThreadState* thread, WriteOptions& write_opts,
  199. const ReadOptions& read_opts, const std::vector<int>& cf_ids,
  200. const std::vector<int64_t>& keys, char (&value)[100]) override;
  201. Status TestDelete(ThreadState* thread, WriteOptions& write_opts,
  202. const std::vector<int>& rand_column_families,
  203. const std::vector<int64_t>& rand_keys) override;
  204. Status TestDeleteRange(ThreadState* thread, WriteOptions& write_opts,
  205. const std::vector<int>& rand_column_families,
  206. const std::vector<int64_t>& rand_keys) override;
  207. void TestIngestExternalFile(ThreadState* thread,
  208. const std::vector<int>& rand_column_families,
  209. const std::vector<int64_t>& rand_keys) override;
  210. void TestCompactRange(ThreadState* thread, int64_t rand_key,
  211. const Slice& start_key,
  212. ColumnFamilyHandle* column_family) override;
  213. Status TestBackupRestore(ThreadState* thread,
  214. const std::vector<int>& rand_column_families,
  215. const std::vector<int64_t>& rand_keys) override;
  216. Status TestCheckpoint(ThreadState* thread,
  217. const std::vector<int>& rand_column_families,
  218. const std::vector<int64_t>& rand_keys) override;
  219. Status TestApproximateSize(ThreadState* thread, uint64_t iteration,
  220. const std::vector<int>& rand_column_families,
  221. const std::vector<int64_t>& rand_keys) override;
  222. Status TestCustomOperations(
  223. ThreadState* thread,
  224. const std::vector<int>& rand_column_families) override;
  225. void RegisterAdditionalListeners() override;
  226. void PrepareTxnDbOptions(SharedState* /*shared*/,
  227. TransactionDBOptions& txn_db_opts) override;
  228. Status PrimaryKeyUpdateTxn(ThreadState* thread, uint32_t old_a,
  229. uint32_t old_a_pos, uint32_t new_a);
  230. Status SecondaryKeyUpdateTxn(ThreadState* thread, uint32_t old_c,
  231. uint32_t old_c_pos, uint32_t new_c);
  232. Status UpdatePrimaryIndexValueTxn(ThreadState* thread, uint32_t a,
  233. uint32_t b_delta);
  234. Status PointLookupTxn(ThreadState* thread, ReadOptions ropts, uint32_t a);
  235. Status RangeScanTxn(ThreadState* thread, ReadOptions ropts, uint32_t c);
  236. void VerifyDb(ThreadState* thread) const override;
  237. void ContinuouslyVerifyDb(ThreadState* thread) const override {
  238. VerifyDb(thread);
  239. }
  240. void VerifyPkSkFast(const ReadOptions& read_options, int job_id);
  241. protected:
  242. class Counter {
  243. public:
  244. uint64_t Next() { return value_.fetch_add(1); }
  245. private:
  246. std::atomic<uint64_t> value_ = Env::Default()->NowNanos();
  247. };
  248. using KeySet = std::set<uint32_t>;
  249. class KeyGenerator {
  250. public:
  251. explicit KeyGenerator(uint32_t s, uint32_t low, uint32_t high,
  252. KeySet&& existing_uniq, KeySet&& non_existing_uniq)
  253. : rand_(s),
  254. low_(low),
  255. high_(high),
  256. existing_uniq_(std::move(existing_uniq)),
  257. non_existing_uniq_(std::move(non_existing_uniq)) {}
  258. ~KeyGenerator() {
  259. assert(!existing_uniq_.empty());
  260. assert(!non_existing_uniq_.empty());
  261. }
  262. void FinishInit();
  263. std::pair<uint32_t, uint32_t> ChooseExisting();
  264. void Replace(uint32_t old_val, uint32_t old_pos, uint32_t new_val);
  265. uint32_t Allocate();
  266. void UndoAllocation(uint32_t new_val);
  267. std::string ToString() const {
  268. std::ostringstream oss;
  269. oss << "[" << low_ << ", " << high_ << "): " << existing_.size()
  270. << " elements, " << existing_uniq_.size() << " unique values, "
  271. << non_existing_uniq_.size() << " unique non-existing values";
  272. return oss.str();
  273. }
  274. private:
  275. Random rand_;
  276. uint32_t low_ = 0;
  277. uint32_t high_ = 0;
  278. std::vector<uint32_t> existing_{};
  279. KeySet existing_uniq_{};
  280. KeySet non_existing_uniq_{};
  281. bool initialized_ = false;
  282. };
  283. // Return <a, pos>
  284. std::pair<uint32_t, uint32_t> ChooseExistingA(ThreadState* thread);
  285. uint32_t GenerateNextA(ThreadState* thread);
  286. // Return <c, pos>
  287. std::pair<uint32_t, uint32_t> ChooseExistingC(ThreadState* thread);
  288. uint32_t GenerateNextC(ThreadState* thread);
  289. // Randomly commit or rollback `txn`
  290. void ProcessRecoveredPreparedTxnsHelper(Transaction* txn,
  291. SharedState*) override;
  292. // Some applications, e.g. MyRocks writes a KV pair to the database via
  293. // commit-time-write-batch (ctwb) in additional to the transaction's regular
  294. // write batch. The key is usually constant representing some system
  295. // metadata, while the value is monoticailly increasing which represents the
  296. // actual value of the metadata. Method WriteToCommitTimeWriteBatch()
  297. // emulates this scenario.
  298. Status WriteToCommitTimeWriteBatch(Transaction& txn);
  299. Status CommitAndCreateTimestampedSnapshotIfNeeded(ThreadState* thread,
  300. Transaction& txn);
  301. void SetupSnapshot(ThreadState* thread, ReadOptions& read_opts,
  302. Transaction& txn,
  303. std::shared_ptr<const Snapshot>& snapshot);
  304. std::vector<std::unique_ptr<KeyGenerator>> key_gen_for_a_;
  305. std::vector<std::unique_ptr<KeyGenerator>> key_gen_for_c_;
  306. Counter counter_{};
  307. private:
  308. struct KeySpaces {
  309. uint32_t lb_a = 0;
  310. uint32_t ub_a = 0;
  311. uint32_t lb_c = 0;
  312. uint32_t ub_c = 0;
  313. explicit KeySpaces() = default;
  314. explicit KeySpaces(uint32_t _lb_a, uint32_t _ub_a, uint32_t _lb_c,
  315. uint32_t _ub_c)
  316. : lb_a(_lb_a), ub_a(_ub_a), lb_c(_lb_c), ub_c(_ub_c) {}
  317. std::string EncodeTo() const;
  318. bool DecodeFrom(Slice data);
  319. };
  320. void PersistKeySpacesDesc(const std::string& key_spaces_path, uint32_t lb_a,
  321. uint32_t ub_a, uint32_t lb_c, uint32_t ub_c);
  322. KeySpaces ReadKeySpacesDesc(const std::string& key_spaces_path);
  323. void PreloadDb(SharedState* shared, int threads, uint32_t lb_a, uint32_t ub_a,
  324. uint32_t lb_c, uint32_t ub_c);
  325. void ScanExistingDb(SharedState* shared, int threads);
  326. };
  327. class InvariantChecker {
  328. public:
  329. static_assert(sizeof(MultiOpsTxnsStressTest::Record().a_) == sizeof(uint32_t),
  330. "MultiOpsTxnsStressTest::Record::a_ must be 4 bytes");
  331. static_assert(sizeof(MultiOpsTxnsStressTest::Record().b_) == sizeof(uint32_t),
  332. "MultiOpsTxnsStressTest::Record::b_ must be 4 bytes");
  333. static_assert(sizeof(MultiOpsTxnsStressTest::Record().c_) == sizeof(uint32_t),
  334. "MultiOpsTxnsStressTest::Record::c_ must be 4 bytes");
  335. };
  336. class MultiOpsTxnsStressListener : public EventListener {
  337. public:
  338. explicit MultiOpsTxnsStressListener(MultiOpsTxnsStressTest* stress_test)
  339. : stress_test_(stress_test) {
  340. assert(stress_test_);
  341. }
  342. ~MultiOpsTxnsStressListener() override {}
  343. void OnFlushCompleted(DB* db, const FlushJobInfo& info) override {
  344. assert(db);
  345. #ifdef NDEBUG
  346. (void)db;
  347. #endif
  348. assert(info.cf_id == 0);
  349. const ReadOptions read_options(Env::IOActivity::kFlush);
  350. stress_test_->VerifyPkSkFast(read_options, info.job_id);
  351. }
  352. void OnCompactionCompleted(DB* db, const CompactionJobInfo& info) override {
  353. assert(db);
  354. #ifdef NDEBUG
  355. (void)db;
  356. #endif
  357. assert(info.cf_id == 0);
  358. const ReadOptions read_options(Env::IOActivity::kCompaction);
  359. stress_test_->VerifyPkSkFast(read_options, info.job_id);
  360. }
  361. private:
  362. MultiOpsTxnsStressTest* const stress_test_ = nullptr;
  363. };
  364. } // namespace ROCKSDB_NAMESPACE
  365. #endif // GFLAGS