multi_ops_txns_stress.cc 61 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #ifdef GFLAGS
  10. #include "db_stress_tool/multi_ops_txns_stress.h"
  11. #include "rocksdb/utilities/write_batch_with_index.h"
  12. #include "util/defer.h"
  13. #include "utilities/fault_injection_fs.h"
  14. #include "utilities/transactions/write_prepared_txn_db.h"
  15. namespace ROCKSDB_NAMESPACE {
  16. // The description of A and C can be found in multi_ops_txns_stress.h
  17. DEFINE_int32(lb_a, 0, "(Inclusive) lower bound of A");
  18. DEFINE_int32(ub_a, 1000, "(Exclusive) upper bound of A");
  19. DEFINE_int32(lb_c, 0, "(Inclusive) lower bound of C");
  20. DEFINE_int32(ub_c, 1000, "(Exclusive) upper bound of C");
  21. DEFINE_string(key_spaces_path, "",
  22. "Path to file describing the lower and upper bounds of A and C");
  23. DEFINE_int32(delay_snapshot_read_one_in, 0,
  24. "With a chance of 1/N, inject a random delay between taking "
  25. "snapshot and read.");
  26. DEFINE_int32(rollback_one_in, 0,
  27. "If non-zero, rollback non-read-only transactions with a "
  28. "probability of 1/N.");
  29. DEFINE_int32(clear_wp_commit_cache_one_in, 0,
  30. "If non-zero, evict all commit entries from commit cache with a "
  31. "probability of 1/N. This options applies to write-prepared and "
  32. "write-unprepared transactions.");
  33. extern "C" bool rocksdb_write_prepared_TEST_ShouldClearCommitCache(void) {
  34. static Random rand(static_cast<uint32_t>(db_stress_env->NowMicros()));
  35. return FLAGS_clear_wp_commit_cache_one_in > 0 &&
  36. rand.OneIn(FLAGS_clear_wp_commit_cache_one_in);
  37. }
  38. // MultiOpsTxnsStressTest can either operate on a database with pre-populated
  39. // data (possibly from previous ones), or create a new db and preload it with
  40. // data specified via `-lb_a`, `-ub_a`, `-lb_c`, `-ub_c`, etc. Among these, we
  41. // define the test key spaces as two key ranges: [lb_a, ub_a) and [lb_c, ub_c).
  42. // The key spaces specification is persisted in a file whose absolute path can
  43. // be specified via `-key_spaces_path`.
  44. //
  45. // Whether an existing db is used or a new one is created, key_spaces_path will
  46. // be used. In the former case, the test reads the key spaces specification
  47. // from `-key_spaces_path` and decodes [lb_a, ub_a) and [lb_c, ub_c). In the
  48. // latter case, the test writes a key spaces specification to a file at the
  49. // location, and this file will be used by future runs until a new db is
  50. // created.
  51. //
  52. // Create a fresh new database (-destroy_db_initially=1 or there is no database
  53. // in the location specified by -db). See PreloadDb().
  54. //
  55. // Use an existing, non-empty database. See ScanExistingDb().
  56. //
  57. // This test is multi-threaded, and thread count can be specified via
  58. // `-threads`. For simplicity, we partition the key ranges and each thread
  59. // operates on a subrange independently.
  60. // Within each subrange, a KeyGenerator object is responsible for key
  61. // generation. A KeyGenerator maintains two sets: set of existing keys within
  62. // [low, high), set of non-existing keys within [low, high). [low, high) is the
  63. // subrange. The test initialization makes sure there is at least one
  64. // non-existing key, otherwise the test will return an error and exit before
  65. // any test thread is spawned.
  66. void MultiOpsTxnsStressTest::KeyGenerator::FinishInit() {
  67. assert(existing_.empty());
  68. assert(!existing_uniq_.empty());
  69. assert(low_ < high_);
  70. for (auto v : existing_uniq_) {
  71. assert(low_ <= v);
  72. assert(high_ > v);
  73. existing_.push_back(v);
  74. }
  75. if (non_existing_uniq_.empty()) {
  76. fprintf(
  77. stderr,
  78. "Cannot allocate key in [%u, %u)\nStart with a new DB or try change "
  79. "the number of threads for testing via -threads=<#threads>\n",
  80. static_cast<unsigned int>(low_), static_cast<unsigned int>(high_));
  81. fflush(stdout);
  82. fflush(stderr);
  83. assert(false);
  84. }
  85. initialized_ = true;
  86. }
  87. std::pair<uint32_t, uint32_t>
  88. MultiOpsTxnsStressTest::KeyGenerator::ChooseExisting() {
  89. assert(initialized_);
  90. const size_t N = existing_.size();
  91. assert(N > 0);
  92. uint32_t rnd = rand_.Uniform(static_cast<int>(N));
  93. assert(rnd < N);
  94. return std::make_pair(existing_[rnd], rnd);
  95. }
  96. uint32_t MultiOpsTxnsStressTest::KeyGenerator::Allocate() {
  97. assert(initialized_);
  98. auto it = non_existing_uniq_.begin();
  99. assert(non_existing_uniq_.end() != it);
  100. uint32_t ret = *it;
  101. // Remove this element from non_existing_.
  102. // Need to call UndoAllocation() if the calling transaction does not commit.
  103. non_existing_uniq_.erase(it);
  104. return ret;
  105. }
  106. void MultiOpsTxnsStressTest::KeyGenerator::Replace(uint32_t old_val,
  107. uint32_t old_pos,
  108. uint32_t new_val) {
  109. assert(initialized_);
  110. {
  111. auto it = existing_uniq_.find(old_val);
  112. assert(it != existing_uniq_.end());
  113. existing_uniq_.erase(it);
  114. }
  115. {
  116. assert(0 == existing_uniq_.count(new_val));
  117. existing_uniq_.insert(new_val);
  118. existing_[old_pos] = new_val;
  119. }
  120. {
  121. assert(0 == non_existing_uniq_.count(old_val));
  122. non_existing_uniq_.insert(old_val);
  123. }
  124. }
  125. void MultiOpsTxnsStressTest::KeyGenerator::UndoAllocation(uint32_t new_val) {
  126. assert(initialized_);
  127. assert(0 == non_existing_uniq_.count(new_val));
  128. non_existing_uniq_.insert(new_val);
  129. }
  130. std::string MultiOpsTxnsStressTest::Record::EncodePrimaryKey(uint32_t a) {
  131. std::string ret;
  132. PutFixed32(&ret, kPrimaryIndexId);
  133. PutFixed32(&ret, a);
  134. char* const buf = ret.data();
  135. std::reverse(buf, buf + sizeof(kPrimaryIndexId));
  136. std::reverse(buf + sizeof(kPrimaryIndexId),
  137. buf + sizeof(kPrimaryIndexId) + sizeof(a));
  138. return ret;
  139. }
  140. std::string MultiOpsTxnsStressTest::Record::EncodeSecondaryKey(uint32_t c) {
  141. std::string ret;
  142. PutFixed32(&ret, kSecondaryIndexId);
  143. PutFixed32(&ret, c);
  144. char* const buf = ret.data();
  145. std::reverse(buf, buf + sizeof(kSecondaryIndexId));
  146. std::reverse(buf + sizeof(kSecondaryIndexId),
  147. buf + sizeof(kSecondaryIndexId) + sizeof(c));
  148. return ret;
  149. }
  150. std::string MultiOpsTxnsStressTest::Record::EncodeSecondaryKey(uint32_t c,
  151. uint32_t a) {
  152. std::string ret;
  153. PutFixed32(&ret, kSecondaryIndexId);
  154. PutFixed32(&ret, c);
  155. PutFixed32(&ret, a);
  156. char* const buf = ret.data();
  157. std::reverse(buf, buf + sizeof(kSecondaryIndexId));
  158. std::reverse(buf + sizeof(kSecondaryIndexId),
  159. buf + sizeof(kSecondaryIndexId) + sizeof(c));
  160. std::reverse(buf + sizeof(kSecondaryIndexId) + sizeof(c),
  161. buf + sizeof(kSecondaryIndexId) + sizeof(c) + sizeof(a));
  162. return ret;
  163. }
  164. std::tuple<Status, uint32_t, uint32_t>
  165. MultiOpsTxnsStressTest::Record::DecodePrimaryIndexValue(
  166. Slice primary_index_value) {
  167. if (primary_index_value.size() != 8) {
  168. return std::tuple<Status, uint32_t, uint32_t>{Status::Corruption(""), 0, 0};
  169. }
  170. uint32_t b = 0;
  171. uint32_t c = 0;
  172. if (!GetFixed32(&primary_index_value, &b) ||
  173. !GetFixed32(&primary_index_value, &c)) {
  174. assert(false);
  175. return std::tuple<Status, uint32_t, uint32_t>{Status::Corruption(""), 0, 0};
  176. }
  177. return std::tuple<Status, uint32_t, uint32_t>{Status::OK(), b, c};
  178. }
  179. std::pair<Status, uint32_t>
  180. MultiOpsTxnsStressTest::Record::DecodeSecondaryIndexValue(
  181. Slice secondary_index_value) {
  182. if (secondary_index_value.size() != 4) {
  183. return std::make_pair(Status::Corruption(""), 0);
  184. }
  185. uint32_t crc = 0;
  186. bool result __attribute__((unused)) =
  187. GetFixed32(&secondary_index_value, &crc);
  188. assert(result);
  189. return std::make_pair(Status::OK(), crc);
  190. }
  191. std::pair<std::string, std::string>
  192. MultiOpsTxnsStressTest::Record::EncodePrimaryIndexEntry() const {
  193. std::string primary_index_key = EncodePrimaryKey();
  194. std::string primary_index_value = EncodePrimaryIndexValue();
  195. return std::make_pair(primary_index_key, primary_index_value);
  196. }
  197. std::string MultiOpsTxnsStressTest::Record::EncodePrimaryKey() const {
  198. return EncodePrimaryKey(a_);
  199. }
  200. std::string MultiOpsTxnsStressTest::Record::EncodePrimaryIndexValue() const {
  201. std::string ret;
  202. PutFixed32(&ret, b_);
  203. PutFixed32(&ret, c_);
  204. return ret;
  205. }
  206. std::pair<std::string, std::string>
  207. MultiOpsTxnsStressTest::Record::EncodeSecondaryIndexEntry() const {
  208. std::string secondary_index_key = EncodeSecondaryKey(c_, a_);
  209. // Secondary index value is always 4-byte crc32 of the secondary key
  210. std::string secondary_index_value;
  211. uint32_t crc =
  212. crc32c::Value(secondary_index_key.data(), secondary_index_key.size());
  213. PutFixed32(&secondary_index_value, crc);
  214. return std::make_pair(std::move(secondary_index_key), secondary_index_value);
  215. }
  216. std::string MultiOpsTxnsStressTest::Record::EncodeSecondaryKey() const {
  217. return EncodeSecondaryKey(c_, a_);
  218. }
  219. Status MultiOpsTxnsStressTest::Record::DecodePrimaryIndexEntry(
  220. Slice primary_index_key, Slice primary_index_value) {
  221. if (primary_index_key.size() != 8) {
  222. assert(false);
  223. return Status::Corruption("Primary index key length is not 8");
  224. }
  225. uint32_t index_id = 0;
  226. [[maybe_unused]] bool res = GetFixed32(&primary_index_key, &index_id);
  227. assert(res);
  228. index_id = EndianSwapValue(index_id);
  229. if (index_id != kPrimaryIndexId) {
  230. std::ostringstream oss;
  231. oss << "Unexpected primary index id: " << index_id;
  232. return Status::Corruption(oss.str());
  233. }
  234. res = GetFixed32(&primary_index_key, &a_);
  235. assert(res);
  236. a_ = EndianSwapValue(a_);
  237. assert(primary_index_key.empty());
  238. if (primary_index_value.size() != 8) {
  239. return Status::Corruption("Primary index value length is not 8");
  240. }
  241. GetFixed32(&primary_index_value, &b_);
  242. GetFixed32(&primary_index_value, &c_);
  243. return Status::OK();
  244. }
  245. Status MultiOpsTxnsStressTest::Record::DecodeSecondaryIndexEntry(
  246. Slice secondary_index_key, Slice secondary_index_value) {
  247. if (secondary_index_key.size() != 12) {
  248. return Status::Corruption("Secondary index key length is not 12");
  249. }
  250. uint32_t crc =
  251. crc32c::Value(secondary_index_key.data(), secondary_index_key.size());
  252. uint32_t index_id = 0;
  253. [[maybe_unused]] bool res = GetFixed32(&secondary_index_key, &index_id);
  254. assert(res);
  255. index_id = EndianSwapValue(index_id);
  256. if (index_id != kSecondaryIndexId) {
  257. std::ostringstream oss;
  258. oss << "Unexpected secondary index id: " << index_id;
  259. return Status::Corruption(oss.str());
  260. }
  261. assert(secondary_index_key.size() == 8);
  262. res = GetFixed32(&secondary_index_key, &c_);
  263. assert(res);
  264. c_ = EndianSwapValue(c_);
  265. assert(secondary_index_key.size() == 4);
  266. res = GetFixed32(&secondary_index_key, &a_);
  267. assert(res);
  268. a_ = EndianSwapValue(a_);
  269. assert(secondary_index_key.empty());
  270. if (secondary_index_value.size() != 4) {
  271. return Status::Corruption("Secondary index value length is not 4");
  272. }
  273. uint32_t val = 0;
  274. GetFixed32(&secondary_index_value, &val);
  275. if (val != crc) {
  276. std::ostringstream oss;
  277. oss << "Secondary index key checksum mismatch, stored: " << val
  278. << ", recomputed: " << crc;
  279. return Status::Corruption(oss.str());
  280. }
  281. return Status::OK();
  282. }
  283. void MultiOpsTxnsStressTest::FinishInitDb(SharedState* shared) {
  284. if (FLAGS_enable_compaction_filter) {
  285. // TODO (yanqin) enable compaction filter
  286. }
  287. ProcessRecoveredPreparedTxns(shared);
  288. ReopenAndPreloadDbIfNeeded(shared);
  289. // TODO (yanqin) parallelize if key space is large
  290. for (auto& key_gen : key_gen_for_a_) {
  291. assert(key_gen);
  292. key_gen->FinishInit();
  293. }
  294. // TODO (yanqin) parallelize if key space is large
  295. for (auto& key_gen : key_gen_for_c_) {
  296. assert(key_gen);
  297. key_gen->FinishInit();
  298. }
  299. }
  300. void MultiOpsTxnsStressTest::ReopenAndPreloadDbIfNeeded(SharedState* shared) {
  301. (void)shared;
  302. bool db_empty = false;
  303. {
  304. ReadOptions ropt;
  305. std::unique_ptr<ManagedSnapshot> snapshot = nullptr;
  306. if (FLAGS_auto_refresh_iterator_with_snapshot) {
  307. snapshot = std::make_unique<ManagedSnapshot>(db_);
  308. ropt.snapshot = snapshot->snapshot();
  309. ropt.auto_refresh_iterator_with_snapshot = true;
  310. }
  311. std::unique_ptr<Iterator> iter(db_->NewIterator(ropt));
  312. iter->SeekToFirst();
  313. if (!iter->Valid()) {
  314. db_empty = true;
  315. }
  316. }
  317. if (db_empty) {
  318. PreloadDb(shared, FLAGS_threads, FLAGS_lb_a, FLAGS_ub_a, FLAGS_lb_c,
  319. FLAGS_ub_c);
  320. } else {
  321. fprintf(stdout,
  322. "Key ranges will be read from %s.\n-lb_a, -ub_a, -lb_c, -ub_c will "
  323. "be ignored\n",
  324. FLAGS_key_spaces_path.c_str());
  325. fflush(stdout);
  326. ScanExistingDb(shared, FLAGS_threads);
  327. }
  328. }
  329. // Used for point-lookup transaction
  330. Status MultiOpsTxnsStressTest::TestGet(
  331. ThreadState* thread, const ReadOptions& read_opts,
  332. const std::vector<int>& /*rand_column_families*/,
  333. const std::vector<int64_t>& /*rand_keys*/) {
  334. ThreadStatus::OperationType cur_op_type =
  335. ThreadStatusUtil::GetThreadOperation();
  336. ThreadStatusUtil::SetThreadOperation(ThreadStatus::OperationType::OP_UNKNOWN);
  337. uint32_t a = 0;
  338. uint32_t pos = 0;
  339. std::tie(a, pos) = ChooseExistingA(thread);
  340. Status s = PointLookupTxn(thread, read_opts, a);
  341. ThreadStatusUtil::SetThreadOperation(cur_op_type);
  342. return s;
  343. }
  344. // Not used.
  345. std::vector<Status> MultiOpsTxnsStressTest::TestMultiGet(
  346. ThreadState* /*thread*/, const ReadOptions& /*read_opts*/,
  347. const std::vector<int>& /*rand_column_families*/,
  348. const std::vector<int64_t>& /*rand_keys*/) {
  349. return std::vector<Status>{Status::NotSupported()};
  350. }
  351. // Wide columns are currently not supported by transactions.
  352. void MultiOpsTxnsStressTest::TestGetEntity(
  353. ThreadState* /* thread */, const ReadOptions& /* read_opts */,
  354. const std::vector<int>& /* rand_column_families */,
  355. const std::vector<int64_t>& /* rand_keys */) {}
  356. // Wide columns are currently not supported by transactions.
  357. void MultiOpsTxnsStressTest::TestMultiGetEntity(
  358. ThreadState* /* thread */, const ReadOptions& /* read_opts */,
  359. const std::vector<int>& /* rand_column_families */,
  360. const std::vector<int64_t>& /* rand_keys */) {}
  361. Status MultiOpsTxnsStressTest::TestPrefixScan(
  362. ThreadState* thread, const ReadOptions& read_opts,
  363. const std::vector<int>& rand_column_families,
  364. const std::vector<int64_t>& rand_keys) {
  365. (void)thread;
  366. (void)read_opts;
  367. (void)rand_column_families;
  368. (void)rand_keys;
  369. return Status::OK();
  370. }
  371. // Given a key K, this creates an iterator which scans to K and then
  372. // does a random sequence of Next/Prev operations.
  373. Status MultiOpsTxnsStressTest::TestIterate(
  374. ThreadState* thread, const ReadOptions& read_opts,
  375. const std::vector<int>& /*rand_column_families*/,
  376. const std::vector<int64_t>& /*rand_keys*/) {
  377. ThreadStatus::OperationType cur_op_type =
  378. ThreadStatusUtil::GetThreadOperation();
  379. ThreadStatusUtil::SetThreadOperation(ThreadStatus::OperationType::OP_UNKNOWN);
  380. uint32_t c = 0;
  381. uint32_t pos = 0;
  382. std::tie(c, pos) = ChooseExistingC(thread);
  383. Status s = RangeScanTxn(thread, read_opts, c);
  384. ThreadStatusUtil::SetThreadOperation(cur_op_type);
  385. return s;
  386. }
  387. Status MultiOpsTxnsStressTest::TestIterateAttributeGroups(
  388. ThreadState* /*thread*/, const ReadOptions& /*read_opts*/,
  389. const std::vector<int>& /*rand_column_families*/,
  390. const std::vector<int64_t>& /*rand_keys*/) {
  391. return Status::NotSupported();
  392. }
  393. // Not intended for use.
  394. Status MultiOpsTxnsStressTest::TestPut(ThreadState* /*thread*/,
  395. WriteOptions& /*write_opts*/,
  396. const ReadOptions& /*read_opts*/,
  397. const std::vector<int>& /*cf_ids*/,
  398. const std::vector<int64_t>& /*keys*/,
  399. char (&value)[100]) {
  400. (void)value;
  401. return Status::NotSupported();
  402. }
  403. // Not intended for use.
  404. Status MultiOpsTxnsStressTest::TestDelete(
  405. ThreadState* /*thread*/, WriteOptions& /*write_opts*/,
  406. const std::vector<int>& /*rand_column_families*/,
  407. const std::vector<int64_t>& /*rand_keys*/) {
  408. return Status::NotSupported();
  409. }
  410. // Not intended for use.
  411. Status MultiOpsTxnsStressTest::TestDeleteRange(
  412. ThreadState* /*thread*/, WriteOptions& /*write_opts*/,
  413. const std::vector<int>& /*rand_column_families*/,
  414. const std::vector<int64_t>& /*rand_keys*/) {
  415. return Status::NotSupported();
  416. }
  417. void MultiOpsTxnsStressTest::TestIngestExternalFile(
  418. ThreadState* thread, const std::vector<int>& rand_column_families,
  419. const std::vector<int64_t>& /*rand_keys*/) {
  420. // TODO (yanqin)
  421. (void)thread;
  422. (void)rand_column_families;
  423. }
  424. void MultiOpsTxnsStressTest::TestCompactRange(
  425. ThreadState* thread, int64_t /*rand_key*/, const Slice& /*start_key*/,
  426. ColumnFamilyHandle* column_family) {
  427. // TODO (yanqin).
  428. // May use GetRangeHash() for validation before and after DB::CompactRange()
  429. // completes.
  430. (void)thread;
  431. (void)column_family;
  432. }
  433. Status MultiOpsTxnsStressTest::TestBackupRestore(
  434. ThreadState* thread, const std::vector<int>& rand_column_families,
  435. const std::vector<int64_t>& /*rand_keys*/) {
  436. // TODO (yanqin)
  437. (void)thread;
  438. (void)rand_column_families;
  439. return Status::OK();
  440. }
  441. Status MultiOpsTxnsStressTest::TestCheckpoint(
  442. ThreadState* thread, const std::vector<int>& rand_column_families,
  443. const std::vector<int64_t>& /*rand_keys*/) {
  444. // TODO (yanqin)
  445. (void)thread;
  446. (void)rand_column_families;
  447. return Status::OK();
  448. }
  449. Status MultiOpsTxnsStressTest::TestApproximateSize(
  450. ThreadState* thread, uint64_t iteration,
  451. const std::vector<int>& rand_column_families,
  452. const std::vector<int64_t>& /*rand_keys*/) {
  453. // TODO (yanqin)
  454. (void)thread;
  455. (void)iteration;
  456. (void)rand_column_families;
  457. return Status::OK();
  458. }
  459. Status MultiOpsTxnsStressTest::TestCustomOperations(
  460. ThreadState* thread, const std::vector<int>& rand_column_families) {
  461. (void)rand_column_families;
  462. // Randomly choose from 0, 1, and 2.
  463. // TODO (yanqin) allow user to configure probability of each operation.
  464. uint32_t rand = thread->rand.Uniform(3);
  465. Status s;
  466. if (0 == rand) {
  467. // Update primary key.
  468. uint32_t old_a = 0;
  469. uint32_t pos = 0;
  470. std::tie(old_a, pos) = ChooseExistingA(thread);
  471. uint32_t new_a = GenerateNextA(thread);
  472. s = PrimaryKeyUpdateTxn(thread, old_a, pos, new_a);
  473. } else if (1 == rand) {
  474. // Update secondary key.
  475. uint32_t old_c = 0;
  476. uint32_t pos = 0;
  477. std::tie(old_c, pos) = ChooseExistingC(thread);
  478. uint32_t new_c = GenerateNextC(thread);
  479. s = SecondaryKeyUpdateTxn(thread, old_c, pos, new_c);
  480. } else if (2 == rand) {
  481. // Update primary index value.
  482. uint32_t a = 0;
  483. uint32_t pos = 0;
  484. std::tie(a, pos) = ChooseExistingA(thread);
  485. s = UpdatePrimaryIndexValueTxn(thread, a, /*b_delta=*/1);
  486. } else {
  487. // Should never reach here.
  488. assert(false);
  489. }
  490. if (!s.ok()) {
  491. fprintf(stderr, "Transaction failed %s\n", s.ToString().c_str());
  492. fflush(stderr);
  493. thread->shared->SafeTerminate();
  494. }
  495. return s;
  496. }
  497. void MultiOpsTxnsStressTest::RegisterAdditionalListeners() {
  498. options_.listeners.emplace_back(new MultiOpsTxnsStressListener(this));
  499. }
  500. void MultiOpsTxnsStressTest::PrepareTxnDbOptions(
  501. SharedState* /*shared*/, TransactionDBOptions& txn_db_opts) {
  502. // MultiOpsTxnStressTest uses SingleDelete to delete secondary keys, thus we
  503. // register this callback to let TxnDb know that when rolling back
  504. // a transaction, use only SingleDelete to cancel prior Put from the same
  505. // transaction if applicable.
  506. txn_db_opts.rollback_deletion_type_callback =
  507. [](TransactionDB* /*db*/, ColumnFamilyHandle* /*column_family*/,
  508. const Slice& key) {
  509. Slice ks = key;
  510. uint32_t index_id = 0;
  511. [[maybe_unused]] bool res = GetFixed32(&ks, &index_id);
  512. assert(res);
  513. index_id = EndianSwapValue(index_id);
  514. assert(index_id <= Record::kSecondaryIndexId);
  515. return index_id == Record::kSecondaryIndexId;
  516. };
  517. }
  518. Status MultiOpsTxnsStressTest::PrimaryKeyUpdateTxn(ThreadState* thread,
  519. uint32_t old_a,
  520. uint32_t old_a_pos,
  521. uint32_t new_a) {
  522. std::string old_pk = Record::EncodePrimaryKey(old_a);
  523. std::string new_pk = Record::EncodePrimaryKey(new_a);
  524. std::unique_ptr<Transaction> txn;
  525. WriteOptions wopts;
  526. Status s = NewTxn(wopts, thread, &txn);
  527. if (!s.ok()) {
  528. assert(!txn);
  529. thread->stats.AddErrors(1);
  530. return s;
  531. }
  532. assert(txn);
  533. txn->SetSnapshotOnNextOperation(/*notifier=*/nullptr);
  534. const Defer cleanup([new_a, &s, thread, this, &txn]() {
  535. if (s.ok()) {
  536. // Two gets, one for existing pk, one for locking potential new pk.
  537. thread->stats.AddGets(/*ngets=*/2, /*nfounds=*/1);
  538. thread->stats.AddDeletes(1);
  539. thread->stats.AddBytesForWrites(
  540. /*nwrites=*/2,
  541. Record::kPrimaryIndexEntrySize + Record::kSecondaryIndexEntrySize);
  542. thread->stats.AddSingleDeletes(1);
  543. return;
  544. }
  545. if (s.IsNotFound()) {
  546. thread->stats.AddGets(/*ngets=*/1, /*nfounds=*/0);
  547. } else if (s.IsBusy() || s.IsIncomplete()) {
  548. // ignore.
  549. // Incomplete also means rollback by application. See the transaction
  550. // implementations.
  551. } else {
  552. thread->stats.AddErrors(1);
  553. }
  554. auto& key_gen = key_gen_for_a_[thread->tid];
  555. key_gen->UndoAllocation(new_a);
  556. s = txn->Rollback();
  557. if (!s.ok()) {
  558. fprintf(stderr, "Transaction rollback failed %s\n", s.ToString().c_str());
  559. fflush(stderr);
  560. assert(false);
  561. }
  562. });
  563. ReadOptions ropts;
  564. ropts.rate_limiter_priority =
  565. FLAGS_rate_limit_user_ops ? Env::IO_USER : Env::IO_TOTAL;
  566. std::string value;
  567. s = txn->GetForUpdate(ropts, old_pk, &value);
  568. if (!s.ok()) {
  569. return s;
  570. }
  571. std::string empty_value;
  572. s = txn->GetForUpdate(ropts, new_pk, &empty_value);
  573. if (s.ok()) {
  574. assert(!empty_value.empty());
  575. s = Status::Busy();
  576. return s;
  577. } else if (!s.IsNotFound()) {
  578. return s;
  579. }
  580. auto result = Record::DecodePrimaryIndexValue(value);
  581. s = std::get<0>(result);
  582. if (!s.ok()) {
  583. return s;
  584. }
  585. uint32_t b = std::get<1>(result);
  586. uint32_t c = std::get<2>(result);
  587. ColumnFamilyHandle* cf = db_->DefaultColumnFamily();
  588. s = txn->Delete(cf, old_pk, /*assume_tracked=*/true);
  589. if (!s.ok()) {
  590. return s;
  591. }
  592. s = txn->Put(cf, new_pk, value, /*assume_tracked=*/true);
  593. if (!s.ok()) {
  594. return s;
  595. }
  596. auto* wb = txn->GetWriteBatch();
  597. assert(wb);
  598. std::string old_sk = Record::EncodeSecondaryKey(c, old_a);
  599. s = wb->SingleDelete(old_sk);
  600. if (!s.ok()) {
  601. return s;
  602. }
  603. Record record(new_a, b, c);
  604. std::string new_sk;
  605. std::string new_crc;
  606. std::tie(new_sk, new_crc) = record.EncodeSecondaryIndexEntry();
  607. s = wb->Put(new_sk, new_crc);
  608. if (!s.ok()) {
  609. return s;
  610. }
  611. s = txn->Prepare();
  612. if (!s.ok()) {
  613. return s;
  614. }
  615. if (FLAGS_rollback_one_in > 0 && thread->rand.OneIn(FLAGS_rollback_one_in)) {
  616. s = Status::Incomplete();
  617. return s;
  618. }
  619. s = WriteToCommitTimeWriteBatch(*txn);
  620. if (!s.ok()) {
  621. return s;
  622. }
  623. s = CommitAndCreateTimestampedSnapshotIfNeeded(thread, *txn);
  624. auto& key_gen = key_gen_for_a_.at(thread->tid);
  625. if (s.ok()) {
  626. key_gen->Replace(old_a, old_a_pos, new_a);
  627. }
  628. return s;
  629. }
  630. Status MultiOpsTxnsStressTest::SecondaryKeyUpdateTxn(ThreadState* thread,
  631. uint32_t old_c,
  632. uint32_t old_c_pos,
  633. uint32_t new_c) {
  634. std::unique_ptr<Transaction> txn;
  635. WriteOptions wopts;
  636. Status s = NewTxn(wopts, thread, &txn);
  637. if (!s.ok()) {
  638. assert(!txn);
  639. thread->stats.AddErrors(1);
  640. return s;
  641. }
  642. assert(txn);
  643. Iterator* it = nullptr;
  644. long iterations = 0;
  645. const Defer cleanup([new_c, &s, thread, &txn, &it, this, &iterations]() {
  646. delete it;
  647. if (s.ok()) {
  648. thread->stats.AddIterations(iterations);
  649. thread->stats.AddGets(/*ngets=*/1, /*nfounds=*/1);
  650. thread->stats.AddSingleDeletes(1);
  651. thread->stats.AddBytesForWrites(
  652. /*nwrites=*/2,
  653. Record::kPrimaryIndexEntrySize + Record::kSecondaryIndexEntrySize);
  654. return;
  655. } else if (s.IsBusy() || s.IsTimedOut() || s.IsTryAgain() ||
  656. s.IsMergeInProgress() || s.IsIncomplete()) {
  657. // ww-conflict detected, or
  658. // lock cannot be acquired, or
  659. // memtable history is not large enough for conflict checking, or
  660. // Merge operation cannot be resolved, or
  661. // application rollback.
  662. // TODO (yanqin) add stats for other cases?
  663. } else if (s.IsNotFound()) {
  664. // ignore.
  665. } else {
  666. thread->stats.AddErrors(1);
  667. }
  668. auto& key_gen = key_gen_for_c_[thread->tid];
  669. key_gen->UndoAllocation(new_c);
  670. s = txn->Rollback();
  671. if (!s.ok()) {
  672. fprintf(stderr, "Transaction rollback failed %s\n", s.ToString().c_str());
  673. fflush(stderr);
  674. assert(false);
  675. }
  676. });
  677. // TODO (yanqin) try SetSnapshotOnNextOperation(). We currently need to take
  678. // a snapshot here because we will later verify that point lookup in the
  679. // primary index using GetForUpdate() returns the same value for 'c' as the
  680. // iterator. The iterator does not need a snapshot though, because it will be
  681. // assigned the current latest (published) sequence in the db, which will be
  682. // no smaller than the snapshot created here. The GetForUpdate will perform
  683. // ww conflict checking to ensure GetForUpdate() (using the snapshot) sees
  684. // the same data as this iterator.
  685. txn->SetSnapshot();
  686. std::string old_sk_prefix = Record::EncodeSecondaryKey(old_c);
  687. std::string iter_ub_str = Record::EncodeSecondaryKey(old_c + 1);
  688. Slice iter_ub = iter_ub_str;
  689. ReadOptions ropts;
  690. ropts.snapshot = txn->GetSnapshot();
  691. ropts.auto_refresh_iterator_with_snapshot =
  692. FLAGS_auto_refresh_iterator_with_snapshot;
  693. ropts.total_order_seek = true;
  694. ropts.iterate_upper_bound = &iter_ub;
  695. ropts.rate_limiter_priority =
  696. FLAGS_rate_limit_user_ops ? Env::IO_USER : Env::IO_TOTAL;
  697. if (FLAGS_use_sqfc_for_range_queries) {
  698. ropts.table_filter =
  699. sqfc_factory_->GetTableFilterForRangeQuery(old_sk_prefix, iter_ub);
  700. }
  701. it = txn->GetIterator(ropts);
  702. assert(it);
  703. it->Seek(old_sk_prefix);
  704. if (!it->Valid()) {
  705. s = Status::NotFound();
  706. return s;
  707. }
  708. auto* wb = txn->GetWriteBatch();
  709. assert(wb);
  710. do {
  711. ++iterations;
  712. Record record;
  713. s = record.DecodeSecondaryIndexEntry(it->key(), it->value());
  714. if (!s.ok()) {
  715. fprintf(stderr, "Cannot decode secondary key (%s => %s): %s\n",
  716. it->key().ToString(true).c_str(),
  717. it->value().ToString(true).c_str(), s.ToString().c_str());
  718. assert(false);
  719. break;
  720. }
  721. // At this point, record.b is not known yet, thus we need to access
  722. // primary index.
  723. std::string pk = Record::EncodePrimaryKey(record.a_value());
  724. std::string value;
  725. ReadOptions read_opts;
  726. read_opts.rate_limiter_priority =
  727. FLAGS_rate_limit_user_ops ? Env::IO_USER : Env::IO_TOTAL;
  728. read_opts.snapshot = txn->GetSnapshot();
  729. s = txn->GetForUpdate(read_opts, pk, &value);
  730. if (s.IsBusy() || s.IsTimedOut() || s.IsTryAgain() ||
  731. s.IsMergeInProgress()) {
  732. // Write conflict, or cannot acquire lock, or memtable size is not large
  733. // enough, or merge cannot be resolved.
  734. break;
  735. } else if (s.IsNotFound()) {
  736. // We can also fail verification here.
  737. std::ostringstream oss;
  738. auto* dbimpl = static_cast_with_check<DBImpl>(db_->GetRootDB());
  739. assert(dbimpl);
  740. oss << "snap " << read_opts.snapshot->GetSequenceNumber()
  741. << " (published " << dbimpl->GetLastPublishedSequence()
  742. << "), pk should exist: " << Slice(pk).ToString(true);
  743. fprintf(stderr, "%s\n", oss.str().c_str());
  744. assert(false);
  745. break;
  746. }
  747. if (!s.ok()) {
  748. std::ostringstream oss;
  749. auto* dbimpl = static_cast_with_check<DBImpl>(db_->GetRootDB());
  750. assert(dbimpl);
  751. oss << "snap " << read_opts.snapshot->GetSequenceNumber()
  752. << " (published " << dbimpl->GetLastPublishedSequence() << "), "
  753. << s.ToString();
  754. fprintf(stderr, "%s\n", oss.str().c_str());
  755. assert(false);
  756. break;
  757. }
  758. auto result = Record::DecodePrimaryIndexValue(value);
  759. s = std::get<0>(result);
  760. if (!s.ok()) {
  761. fprintf(stderr, "Cannot decode primary index value %s: %s\n",
  762. Slice(value).ToString(true).c_str(), s.ToString().c_str());
  763. assert(false);
  764. break;
  765. }
  766. uint32_t b = std::get<1>(result);
  767. uint32_t c = std::get<2>(result);
  768. if (c != old_c) {
  769. std::ostringstream oss;
  770. auto* dbimpl = static_cast_with_check<DBImpl>(db_->GetRootDB());
  771. assert(dbimpl);
  772. oss << "snap " << read_opts.snapshot->GetSequenceNumber()
  773. << " (published " << dbimpl->GetLastPublishedSequence()
  774. << "), pk/sk mismatch. pk: (a=" << record.a_value() << ", "
  775. << "c=" << c << "), sk: (c=" << old_c << ")";
  776. s = Status::Corruption();
  777. fprintf(stderr, "%s\n", oss.str().c_str());
  778. assert(false);
  779. break;
  780. }
  781. Record new_rec(record.a_value(), b, new_c);
  782. std::string new_primary_index_value = new_rec.EncodePrimaryIndexValue();
  783. ColumnFamilyHandle* cf = db_->DefaultColumnFamily();
  784. s = txn->Put(cf, pk, new_primary_index_value, /*assume_tracked=*/true);
  785. if (!s.ok()) {
  786. break;
  787. }
  788. std::string old_sk = it->key().ToString(/*hex=*/false);
  789. std::string new_sk;
  790. std::string new_crc;
  791. std::tie(new_sk, new_crc) = new_rec.EncodeSecondaryIndexEntry();
  792. s = wb->SingleDelete(old_sk);
  793. if (!s.ok()) {
  794. break;
  795. }
  796. s = wb->Put(new_sk, new_crc);
  797. if (!s.ok()) {
  798. break;
  799. }
  800. it->Next();
  801. } while (it->Valid());
  802. if (!s.ok()) {
  803. return s;
  804. }
  805. s = txn->Prepare();
  806. if (!s.ok()) {
  807. return s;
  808. }
  809. if (FLAGS_rollback_one_in > 0 && thread->rand.OneIn(FLAGS_rollback_one_in)) {
  810. s = Status::Incomplete();
  811. return s;
  812. }
  813. s = WriteToCommitTimeWriteBatch(*txn);
  814. if (!s.ok()) {
  815. return s;
  816. }
  817. s = CommitAndCreateTimestampedSnapshotIfNeeded(thread, *txn);
  818. if (s.ok()) {
  819. auto& key_gen = key_gen_for_c_.at(thread->tid);
  820. key_gen->Replace(old_c, old_c_pos, new_c);
  821. }
  822. return s;
  823. }
  824. Status MultiOpsTxnsStressTest::UpdatePrimaryIndexValueTxn(ThreadState* thread,
  825. uint32_t a,
  826. uint32_t b_delta) {
  827. std::string pk_str = Record::EncodePrimaryKey(a);
  828. std::unique_ptr<Transaction> txn;
  829. WriteOptions wopts;
  830. Status s = NewTxn(wopts, thread, &txn);
  831. if (!s.ok()) {
  832. assert(!txn);
  833. thread->stats.AddErrors(1);
  834. return s;
  835. }
  836. assert(txn);
  837. const Defer cleanup([&s, thread, &txn]() {
  838. if (s.ok()) {
  839. thread->stats.AddGets(/*ngets=*/1, /*nfounds=*/1);
  840. thread->stats.AddBytesForWrites(
  841. /*nwrites=*/1, /*nbytes=*/Record::kPrimaryIndexEntrySize);
  842. return;
  843. }
  844. if (s.IsNotFound()) {
  845. thread->stats.AddGets(/*ngets=*/1, /*nfounds=*/0);
  846. } else if (s.IsInvalidArgument()) {
  847. // ignored.
  848. } else if (s.IsBusy() || s.IsTimedOut() || s.IsTryAgain() ||
  849. s.IsMergeInProgress() || s.IsIncomplete()) {
  850. // ignored.
  851. } else {
  852. thread->stats.AddErrors(1);
  853. }
  854. s = txn->Rollback();
  855. if (!s.ok()) {
  856. fprintf(stderr, "Transaction rollback failed %s\n", s.ToString().c_str());
  857. fflush(stderr);
  858. assert(false);
  859. }
  860. });
  861. ReadOptions ropts;
  862. ropts.rate_limiter_priority =
  863. FLAGS_rate_limit_user_ops ? Env::IO_USER : Env::IO_TOTAL;
  864. std::string value;
  865. s = txn->GetForUpdate(ropts, pk_str, &value);
  866. if (!s.ok()) {
  867. return s;
  868. }
  869. auto result = Record::DecodePrimaryIndexValue(value);
  870. if (!std::get<0>(result).ok()) {
  871. s = std::get<0>(result);
  872. fprintf(stderr, "Cannot decode primary index value %s: %s\n",
  873. Slice(value).ToString(true).c_str(), s.ToString().c_str());
  874. assert(false);
  875. return s;
  876. }
  877. uint32_t b = std::get<1>(result) + b_delta;
  878. uint32_t c = std::get<2>(result);
  879. Record record(a, b, c);
  880. std::string primary_index_value = record.EncodePrimaryIndexValue();
  881. ColumnFamilyHandle* cf = db_->DefaultColumnFamily();
  882. s = txn->Put(cf, pk_str, primary_index_value, /*assume_tracked=*/true);
  883. if (!s.ok()) {
  884. return s;
  885. }
  886. s = txn->Prepare();
  887. if (!s.ok()) {
  888. return s;
  889. }
  890. if (FLAGS_rollback_one_in > 0 && thread->rand.OneIn(FLAGS_rollback_one_in)) {
  891. s = Status::Incomplete();
  892. return s;
  893. }
  894. s = WriteToCommitTimeWriteBatch(*txn);
  895. if (!s.ok()) {
  896. return s;
  897. }
  898. s = CommitAndCreateTimestampedSnapshotIfNeeded(thread, *txn);
  899. return s;
  900. }
  901. Status MultiOpsTxnsStressTest::PointLookupTxn(ThreadState* thread,
  902. ReadOptions ropts, uint32_t a) {
  903. std::string pk_str = Record::EncodePrimaryKey(a);
  904. // pk may or may not exist
  905. PinnableSlice value;
  906. std::unique_ptr<Transaction> txn;
  907. WriteOptions wopts;
  908. Status s = NewTxn(wopts, thread, &txn);
  909. if (!s.ok()) {
  910. assert(!txn);
  911. thread->stats.AddErrors(1);
  912. return s;
  913. }
  914. assert(txn);
  915. const Defer cleanup([&s, thread, &txn]() {
  916. if (s.ok()) {
  917. thread->stats.AddGets(/*ngets=*/1, /*nfounds=*/1);
  918. return;
  919. } else if (s.IsNotFound()) {
  920. thread->stats.AddGets(/*ngets=*/1, /*nfounds=*/0);
  921. } else {
  922. thread->stats.AddErrors(1);
  923. }
  924. txn->Rollback().PermitUncheckedError();
  925. });
  926. std::shared_ptr<const Snapshot> snapshot;
  927. SetupSnapshot(thread, ropts, *txn, snapshot);
  928. if (FLAGS_delay_snapshot_read_one_in > 0 &&
  929. thread->rand.OneIn(FLAGS_delay_snapshot_read_one_in)) {
  930. uint64_t delay_ms = thread->rand.Uniform(100) + 1;
  931. db_->GetDBOptions().env->SleepForMicroseconds(
  932. static_cast<int>(delay_ms * 1000));
  933. }
  934. s = txn->Get(ropts, db_->DefaultColumnFamily(), pk_str, &value);
  935. if (s.ok()) {
  936. s = txn->Commit();
  937. }
  938. return s;
  939. }
  940. Status MultiOpsTxnsStressTest::RangeScanTxn(ThreadState* thread,
  941. ReadOptions ropts, uint32_t c) {
  942. std::string sk = Record::EncodeSecondaryKey(c);
  943. std::unique_ptr<Transaction> txn;
  944. WriteOptions wopts;
  945. Status s = NewTxn(wopts, thread, &txn);
  946. if (!s.ok()) {
  947. assert(!txn);
  948. thread->stats.AddErrors(1);
  949. return s;
  950. }
  951. assert(txn);
  952. const Defer cleanup([&s, thread, &txn]() {
  953. if (s.ok()) {
  954. thread->stats.AddIterations(1);
  955. return;
  956. }
  957. thread->stats.AddErrors(1);
  958. txn->Rollback().PermitUncheckedError();
  959. });
  960. std::shared_ptr<const Snapshot> snapshot;
  961. SetupSnapshot(thread, ropts, *txn, snapshot);
  962. if (FLAGS_delay_snapshot_read_one_in > 0 &&
  963. thread->rand.OneIn(FLAGS_delay_snapshot_read_one_in)) {
  964. uint64_t delay_ms = thread->rand.Uniform(100) + 1;
  965. db_->GetDBOptions().env->SleepForMicroseconds(
  966. static_cast<int>(delay_ms * 1000));
  967. }
  968. std::unique_ptr<Iterator> iter(txn->GetIterator(ropts));
  969. constexpr size_t total_nexts = 10;
  970. size_t nexts = 0;
  971. for (iter->Seek(sk);
  972. iter->Valid() && nexts < total_nexts && iter->status().ok();
  973. iter->Next(), ++nexts) {
  974. }
  975. if (iter->status().ok()) {
  976. s = txn->Commit();
  977. } else {
  978. s = iter->status();
  979. }
  980. return s;
  981. }
  982. void MultiOpsTxnsStressTest::VerifyDb(ThreadState* thread) const {
  983. if (thread->shared->HasVerificationFailedYet()) {
  984. return;
  985. }
  986. const Snapshot* const snapshot = db_->GetSnapshot();
  987. assert(snapshot);
  988. ManagedSnapshot snapshot_guard(db_, snapshot);
  989. std::ostringstream oss;
  990. oss << "[snap=" << snapshot->GetSequenceNumber() << ",";
  991. auto* dbimpl = static_cast_with_check<DBImpl>(db_->GetRootDB());
  992. assert(dbimpl);
  993. oss << " last_published=" << dbimpl->GetLastPublishedSequence() << "] ";
  994. if (FLAGS_delay_snapshot_read_one_in > 0 &&
  995. thread->rand.OneIn(FLAGS_delay_snapshot_read_one_in)) {
  996. uint64_t delay_ms = thread->rand.Uniform(100) + 1;
  997. db_->GetDBOptions().env->SleepForMicroseconds(
  998. static_cast<int>(delay_ms * 1000));
  999. }
  1000. // TODO (yanqin) with a probability, we can use either forward or backward
  1001. // iterator in subsequent checks. We can also use more advanced features in
  1002. // range scan. For now, let's just use simple forward iteration with
  1003. // total_order_seek = true.
  1004. // First, iterate primary index.
  1005. size_t primary_index_entries_count = 0;
  1006. {
  1007. std::string iter_ub_str;
  1008. PutFixed32(&iter_ub_str, Record::kPrimaryIndexId + 1);
  1009. std::reverse(iter_ub_str.begin(), iter_ub_str.end());
  1010. Slice iter_ub = iter_ub_str;
  1011. std::string start_key;
  1012. PutFixed32(&start_key, Record::kPrimaryIndexId);
  1013. std::reverse(start_key.begin(), start_key.end());
  1014. // This `ReadOptions` is for validation purposes. Ignore
  1015. // `FLAGS_rate_limit_user_ops` to avoid slowing any validation.
  1016. ReadOptions ropts;
  1017. ropts.snapshot = snapshot;
  1018. ropts.auto_refresh_iterator_with_snapshot =
  1019. FLAGS_auto_refresh_iterator_with_snapshot;
  1020. ropts.total_order_seek = true;
  1021. ropts.iterate_upper_bound = &iter_ub;
  1022. if (FLAGS_use_sqfc_for_range_queries) {
  1023. ropts.table_filter =
  1024. sqfc_factory_->GetTableFilterForRangeQuery(start_key, iter_ub);
  1025. }
  1026. std::unique_ptr<Iterator> it(db_->NewIterator(ropts));
  1027. for (it->Seek(start_key); it->Valid(); it->Next()) {
  1028. Record record;
  1029. Status s = record.DecodePrimaryIndexEntry(it->key(), it->value());
  1030. if (!s.ok()) {
  1031. oss << "Cannot decode primary index entry " << it->key().ToString(true)
  1032. << "=>" << it->value().ToString(true) << ". Status is "
  1033. << s.ToString();
  1034. VerificationAbort(thread->shared, oss.str());
  1035. assert(false);
  1036. return;
  1037. }
  1038. ++primary_index_entries_count;
  1039. // Search secondary index.
  1040. uint32_t a = record.a_value();
  1041. uint32_t c = record.c_value();
  1042. char sk_buf[12];
  1043. EncodeFixed32(sk_buf, Record::kSecondaryIndexId);
  1044. std::reverse(sk_buf, sk_buf + sizeof(uint32_t));
  1045. EncodeFixed32(sk_buf + sizeof(uint32_t), c);
  1046. std::reverse(sk_buf + sizeof(uint32_t), sk_buf + 2 * sizeof(uint32_t));
  1047. EncodeFixed32(sk_buf + 2 * sizeof(uint32_t), a);
  1048. std::reverse(sk_buf + 2 * sizeof(uint32_t), sk_buf + sizeof(sk_buf));
  1049. Slice sk(sk_buf, sizeof(sk_buf));
  1050. std::string value;
  1051. s = db_->Get(ropts, sk, &value);
  1052. if (!s.ok()) {
  1053. oss << "Cannot find secondary index entry " << sk.ToString(true)
  1054. << ". Status is " << s.ToString();
  1055. VerificationAbort(thread->shared, oss.str());
  1056. assert(false);
  1057. return;
  1058. }
  1059. }
  1060. }
  1061. // Second, iterate secondary index.
  1062. size_t secondary_index_entries_count = 0;
  1063. {
  1064. std::string start_key;
  1065. PutFixed32(&start_key, Record::kSecondaryIndexId);
  1066. std::reverse(start_key.begin(), start_key.end());
  1067. // This `ReadOptions` is for validation purposes. Ignore
  1068. // `FLAGS_rate_limit_user_ops` to avoid slowing any validation.
  1069. ReadOptions ropts;
  1070. ropts.snapshot = snapshot;
  1071. ropts.auto_refresh_iterator_with_snapshot =
  1072. FLAGS_auto_refresh_iterator_with_snapshot;
  1073. ropts.total_order_seek = true;
  1074. std::unique_ptr<Iterator> it(db_->NewIterator(ropts));
  1075. for (it->Seek(start_key); it->Valid(); it->Next()) {
  1076. ++secondary_index_entries_count;
  1077. Record record;
  1078. Status s = record.DecodeSecondaryIndexEntry(it->key(), it->value());
  1079. if (!s.ok()) {
  1080. oss << "Cannot decode secondary index entry "
  1081. << it->key().ToString(true) << "=>" << it->value().ToString(true)
  1082. << ". Status is " << s.ToString();
  1083. VerificationAbort(thread->shared, oss.str());
  1084. assert(false);
  1085. return;
  1086. }
  1087. // After decoding secondary index entry, we know a and c. Crc is verified
  1088. // in decoding phase.
  1089. //
  1090. // Form a primary key and search in the primary index.
  1091. std::string pk = Record::EncodePrimaryKey(record.a_value());
  1092. std::string value;
  1093. s = db_->Get(ropts, pk, &value);
  1094. if (!s.ok()) {
  1095. oss << "Error searching pk " << Slice(pk).ToString(true) << ". "
  1096. << s.ToString() << ". sk " << it->key().ToString(true);
  1097. VerificationAbort(thread->shared, oss.str());
  1098. assert(false);
  1099. return;
  1100. }
  1101. auto result = Record::DecodePrimaryIndexValue(value);
  1102. s = std::get<0>(result);
  1103. if (!s.ok()) {
  1104. oss << "Error decoding primary index value "
  1105. << Slice(value).ToString(true) << ". Status is " << s.ToString();
  1106. VerificationAbort(thread->shared, oss.str());
  1107. assert(false);
  1108. return;
  1109. }
  1110. uint32_t c_in_primary = std::get<2>(result);
  1111. if (c_in_primary != record.c_value()) {
  1112. oss << "Pk/sk mismatch. pk: " << Slice(pk).ToString(true) << "=>"
  1113. << Slice(value).ToString(true) << " (a=" << record.a_value()
  1114. << ", c=" << c_in_primary << "), sk: " << it->key().ToString(true)
  1115. << " (c=" << record.c_value() << ")";
  1116. VerificationAbort(thread->shared, oss.str());
  1117. assert(false);
  1118. return;
  1119. }
  1120. }
  1121. }
  1122. if (secondary_index_entries_count != primary_index_entries_count) {
  1123. oss << "Pk/sk mismatch: primary index has " << primary_index_entries_count
  1124. << " entries. Secondary index has " << secondary_index_entries_count
  1125. << " entries.";
  1126. VerificationAbort(thread->shared, oss.str());
  1127. assert(false);
  1128. return;
  1129. }
  1130. }
  1131. // VerifyPkSkFast() can be called by MultiOpsTxnsStressListener's callbacks
  1132. // which can be called before TransactionDB::Open() returns to caller.
  1133. // Therefore, at that time, db_ and txn_db_ may still be nullptr.
  1134. // Caller has to make sure that the race condition does not happen.
  1135. void MultiOpsTxnsStressTest::VerifyPkSkFast(const ReadOptions& read_options,
  1136. int job_id) {
  1137. DB* const db = db_aptr_.load(std::memory_order_acquire);
  1138. if (db == nullptr) {
  1139. return;
  1140. }
  1141. assert(db_ == db);
  1142. assert(db_ != nullptr);
  1143. ThreadStatus::OperationType cur_op_type =
  1144. ThreadStatusUtil::GetThreadOperation();
  1145. ThreadStatusUtil::SetThreadOperation(ThreadStatus::OperationType::OP_UNKNOWN);
  1146. const Snapshot* const snapshot = db_->GetSnapshot();
  1147. ThreadStatusUtil::SetThreadOperation(cur_op_type);
  1148. assert(snapshot);
  1149. ManagedSnapshot snapshot_guard(db_, snapshot);
  1150. std::ostringstream oss;
  1151. auto* dbimpl = static_cast_with_check<DBImpl>(db_->GetRootDB());
  1152. assert(dbimpl);
  1153. oss << "Job " << job_id << ": [" << snapshot->GetSequenceNumber() << ","
  1154. << dbimpl->GetLastPublishedSequence() << "] ";
  1155. std::string start_key;
  1156. PutFixed32(&start_key, Record::kSecondaryIndexId);
  1157. std::reverse(start_key.begin(), start_key.end());
  1158. // This `ReadOptions` is for validation purposes. Ignore
  1159. // `FLAGS_rate_limit_user_ops` to avoid slowing any validation.
  1160. ReadOptions ropts;
  1161. ropts.snapshot = snapshot;
  1162. ropts.auto_refresh_iterator_with_snapshot =
  1163. FLAGS_auto_refresh_iterator_with_snapshot;
  1164. ropts.total_order_seek = true;
  1165. ropts.io_activity = read_options.io_activity;
  1166. std::unique_ptr<Iterator> it(db_->NewIterator(ropts));
  1167. for (it->Seek(start_key); it->Valid(); it->Next()) {
  1168. Record record;
  1169. Status s = record.DecodeSecondaryIndexEntry(it->key(), it->value());
  1170. if (!s.ok()) {
  1171. oss << "Cannot decode secondary index entry " << it->key().ToString(true)
  1172. << "=>" << it->value().ToString(true);
  1173. fprintf(stderr, "%s\n", oss.str().c_str());
  1174. fflush(stderr);
  1175. assert(false);
  1176. }
  1177. // After decoding secondary index entry, we know a and c. Crc is verified
  1178. // in decoding phase.
  1179. //
  1180. // Form a primary key and search in the primary index.
  1181. std::string pk = Record::EncodePrimaryKey(record.a_value());
  1182. std::string value;
  1183. s = db_->Get(ropts, pk, &value);
  1184. if (!s.ok()) {
  1185. oss << "Error searching pk " << Slice(pk).ToString(true) << ". "
  1186. << s.ToString() << ". sk " << it->key().ToString(true);
  1187. fprintf(stderr, "%s\n", oss.str().c_str());
  1188. fflush(stderr);
  1189. assert(false);
  1190. }
  1191. auto result = Record::DecodePrimaryIndexValue(value);
  1192. s = std::get<0>(result);
  1193. if (!s.ok()) {
  1194. oss << "Error decoding primary index value "
  1195. << Slice(value).ToString(true) << ". " << s.ToString();
  1196. fprintf(stderr, "%s\n", oss.str().c_str());
  1197. fflush(stderr);
  1198. assert(false);
  1199. }
  1200. uint32_t c_in_primary = std::get<2>(result);
  1201. if (c_in_primary != record.c_value()) {
  1202. oss << "Pk/sk mismatch. pk: " << Slice(pk).ToString(true) << "=>"
  1203. << Slice(value).ToString(true) << " (a=" << record.a_value()
  1204. << ", c=" << c_in_primary << "), sk: " << it->key().ToString(true)
  1205. << " (c=" << record.c_value() << ")";
  1206. fprintf(stderr, "%s\n", oss.str().c_str());
  1207. fflush(stderr);
  1208. assert(false);
  1209. }
  1210. }
  1211. }
  1212. std::pair<uint32_t, uint32_t> MultiOpsTxnsStressTest::ChooseExistingA(
  1213. ThreadState* thread) {
  1214. uint32_t tid = thread->tid;
  1215. auto& key_gen = key_gen_for_a_.at(tid);
  1216. return key_gen->ChooseExisting();
  1217. }
  1218. uint32_t MultiOpsTxnsStressTest::GenerateNextA(ThreadState* thread) {
  1219. uint32_t tid = thread->tid;
  1220. auto& key_gen = key_gen_for_a_.at(tid);
  1221. return key_gen->Allocate();
  1222. }
  1223. std::pair<uint32_t, uint32_t> MultiOpsTxnsStressTest::ChooseExistingC(
  1224. ThreadState* thread) {
  1225. uint32_t tid = thread->tid;
  1226. auto& key_gen = key_gen_for_c_.at(tid);
  1227. return key_gen->ChooseExisting();
  1228. }
  1229. uint32_t MultiOpsTxnsStressTest::GenerateNextC(ThreadState* thread) {
  1230. uint32_t tid = thread->tid;
  1231. auto& key_gen = key_gen_for_c_.at(tid);
  1232. return key_gen->Allocate();
  1233. }
  1234. void MultiOpsTxnsStressTest::ProcessRecoveredPreparedTxnsHelper(
  1235. Transaction* txn, SharedState* shared) {
  1236. thread_local Random rand(static_cast<uint32_t>(FLAGS_seed));
  1237. if (rand.OneIn(2)) {
  1238. Status s = txn->Commit();
  1239. ProcessStatus(shared, "ProcessRecoveredPreparedTxnsHelper", s,
  1240. /*ignore_injected_error=*/false);
  1241. } else {
  1242. Status s = txn->Rollback();
  1243. ProcessStatus(shared, "ProcessRecoveredPreparedTxnsHelper", s,
  1244. /*ignore_injected_error=*/false);
  1245. }
  1246. }
  1247. Status MultiOpsTxnsStressTest::WriteToCommitTimeWriteBatch(Transaction& txn) {
  1248. WriteBatch* ctwb = txn.GetCommitTimeWriteBatch();
  1249. assert(ctwb);
  1250. // Do not change the content in key_buf.
  1251. static constexpr char key_buf[sizeof(Record::kMetadataPrefix) + 4] = {
  1252. '\0', '\0', '\0', '\0', '\0', '\0', '\0', '\xff'};
  1253. uint64_t counter_val = counter_.Next();
  1254. char val_buf[sizeof(counter_val)];
  1255. EncodeFixed64(val_buf, counter_val);
  1256. return ctwb->Put(Slice(key_buf, sizeof(key_buf)),
  1257. Slice(val_buf, sizeof(val_buf)));
  1258. }
  1259. Status MultiOpsTxnsStressTest::CommitAndCreateTimestampedSnapshotIfNeeded(
  1260. ThreadState* thread, Transaction& txn) {
  1261. Status s;
  1262. if (FLAGS_create_timestamped_snapshot_one_in > 0 &&
  1263. thread->rand.OneInOpt(FLAGS_create_timestamped_snapshot_one_in)) {
  1264. uint64_t ts = db_stress_env->NowNanos();
  1265. std::shared_ptr<const Snapshot> snapshot;
  1266. s = txn.CommitAndTryCreateSnapshot(/*notifier=*/nullptr, ts, &snapshot);
  1267. } else {
  1268. s = txn.Commit();
  1269. }
  1270. if (!s.ok()) {
  1271. fprintf(stderr, "Txn %s commit failed with %s\n", txn.GetName().c_str(),
  1272. s.ToString().c_str());
  1273. fflush(stderr);
  1274. }
  1275. assert(txn_db_);
  1276. if (FLAGS_create_timestamped_snapshot_one_in > 0 &&
  1277. thread->rand.OneInOpt(50000)) {
  1278. uint64_t now = db_stress_env->NowNanos();
  1279. constexpr uint64_t time_diff = static_cast<uint64_t>(1000) * 1000 * 1000;
  1280. txn_db_->ReleaseTimestampedSnapshotsOlderThan(now - time_diff);
  1281. }
  1282. return s;
  1283. }
  1284. void MultiOpsTxnsStressTest::SetupSnapshot(
  1285. ThreadState* thread, ReadOptions& read_opts, Transaction& txn,
  1286. std::shared_ptr<const Snapshot>& snapshot) {
  1287. if (thread->rand.OneInOpt(2)) {
  1288. snapshot = txn_db_->GetLatestTimestampedSnapshot();
  1289. }
  1290. if (snapshot) {
  1291. read_opts.snapshot = snapshot.get();
  1292. } else {
  1293. txn.SetSnapshot();
  1294. read_opts.snapshot = txn.GetSnapshot();
  1295. }
  1296. }
  1297. std::string MultiOpsTxnsStressTest::KeySpaces::EncodeTo() const {
  1298. std::string result;
  1299. PutFixed32(&result, lb_a);
  1300. PutFixed32(&result, ub_a);
  1301. PutFixed32(&result, lb_c);
  1302. PutFixed32(&result, ub_c);
  1303. return result;
  1304. }
  1305. bool MultiOpsTxnsStressTest::KeySpaces::DecodeFrom(Slice data) {
  1306. if (!GetFixed32(&data, &lb_a) || !GetFixed32(&data, &ub_a) ||
  1307. !GetFixed32(&data, &lb_c) || !GetFixed32(&data, &ub_c)) {
  1308. return false;
  1309. }
  1310. return true;
  1311. }
  1312. void MultiOpsTxnsStressTest::PersistKeySpacesDesc(
  1313. const std::string& key_spaces_path, uint32_t lb_a, uint32_t ub_a,
  1314. uint32_t lb_c, uint32_t ub_c) {
  1315. KeySpaces key_spaces(lb_a, ub_a, lb_c, ub_c);
  1316. std::string key_spaces_rep = key_spaces.EncodeTo();
  1317. std::unique_ptr<WritableFile> wfile;
  1318. Status s1 =
  1319. Env::Default()->NewWritableFile(key_spaces_path, &wfile, EnvOptions());
  1320. assert(s1.ok());
  1321. assert(wfile);
  1322. s1 = wfile->Append(key_spaces_rep);
  1323. assert(s1.ok());
  1324. }
  1325. MultiOpsTxnsStressTest::KeySpaces MultiOpsTxnsStressTest::ReadKeySpacesDesc(
  1326. const std::string& key_spaces_path) {
  1327. KeySpaces key_spaces;
  1328. std::unique_ptr<SequentialFile> sfile;
  1329. Status s1 =
  1330. Env::Default()->NewSequentialFile(key_spaces_path, &sfile, EnvOptions());
  1331. assert(s1.ok());
  1332. assert(sfile);
  1333. char buf[16];
  1334. Slice result;
  1335. s1 = sfile->Read(sizeof(buf), &result, buf);
  1336. assert(s1.ok());
  1337. if (!key_spaces.DecodeFrom(result)) {
  1338. assert(false);
  1339. }
  1340. return key_spaces;
  1341. }
  1342. // Create an empty database if necessary and preload it with initial test data.
  1343. // Key range [lb_a, ub_a), [lb_c, ub_c). The key ranges will be shared by
  1344. // 'threads' threads.
  1345. // PreloadDb() also sets up KeyGenerator objects for each sub key range
  1346. // operated on by each thread.
  1347. // Both [lb_a, ub_a) and [lb_c, ub_c) are partitioned. Each thread operates on
  1348. // one sub range, using KeyGenerators to generate keys.
  1349. // For example, we choose a from [0, 10000) and c from [0, 100). Number of
  1350. // threads is 32, their tids range from 0 to 31.
  1351. // Thread k chooses a from [312*k,312*(k+1)) and c from [3*k,3*(k+1)) if k<31.
  1352. // Thread 31 chooses a from [9672, 10000) and c from [93, 100).
  1353. // Within each subrange: a from [low1, high1), c from [low2, high2).
  1354. // high1 - low1 > high2 - low2
  1355. // We reserve {high1 - 1} and {high2 - 1} as unallocated.
  1356. // The records are <low1,low2>, <low1+1,low2+1>, ...,
  1357. // <low1+k,low2+k%(high2-low2-1), <low1+k+1,low2+(k+1)%(high2-low2-1)>, ...
  1358. void MultiOpsTxnsStressTest::PreloadDb(SharedState* shared, int threads,
  1359. uint32_t lb_a, uint32_t ub_a,
  1360. uint32_t lb_c, uint32_t ub_c) {
  1361. key_gen_for_a_.resize(threads);
  1362. key_gen_for_c_.resize(threads);
  1363. assert(ub_a > lb_a && ub_a > lb_a + threads);
  1364. assert(ub_c > lb_c && ub_c > lb_c + threads);
  1365. PersistKeySpacesDesc(FLAGS_key_spaces_path, lb_a, ub_a, lb_c, ub_c);
  1366. fprintf(stdout, "a from [%u, %u), c from [%u, %u)\n",
  1367. static_cast<unsigned int>(lb_a), static_cast<unsigned int>(ub_a),
  1368. static_cast<unsigned int>(lb_c), static_cast<unsigned int>(ub_c));
  1369. const uint32_t num_c = ub_c - lb_c;
  1370. const uint32_t num_c_per_thread = num_c / threads;
  1371. const uint32_t num_a = ub_a - lb_a;
  1372. const uint32_t num_a_per_thread = num_a / threads;
  1373. WriteOptions wopts;
  1374. wopts.disableWAL = FLAGS_disable_wal;
  1375. Random rnd(shared->GetSeed());
  1376. assert(txn_db_);
  1377. std::vector<KeySet> existing_a_uniqs(threads);
  1378. std::vector<KeySet> non_existing_a_uniqs(threads);
  1379. std::vector<KeySet> existing_c_uniqs(threads);
  1380. std::vector<KeySet> non_existing_c_uniqs(threads);
  1381. for (uint32_t a = lb_a; a < ub_a; ++a) {
  1382. uint32_t tid = (a - lb_a) / num_a_per_thread;
  1383. if (tid >= static_cast<uint32_t>(threads)) {
  1384. tid = threads - 1;
  1385. }
  1386. uint32_t a_base = lb_a + tid * num_a_per_thread;
  1387. uint32_t a_hi = (tid < static_cast<uint32_t>(threads - 1))
  1388. ? (a_base + num_a_per_thread)
  1389. : ub_a;
  1390. uint32_t a_delta = a - a_base;
  1391. if (a == a_hi - 1) {
  1392. non_existing_a_uniqs[tid].insert(a);
  1393. continue;
  1394. }
  1395. uint32_t c_base = lb_c + tid * num_c_per_thread;
  1396. uint32_t c_hi = (tid < static_cast<uint32_t>(threads - 1))
  1397. ? (c_base + num_c_per_thread)
  1398. : ub_c;
  1399. uint32_t c_delta = a_delta % (c_hi - c_base - 1);
  1400. uint32_t c = c_base + c_delta;
  1401. uint32_t b = rnd.Next();
  1402. Record record(a, b, c);
  1403. WriteBatch wb;
  1404. const auto primary_index_entry = record.EncodePrimaryIndexEntry();
  1405. Status s = wb.Put(primary_index_entry.first, primary_index_entry.second);
  1406. ProcessStatus(shared, "PreloadDB", s, /*ignore_injected_error=*/false);
  1407. const auto secondary_index_entry = record.EncodeSecondaryIndexEntry();
  1408. s = wb.Put(secondary_index_entry.first, secondary_index_entry.second);
  1409. ProcessStatus(shared, "PreloadDB", s, /*ignore_injected_error=*/false);
  1410. s = txn_db_->Write(wopts, &wb);
  1411. assert(s.ok());
  1412. ProcessStatus(shared, "PreloadDB", s, /*ignore_injected_error=*/false);
  1413. // TODO (yanqin): make the following check optional, especially when data
  1414. // size is large.
  1415. Record tmp_rec;
  1416. tmp_rec.SetB(record.b_value());
  1417. s = tmp_rec.DecodeSecondaryIndexEntry(secondary_index_entry.first,
  1418. secondary_index_entry.second);
  1419. ProcessStatus(shared, "PreloadDB", s, /*ignore_injected_error=*/false);
  1420. assert(tmp_rec == record);
  1421. existing_a_uniqs[tid].insert(a);
  1422. existing_c_uniqs[tid].insert(c);
  1423. }
  1424. for (int i = 0; i < threads; ++i) {
  1425. uint32_t my_seed = i + shared->GetSeed();
  1426. auto& key_gen_for_a = key_gen_for_a_[i];
  1427. assert(!key_gen_for_a);
  1428. uint32_t low = lb_a + i * num_a_per_thread;
  1429. uint32_t high = (i < threads - 1) ? (low + num_a_per_thread) : ub_a;
  1430. assert(existing_a_uniqs[i].size() == high - low - 1);
  1431. assert(non_existing_a_uniqs[i].size() == 1);
  1432. key_gen_for_a = std::make_unique<KeyGenerator>(
  1433. my_seed, low, high, std::move(existing_a_uniqs[i]),
  1434. std::move(non_existing_a_uniqs[i]));
  1435. auto& key_gen_for_c = key_gen_for_c_[i];
  1436. assert(!key_gen_for_c);
  1437. low = lb_c + i * num_c_per_thread;
  1438. high = (i < threads - 1) ? (low + num_c_per_thread) : ub_c;
  1439. non_existing_c_uniqs[i].insert(high - 1);
  1440. assert(existing_c_uniqs[i].size() == high - low - 1);
  1441. assert(non_existing_c_uniqs[i].size() == 1);
  1442. key_gen_for_c = std::make_unique<KeyGenerator>(
  1443. my_seed, low, high, std::move(existing_c_uniqs[i]),
  1444. std::move(non_existing_c_uniqs[i]));
  1445. }
  1446. }
  1447. // Scan an existing, non-empty database.
  1448. // Set up [lb_a, ub_a) and [lb_c, ub_c) as test key ranges.
  1449. // Set up KeyGenerator objects for each sub key range operated on by each
  1450. // thread.
  1451. // Scan the entire database and for each subrange, populate the existing keys
  1452. // and non-existing keys. We currently require the non-existing keys be
  1453. // non-empty after initialization.
  1454. void MultiOpsTxnsStressTest::ScanExistingDb(SharedState* shared, int threads) {
  1455. key_gen_for_a_.resize(threads);
  1456. key_gen_for_c_.resize(threads);
  1457. KeySpaces key_spaces = ReadKeySpacesDesc(FLAGS_key_spaces_path);
  1458. const uint32_t lb_a = key_spaces.lb_a;
  1459. const uint32_t ub_a = key_spaces.ub_a;
  1460. const uint32_t lb_c = key_spaces.lb_c;
  1461. const uint32_t ub_c = key_spaces.ub_c;
  1462. assert(lb_a < ub_a && lb_c < ub_c);
  1463. fprintf(stdout, "a from [%u, %u), c from [%u, %u)\n",
  1464. static_cast<unsigned int>(lb_a), static_cast<unsigned int>(ub_a),
  1465. static_cast<unsigned int>(lb_c), static_cast<unsigned int>(ub_c));
  1466. assert(ub_a > lb_a && ub_a > lb_a + threads);
  1467. assert(ub_c > lb_c && ub_c > lb_c + threads);
  1468. const uint32_t num_c = ub_c - lb_c;
  1469. const uint32_t num_c_per_thread = num_c / threads;
  1470. const uint32_t num_a = ub_a - lb_a;
  1471. const uint32_t num_a_per_thread = num_a / threads;
  1472. assert(db_);
  1473. ReadOptions ropts;
  1474. std::unique_ptr<ManagedSnapshot> snapshot = nullptr;
  1475. if (FLAGS_auto_refresh_iterator_with_snapshot) {
  1476. snapshot = std::make_unique<ManagedSnapshot>(db_);
  1477. ropts.snapshot = snapshot->snapshot();
  1478. ropts.auto_refresh_iterator_with_snapshot = true;
  1479. }
  1480. std::vector<KeySet> existing_a_uniqs(threads);
  1481. std::vector<KeySet> non_existing_a_uniqs(threads);
  1482. std::vector<KeySet> existing_c_uniqs(threads);
  1483. std::vector<KeySet> non_existing_c_uniqs(threads);
  1484. {
  1485. std::string pk_lb_str = Record::EncodePrimaryKey(0);
  1486. std::string pk_ub_str =
  1487. Record::EncodePrimaryKey(std::numeric_limits<uint32_t>::max());
  1488. Slice pk_lb = pk_lb_str;
  1489. Slice pk_ub = pk_ub_str;
  1490. ropts.iterate_lower_bound = &pk_lb;
  1491. ropts.iterate_upper_bound = &pk_ub;
  1492. ropts.total_order_seek = true;
  1493. if (FLAGS_use_sqfc_for_range_queries) {
  1494. ropts.table_filter =
  1495. sqfc_factory_->GetTableFilterForRangeQuery(pk_lb, pk_ub);
  1496. }
  1497. std::unique_ptr<Iterator> it(db_->NewIterator(ropts));
  1498. for (it->SeekToFirst(); it->Valid(); it->Next()) {
  1499. Record record;
  1500. Status s = record.DecodePrimaryIndexEntry(it->key(), it->value());
  1501. if (!s.ok()) {
  1502. fprintf(stderr, "Cannot decode primary index entry (%s => %s): %s\n",
  1503. it->key().ToString(true).c_str(),
  1504. it->value().ToString(true).c_str(), s.ToString().c_str());
  1505. assert(false);
  1506. }
  1507. uint32_t a = record.a_value();
  1508. assert(a >= lb_a);
  1509. assert(a < ub_a);
  1510. uint32_t tid = (a - lb_a) / num_a_per_thread;
  1511. if (tid >= static_cast<uint32_t>(threads)) {
  1512. tid = threads - 1;
  1513. }
  1514. existing_a_uniqs[tid].insert(a);
  1515. uint32_t c = record.c_value();
  1516. assert(c >= lb_c);
  1517. assert(c < ub_c);
  1518. tid = (c - lb_c) / num_c_per_thread;
  1519. if (tid >= static_cast<uint32_t>(threads)) {
  1520. tid = threads - 1;
  1521. }
  1522. auto& existing_c_uniq = existing_c_uniqs[tid];
  1523. existing_c_uniq.insert(c);
  1524. }
  1525. for (uint32_t a = lb_a; a < ub_a; ++a) {
  1526. uint32_t tid = (a - lb_a) / num_a_per_thread;
  1527. if (tid >= static_cast<uint32_t>(threads)) {
  1528. tid = threads - 1;
  1529. }
  1530. if (0 == existing_a_uniqs[tid].count(a)) {
  1531. non_existing_a_uniqs[tid].insert(a);
  1532. }
  1533. }
  1534. for (uint32_t c = lb_c; c < ub_c; ++c) {
  1535. uint32_t tid = (c - lb_c) / num_c_per_thread;
  1536. if (tid >= static_cast<uint32_t>(threads)) {
  1537. tid = threads - 1;
  1538. }
  1539. if (0 == existing_c_uniqs[tid].count(c)) {
  1540. non_existing_c_uniqs[tid].insert(c);
  1541. }
  1542. }
  1543. for (int i = 0; i < threads; ++i) {
  1544. uint32_t my_seed = i + shared->GetSeed();
  1545. auto& key_gen_for_a = key_gen_for_a_[i];
  1546. assert(!key_gen_for_a);
  1547. uint32_t low = lb_a + i * num_a_per_thread;
  1548. uint32_t high = (i < threads - 1) ? (low + num_a_per_thread) : ub_a;
  1549. // The following two assertions assume the test thread count and key
  1550. // space remain the same across different runs. Will need to relax.
  1551. assert(existing_a_uniqs[i].size() == high - low - 1);
  1552. assert(non_existing_a_uniqs[i].size() == 1);
  1553. key_gen_for_a = std::make_unique<KeyGenerator>(
  1554. my_seed, low, high, std::move(existing_a_uniqs[i]),
  1555. std::move(non_existing_a_uniqs[i]));
  1556. auto& key_gen_for_c = key_gen_for_c_[i];
  1557. assert(!key_gen_for_c);
  1558. low = lb_c + i * num_c_per_thread;
  1559. high = (i < threads - 1) ? (low + num_c_per_thread) : ub_c;
  1560. // The following two assertions assume the test thread count and key
  1561. // space remain the same across different runs. Will need to relax.
  1562. assert(existing_c_uniqs[i].size() == high - low - 1);
  1563. assert(non_existing_c_uniqs[i].size() == 1);
  1564. key_gen_for_c = std::make_unique<KeyGenerator>(
  1565. my_seed, low, high, std::move(existing_c_uniqs[i]),
  1566. std::move(non_existing_c_uniqs[i]));
  1567. }
  1568. }
  1569. }
  1570. StressTest* CreateMultiOpsTxnsStressTest() {
  1571. return new MultiOpsTxnsStressTest();
  1572. }
  1573. void CheckAndSetOptionsForMultiOpsTxnStressTest() {
  1574. if (FLAGS_test_batches_snapshots || FLAGS_test_cf_consistency) {
  1575. fprintf(stderr,
  1576. "-test_multi_ops_txns is not compatible with "
  1577. "-test_bathces_snapshots and -test_cf_consistency\n");
  1578. exit(1);
  1579. }
  1580. if (!FLAGS_use_txn) {
  1581. fprintf(stderr, "-use_txn must be true if -test_multi_ops_txns\n");
  1582. exit(1);
  1583. } else if (FLAGS_test_secondary > 0) {
  1584. fprintf(
  1585. stderr,
  1586. "secondary instance does not support replaying logs (MANIFEST + WAL) "
  1587. "of TransactionDB with write-prepared/write-unprepared policy\n");
  1588. exit(1);
  1589. }
  1590. if (FLAGS_clear_column_family_one_in > 0) {
  1591. fprintf(stderr,
  1592. "-test_multi_ops_txns is not compatible with clearing column "
  1593. "families\n");
  1594. exit(1);
  1595. }
  1596. if (FLAGS_column_families > 1) {
  1597. // TODO (yanqin) support separating primary index and secondary index in
  1598. // different column families.
  1599. fprintf(stderr,
  1600. "-test_multi_ops_txns currently does not use more than one column "
  1601. "family\n");
  1602. exit(1);
  1603. }
  1604. if (FLAGS_writepercent > 0 || FLAGS_delpercent > 0 ||
  1605. FLAGS_delrangepercent > 0) {
  1606. fprintf(stderr,
  1607. "-test_multi_ops_txns requires that -writepercent, -delpercent and "
  1608. "-delrangepercent be 0\n");
  1609. exit(1);
  1610. }
  1611. if (FLAGS_key_spaces_path.empty()) {
  1612. fprintf(stderr,
  1613. "Must specify a file to store ranges of A and C via "
  1614. "-key_spaces_path\n");
  1615. exit(1);
  1616. }
  1617. if (FLAGS_create_timestamped_snapshot_one_in > 0) {
  1618. if (FLAGS_txn_write_policy !=
  1619. static_cast<uint64_t>(TxnDBWritePolicy::WRITE_COMMITTED)) {
  1620. fprintf(stderr,
  1621. "Timestamped snapshot is not yet supported by "
  1622. "write-prepared/write-unprepared transactions\n");
  1623. exit(1);
  1624. }
  1625. }
  1626. if (FLAGS_sync_fault_injection == 1) {
  1627. fprintf(stderr,
  1628. "Sync fault injection is currently not supported in "
  1629. "-test_multi_ops_txns\n");
  1630. exit(1);
  1631. }
  1632. }
  1633. } // namespace ROCKSDB_NAMESPACE
  1634. #endif // GFLAGS