cf_consistency_stress.cc 41 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #ifdef GFLAGS
  10. #include "db_stress_tool/db_stress_common.h"
  11. #include "file/file_util.h"
  12. namespace ROCKSDB_NAMESPACE {
  13. class CfConsistencyStressTest : public StressTest {
  14. public:
  15. CfConsistencyStressTest() : batch_id_(0) {}
  16. ~CfConsistencyStressTest() override = default;
  17. bool IsStateTracked() const override { return false; }
  18. Status TestPut(ThreadState* thread, WriteOptions& write_opts,
  19. const ReadOptions& /* read_opts */,
  20. const std::vector<int>& rand_column_families,
  21. const std::vector<int64_t>& rand_keys,
  22. char (&value)[100]) override {
  23. assert(!rand_column_families.empty());
  24. assert(!rand_keys.empty());
  25. const std::string k = Key(rand_keys[0]);
  26. const uint32_t value_base = batch_id_.fetch_add(1);
  27. const size_t sz = GenerateValue(value_base, value, sizeof(value));
  28. const Slice v(value, sz);
  29. WriteBatch batch;
  30. Status status;
  31. if (FLAGS_use_attribute_group && FLAGS_use_put_entity_one_in > 0 &&
  32. (value_base % FLAGS_use_put_entity_one_in) == 0) {
  33. std::vector<ColumnFamilyHandle*> cfhs;
  34. cfhs.reserve(rand_column_families.size());
  35. for (auto cf : rand_column_families) {
  36. cfhs.push_back(column_families_[cf]);
  37. }
  38. status = batch.PutEntity(k, GenerateAttributeGroups(cfhs, value_base, v));
  39. } else {
  40. for (auto cf : rand_column_families) {
  41. ColumnFamilyHandle* const cfh = column_families_[cf];
  42. assert(cfh);
  43. if (FLAGS_use_put_entity_one_in > 0 &&
  44. (value_base % FLAGS_use_put_entity_one_in) == 0) {
  45. status = batch.PutEntity(cfh, k, GenerateWideColumns(value_base, v));
  46. } else if (FLAGS_use_timed_put_one_in > 0 &&
  47. ((value_base + kLargePrimeForCommonFactorSkew) %
  48. FLAGS_use_timed_put_one_in) == 0) {
  49. uint64_t write_unix_time = GetWriteUnixTime(thread);
  50. status = batch.TimedPut(cfh, k, v, write_unix_time);
  51. } else if (FLAGS_use_merge) {
  52. status = batch.Merge(cfh, k, v);
  53. } else {
  54. status = batch.Put(cfh, k, v);
  55. }
  56. if (!status.ok()) {
  57. break;
  58. }
  59. }
  60. }
  61. if (status.ok()) {
  62. status = db_->Write(write_opts, &batch);
  63. }
  64. if (status.ok()) {
  65. auto num = static_cast<long>(rand_column_families.size());
  66. thread->stats.AddBytesForWrites(num, (sz + 1) * num);
  67. } else if (!IsErrorInjectedAndRetryable(status)) {
  68. fprintf(stderr, "multi put or merge error: %s\n",
  69. status.ToString().c_str());
  70. thread->stats.AddErrors(1);
  71. }
  72. return status;
  73. }
  74. Status TestDelete(ThreadState* thread, WriteOptions& write_opts,
  75. const std::vector<int>& rand_column_families,
  76. const std::vector<int64_t>& rand_keys) override {
  77. std::string key_str = Key(rand_keys[0]);
  78. Slice key = key_str;
  79. WriteBatch batch;
  80. for (auto cf : rand_column_families) {
  81. ColumnFamilyHandle* cfh = column_families_[cf];
  82. batch.Delete(cfh, key);
  83. }
  84. Status s = db_->Write(write_opts, &batch);
  85. if (s.ok()) {
  86. thread->stats.AddDeletes(static_cast<long>(rand_column_families.size()));
  87. } else if (!IsErrorInjectedAndRetryable(s)) {
  88. fprintf(stderr, "multidel error: %s\n", s.ToString().c_str());
  89. thread->stats.AddErrors(1);
  90. }
  91. return s;
  92. }
  93. Status TestDeleteRange(ThreadState* thread, WriteOptions& write_opts,
  94. const std::vector<int>& rand_column_families,
  95. const std::vector<int64_t>& rand_keys) override {
  96. int64_t rand_key = rand_keys[0];
  97. auto shared = thread->shared;
  98. int64_t max_key = shared->GetMaxKey();
  99. if (rand_key > max_key - FLAGS_range_deletion_width) {
  100. rand_key =
  101. thread->rand.Next() % (max_key - FLAGS_range_deletion_width + 1);
  102. }
  103. std::string key_str = Key(rand_key);
  104. Slice key = key_str;
  105. std::string end_key_str = Key(rand_key + FLAGS_range_deletion_width);
  106. Slice end_key = end_key_str;
  107. WriteBatch batch;
  108. for (auto cf : rand_column_families) {
  109. ColumnFamilyHandle* cfh = column_families_[rand_column_families[cf]];
  110. batch.DeleteRange(cfh, key, end_key);
  111. }
  112. Status s = db_->Write(write_opts, &batch);
  113. if (s.ok()) {
  114. thread->stats.AddRangeDeletions(
  115. static_cast<long>(rand_column_families.size()));
  116. } else if (!IsErrorInjectedAndRetryable(s)) {
  117. fprintf(stderr, "multi del range error: %s\n", s.ToString().c_str());
  118. thread->stats.AddErrors(1);
  119. }
  120. return s;
  121. }
  122. void TestIngestExternalFile(
  123. ThreadState* /* thread */,
  124. const std::vector<int>& /* rand_column_families */,
  125. const std::vector<int64_t>& /* rand_keys */) override {
  126. assert(false);
  127. fprintf(stderr,
  128. "CfConsistencyStressTest does not support TestIngestExternalFile "
  129. "because it's not possible to verify the result\n");
  130. std::terminate();
  131. }
  132. Status TestGet(ThreadState* thread, const ReadOptions& readoptions,
  133. const std::vector<int>& rand_column_families,
  134. const std::vector<int64_t>& rand_keys) override {
  135. std::string key_str = Key(rand_keys[0]);
  136. Slice key = key_str;
  137. Status s;
  138. bool is_consistent = true;
  139. if (thread->rand.OneIn(2)) {
  140. // 1/2 chance, does a random read from random CF
  141. auto cfh =
  142. column_families_[rand_column_families[thread->rand.Next() %
  143. rand_column_families.size()]];
  144. std::string from_db;
  145. s = db_->Get(readoptions, cfh, key, &from_db);
  146. } else {
  147. // 1/2 chance, comparing one key is the same across all CFs
  148. const Snapshot* snapshot = db_->GetSnapshot();
  149. ReadOptions readoptionscopy = readoptions;
  150. readoptionscopy.snapshot = snapshot;
  151. std::string value0;
  152. s = db_->Get(readoptionscopy, column_families_[rand_column_families[0]],
  153. key, &value0);
  154. // Temporarily disable error injection for verification
  155. if (fault_fs_guard) {
  156. fault_fs_guard->DisableThreadLocalErrorInjection(
  157. FaultInjectionIOType::kRead);
  158. fault_fs_guard->DisableThreadLocalErrorInjection(
  159. FaultInjectionIOType::kMetadataRead);
  160. }
  161. if (s.ok() || s.IsNotFound()) {
  162. bool found = s.ok();
  163. for (size_t i = 1; i < rand_column_families.size(); i++) {
  164. std::string value1;
  165. s = db_->Get(readoptionscopy,
  166. column_families_[rand_column_families[i]], key, &value1);
  167. if (!s.ok() && !s.IsNotFound()) {
  168. break;
  169. }
  170. if (!found && s.ok()) {
  171. fprintf(stderr, "Get() return different results with key %s\n",
  172. Slice(key_str).ToString(true).c_str());
  173. fprintf(stderr, "CF %s is not found\n",
  174. column_family_names_[0].c_str());
  175. fprintf(stderr, "CF %s returns value %s\n",
  176. column_family_names_[i].c_str(),
  177. Slice(value1).ToString(true).c_str());
  178. is_consistent = false;
  179. } else if (found && s.IsNotFound()) {
  180. fprintf(stderr, "Get() return different results with key %s\n",
  181. Slice(key_str).ToString(true).c_str());
  182. fprintf(stderr, "CF %s returns value %s\n",
  183. column_family_names_[0].c_str(),
  184. Slice(value0).ToString(true).c_str());
  185. fprintf(stderr, "CF %s is not found\n",
  186. column_family_names_[i].c_str());
  187. is_consistent = false;
  188. } else if (s.ok() && value0 != value1) {
  189. fprintf(stderr, "Get() return different results with key %s\n",
  190. Slice(key_str).ToString(true).c_str());
  191. fprintf(stderr, "CF %s returns value %s\n",
  192. column_family_names_[0].c_str(),
  193. Slice(value0).ToString(true).c_str());
  194. fprintf(stderr, "CF %s returns value %s\n",
  195. column_family_names_[i].c_str(),
  196. Slice(value1).ToString(true).c_str());
  197. is_consistent = false;
  198. }
  199. if (!is_consistent) {
  200. break;
  201. }
  202. }
  203. }
  204. // Enable back error injection disabled for verification
  205. if (fault_fs_guard) {
  206. fault_fs_guard->EnableThreadLocalErrorInjection(
  207. FaultInjectionIOType::kRead);
  208. fault_fs_guard->EnableThreadLocalErrorInjection(
  209. FaultInjectionIOType::kMetadataRead);
  210. }
  211. db_->ReleaseSnapshot(snapshot);
  212. }
  213. if (!is_consistent) {
  214. fprintf(stderr, "TestGet error: is_consistent is false\n");
  215. thread->stats.AddErrors(1);
  216. // Fail fast to preserve the DB state.
  217. thread->shared->SetVerificationFailure();
  218. } else if (s.ok()) {
  219. thread->stats.AddGets(1, 1);
  220. } else if (s.IsNotFound()) {
  221. thread->stats.AddGets(1, 0);
  222. } else if (!IsErrorInjectedAndRetryable(s)) {
  223. fprintf(stderr, "TestGet error: %s\n", s.ToString().c_str());
  224. thread->stats.AddErrors(1);
  225. }
  226. return s;
  227. }
  228. std::vector<Status> TestMultiGet(
  229. ThreadState* thread, const ReadOptions& read_opts,
  230. const std::vector<int>& rand_column_families,
  231. const std::vector<int64_t>& rand_keys) override {
  232. size_t num_keys = rand_keys.size();
  233. std::vector<std::string> key_str;
  234. std::vector<Slice> keys;
  235. keys.reserve(num_keys);
  236. key_str.reserve(num_keys);
  237. std::vector<PinnableSlice> values(num_keys);
  238. std::vector<Status> statuses(num_keys);
  239. ColumnFamilyHandle* cfh = column_families_[rand_column_families[0]];
  240. ReadOptions readoptionscopy = read_opts;
  241. readoptionscopy.rate_limiter_priority =
  242. FLAGS_rate_limit_user_ops ? Env::IO_USER : Env::IO_TOTAL;
  243. for (size_t i = 0; i < num_keys; ++i) {
  244. key_str.emplace_back(Key(rand_keys[i]));
  245. keys.emplace_back(key_str.back());
  246. }
  247. db_->MultiGet(readoptionscopy, cfh, num_keys, keys.data(), values.data(),
  248. statuses.data());
  249. for (const auto& s : statuses) {
  250. if (s.ok()) {
  251. // found case
  252. thread->stats.AddGets(1, 1);
  253. } else if (s.IsNotFound()) {
  254. // not found case
  255. thread->stats.AddGets(1, 0);
  256. } else if (!IsErrorInjectedAndRetryable(s)) {
  257. // errors case
  258. fprintf(stderr, "MultiGet error: %s\n", s.ToString().c_str());
  259. thread->stats.AddErrors(1);
  260. }
  261. }
  262. return statuses;
  263. }
  264. void TestGetEntity(ThreadState* thread, const ReadOptions& read_opts,
  265. const std::vector<int>& rand_column_families,
  266. const std::vector<int64_t>& rand_keys) override {
  267. assert(thread);
  268. assert(!rand_column_families.empty());
  269. assert(!rand_keys.empty());
  270. const std::string key = Key(rand_keys[0]);
  271. Status s;
  272. bool is_consistent = true;
  273. if (thread->rand.OneIn(2)) {
  274. // With a 1/2 chance, do a random read from a random CF
  275. const size_t cf_id = thread->rand.Next() % rand_column_families.size();
  276. assert(rand_column_families[cf_id] >= 0);
  277. assert(rand_column_families[cf_id] <
  278. static_cast<int>(column_families_.size()));
  279. ColumnFamilyHandle* const cfh =
  280. column_families_[rand_column_families[cf_id]];
  281. assert(cfh);
  282. PinnableWideColumns result;
  283. s = db_->GetEntity(read_opts, cfh, key, &result);
  284. if (s.ok()) {
  285. if (!VerifyWideColumns(result.columns())) {
  286. fprintf(
  287. stderr,
  288. "GetEntity error: inconsistent columns for key %s, entity %s\n",
  289. StringToHex(key).c_str(),
  290. WideColumnsToHex(result.columns()).c_str());
  291. is_consistent = false;
  292. }
  293. }
  294. } else {
  295. // With a 1/2 chance, compare one key across all CFs
  296. ManagedSnapshot snapshot_guard(db_);
  297. ReadOptions read_opts_copy = read_opts;
  298. read_opts_copy.snapshot = snapshot_guard.snapshot();
  299. assert(rand_column_families[0] >= 0);
  300. assert(rand_column_families[0] <
  301. static_cast<int>(column_families_.size()));
  302. PinnableWideColumns cmp_result;
  303. s = db_->GetEntity(read_opts_copy,
  304. column_families_[rand_column_families[0]], key,
  305. &cmp_result);
  306. // Temporarily disable error injection for verification
  307. if (fault_fs_guard) {
  308. fault_fs_guard->DisableThreadLocalErrorInjection(
  309. FaultInjectionIOType::kRead);
  310. fault_fs_guard->DisableThreadLocalErrorInjection(
  311. FaultInjectionIOType::kMetadataRead);
  312. }
  313. if (s.ok() || s.IsNotFound()) {
  314. const bool cmp_found = s.ok();
  315. if (cmp_found) {
  316. if (!VerifyWideColumns(cmp_result.columns())) {
  317. fprintf(stderr,
  318. "GetEntity error: inconsistent columns for key %s, "
  319. "entity %s\n",
  320. StringToHex(key).c_str(),
  321. WideColumnsToHex(cmp_result.columns()).c_str());
  322. is_consistent = false;
  323. }
  324. }
  325. if (is_consistent) {
  326. if (FLAGS_use_attribute_group) {
  327. PinnableAttributeGroups result;
  328. result.reserve(rand_column_families.size());
  329. for (size_t i = 1; i < rand_column_families.size(); ++i) {
  330. assert(rand_column_families[i] >= 0);
  331. assert(rand_column_families[i] <
  332. static_cast<int>(column_families_.size()));
  333. result.emplace_back(column_families_[rand_column_families[i]]);
  334. }
  335. s = db_->GetEntity(read_opts_copy, key, &result);
  336. if (s.ok()) {
  337. for (auto& attribute_group : result) {
  338. s = attribute_group.status();
  339. if (!s.ok() && !s.IsNotFound()) {
  340. break;
  341. }
  342. const bool found = s.ok();
  343. if (!cmp_found && found) {
  344. fprintf(
  345. stderr,
  346. "Non-AttributeGroup GetEntity returns different results "
  347. "than AttributeGroup GetEntity for key %s: CF %s "
  348. "returns not found, CF %s returns entity %s \n",
  349. StringToHex(key).c_str(), column_family_names_[0].c_str(),
  350. attribute_group.column_family()->GetName().c_str(),
  351. WideColumnsToHex(attribute_group.columns()).c_str());
  352. is_consistent = false;
  353. break;
  354. }
  355. if (cmp_found && !found) {
  356. fprintf(
  357. stderr,
  358. "Non-AttributeGroup GetEntity returns different results "
  359. "than AttributeGroup GetEntity for key %s: CF %s "
  360. "returns entity %s, CF %s returns not found \n",
  361. StringToHex(key).c_str(), column_family_names_[0].c_str(),
  362. WideColumnsToHex(cmp_result.columns()).c_str(),
  363. attribute_group.column_family()->GetName().c_str());
  364. is_consistent = false;
  365. break;
  366. }
  367. if (found &&
  368. attribute_group.columns() != cmp_result.columns()) {
  369. fprintf(
  370. stderr,
  371. "Non-AttributeGroup GetEntity returns different results "
  372. "than AttributeGroup GetEntity for key %s: CF %s "
  373. "returns entity %s, CF %s returns entity %s\n",
  374. StringToHex(key).c_str(), column_family_names_[0].c_str(),
  375. WideColumnsToHex(cmp_result.columns()).c_str(),
  376. attribute_group.column_family()->GetName().c_str(),
  377. WideColumnsToHex(attribute_group.columns()).c_str());
  378. is_consistent = false;
  379. break;
  380. }
  381. }
  382. }
  383. } else {
  384. for (size_t i = 1; i < rand_column_families.size(); ++i) {
  385. assert(rand_column_families[i] >= 0);
  386. assert(rand_column_families[i] <
  387. static_cast<int>(column_families_.size()));
  388. PinnableWideColumns result;
  389. s = db_->GetEntity(read_opts_copy,
  390. column_families_[rand_column_families[i]], key,
  391. &result);
  392. if (!s.ok() && !s.IsNotFound()) {
  393. break;
  394. }
  395. const bool found = s.ok();
  396. assert(!column_family_names_.empty());
  397. assert(i < column_family_names_.size());
  398. if (!cmp_found && found) {
  399. fprintf(stderr,
  400. "GetEntity returns different results for key %s: CF %s "
  401. "returns not found, CF %s returns entity %s\n",
  402. StringToHex(key).c_str(),
  403. column_family_names_[0].c_str(),
  404. column_family_names_[i].c_str(),
  405. WideColumnsToHex(result.columns()).c_str());
  406. is_consistent = false;
  407. break;
  408. }
  409. if (cmp_found && !found) {
  410. fprintf(stderr,
  411. "GetEntity returns different results for key %s: CF %s "
  412. "returns entity %s, CF %s returns not found\n",
  413. StringToHex(key).c_str(),
  414. column_family_names_[0].c_str(),
  415. WideColumnsToHex(cmp_result.columns()).c_str(),
  416. column_family_names_[i].c_str());
  417. is_consistent = false;
  418. break;
  419. }
  420. if (found && result != cmp_result) {
  421. fprintf(stderr,
  422. "GetEntity returns different results for key %s: CF %s "
  423. "returns entity %s, CF %s returns entity %s\n",
  424. StringToHex(key).c_str(),
  425. column_family_names_[0].c_str(),
  426. WideColumnsToHex(cmp_result.columns()).c_str(),
  427. column_family_names_[i].c_str(),
  428. WideColumnsToHex(result.columns()).c_str());
  429. is_consistent = false;
  430. break;
  431. }
  432. }
  433. }
  434. }
  435. }
  436. // Enable back error injection disabled for verification
  437. if (fault_fs_guard) {
  438. fault_fs_guard->EnableThreadLocalErrorInjection(
  439. FaultInjectionIOType::kRead);
  440. fault_fs_guard->EnableThreadLocalErrorInjection(
  441. FaultInjectionIOType::kMetadataRead);
  442. }
  443. }
  444. if (!is_consistent) {
  445. fprintf(stderr, "TestGetEntity error: results are not consistent\n");
  446. thread->stats.AddErrors(1);
  447. // Fail fast to preserve the DB state.
  448. thread->shared->SetVerificationFailure();
  449. } else if (s.ok()) {
  450. thread->stats.AddGets(1, 1);
  451. } else if (s.IsNotFound()) {
  452. thread->stats.AddGets(1, 0);
  453. } else if (!IsErrorInjectedAndRetryable(s)) {
  454. fprintf(stderr, "TestGetEntity error: %s\n", s.ToString().c_str());
  455. thread->stats.AddErrors(1);
  456. }
  457. }
  458. void TestMultiGetEntity(ThreadState* thread, const ReadOptions& read_opts,
  459. const std::vector<int>& rand_column_families,
  460. const std::vector<int64_t>& rand_keys) override {
  461. assert(thread);
  462. assert(thread->shared);
  463. assert(!rand_column_families.empty());
  464. assert(!rand_keys.empty());
  465. ManagedSnapshot snapshot_guard(db_);
  466. ReadOptions read_opts_copy = read_opts;
  467. read_opts_copy.snapshot = snapshot_guard.snapshot();
  468. const size_t num_cfs = rand_column_families.size();
  469. std::vector<ColumnFamilyHandle*> cfhs;
  470. cfhs.reserve(num_cfs);
  471. for (size_t j = 0; j < num_cfs; ++j) {
  472. assert(rand_column_families[j] >= 0);
  473. assert(rand_column_families[j] <
  474. static_cast<int>(column_families_.size()));
  475. ColumnFamilyHandle* const cfh = column_families_[rand_column_families[j]];
  476. assert(cfh);
  477. cfhs.emplace_back(cfh);
  478. }
  479. const size_t num_keys = rand_keys.size();
  480. if (FLAGS_use_attribute_group) {
  481. // AttributeGroup MultiGetEntity verification
  482. std::vector<PinnableAttributeGroups> results;
  483. std::vector<Slice> key_slices;
  484. std::vector<std::string> key_strs;
  485. results.reserve(num_keys);
  486. key_slices.reserve(num_keys);
  487. key_strs.reserve(num_keys);
  488. for (size_t i = 0; i < num_keys; ++i) {
  489. key_strs.emplace_back(Key(rand_keys[i]));
  490. key_slices.emplace_back(key_strs.back());
  491. PinnableAttributeGroups attribute_groups;
  492. for (auto* cfh : cfhs) {
  493. attribute_groups.emplace_back(cfh);
  494. }
  495. results.emplace_back(std::move(attribute_groups));
  496. }
  497. db_->MultiGetEntity(read_opts_copy, num_keys, key_slices.data(),
  498. results.data());
  499. bool is_consistent = true;
  500. for (size_t i = 0; i < num_keys; ++i) {
  501. const auto& result = results[i];
  502. const Status& cmp_s = result[0].status();
  503. const WideColumns& cmp_columns = result[0].columns();
  504. bool has_error = false;
  505. for (size_t j = 0; j < num_cfs; ++j) {
  506. const Status& s = result[j].status();
  507. const WideColumns& columns = result[j].columns();
  508. if (!s.ok() && IsErrorInjectedAndRetryable(s)) {
  509. break;
  510. } else if (!s.ok() && !s.IsNotFound()) {
  511. fprintf(stderr, "TestMultiGetEntity (AttributeGroup) error: %s\n",
  512. s.ToString().c_str());
  513. thread->stats.AddErrors(1);
  514. has_error = true;
  515. break;
  516. }
  517. assert(cmp_s.ok() || cmp_s.IsNotFound());
  518. if (s.IsNotFound()) {
  519. if (cmp_s.ok()) {
  520. fprintf(stderr,
  521. "MultiGetEntity (AttributeGroup) returns different "
  522. "results for key %s: CF %s "
  523. "returns entity %s, CF %s returns not found\n",
  524. key_slices[i].ToString(true).c_str(),
  525. column_family_names_[0].c_str(),
  526. WideColumnsToHex(cmp_columns).c_str(),
  527. column_family_names_[j].c_str());
  528. is_consistent = false;
  529. break;
  530. }
  531. continue;
  532. }
  533. assert(s.ok());
  534. if (cmp_s.IsNotFound()) {
  535. fprintf(stderr,
  536. "MultiGetEntity (AttributeGroup) returns different results "
  537. "for key %s: CF %s "
  538. "returns not found, CF %s returns entity %s\n",
  539. key_slices[i].ToString(true).c_str(),
  540. column_family_names_[0].c_str(),
  541. column_family_names_[j].c_str(),
  542. WideColumnsToHex(columns).c_str());
  543. is_consistent = false;
  544. break;
  545. }
  546. if (columns != cmp_columns) {
  547. fprintf(stderr,
  548. "MultiGetEntity (AttributeGroup) returns different results "
  549. "for key %s: CF %s "
  550. "returns entity %s, CF %s returns entity %s\n",
  551. key_slices[i].ToString(true).c_str(),
  552. column_family_names_[0].c_str(),
  553. WideColumnsToHex(cmp_columns).c_str(),
  554. column_family_names_[j].c_str(),
  555. WideColumnsToHex(columns).c_str());
  556. is_consistent = false;
  557. break;
  558. }
  559. if (!VerifyWideColumns(columns)) {
  560. fprintf(stderr,
  561. "MultiGetEntity (AttributeGroup) error: inconsistent "
  562. "columns for key %s, "
  563. "entity %s\n",
  564. key_slices[i].ToString(true).c_str(),
  565. WideColumnsToHex(columns).c_str());
  566. is_consistent = false;
  567. break;
  568. }
  569. }
  570. if (has_error) {
  571. break;
  572. } else if (!is_consistent) {
  573. fprintf(stderr,
  574. "TestMultiGetEntity (AttributeGroup) error: results are not "
  575. "consistent\n");
  576. thread->stats.AddErrors(1);
  577. // Fail fast to preserve the DB state.
  578. thread->shared->SetVerificationFailure();
  579. break;
  580. } else if (cmp_s.ok()) {
  581. thread->stats.AddGets(1, 1);
  582. } else if (cmp_s.IsNotFound()) {
  583. thread->stats.AddGets(1, 0);
  584. }
  585. }
  586. } else {
  587. // Non-AttributeGroup MultiGetEntity verification
  588. for (size_t i = 0; i < num_keys; ++i) {
  589. const std::string key = Key(rand_keys[i]);
  590. std::vector<Slice> key_slices(num_cfs, key);
  591. std::vector<PinnableWideColumns> results(num_cfs);
  592. std::vector<Status> statuses(num_cfs);
  593. db_->MultiGetEntity(read_opts_copy, num_cfs, cfhs.data(),
  594. key_slices.data(), results.data(), statuses.data());
  595. bool is_consistent = true;
  596. const Status& cmp_s = statuses[0];
  597. const WideColumns& cmp_columns = results[0].columns();
  598. for (size_t j = 0; j < num_cfs; ++j) {
  599. const Status& s = statuses[j];
  600. const WideColumns& columns = results[j].columns();
  601. if (!s.ok() && IsErrorInjectedAndRetryable(s)) {
  602. break;
  603. } else if (!s.ok() && !s.IsNotFound()) {
  604. fprintf(stderr, "TestMultiGetEntity error: %s\n",
  605. s.ToString().c_str());
  606. thread->stats.AddErrors(1);
  607. break;
  608. }
  609. assert(cmp_s.ok() || cmp_s.IsNotFound());
  610. if (s.IsNotFound()) {
  611. if (cmp_s.ok()) {
  612. fprintf(
  613. stderr,
  614. "MultiGetEntity returns different results for key %s: CF %s "
  615. "returns entity %s, CF %s returns not found\n",
  616. StringToHex(key).c_str(), column_family_names_[0].c_str(),
  617. WideColumnsToHex(cmp_columns).c_str(),
  618. column_family_names_[j].c_str());
  619. is_consistent = false;
  620. break;
  621. }
  622. continue;
  623. }
  624. assert(s.ok());
  625. if (cmp_s.IsNotFound()) {
  626. fprintf(
  627. stderr,
  628. "MultiGetEntity returns different results for key %s: CF %s "
  629. "returns not found, CF %s returns entity %s\n",
  630. StringToHex(key).c_str(), column_family_names_[0].c_str(),
  631. column_family_names_[j].c_str(),
  632. WideColumnsToHex(columns).c_str());
  633. is_consistent = false;
  634. break;
  635. }
  636. if (columns != cmp_columns) {
  637. fprintf(
  638. stderr,
  639. "MultiGetEntity returns different results for key %s: CF %s "
  640. "returns entity %s, CF %s returns entity %s\n",
  641. StringToHex(key).c_str(), column_family_names_[0].c_str(),
  642. WideColumnsToHex(cmp_columns).c_str(),
  643. column_family_names_[j].c_str(),
  644. WideColumnsToHex(columns).c_str());
  645. is_consistent = false;
  646. break;
  647. }
  648. if (!VerifyWideColumns(columns)) {
  649. fprintf(stderr,
  650. "MultiGetEntity error: inconsistent columns for key %s, "
  651. "entity %s\n",
  652. StringToHex(key).c_str(),
  653. WideColumnsToHex(columns).c_str());
  654. is_consistent = false;
  655. break;
  656. }
  657. }
  658. if (!is_consistent) {
  659. fprintf(stderr,
  660. "TestMultiGetEntity error: results are not consistent\n");
  661. thread->stats.AddErrors(1);
  662. // Fail fast to preserve the DB state.
  663. thread->shared->SetVerificationFailure();
  664. break;
  665. } else if (statuses[0].ok()) {
  666. thread->stats.AddGets(1, 1);
  667. } else if (statuses[0].IsNotFound()) {
  668. thread->stats.AddGets(1, 0);
  669. }
  670. }
  671. }
  672. }
  673. Status TestPrefixScan(ThreadState* thread, const ReadOptions& readoptions,
  674. const std::vector<int>& rand_column_families,
  675. const std::vector<int64_t>& rand_keys) override {
  676. assert(!rand_column_families.empty());
  677. assert(!rand_keys.empty());
  678. const std::string key = Key(rand_keys[0]);
  679. const size_t prefix_to_use =
  680. (FLAGS_prefix_size < 0) ? 7 : static_cast<size_t>(FLAGS_prefix_size);
  681. const Slice prefix(key.data(), prefix_to_use);
  682. std::string upper_bound;
  683. Slice ub_slice;
  684. ReadOptions ro_copy = readoptions;
  685. std::unique_ptr<ManagedSnapshot> snapshot = nullptr;
  686. if (ro_copy.auto_refresh_iterator_with_snapshot) {
  687. snapshot = std::make_unique<ManagedSnapshot>(db_);
  688. ro_copy.snapshot = snapshot->snapshot();
  689. }
  690. // Get the next prefix first and then see if we want to set upper bound.
  691. // We'll use the next prefix in an assertion later on
  692. if (GetNextPrefix(prefix, &upper_bound) && thread->rand.OneIn(2)) {
  693. ub_slice = Slice(upper_bound);
  694. ro_copy.iterate_upper_bound = &ub_slice;
  695. if (FLAGS_use_sqfc_for_range_queries) {
  696. ro_copy.table_filter =
  697. sqfc_factory_->GetTableFilterForRangeQuery(prefix, ub_slice);
  698. }
  699. }
  700. ColumnFamilyHandle* const cfh =
  701. column_families_[rand_column_families[thread->rand.Uniform(
  702. static_cast<int>(rand_column_families.size()))]];
  703. assert(cfh);
  704. std::unique_ptr<Iterator> iter(db_->NewIterator(ro_copy, cfh));
  705. uint64_t count = 0;
  706. Status s;
  707. for (iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix);
  708. iter->Next()) {
  709. ++count;
  710. if (ro_copy.allow_unprepared_value) {
  711. if (!iter->PrepareValue()) {
  712. s = iter->status();
  713. break;
  714. }
  715. }
  716. if (!VerifyWideColumns(iter->value(), iter->columns())) {
  717. s = Status::Corruption("Value and columns inconsistent",
  718. DebugString(iter->value(), iter->columns()));
  719. break;
  720. }
  721. }
  722. assert(prefix_to_use == 0 ||
  723. count <= GetPrefixKeyCount(prefix.ToString(), upper_bound));
  724. if (s.ok()) {
  725. s = iter->status();
  726. }
  727. if (!s.ok() && !IsErrorInjectedAndRetryable(s)) {
  728. fprintf(stderr, "TestPrefixScan error: %s\n", s.ToString().c_str());
  729. thread->stats.AddErrors(1);
  730. return s;
  731. }
  732. thread->stats.AddPrefixes(1, count);
  733. return Status::OK();
  734. }
  735. ColumnFamilyHandle* GetControlCfh(ThreadState* thread,
  736. int /*column_family_id*/
  737. ) override {
  738. // All column families should contain the same data. Randomly pick one.
  739. return column_families_[thread->rand.Next() % column_families_.size()];
  740. }
  741. void VerifyDb(ThreadState* thread) const override {
  742. // This `ReadOptions` is for validation purposes. Ignore
  743. // `FLAGS_rate_limit_user_ops` to avoid slowing any validation.
  744. ReadOptions options(FLAGS_verify_checksum, true);
  745. // We must set total_order_seek to true because we are doing a SeekToFirst
  746. // on a column family whose memtables may support (by default) prefix-based
  747. // iterator. In this case, NewIterator with options.total_order_seek being
  748. // false returns a prefix-based iterator. Calling SeekToFirst using this
  749. // iterator causes the iterator to become invalid. That means we cannot
  750. // iterate the memtable using this iterator any more, although the memtable
  751. // contains the most up-to-date key-values.
  752. options.total_order_seek = true;
  753. ManagedSnapshot snapshot_guard(db_);
  754. options.snapshot = snapshot_guard.snapshot();
  755. options.auto_refresh_iterator_with_snapshot =
  756. FLAGS_auto_refresh_iterator_with_snapshot;
  757. const size_t num = column_families_.size();
  758. std::vector<std::unique_ptr<Iterator>> iters;
  759. iters.reserve(num);
  760. for (size_t i = 0; i < num; ++i) {
  761. iters.emplace_back(db_->NewIterator(options, column_families_[i]));
  762. iters.back()->SeekToFirst();
  763. }
  764. std::vector<Status> statuses(num, Status::OK());
  765. assert(thread);
  766. auto shared = thread->shared;
  767. assert(shared);
  768. do {
  769. if (shared->HasVerificationFailedYet()) {
  770. break;
  771. }
  772. size_t valid_cnt = 0;
  773. for (size_t i = 0; i < num; ++i) {
  774. const auto& iter = iters[i];
  775. assert(iter);
  776. if (iter->Valid()) {
  777. if (!VerifyWideColumns(iter->value(), iter->columns())) {
  778. statuses[i] =
  779. Status::Corruption("Value and columns inconsistent",
  780. DebugString(iter->value(), iter->columns()));
  781. } else {
  782. ++valid_cnt;
  783. }
  784. } else {
  785. statuses[i] = iter->status();
  786. }
  787. }
  788. if (valid_cnt == 0) {
  789. for (size_t i = 0; i < num; ++i) {
  790. const auto& s = statuses[i];
  791. if (!s.ok()) {
  792. fprintf(stderr, "Iterator on cf %s has error: %s\n",
  793. column_families_[i]->GetName().c_str(),
  794. s.ToString().c_str());
  795. shared->SetVerificationFailure();
  796. }
  797. }
  798. break;
  799. }
  800. if (valid_cnt < num) {
  801. shared->SetVerificationFailure();
  802. for (size_t i = 0; i < num; ++i) {
  803. assert(iters[i]);
  804. if (!iters[i]->Valid()) {
  805. if (statuses[i].ok()) {
  806. fprintf(stderr, "Finished scanning cf %s\n",
  807. column_families_[i]->GetName().c_str());
  808. } else {
  809. fprintf(stderr, "Iterator on cf %s has error: %s\n",
  810. column_families_[i]->GetName().c_str(),
  811. statuses[i].ToString().c_str());
  812. }
  813. } else {
  814. fprintf(stderr, "cf %s has remaining data to scan\n",
  815. column_families_[i]->GetName().c_str());
  816. }
  817. }
  818. break;
  819. }
  820. if (shared->HasVerificationFailedYet()) {
  821. break;
  822. }
  823. // If the program reaches here, then all column families' iterators are
  824. // still valid.
  825. assert(valid_cnt == num);
  826. if (shared->PrintingVerificationResults()) {
  827. continue;
  828. }
  829. assert(iters[0]);
  830. const Slice key = iters[0]->key();
  831. const Slice value = iters[0]->value();
  832. int num_mismatched_cfs = 0;
  833. for (size_t i = 1; i < num; ++i) {
  834. assert(iters[i]);
  835. const int cmp = key.compare(iters[i]->key());
  836. if (cmp != 0) {
  837. ++num_mismatched_cfs;
  838. if (1 == num_mismatched_cfs) {
  839. fprintf(stderr, "Verification failed\n");
  840. fprintf(stderr, "Latest Sequence Number: %" PRIu64 "\n",
  841. db_->GetLatestSequenceNumber());
  842. fprintf(stderr, "[%s] %s => %s\n",
  843. column_families_[0]->GetName().c_str(),
  844. key.ToString(true /* hex */).c_str(),
  845. value.ToString(true /* hex */).c_str());
  846. }
  847. fprintf(stderr, "[%s] %s => %s\n",
  848. column_families_[i]->GetName().c_str(),
  849. iters[i]->key().ToString(true /* hex */).c_str(),
  850. iters[i]->value().ToString(true /* hex */).c_str());
  851. Slice begin_key;
  852. Slice end_key;
  853. if (cmp < 0) {
  854. begin_key = key;
  855. end_key = iters[i]->key();
  856. } else {
  857. begin_key = iters[i]->key();
  858. end_key = key;
  859. }
  860. const auto print_key_versions = [&](ColumnFamilyHandle* cfh) {
  861. constexpr size_t kMaxNumIKeys = 8;
  862. std::vector<KeyVersion> versions;
  863. const Status s = GetAllKeyVersions(db_, cfh, begin_key, end_key,
  864. kMaxNumIKeys, &versions);
  865. if (!s.ok()) {
  866. fprintf(stderr, "%s\n", s.ToString().c_str());
  867. return;
  868. }
  869. assert(cfh);
  870. fprintf(stderr,
  871. "Internal keys in CF '%s', [%s, %s] (max %" ROCKSDB_PRIszt
  872. ")\n",
  873. cfh->GetName().c_str(),
  874. begin_key.ToString(true /* hex */).c_str(),
  875. end_key.ToString(true /* hex */).c_str(), kMaxNumIKeys);
  876. for (const KeyVersion& kv : versions) {
  877. fprintf(stderr, " key %s seq %" PRIu64 " type %d\n",
  878. Slice(kv.user_key).ToString(true).c_str(), kv.sequence,
  879. kv.type);
  880. }
  881. };
  882. if (1 == num_mismatched_cfs) {
  883. print_key_versions(column_families_[0]);
  884. }
  885. print_key_versions(column_families_[i]);
  886. shared->SetVerificationFailure();
  887. }
  888. }
  889. shared->FinishPrintingVerificationResults();
  890. for (auto& iter : iters) {
  891. assert(iter);
  892. iter->Next();
  893. }
  894. } while (true);
  895. }
  896. void ContinuouslyVerifyDb(ThreadState* thread) const override {
  897. assert(thread);
  898. Status status;
  899. DB* db_ptr = secondary_db_ ? secondary_db_ : db_;
  900. const auto& cfhs = secondary_db_ ? secondary_cfhs_ : column_families_;
  901. // Take a snapshot to preserve the state of primary db.
  902. ManagedSnapshot snapshot_guard(db_);
  903. SharedState* shared = thread->shared;
  904. assert(shared);
  905. if (secondary_db_) {
  906. status = secondary_db_->TryCatchUpWithPrimary();
  907. if (!status.ok()) {
  908. fprintf(stderr, "TryCatchUpWithPrimary: %s\n",
  909. status.ToString().c_str());
  910. shared->SetShouldStopTest();
  911. assert(false);
  912. return;
  913. }
  914. }
  915. const auto checksum_column_family = [](Iterator* iter,
  916. uint32_t* checksum) -> Status {
  917. assert(nullptr != checksum);
  918. uint32_t ret = 0;
  919. for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
  920. ret = crc32c::Extend(ret, iter->key().data(), iter->key().size());
  921. ret = crc32c::Extend(ret, iter->value().data(), iter->value().size());
  922. for (const auto& column : iter->columns()) {
  923. ret = crc32c::Extend(ret, column.name().data(), column.name().size());
  924. ret =
  925. crc32c::Extend(ret, column.value().data(), column.value().size());
  926. }
  927. }
  928. *checksum = ret;
  929. return iter->status();
  930. };
  931. // This `ReadOptions` is for validation purposes. Ignore
  932. // `FLAGS_rate_limit_user_ops` to avoid slowing any validation.
  933. ReadOptions ropts(FLAGS_verify_checksum, true);
  934. ropts.total_order_seek = true;
  935. if (nullptr == secondary_db_ || FLAGS_auto_refresh_iterator_with_snapshot) {
  936. ropts.snapshot = snapshot_guard.snapshot();
  937. ropts.auto_refresh_iterator_with_snapshot = true;
  938. }
  939. uint32_t crc = 0;
  940. {
  941. // Compute crc for all key-values of default column family.
  942. std::unique_ptr<Iterator> it(db_ptr->NewIterator(ropts));
  943. status = checksum_column_family(it.get(), &crc);
  944. if (!status.ok()) {
  945. fprintf(stderr, "Computing checksum of default cf: %s\n",
  946. status.ToString().c_str());
  947. assert(false);
  948. }
  949. }
  950. // Since we currently intentionally disallow reading from the secondary
  951. // instance with snapshot, we cannot achieve cross-cf consistency if WAL is
  952. // enabled because there is no guarantee that secondary instance replays
  953. // the primary's WAL to a consistent point where all cfs have the same
  954. // data.
  955. if (status.ok() && FLAGS_disable_wal) {
  956. uint32_t tmp_crc = 0;
  957. for (ColumnFamilyHandle* cfh : cfhs) {
  958. if (cfh == db_ptr->DefaultColumnFamily()) {
  959. continue;
  960. }
  961. std::unique_ptr<Iterator> it(db_ptr->NewIterator(ropts, cfh));
  962. status = checksum_column_family(it.get(), &tmp_crc);
  963. if (!status.ok() || tmp_crc != crc) {
  964. break;
  965. }
  966. }
  967. if (!status.ok()) {
  968. fprintf(stderr, "status: %s\n", status.ToString().c_str());
  969. shared->SetShouldStopTest();
  970. assert(false);
  971. } else if (tmp_crc != crc) {
  972. fprintf(stderr, "tmp_crc=%" PRIu32 " crc=%" PRIu32 "\n", tmp_crc, crc);
  973. shared->SetShouldStopTest();
  974. assert(false);
  975. }
  976. }
  977. }
  978. std::vector<int> GenerateColumnFamilies(
  979. const int /* num_column_families */,
  980. int /* rand_column_family */) const override {
  981. std::vector<int> ret;
  982. int num = static_cast<int>(column_families_.size());
  983. int k = 0;
  984. std::generate_n(back_inserter(ret), num, [&k]() -> int { return k++; });
  985. return ret;
  986. }
  987. private:
  988. std::atomic<uint32_t> batch_id_;
  989. };
  990. StressTest* CreateCfConsistencyStressTest() {
  991. return new CfConsistencyStressTest();
  992. }
  993. } // namespace ROCKSDB_NAMESPACE
  994. #endif // GFLAGS