batched_ops_stress.cc 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #ifdef GFLAGS
  10. #include "db_stress_tool/db_stress_common.h"
  11. namespace ROCKSDB_NAMESPACE {
  12. class BatchedOpsStressTest : public StressTest {
  13. public:
  14. BatchedOpsStressTest() = default;
  15. virtual ~BatchedOpsStressTest() = default;
  16. bool IsStateTracked() const override { return false; }
  17. // Given a key K and value V, this puts ("0"+K, V+"0"), ("1"+K, V+"1"), ...,
  18. // ("9"+K, V+"9") in DB atomically i.e in a single batch.
  19. // Also refer BatchedOpsStressTest::TestGet
  20. Status TestPut(ThreadState* thread, WriteOptions& write_opts,
  21. const ReadOptions& /* read_opts */,
  22. const std::vector<int>& rand_column_families,
  23. const std::vector<int64_t>& rand_keys,
  24. char (&value)[100]) override {
  25. assert(!rand_column_families.empty());
  26. assert(!rand_keys.empty());
  27. const std::string key_body = Key(rand_keys[0]);
  28. const uint32_t value_base = thread->rand.Next();
  29. const size_t sz = GenerateValue(value_base, value, sizeof(value));
  30. const std::string value_body = Slice(value, sz).ToString();
  31. WriteBatch batch(0 /* reserved_bytes */, 0 /* max_bytes */,
  32. FLAGS_batch_protection_bytes_per_key,
  33. FLAGS_user_timestamp_size);
  34. ColumnFamilyHandle* const cfh = column_families_[rand_column_families[0]];
  35. assert(cfh);
  36. Status status;
  37. for (int i = 9; i >= 0; --i) {
  38. const std::string num = std::to_string(i);
  39. // Note: the digit in num is prepended to the key; however, it is appended
  40. // to the value because we want the "value base" to be encoded uniformly
  41. // at the beginning of the value for all types of stress tests (e.g.
  42. // batched, non-batched, CF consistency).
  43. const std::string k = num + key_body;
  44. const std::string v = value_body + num;
  45. if (FLAGS_use_put_entity_one_in > 0 &&
  46. (value_base % FLAGS_use_put_entity_one_in) == 0) {
  47. if (FLAGS_use_attribute_group) {
  48. status =
  49. batch.PutEntity(k, GenerateAttributeGroups({cfh}, value_base, v));
  50. } else {
  51. status = batch.PutEntity(cfh, k, GenerateWideColumns(value_base, v));
  52. }
  53. } else if (FLAGS_use_timed_put_one_in > 0 &&
  54. ((value_base + kLargePrimeForCommonFactorSkew) %
  55. FLAGS_use_timed_put_one_in) == 0) {
  56. uint64_t write_unix_time = GetWriteUnixTime(thread);
  57. status = batch.TimedPut(cfh, k, v, write_unix_time);
  58. } else if (FLAGS_use_merge) {
  59. status = batch.Merge(cfh, k, v);
  60. } else {
  61. status = batch.Put(cfh, k, v);
  62. }
  63. if (!status.ok()) {
  64. break;
  65. }
  66. }
  67. if (status.ok()) {
  68. status = db_->Write(write_opts, &batch);
  69. }
  70. if (!status.ok()) {
  71. fprintf(stderr, "multiput error: %s\n", status.ToString().c_str());
  72. thread->stats.AddErrors(1);
  73. } else {
  74. // we did 10 writes each of size sz + 1
  75. thread->stats.AddBytesForWrites(10, (sz + 1) * 10);
  76. }
  77. return status;
  78. }
  79. // Given a key K, this deletes ("0"+K), ("1"+K), ..., ("9"+K)
  80. // in DB atomically i.e in a single batch. Also refer MultiGet.
  81. Status TestDelete(ThreadState* thread, WriteOptions& writeoptions,
  82. const std::vector<int>& rand_column_families,
  83. const std::vector<int64_t>& rand_keys) override {
  84. std::string keys[10] = {"9", "7", "5", "3", "1", "8", "6", "4", "2", "0"};
  85. WriteBatch batch(0 /* reserved_bytes */, 0 /* max_bytes */,
  86. FLAGS_batch_protection_bytes_per_key,
  87. FLAGS_user_timestamp_size);
  88. Status s;
  89. auto cfh = column_families_[rand_column_families[0]];
  90. std::string key_str = Key(rand_keys[0]);
  91. for (int i = 0; i < 10; i++) {
  92. keys[i] += key_str;
  93. batch.Delete(cfh, keys[i]);
  94. }
  95. s = db_->Write(writeoptions, &batch);
  96. if (!s.ok()) {
  97. fprintf(stderr, "multidelete error: %s\n", s.ToString().c_str());
  98. thread->stats.AddErrors(1);
  99. } else {
  100. thread->stats.AddDeletes(10);
  101. }
  102. return s;
  103. }
  104. Status TestDeleteRange(ThreadState* /* thread */,
  105. WriteOptions& /* write_opts */,
  106. const std::vector<int>& /* rand_column_families */,
  107. const std::vector<int64_t>& /* rand_keys */) override {
  108. assert(false);
  109. return Status::NotSupported(
  110. "BatchedOpsStressTest does not support "
  111. "TestDeleteRange");
  112. }
  113. void TestIngestExternalFile(
  114. ThreadState* /* thread */,
  115. const std::vector<int>& /* rand_column_families */,
  116. const std::vector<int64_t>& /* rand_keys */) override {
  117. assert(false);
  118. fprintf(stderr,
  119. "BatchedOpsStressTest does not support "
  120. "TestIngestExternalFile\n");
  121. std::terminate();
  122. }
  123. // Given a key K, this gets values for "0"+K, "1"+K, ..., "9"+K
  124. // in the same snapshot, and verifies that all the values are of the form
  125. // V+"0", V+"1", ..., V+"9".
  126. // ASSUMES that BatchedOpsStressTest::TestPut was used to put (K, V) into
  127. // the DB.
  128. Status TestGet(ThreadState* thread, const ReadOptions& readoptions,
  129. const std::vector<int>& rand_column_families,
  130. const std::vector<int64_t>& rand_keys) override {
  131. std::string keys[10] = {"0", "1", "2", "3", "4", "5", "6", "7", "8", "9"};
  132. Slice key_slices[10];
  133. std::string values[10];
  134. ReadOptions readoptionscopy = readoptions;
  135. readoptionscopy.snapshot = db_->GetSnapshot();
  136. std::string key_str = Key(rand_keys[0]);
  137. Slice key = key_str;
  138. auto cfh = column_families_[rand_column_families[0]];
  139. std::string from_db;
  140. Status s;
  141. for (int i = 0; i < 10; i++) {
  142. keys[i] += key.ToString();
  143. key_slices[i] = keys[i];
  144. s = db_->Get(readoptionscopy, cfh, key_slices[i], &from_db);
  145. if (!s.ok() && !s.IsNotFound()) {
  146. fprintf(stderr, "get error: %s\n", s.ToString().c_str());
  147. values[i] = "";
  148. thread->stats.AddErrors(1);
  149. // we continue after error rather than exiting so that we can
  150. // find more errors if any
  151. } else if (s.IsNotFound()) {
  152. values[i] = "";
  153. thread->stats.AddGets(1, 0);
  154. } else {
  155. values[i] = from_db;
  156. assert(!keys[i].empty());
  157. assert(!values[i].empty());
  158. const char expected = keys[i].front();
  159. const char actual = values[i].back();
  160. if (expected != actual) {
  161. fprintf(stderr, "get error expected = %c actual = %c\n", expected,
  162. actual);
  163. }
  164. values[i].pop_back(); // get rid of the differing character
  165. thread->stats.AddGets(1, 1);
  166. }
  167. }
  168. db_->ReleaseSnapshot(readoptionscopy.snapshot);
  169. // Now that we retrieved all values, check that they all match
  170. for (int i = 1; i < 10; i++) {
  171. if (values[i] != values[0]) {
  172. fprintf(stderr, "get error: inconsistent values for key %s: %s, %s\n",
  173. key.ToString(true).c_str(), StringToHex(values[0]).c_str(),
  174. StringToHex(values[i]).c_str());
  175. // we continue after error rather than exiting so that we can
  176. // find more errors if any
  177. }
  178. }
  179. return s;
  180. }
  181. std::vector<Status> TestMultiGet(
  182. ThreadState* thread, const ReadOptions& readoptions,
  183. const std::vector<int>& rand_column_families,
  184. const std::vector<int64_t>& rand_keys) override {
  185. size_t num_keys = rand_keys.size();
  186. std::vector<Status> ret_status(num_keys);
  187. std::array<std::string, 10> keys = {
  188. {"0", "1", "2", "3", "4", "5", "6", "7", "8", "9"}};
  189. size_t num_prefixes = keys.size();
  190. for (size_t rand_key = 0; rand_key < num_keys; ++rand_key) {
  191. std::vector<Slice> key_slices;
  192. std::vector<PinnableSlice> values(num_prefixes);
  193. std::vector<Status> statuses(num_prefixes);
  194. ReadOptions readoptionscopy = readoptions;
  195. readoptionscopy.snapshot = db_->GetSnapshot();
  196. readoptionscopy.rate_limiter_priority =
  197. FLAGS_rate_limit_user_ops ? Env::IO_USER : Env::IO_TOTAL;
  198. std::vector<std::string> key_str;
  199. key_str.reserve(num_prefixes);
  200. key_slices.reserve(num_prefixes);
  201. std::string from_db;
  202. ColumnFamilyHandle* cfh = column_families_[rand_column_families[0]];
  203. for (size_t key = 0; key < num_prefixes; ++key) {
  204. key_str.emplace_back(keys[key] + Key(rand_keys[rand_key]));
  205. key_slices.emplace_back(key_str.back());
  206. }
  207. db_->MultiGet(readoptionscopy, cfh, num_prefixes, key_slices.data(),
  208. values.data(), statuses.data());
  209. for (size_t i = 0; i < num_prefixes; i++) {
  210. Status s = statuses[i];
  211. if (!s.ok() && !s.IsNotFound()) {
  212. fprintf(stderr, "multiget error: %s\n", s.ToString().c_str());
  213. thread->stats.AddErrors(1);
  214. ret_status[rand_key] = s;
  215. // we continue after error rather than exiting so that we can
  216. // find more errors if any
  217. } else if (s.IsNotFound()) {
  218. thread->stats.AddGets(1, 0);
  219. ret_status[rand_key] = s;
  220. } else {
  221. assert(!keys[i].empty());
  222. assert(!values[i].empty());
  223. const char expected = keys[i][0];
  224. const char actual = values[i][values[i].size() - 1];
  225. if (expected != actual) {
  226. fprintf(stderr, "multiget error expected = %c actual = %c\n",
  227. expected, actual);
  228. }
  229. values[i].remove_suffix(1); // get rid of the differing character
  230. thread->stats.AddGets(1, 1);
  231. }
  232. }
  233. db_->ReleaseSnapshot(readoptionscopy.snapshot);
  234. // Now that we retrieved all values, check that they all match
  235. for (size_t i = 1; i < num_prefixes; i++) {
  236. if (values[i] != values[0]) {
  237. fprintf(stderr,
  238. "multiget error: inconsistent values for key %s: %s, %s\n",
  239. StringToHex(key_str[i]).c_str(),
  240. StringToHex(values[0].ToString()).c_str(),
  241. StringToHex(values[i].ToString()).c_str());
  242. // we continue after error rather than exiting so that we can
  243. // find more errors if any
  244. }
  245. }
  246. }
  247. return ret_status;
  248. }
  249. void TestGetEntity(ThreadState* thread, const ReadOptions& read_opts,
  250. const std::vector<int>& rand_column_families,
  251. const std::vector<int64_t>& rand_keys) override {
  252. assert(thread);
  253. ManagedSnapshot snapshot_guard(db_);
  254. ReadOptions read_opts_copy(read_opts);
  255. read_opts_copy.snapshot = snapshot_guard.snapshot();
  256. assert(!rand_keys.empty());
  257. const std::string key_suffix = Key(rand_keys[0]);
  258. assert(!rand_column_families.empty());
  259. assert(rand_column_families[0] >= 0);
  260. assert(rand_column_families[0] < static_cast<int>(column_families_.size()));
  261. ColumnFamilyHandle* const cfh = column_families_[rand_column_families[0]];
  262. assert(cfh);
  263. constexpr size_t num_keys = 10;
  264. std::array<PinnableWideColumns, num_keys> column_results;
  265. std::array<PinnableAttributeGroups, num_keys> attribute_group_results;
  266. std::string error_msg_header = FLAGS_use_attribute_group
  267. ? "GetEntity (AttributeGroup) error"
  268. : "GetEntity error";
  269. for (size_t i = 0; i < num_keys; ++i) {
  270. const std::string key = std::to_string(i) + key_suffix;
  271. Status s;
  272. if (FLAGS_use_attribute_group) {
  273. attribute_group_results[i].emplace_back(cfh);
  274. s = db_->GetEntity(read_opts_copy, key, &attribute_group_results[i]);
  275. if (s.ok()) {
  276. s = attribute_group_results[i].back().status();
  277. }
  278. } else {
  279. s = db_->GetEntity(read_opts_copy, cfh, key, &column_results[i]);
  280. }
  281. if (!s.ok() && !s.IsNotFound()) {
  282. fprintf(stderr, "%s: %s\n", error_msg_header.c_str(),
  283. s.ToString().c_str());
  284. thread->stats.AddErrors(1);
  285. } else if (s.IsNotFound()) {
  286. thread->stats.AddGets(1, 0);
  287. } else {
  288. thread->stats.AddGets(1, 1);
  289. }
  290. }
  291. const WideColumns& columns_to_compare =
  292. FLAGS_use_attribute_group ? attribute_group_results[0].front().columns()
  293. : column_results[0].columns();
  294. for (size_t i = 1; i < num_keys; ++i) {
  295. const WideColumns& columns =
  296. FLAGS_use_attribute_group
  297. ? attribute_group_results[i].front().columns()
  298. : column_results[i].columns();
  299. if (!CompareColumns(columns_to_compare, columns)) {
  300. fprintf(stderr, "%s: inconsistent entities for key %s: %s, %s\n",
  301. error_msg_header.c_str(), StringToHex(key_suffix).c_str(),
  302. WideColumnsToHex(columns_to_compare).c_str(),
  303. WideColumnsToHex(columns).c_str());
  304. }
  305. if (!columns.empty()) {
  306. // The last character of each column value should be 'i' as a decimal
  307. // digit
  308. const char expected = static_cast<char>('0' + i);
  309. for (const auto& column : columns) {
  310. const Slice& value = column.value();
  311. if (value.empty() || value[value.size() - 1] != expected) {
  312. fprintf(stderr,
  313. "%s: incorrect column value for key "
  314. "%s, entity %s, column value %s, expected %c\n",
  315. error_msg_header.c_str(), StringToHex(key_suffix).c_str(),
  316. WideColumnsToHex(columns).c_str(),
  317. value.ToString(/* hex */ true).c_str(), expected);
  318. }
  319. }
  320. if (!VerifyWideColumns(columns)) {
  321. fprintf(stderr, "%s: inconsistent columns for key %s, entity %s\n",
  322. error_msg_header.c_str(), StringToHex(key_suffix).c_str(),
  323. WideColumnsToHex(columns).c_str());
  324. }
  325. }
  326. }
  327. }
  328. void TestMultiGetEntity(ThreadState* thread, const ReadOptions& read_opts,
  329. const std::vector<int>& rand_column_families,
  330. const std::vector<int64_t>& rand_keys) override {
  331. assert(thread);
  332. assert(!rand_column_families.empty());
  333. assert(rand_column_families[0] >= 0);
  334. assert(rand_column_families[0] < static_cast<int>(column_families_.size()));
  335. ColumnFamilyHandle* const cfh = column_families_[rand_column_families[0]];
  336. assert(cfh);
  337. assert(!rand_keys.empty());
  338. ManagedSnapshot snapshot_guard(db_);
  339. ReadOptions read_opts_copy(read_opts);
  340. read_opts_copy.snapshot = snapshot_guard.snapshot();
  341. const size_t num_keys = rand_keys.size();
  342. for (size_t i = 0; i < num_keys; ++i) {
  343. const std::string key_suffix = Key(rand_keys[i]);
  344. constexpr size_t num_prefixes = 10;
  345. std::array<std::string, num_prefixes> keys;
  346. std::array<Slice, num_prefixes> key_slices;
  347. for (size_t j = 0; j < num_prefixes; ++j) {
  348. keys[j] = std::to_string(j) + key_suffix;
  349. key_slices[j] = keys[j];
  350. }
  351. if (FLAGS_use_attribute_group) {
  352. // AttributeGroup MultiGetEntity verification
  353. std::vector<PinnableAttributeGroups> results;
  354. results.reserve(num_prefixes);
  355. for (size_t j = 0; j < num_prefixes; ++j) {
  356. PinnableAttributeGroups attribute_groups;
  357. attribute_groups.emplace_back(cfh);
  358. results.emplace_back(std::move(attribute_groups));
  359. }
  360. db_->MultiGetEntity(read_opts_copy, num_prefixes, key_slices.data(),
  361. results.data());
  362. const WideColumns& cmp_columns = results[0][0].columns();
  363. for (size_t j = 0; j < num_prefixes; ++j) {
  364. const auto& attribute_groups = results[j];
  365. assert(attribute_groups.size() == 1);
  366. const Status& s = attribute_groups[0].status();
  367. if (!s.ok() && !s.IsNotFound()) {
  368. fprintf(stderr, "MultiGetEntity (AttributeGroup) error: %s\n",
  369. s.ToString().c_str());
  370. thread->stats.AddErrors(1);
  371. } else if (s.IsNotFound()) {
  372. thread->stats.AddGets(1, 0);
  373. } else {
  374. thread->stats.AddGets(1, 1);
  375. }
  376. const WideColumns& columns = results[j][0].columns();
  377. if (!CompareColumns(cmp_columns, columns)) {
  378. fprintf(stderr,
  379. "MultiGetEntity (AttributeGroup) error: inconsistent "
  380. "entities for key %s: %s, "
  381. "%s\n",
  382. StringToHex(key_suffix).c_str(),
  383. WideColumnsToHex(cmp_columns).c_str(),
  384. WideColumnsToHex(columns).c_str());
  385. }
  386. if (!columns.empty()) {
  387. // The last character of each column value should be 'j' as a
  388. // decimal digit
  389. const char expected = static_cast<char>('0' + j);
  390. for (const auto& column : columns) {
  391. const Slice& value = column.value();
  392. if (value.empty() || value[value.size() - 1] != expected) {
  393. fprintf(stderr,
  394. "MultiGetEntity (AttributeGroup) error: incorrect "
  395. "column value for key "
  396. "%s, entity %s, column value %s, expected %c\n",
  397. StringToHex(key_suffix).c_str(),
  398. WideColumnsToHex(columns).c_str(),
  399. value.ToString(/* hex */ true).c_str(), expected);
  400. }
  401. }
  402. if (!VerifyWideColumns(columns)) {
  403. fprintf(stderr,
  404. "MultiGetEntity (AttributeGroup) error: inconsistent "
  405. "columns for key %s, "
  406. "entity %s\n",
  407. StringToHex(key_suffix).c_str(),
  408. WideColumnsToHex(columns).c_str());
  409. }
  410. }
  411. }
  412. } else {
  413. // Non-AttributeGroup MultiGetEntity verification
  414. std::array<PinnableWideColumns, num_prefixes> results;
  415. std::array<Status, num_prefixes> statuses;
  416. db_->MultiGetEntity(read_opts_copy, cfh, num_prefixes,
  417. key_slices.data(), results.data(), statuses.data());
  418. const WideColumns& cmp_columns = results[0].columns();
  419. for (size_t j = 0; j < num_prefixes; ++j) {
  420. const Status& s = statuses[j];
  421. if (!s.ok() && !s.IsNotFound()) {
  422. fprintf(stderr, "MultiGetEntity error: %s\n", s.ToString().c_str());
  423. thread->stats.AddErrors(1);
  424. } else if (s.IsNotFound()) {
  425. thread->stats.AddGets(1, 0);
  426. } else {
  427. thread->stats.AddGets(1, 1);
  428. }
  429. const WideColumns& columns = results[j].columns();
  430. if (!CompareColumns(cmp_columns, columns)) {
  431. fprintf(
  432. stderr,
  433. "MultiGetEntity error: inconsistent entities for key %s: %s, "
  434. "%s\n",
  435. StringToHex(key_suffix).c_str(),
  436. WideColumnsToHex(cmp_columns).c_str(),
  437. WideColumnsToHex(columns).c_str());
  438. }
  439. if (!columns.empty()) {
  440. // The last character of each column value should be 'j' as a
  441. // decimal digit
  442. const char expected = static_cast<char>('0' + j);
  443. for (const auto& column : columns) {
  444. const Slice& value = column.value();
  445. if (value.empty() || value[value.size() - 1] != expected) {
  446. fprintf(stderr,
  447. "MultiGetEntity error: incorrect column value for key "
  448. "%s, entity %s, column value %s, expected %c\n",
  449. StringToHex(key_suffix).c_str(),
  450. WideColumnsToHex(columns).c_str(),
  451. value.ToString(/* hex */ true).c_str(), expected);
  452. }
  453. }
  454. if (!VerifyWideColumns(columns)) {
  455. fprintf(stderr,
  456. "MultiGetEntity error: inconsistent columns for key %s, "
  457. "entity %s\n",
  458. StringToHex(key_suffix).c_str(),
  459. WideColumnsToHex(columns).c_str());
  460. }
  461. }
  462. }
  463. }
  464. }
  465. }
  466. // Given a key, this does prefix scans for "0"+P, "1"+P, ..., "9"+P
  467. // in the same snapshot where P is the first FLAGS_prefix_size - 1 bytes
  468. // of the key. Each of these 10 scans returns a series of values;
  469. // each series should be the same length, and it is verified for each
  470. // index i that all the i'th values are of the form V+"0", V+"1", ..., V+"9".
  471. // ASSUMES that MultiPut was used to put (K, V)
  472. Status TestPrefixScan(ThreadState* thread, const ReadOptions& readoptions,
  473. const std::vector<int>& rand_column_families,
  474. const std::vector<int64_t>& rand_keys) override {
  475. assert(!rand_column_families.empty());
  476. assert(!rand_keys.empty());
  477. const std::string key = Key(rand_keys[0]);
  478. assert(FLAGS_prefix_size > 0);
  479. const size_t prefix_to_use = static_cast<size_t>(FLAGS_prefix_size);
  480. constexpr size_t num_prefixes = 10;
  481. std::array<std::string, num_prefixes> prefixes;
  482. std::array<Slice, num_prefixes> prefix_slices;
  483. std::array<ReadOptions, num_prefixes> ro_copies;
  484. std::array<std::string, num_prefixes> upper_bounds;
  485. std::array<Slice, num_prefixes> ub_slices;
  486. std::array<std::unique_ptr<Iterator>, num_prefixes> iters;
  487. const Snapshot* const snapshot = db_->GetSnapshot();
  488. ColumnFamilyHandle* const cfh = column_families_[rand_column_families[0]];
  489. assert(cfh);
  490. for (size_t i = 0; i < num_prefixes; ++i) {
  491. prefixes[i] = std::to_string(i) + key;
  492. prefix_slices[i] = Slice(prefixes[i].data(), prefix_to_use);
  493. ro_copies[i] = readoptions;
  494. ro_copies[i].snapshot = snapshot;
  495. if (thread->rand.OneIn(2) &&
  496. GetNextPrefix(prefix_slices[i], &(upper_bounds[i]))) {
  497. // For half of the time, set the upper bound to the next prefix
  498. ub_slices[i] = upper_bounds[i];
  499. ro_copies[i].iterate_upper_bound = &(ub_slices[i]);
  500. if (FLAGS_use_sqfc_for_range_queries) {
  501. ro_copies[i].table_filter =
  502. sqfc_factory_->GetTableFilterForRangeQuery(prefix_slices[i],
  503. ub_slices[i]);
  504. }
  505. }
  506. iters[i].reset(db_->NewIterator(ro_copies[i], cfh));
  507. iters[i]->Seek(prefix_slices[i]);
  508. }
  509. uint64_t count = 0;
  510. while (iters[0]->Valid() && iters[0]->key().starts_with(prefix_slices[0])) {
  511. ++count;
  512. std::array<std::string, num_prefixes> values;
  513. // get list of all values for this iteration
  514. for (size_t i = 0; i < num_prefixes; ++i) {
  515. // no iterator should finish before the first one
  516. assert(iters[i]->Valid() &&
  517. iters[i]->key().starts_with(prefix_slices[i]));
  518. if (ro_copies[i].allow_unprepared_value) {
  519. // Save key in case PrepareValue fails and invalidates the iterator
  520. const std::string prepare_value_key =
  521. iters[i]->key().ToString(/* hex */ true);
  522. if (!iters[i]->PrepareValue()) {
  523. fprintf(stderr,
  524. "prefix scan error: PrepareValue failed for key %s: %s\n",
  525. prepare_value_key.c_str(),
  526. iters[i]->status().ToString().c_str());
  527. continue;
  528. }
  529. }
  530. values[i] = iters[i]->value().ToString();
  531. // make sure the last character of the value is the expected digit
  532. assert(!prefixes[i].empty());
  533. assert(!values[i].empty());
  534. const char expected = prefixes[i].front();
  535. const char actual = values[i].back();
  536. if (expected != actual) {
  537. fprintf(stderr, "prefix scan error expected = %c actual = %c\n",
  538. expected, actual);
  539. }
  540. values[i].pop_back(); // get rid of the differing character
  541. // make sure all values are equivalent
  542. if (values[i] != values[0]) {
  543. fprintf(stderr,
  544. "prefix scan error : %" ROCKSDB_PRIszt
  545. ", inconsistent values for prefix %s: %s, %s\n",
  546. i, prefix_slices[i].ToString(/* hex */ true).c_str(),
  547. StringToHex(values[0]).c_str(),
  548. StringToHex(values[i]).c_str());
  549. // we continue after error rather than exiting so that we can
  550. // find more errors if any
  551. }
  552. // make sure value() and columns() are consistent
  553. if (!VerifyWideColumns(iters[i]->value(), iters[i]->columns())) {
  554. fprintf(stderr,
  555. "prefix scan error : %" ROCKSDB_PRIszt
  556. ", value and columns inconsistent for prefix %s: value: %s, "
  557. "columns: %s\n",
  558. i, prefix_slices[i].ToString(/* hex */ true).c_str(),
  559. iters[i]->value().ToString(/* hex */ true).c_str(),
  560. WideColumnsToHex(iters[i]->columns()).c_str());
  561. }
  562. iters[i]->Next();
  563. }
  564. }
  565. // cleanup iterators and snapshot
  566. for (size_t i = 0; i < num_prefixes; ++i) {
  567. // if the first iterator finished, they should have all finished
  568. assert(!iters[i]->Valid() ||
  569. !iters[i]->key().starts_with(prefix_slices[i]));
  570. assert(iters[i]->status().ok());
  571. }
  572. db_->ReleaseSnapshot(snapshot);
  573. thread->stats.AddPrefixes(1, count);
  574. return Status::OK();
  575. }
  576. void VerifyDb(ThreadState* /* thread */) const override {}
  577. void ContinuouslyVerifyDb(ThreadState* /* thread */) const override {}
  578. // Compare columns ignoring the last character of column values
  579. bool CompareColumns(const WideColumns& lhs, const WideColumns& rhs) {
  580. if (lhs.size() != rhs.size()) {
  581. return false;
  582. }
  583. for (size_t i = 0; i < lhs.size(); ++i) {
  584. if (lhs[i].name() != rhs[i].name()) {
  585. return false;
  586. }
  587. if (lhs[i].value().size() != rhs[i].value().size()) {
  588. return false;
  589. }
  590. if (lhs[i].value().difference_offset(rhs[i].value()) <
  591. lhs[i].value().size() - 1) {
  592. return false;
  593. }
  594. }
  595. return true;
  596. }
  597. };
  598. StressTest* CreateBatchedOpsStressTest() { return new BatchedOpsStressTest(); }
  599. } // namespace ROCKSDB_NAMESPACE
  600. #endif // GFLAGS