batched_ops_stress.cc 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #ifdef GFLAGS
  10. #include "db_stress_tool/db_stress_common.h"
  11. namespace ROCKSDB_NAMESPACE {
  12. class BatchedOpsStressTest : public StressTest {
  13. public:
  14. BatchedOpsStressTest() {}
  15. virtual ~BatchedOpsStressTest() {}
  16. // Given a key K and value V, this puts ("0"+K, "0"+V), ("1"+K, "1"+V), ...
  17. // ("9"+K, "9"+V) in DB atomically i.e in a single batch.
  18. // Also refer BatchedOpsStressTest::TestGet
  19. Status TestPut(ThreadState* thread, WriteOptions& write_opts,
  20. const ReadOptions& /* read_opts */,
  21. const std::vector<int>& rand_column_families,
  22. const std::vector<int64_t>& rand_keys, char (&value)[100],
  23. std::unique_ptr<MutexLock>& /* lock */) override {
  24. uint32_t value_base =
  25. thread->rand.Next() % thread->shared->UNKNOWN_SENTINEL;
  26. size_t sz = GenerateValue(value_base, value, sizeof(value));
  27. Slice v(value, sz);
  28. std::string keys[10] = {"9", "8", "7", "6", "5", "4", "3", "2", "1", "0"};
  29. std::string values[10] = {"9", "8", "7", "6", "5", "4", "3", "2", "1", "0"};
  30. Slice value_slices[10];
  31. WriteBatch batch;
  32. Status s;
  33. auto cfh = column_families_[rand_column_families[0]];
  34. std::string key_str = Key(rand_keys[0]);
  35. for (int i = 0; i < 10; i++) {
  36. keys[i] += key_str;
  37. values[i] += v.ToString();
  38. value_slices[i] = values[i];
  39. if (FLAGS_use_merge) {
  40. batch.Merge(cfh, keys[i], value_slices[i]);
  41. } else {
  42. batch.Put(cfh, keys[i], value_slices[i]);
  43. }
  44. }
  45. s = db_->Write(write_opts, &batch);
  46. if (!s.ok()) {
  47. fprintf(stderr, "multiput error: %s\n", s.ToString().c_str());
  48. thread->stats.AddErrors(1);
  49. } else {
  50. // we did 10 writes each of size sz + 1
  51. thread->stats.AddBytesForWrites(10, (sz + 1) * 10);
  52. }
  53. return s;
  54. }
  55. // Given a key K, this deletes ("0"+K), ("1"+K),... ("9"+K)
  56. // in DB atomically i.e in a single batch. Also refer MultiGet.
  57. Status TestDelete(ThreadState* thread, WriteOptions& writeoptions,
  58. const std::vector<int>& rand_column_families,
  59. const std::vector<int64_t>& rand_keys,
  60. std::unique_ptr<MutexLock>& /* lock */) override {
  61. std::string keys[10] = {"9", "7", "5", "3", "1", "8", "6", "4", "2", "0"};
  62. WriteBatch batch;
  63. Status s;
  64. auto cfh = column_families_[rand_column_families[0]];
  65. std::string key_str = Key(rand_keys[0]);
  66. for (int i = 0; i < 10; i++) {
  67. keys[i] += key_str;
  68. batch.Delete(cfh, keys[i]);
  69. }
  70. s = db_->Write(writeoptions, &batch);
  71. if (!s.ok()) {
  72. fprintf(stderr, "multidelete error: %s\n", s.ToString().c_str());
  73. thread->stats.AddErrors(1);
  74. } else {
  75. thread->stats.AddDeletes(10);
  76. }
  77. return s;
  78. }
  79. Status TestDeleteRange(ThreadState* /* thread */,
  80. WriteOptions& /* write_opts */,
  81. const std::vector<int>& /* rand_column_families */,
  82. const std::vector<int64_t>& /* rand_keys */,
  83. std::unique_ptr<MutexLock>& /* lock */) override {
  84. assert(false);
  85. return Status::NotSupported(
  86. "BatchedOpsStressTest does not support "
  87. "TestDeleteRange");
  88. }
  89. void TestIngestExternalFile(
  90. ThreadState* /* thread */,
  91. const std::vector<int>& /* rand_column_families */,
  92. const std::vector<int64_t>& /* rand_keys */,
  93. std::unique_ptr<MutexLock>& /* lock */) override {
  94. assert(false);
  95. fprintf(stderr,
  96. "BatchedOpsStressTest does not support "
  97. "TestIngestExternalFile\n");
  98. std::terminate();
  99. }
  100. // Given a key K, this gets values for "0"+K, "1"+K,..."9"+K
  101. // in the same snapshot, and verifies that all the values are of the form
  102. // "0"+V, "1"+V,..."9"+V.
  103. // ASSUMES that BatchedOpsStressTest::TestPut was used to put (K, V) into
  104. // the DB.
  105. Status TestGet(ThreadState* thread, const ReadOptions& readoptions,
  106. const std::vector<int>& rand_column_families,
  107. const std::vector<int64_t>& rand_keys) override {
  108. std::string keys[10] = {"0", "1", "2", "3", "4", "5", "6", "7", "8", "9"};
  109. Slice key_slices[10];
  110. std::string values[10];
  111. ReadOptions readoptionscopy = readoptions;
  112. readoptionscopy.snapshot = db_->GetSnapshot();
  113. std::string key_str = Key(rand_keys[0]);
  114. Slice key = key_str;
  115. auto cfh = column_families_[rand_column_families[0]];
  116. std::string from_db;
  117. Status s;
  118. for (int i = 0; i < 10; i++) {
  119. keys[i] += key.ToString();
  120. key_slices[i] = keys[i];
  121. s = db_->Get(readoptionscopy, cfh, key_slices[i], &from_db);
  122. if (!s.ok() && !s.IsNotFound()) {
  123. fprintf(stderr, "get error: %s\n", s.ToString().c_str());
  124. values[i] = "";
  125. thread->stats.AddErrors(1);
  126. // we continue after error rather than exiting so that we can
  127. // find more errors if any
  128. } else if (s.IsNotFound()) {
  129. values[i] = "";
  130. thread->stats.AddGets(1, 0);
  131. } else {
  132. values[i] = from_db;
  133. char expected_prefix = (keys[i])[0];
  134. char actual_prefix = (values[i])[0];
  135. if (actual_prefix != expected_prefix) {
  136. fprintf(stderr, "error expected prefix = %c actual = %c\n",
  137. expected_prefix, actual_prefix);
  138. }
  139. (values[i])[0] = ' '; // blank out the differing character
  140. thread->stats.AddGets(1, 1);
  141. }
  142. }
  143. db_->ReleaseSnapshot(readoptionscopy.snapshot);
  144. // Now that we retrieved all values, check that they all match
  145. for (int i = 1; i < 10; i++) {
  146. if (values[i] != values[0]) {
  147. fprintf(stderr, "error : inconsistent values for key %s: %s, %s\n",
  148. key.ToString(true).c_str(), StringToHex(values[0]).c_str(),
  149. StringToHex(values[i]).c_str());
  150. // we continue after error rather than exiting so that we can
  151. // find more errors if any
  152. }
  153. }
  154. return s;
  155. }
  156. std::vector<Status> TestMultiGet(
  157. ThreadState* thread, const ReadOptions& readoptions,
  158. const std::vector<int>& rand_column_families,
  159. const std::vector<int64_t>& rand_keys) override {
  160. size_t num_keys = rand_keys.size();
  161. std::vector<Status> ret_status(num_keys);
  162. std::array<std::string, 10> keys = {{"0", "1", "2", "3", "4",
  163. "5", "6", "7", "8", "9"}};
  164. size_t num_prefixes = keys.size();
  165. for (size_t rand_key = 0; rand_key < num_keys; ++rand_key) {
  166. std::vector<Slice> key_slices;
  167. std::vector<PinnableSlice> values(num_prefixes);
  168. std::vector<Status> statuses(num_prefixes);
  169. ReadOptions readoptionscopy = readoptions;
  170. readoptionscopy.snapshot = db_->GetSnapshot();
  171. std::vector<std::string> key_str;
  172. key_str.reserve(num_prefixes);
  173. key_slices.reserve(num_prefixes);
  174. std::string from_db;
  175. ColumnFamilyHandle* cfh = column_families_[rand_column_families[0]];
  176. for (size_t key = 0; key < num_prefixes; ++key) {
  177. key_str.emplace_back(keys[key] + Key(rand_keys[rand_key]));
  178. key_slices.emplace_back(key_str.back());
  179. }
  180. db_->MultiGet(readoptionscopy, cfh, num_prefixes, key_slices.data(),
  181. values.data(), statuses.data());
  182. for (size_t i = 0; i < num_prefixes; i++) {
  183. Status s = statuses[i];
  184. if (!s.ok() && !s.IsNotFound()) {
  185. fprintf(stderr, "get error: %s\n", s.ToString().c_str());
  186. thread->stats.AddErrors(1);
  187. ret_status[rand_key] = s;
  188. // we continue after error rather than exiting so that we can
  189. // find more errors if any
  190. } else if (s.IsNotFound()) {
  191. thread->stats.AddGets(1, 0);
  192. ret_status[rand_key] = s;
  193. } else {
  194. char expected_prefix = (keys[i])[0];
  195. char actual_prefix = (values[i])[0];
  196. if (actual_prefix != expected_prefix) {
  197. fprintf(stderr, "error expected prefix = %c actual = %c\n",
  198. expected_prefix, actual_prefix);
  199. }
  200. std::string str;
  201. str.assign(values[i].data(), values[i].size());
  202. values[i].Reset();
  203. str[0] = ' '; // blank out the differing character
  204. values[i].PinSelf(str);
  205. thread->stats.AddGets(1, 1);
  206. }
  207. }
  208. db_->ReleaseSnapshot(readoptionscopy.snapshot);
  209. // Now that we retrieved all values, check that they all match
  210. for (size_t i = 1; i < num_prefixes; i++) {
  211. if (values[i] != values[0]) {
  212. fprintf(stderr, "error : inconsistent values for key %s: %s, %s\n",
  213. key_str[i].c_str(), StringToHex(values[0].ToString()).c_str(),
  214. StringToHex(values[i].ToString()).c_str());
  215. // we continue after error rather than exiting so that we can
  216. // find more errors if any
  217. }
  218. }
  219. }
  220. return ret_status;
  221. }
  222. // Given a key, this does prefix scans for "0"+P, "1"+P,..."9"+P
  223. // in the same snapshot where P is the first FLAGS_prefix_size - 1 bytes
  224. // of the key. Each of these 10 scans returns a series of values;
  225. // each series should be the same length, and it is verified for each
  226. // index i that all the i'th values are of the form "0"+V, "1"+V,..."9"+V.
  227. // ASSUMES that MultiPut was used to put (K, V)
  228. Status TestPrefixScan(ThreadState* thread, const ReadOptions& readoptions,
  229. const std::vector<int>& rand_column_families,
  230. const std::vector<int64_t>& rand_keys) override {
  231. size_t prefix_to_use =
  232. (FLAGS_prefix_size < 0) ? 7 : static_cast<size_t>(FLAGS_prefix_size);
  233. std::string key_str = Key(rand_keys[0]);
  234. Slice key = key_str;
  235. auto cfh = column_families_[rand_column_families[0]];
  236. std::string prefixes[10] = {"0", "1", "2", "3", "4",
  237. "5", "6", "7", "8", "9"};
  238. Slice prefix_slices[10];
  239. ReadOptions readoptionscopy[10];
  240. const Snapshot* snapshot = db_->GetSnapshot();
  241. Iterator* iters[10];
  242. std::string upper_bounds[10];
  243. Slice ub_slices[10];
  244. Status s = Status::OK();
  245. for (int i = 0; i < 10; i++) {
  246. prefixes[i] += key.ToString();
  247. prefixes[i].resize(prefix_to_use);
  248. prefix_slices[i] = Slice(prefixes[i]);
  249. readoptionscopy[i] = readoptions;
  250. readoptionscopy[i].snapshot = snapshot;
  251. if (thread->rand.OneIn(2) &&
  252. GetNextPrefix(prefix_slices[i], &(upper_bounds[i]))) {
  253. // For half of the time, set the upper bound to the next prefix
  254. ub_slices[i] = Slice(upper_bounds[i]);
  255. readoptionscopy[i].iterate_upper_bound = &(ub_slices[i]);
  256. }
  257. iters[i] = db_->NewIterator(readoptionscopy[i], cfh);
  258. iters[i]->Seek(prefix_slices[i]);
  259. }
  260. long count = 0;
  261. while (iters[0]->Valid() && iters[0]->key().starts_with(prefix_slices[0])) {
  262. count++;
  263. std::string values[10];
  264. // get list of all values for this iteration
  265. for (int i = 0; i < 10; i++) {
  266. // no iterator should finish before the first one
  267. assert(iters[i]->Valid() &&
  268. iters[i]->key().starts_with(prefix_slices[i]));
  269. values[i] = iters[i]->value().ToString();
  270. char expected_first = (prefixes[i])[0];
  271. char actual_first = (values[i])[0];
  272. if (actual_first != expected_first) {
  273. fprintf(stderr, "error expected first = %c actual = %c\n",
  274. expected_first, actual_first);
  275. }
  276. (values[i])[0] = ' '; // blank out the differing character
  277. }
  278. // make sure all values are equivalent
  279. for (int i = 0; i < 10; i++) {
  280. if (values[i] != values[0]) {
  281. fprintf(stderr,
  282. "error : %d, inconsistent values for prefix %s: %s, %s\n", i,
  283. prefixes[i].c_str(), StringToHex(values[0]).c_str(),
  284. StringToHex(values[i]).c_str());
  285. // we continue after error rather than exiting so that we can
  286. // find more errors if any
  287. }
  288. iters[i]->Next();
  289. }
  290. }
  291. // cleanup iterators and snapshot
  292. for (int i = 0; i < 10; i++) {
  293. // if the first iterator finished, they should have all finished
  294. assert(!iters[i]->Valid() ||
  295. !iters[i]->key().starts_with(prefix_slices[i]));
  296. assert(iters[i]->status().ok());
  297. delete iters[i];
  298. }
  299. db_->ReleaseSnapshot(snapshot);
  300. if (s.ok()) {
  301. thread->stats.AddPrefixes(1, count);
  302. } else {
  303. fprintf(stderr, "TestPrefixScan error: %s\n", s.ToString().c_str());
  304. thread->stats.AddErrors(1);
  305. }
  306. return s;
  307. }
  308. void VerifyDb(ThreadState* /* thread */) const override {}
  309. };
  310. StressTest* CreateBatchedOpsStressTest() { return new BatchedOpsStressTest(); }
  311. } // namespace ROCKSDB_NAMESPACE
  312. #endif // GFLAGS