no_batched_ops_stress.cc 121 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719272027212722272327242725272627272728272927302731273227332734273527362737273827392740274127422743274427452746274727482749275027512752275327542755275627572758275927602761276227632764276527662767276827692770277127722773277427752776277727782779278027812782278327842785278627872788278927902791279227932794279527962797279827992800280128022803280428052806280728082809281028112812281328142815281628172818281928202821282228232824282528262827282828292830283128322833283428352836283728382839284028412842284328442845284628472848284928502851285228532854285528562857285828592860286128622863286428652866286728682869287028712872287328742875287628772878287928802881288228832884288528862887288828892890289128922893289428952896289728982899290029012902290329042905290629072908290929102911291229132914291529162917291829192920292129222923292429252926292729282929293029312932293329342935293629372938293929402941294229432944294529462947294829492950295129522953295429552956295729582959296029612962296329642965296629672968296929702971297229732974297529762977297829792980298129822983298429852986298729882989299029912992299329942995299629972998299930003001300230033004300530063007300830093010301130123013301430153016301730183019302030213022302330243025302630273028302930303031303230333034303530363037303830393040304130423043304430453046304730483049305030513052305330543055305630573058305930603061306230633064306530663067306830693070307130723073307430753076307730783079308030813082308330843085308630873088308930903091309230933094309530963097309830993100310131023103310431053106310731083109311031113112311331143115311631173118311931203121312231233124312531263127312831293130313131323133313431353136313731383139314031413142314331443145314631473148314931503151315231533154315531563157315831593160316131623163316431653166316731683169317031713172317331743175317631773178317931803181318231833184318531863187318831893190319131923193319431953196319731983199320032013202320332043205320632073208320932103211321232133214321532163217321832193220322132223223322432253226322732283229
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #include "db/dbformat.h"
  10. #include "db_stress_tool/db_stress_listener.h"
  11. #include "db_stress_tool/db_stress_shared_state.h"
  12. #include "db_stress_tool/expected_state.h"
  13. #include "rocksdb/status.h"
  14. #ifdef GFLAGS
  15. #include "db/wide/wide_columns_helper.h"
  16. #include "db_stress_tool/db_stress_common.h"
  17. #include "rocksdb/utilities/transaction_db.h"
  18. #include "utilities/fault_injection_fs.h"
  19. namespace ROCKSDB_NAMESPACE {
  20. class NonBatchedOpsStressTest : public StressTest {
  21. public:
  22. NonBatchedOpsStressTest() = default;
  23. virtual ~NonBatchedOpsStressTest() = default;
  24. void VerifyDb(ThreadState* thread) const override {
  25. // This `ReadOptions` is for validation purposes. Ignore
  26. // `FLAGS_rate_limit_user_ops` to avoid slowing any validation.
  27. ReadOptions options(FLAGS_verify_checksum, true);
  28. std::string ts_str;
  29. Slice ts;
  30. if (FLAGS_user_timestamp_size > 0) {
  31. ts_str = GetNowNanos();
  32. ts = ts_str;
  33. options.timestamp = &ts;
  34. }
  35. auto shared = thread->shared;
  36. const int64_t max_key = shared->GetMaxKey();
  37. const int64_t keys_per_thread = max_key / shared->GetNumThreads();
  38. int64_t start = keys_per_thread * thread->tid;
  39. int64_t end = start + keys_per_thread;
  40. uint64_t prefix_to_use =
  41. (FLAGS_prefix_size < 0) ? 1 : static_cast<size_t>(FLAGS_prefix_size);
  42. if (thread->tid == shared->GetNumThreads() - 1) {
  43. end = max_key;
  44. }
  45. if (FLAGS_auto_refresh_iterator_with_snapshot) {
  46. options.auto_refresh_iterator_with_snapshot = true;
  47. }
  48. for (size_t cf = 0; cf < column_families_.size(); ++cf) {
  49. if (thread->shared->HasVerificationFailedYet()) {
  50. break;
  51. }
  52. enum class VerificationMethod {
  53. kIterator,
  54. kGet,
  55. kGetEntity,
  56. kMultiGet,
  57. kMultiGetEntity,
  58. kGetMergeOperands,
  59. // Add any new items above kNumberOfMethods
  60. kNumberOfMethods
  61. };
  62. constexpr int num_methods =
  63. static_cast<int>(VerificationMethod::kNumberOfMethods);
  64. VerificationMethod method =
  65. static_cast<VerificationMethod>(thread->rand.Uniform(
  66. (FLAGS_user_timestamp_size > 0) ? num_methods - 1 : num_methods));
  67. if (method == VerificationMethod::kGetEntity && !FLAGS_use_get_entity) {
  68. method = VerificationMethod::kGet;
  69. }
  70. if (method == VerificationMethod::kMultiGetEntity &&
  71. !FLAGS_use_multi_get_entity) {
  72. method = VerificationMethod::kMultiGet;
  73. }
  74. if (method == VerificationMethod::kMultiGet && !FLAGS_use_multiget) {
  75. method = VerificationMethod::kGet;
  76. }
  77. if (method == VerificationMethod::kIterator) {
  78. std::unique_ptr<ManagedSnapshot> snapshot = nullptr;
  79. if (options.auto_refresh_iterator_with_snapshot) {
  80. snapshot = std::make_unique<ManagedSnapshot>(db_);
  81. options.snapshot = snapshot->snapshot();
  82. }
  83. std::unique_ptr<Iterator> iter(
  84. db_->NewIterator(options, column_families_[cf]));
  85. std::string seek_key = Key(start);
  86. iter->Seek(seek_key);
  87. Slice prefix(seek_key.data(), prefix_to_use);
  88. for (int64_t i = start; i < end; ++i) {
  89. if (thread->shared->HasVerificationFailedYet()) {
  90. break;
  91. }
  92. const std::string key = Key(i);
  93. const Slice k(key);
  94. const Slice pfx(key.data(), prefix_to_use);
  95. // Reseek when the prefix changes
  96. if (prefix_to_use > 0 && prefix.compare(pfx) != 0) {
  97. iter->Seek(k);
  98. seek_key = key;
  99. prefix = Slice(seek_key.data(), prefix_to_use);
  100. }
  101. Status s = iter->status();
  102. std::string from_db;
  103. if (iter->Valid()) {
  104. const int diff = iter->key().compare(k);
  105. if (diff > 0) {
  106. s = Status::NotFound();
  107. } else if (diff == 0) {
  108. if (!VerifyWideColumns(iter->value(), iter->columns())) {
  109. VerificationAbort(shared, static_cast<int>(cf), i,
  110. iter->value(), iter->columns());
  111. }
  112. from_db = iter->value().ToString();
  113. iter->Next();
  114. } else {
  115. assert(diff < 0);
  116. VerificationAbort(shared, "An out of range key was found",
  117. static_cast<int>(cf), i);
  118. }
  119. } else {
  120. // The iterator found no value for the key in question, so do not
  121. // move to the next item in the iterator
  122. s = Status::NotFound();
  123. }
  124. VerifyOrSyncValue(static_cast<int>(cf), i, options, shared, from_db,
  125. /* msg_prefix */ "Iterator verification", s);
  126. if (!from_db.empty()) {
  127. PrintKeyValue(static_cast<int>(cf), static_cast<uint32_t>(i),
  128. from_db.data(), from_db.size());
  129. }
  130. }
  131. if (options.auto_refresh_iterator_with_snapshot) {
  132. options.snapshot = nullptr;
  133. }
  134. } else if (method == VerificationMethod::kGet) {
  135. for (int64_t i = start; i < end; ++i) {
  136. if (thread->shared->HasVerificationFailedYet()) {
  137. break;
  138. }
  139. const std::string key = Key(i);
  140. std::string from_db;
  141. Status s = db_->Get(options, column_families_[cf], key, &from_db);
  142. VerifyOrSyncValue(static_cast<int>(cf), i, options, shared, from_db,
  143. /* msg_prefix */ "Get verification", s);
  144. if (!from_db.empty()) {
  145. PrintKeyValue(static_cast<int>(cf), static_cast<uint32_t>(i),
  146. from_db.data(), from_db.size());
  147. }
  148. }
  149. if (secondary_db_) {
  150. assert(secondary_cfhs_.size() == column_families_.size());
  151. // We are going to read in the expected values before catching the
  152. // secondary up to the primary. This sets the lower bound of the
  153. // acceptable values that can be returned from the secondary. After
  154. // each Get() to the secondary, we are going to read in the expected
  155. // value again to determine the upper bound. As long as the returned
  156. // value from Get() is within these bounds, we consider that okay. The
  157. // lower bound will always be moving forwards anyways as
  158. // TryCatchUpWithPrimary() gets called.
  159. std::vector<ExpectedValue> pre_read_expected_values;
  160. for (int64_t i = start; i < end; ++i) {
  161. pre_read_expected_values.push_back(
  162. shared->Get(static_cast<int>(cf), i));
  163. }
  164. if (FLAGS_disable_wal) {
  165. // The secondary relies on the WAL to be able to catch up with the
  166. // primary's memtable changes. If there is no WAL, before
  167. // verification we should make sure the changes are reflected in the
  168. // SST files
  169. Status memtable_flush_status =
  170. db_->Flush(FlushOptions(), column_families_[cf]);
  171. if (!memtable_flush_status.ok()) {
  172. if (IsErrorInjectedAndRetryable(memtable_flush_status)) {
  173. fprintf(stdout,
  174. "Skipping secondary verification because error was "
  175. "injected into memtable flush\n");
  176. continue;
  177. }
  178. VerificationAbort(shared,
  179. "Failed to flush primary's memtables before "
  180. "secondary verification");
  181. }
  182. } else if (FLAGS_manual_wal_flush_one_in > 0) {
  183. // RocksDB maintains internal buffers of WAL data when
  184. // manual_wal_flush is used. The secondary can read the WAL to catch
  185. // up with the primary's memtable changes, but these changes need to
  186. // be flushed first.
  187. Status flush_wal_status = db_->FlushWAL(/*sync=*/true);
  188. if (!flush_wal_status.ok()) {
  189. if (IsErrorInjectedAndRetryable(flush_wal_status)) {
  190. fprintf(stdout,
  191. "Skipping secondary verification because error was "
  192. "injected into WAL flush\n");
  193. continue;
  194. }
  195. VerificationAbort(shared,
  196. "Failed to flush primary's WAL before "
  197. "secondary verification");
  198. }
  199. }
  200. Status s = secondary_db_->TryCatchUpWithPrimary();
  201. #ifndef NDEBUG
  202. uint64_t manifest_num = static_cast_with_check<DBImpl>(secondary_db_)
  203. ->TEST_Current_Manifest_FileNo();
  204. #else
  205. uint64_t manifest_num = 0;
  206. #endif
  207. if (!s.ok()) {
  208. VerificationAbort(shared,
  209. "Secondary failed to catch up to the primary");
  210. }
  211. for (int64_t i = start; i < end; ++i) {
  212. if (thread->shared->HasVerificationFailedYet()) {
  213. break;
  214. }
  215. const std::string key = Key(i);
  216. std::string from_db;
  217. // Temporarily disable error injection to verify the secondary
  218. if (fault_fs_guard) {
  219. fault_fs_guard->DisableThreadLocalErrorInjection(
  220. FaultInjectionIOType::kRead);
  221. fault_fs_guard->DisableThreadLocalErrorInjection(
  222. FaultInjectionIOType::kMetadataRead);
  223. }
  224. s = secondary_db_->Get(options, secondary_cfhs_[cf], key, &from_db);
  225. // Re-enable error injection after verifying the secondary
  226. if (fault_fs_guard) {
  227. fault_fs_guard->EnableThreadLocalErrorInjection(
  228. FaultInjectionIOType::kRead);
  229. fault_fs_guard->EnableThreadLocalErrorInjection(
  230. FaultInjectionIOType::kMetadataRead);
  231. }
  232. assert(!pre_read_expected_values.empty() &&
  233. static_cast<size_t>(i - start) <
  234. pre_read_expected_values.size());
  235. VerifyValueRange(
  236. static_cast<int>(cf), i, options, shared, from_db,
  237. /* msg_prefix */ "Secondary get verification, manifest: " +
  238. std::to_string(manifest_num),
  239. s, pre_read_expected_values[i - start]);
  240. }
  241. }
  242. } else if (method == VerificationMethod::kGetEntity) {
  243. for (int64_t i = start; i < end; ++i) {
  244. if (thread->shared->HasVerificationFailedYet()) {
  245. break;
  246. }
  247. const std::string key = Key(i);
  248. PinnableWideColumns result;
  249. Status s =
  250. db_->GetEntity(options, column_families_[cf], key, &result);
  251. std::string from_db;
  252. if (s.ok()) {
  253. const WideColumns& columns = result.columns();
  254. if (WideColumnsHelper::HasDefaultColumn(columns)) {
  255. from_db = WideColumnsHelper::GetDefaultColumn(columns).ToString();
  256. }
  257. if (!VerifyWideColumns(columns)) {
  258. VerificationAbort(shared, static_cast<int>(cf), i, from_db,
  259. columns);
  260. }
  261. }
  262. VerifyOrSyncValue(static_cast<int>(cf), i, options, shared, from_db,
  263. /* msg_prefix */ "GetEntity verification", s);
  264. if (!from_db.empty()) {
  265. PrintKeyValue(static_cast<int>(cf), static_cast<uint32_t>(i),
  266. from_db.data(), from_db.size());
  267. }
  268. }
  269. } else if (method == VerificationMethod::kMultiGet) {
  270. for (int64_t i = start; i < end;) {
  271. if (thread->shared->HasVerificationFailedYet()) {
  272. break;
  273. }
  274. // Keep the batch size to some reasonable value
  275. size_t batch_size = thread->rand.Uniform(128) + 1;
  276. batch_size = std::min<size_t>(batch_size, end - i);
  277. std::vector<std::string> key_strs(batch_size);
  278. std::vector<Slice> keys(batch_size);
  279. std::vector<PinnableSlice> values(batch_size);
  280. std::vector<Status> statuses(batch_size);
  281. for (size_t j = 0; j < batch_size; ++j) {
  282. key_strs[j] = Key(i + j);
  283. keys[j] = Slice(key_strs[j]);
  284. }
  285. db_->MultiGet(options, column_families_[cf], batch_size, keys.data(),
  286. values.data(), statuses.data());
  287. for (size_t j = 0; j < batch_size; ++j) {
  288. const std::string from_db = values[j].ToString();
  289. VerifyOrSyncValue(static_cast<int>(cf), i + j, options, shared,
  290. from_db, /* msg_prefix */ "MultiGet verification",
  291. statuses[j]);
  292. if (!from_db.empty()) {
  293. PrintKeyValue(static_cast<int>(cf), static_cast<uint32_t>(i + j),
  294. from_db.data(), from_db.size());
  295. }
  296. }
  297. i += batch_size;
  298. }
  299. } else if (method == VerificationMethod::kMultiGetEntity) {
  300. for (int64_t i = start; i < end;) {
  301. if (thread->shared->HasVerificationFailedYet()) {
  302. break;
  303. }
  304. // Keep the batch size to some reasonable value
  305. size_t batch_size = thread->rand.Uniform(128) + 1;
  306. batch_size = std::min<size_t>(batch_size, end - i);
  307. std::vector<std::string> key_strs(batch_size);
  308. std::vector<Slice> keys(batch_size);
  309. std::vector<PinnableWideColumns> results(batch_size);
  310. std::vector<Status> statuses(batch_size);
  311. for (size_t j = 0; j < batch_size; ++j) {
  312. key_strs[j] = Key(i + j);
  313. keys[j] = Slice(key_strs[j]);
  314. }
  315. db_->MultiGetEntity(options, column_families_[cf], batch_size,
  316. keys.data(), results.data(), statuses.data());
  317. for (size_t j = 0; j < batch_size; ++j) {
  318. std::string from_db;
  319. if (statuses[j].ok()) {
  320. const WideColumns& columns = results[j].columns();
  321. if (WideColumnsHelper::HasDefaultColumn(columns)) {
  322. from_db =
  323. WideColumnsHelper::GetDefaultColumn(columns).ToString();
  324. }
  325. if (!VerifyWideColumns(columns)) {
  326. VerificationAbort(shared, static_cast<int>(cf), i, from_db,
  327. columns);
  328. }
  329. }
  330. VerifyOrSyncValue(
  331. static_cast<int>(cf), i + j, options, shared, from_db,
  332. /* msg_prefix */ "MultiGetEntity verification", statuses[j]);
  333. if (!from_db.empty()) {
  334. PrintKeyValue(static_cast<int>(cf), static_cast<uint32_t>(i + j),
  335. from_db.data(), from_db.size());
  336. }
  337. }
  338. i += batch_size;
  339. }
  340. } else {
  341. assert(method == VerificationMethod::kGetMergeOperands);
  342. // Start off with small size that will be increased later if necessary
  343. std::vector<PinnableSlice> values(4);
  344. GetMergeOperandsOptions merge_operands_info;
  345. merge_operands_info.expected_max_number_of_operands =
  346. static_cast<int>(values.size());
  347. for (int64_t i = start; i < end; ++i) {
  348. if (thread->shared->HasVerificationFailedYet()) {
  349. break;
  350. }
  351. const std::string key = Key(i);
  352. const Slice k(key);
  353. std::string from_db;
  354. int number_of_operands = 0;
  355. Status s = db_->GetMergeOperands(options, column_families_[cf], k,
  356. values.data(), &merge_operands_info,
  357. &number_of_operands);
  358. if (s.IsIncomplete()) {
  359. // Need to resize values as there are more than values.size() merge
  360. // operands on this key. Should only happen a few times when we
  361. // encounter a key that had more merge operands than any key seen so
  362. // far
  363. values.resize(number_of_operands);
  364. merge_operands_info.expected_max_number_of_operands =
  365. static_cast<int>(number_of_operands);
  366. s = db_->GetMergeOperands(options, column_families_[cf], k,
  367. values.data(), &merge_operands_info,
  368. &number_of_operands);
  369. }
  370. // Assumed here that GetMergeOperands always sets number_of_operand
  371. if (number_of_operands) {
  372. from_db = values[number_of_operands - 1].ToString();
  373. }
  374. VerifyOrSyncValue(static_cast<int>(cf), i, options, shared, from_db,
  375. /* msg_prefix */ "GetMergeOperands verification",
  376. s);
  377. if (!from_db.empty()) {
  378. PrintKeyValue(static_cast<int>(cf), static_cast<uint32_t>(i),
  379. from_db.data(), from_db.size());
  380. }
  381. }
  382. }
  383. }
  384. }
  385. void ContinuouslyVerifyDb(ThreadState* thread) const override {
  386. // For automated crash tests, we only want to run this continous
  387. // verification when continuous_verification_interval > 0 and there is
  388. // a secondary db. This continous verification currently fails when there is
  389. // a secondary db during the iterator scan. The stack trace mentions
  390. // BlobReader/BlobSource but it may not necessarily be related to BlobDB.
  391. // Regardless, we only want to run this function if we are experimenting and
  392. // explicitly setting continuous_verification_interval.
  393. if (!secondary_db_ || !FLAGS_continuous_verification_interval) {
  394. return;
  395. }
  396. assert(secondary_db_);
  397. assert(!secondary_cfhs_.empty());
  398. Status s = secondary_db_->TryCatchUpWithPrimary();
  399. if (!s.ok()) {
  400. assert(false);
  401. exit(1);
  402. }
  403. const auto checksum_column_family = [](Iterator* iter,
  404. uint32_t* checksum) -> Status {
  405. assert(nullptr != checksum);
  406. uint32_t ret = 0;
  407. for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
  408. ret = crc32c::Extend(ret, iter->key().data(), iter->key().size());
  409. ret = crc32c::Extend(ret, iter->value().data(), iter->value().size());
  410. }
  411. *checksum = ret;
  412. return iter->status();
  413. };
  414. auto* shared = thread->shared;
  415. assert(shared);
  416. const int64_t max_key = shared->GetMaxKey();
  417. ReadOptions read_opts(FLAGS_verify_checksum, true);
  418. std::string ts_str;
  419. Slice ts;
  420. if (FLAGS_user_timestamp_size > 0) {
  421. ts_str = GetNowNanos();
  422. ts = ts_str;
  423. read_opts.timestamp = &ts;
  424. }
  425. std::unique_ptr<ManagedSnapshot> snapshot = nullptr;
  426. if (FLAGS_auto_refresh_iterator_with_snapshot) {
  427. snapshot = std::make_unique<ManagedSnapshot>(db_);
  428. read_opts.snapshot = snapshot->snapshot();
  429. read_opts.auto_refresh_iterator_with_snapshot = true;
  430. }
  431. static Random64 rand64(shared->GetSeed());
  432. {
  433. uint32_t crc = 0;
  434. std::unique_ptr<Iterator> it(secondary_db_->NewIterator(read_opts));
  435. s = checksum_column_family(it.get(), &crc);
  436. if (!s.ok()) {
  437. fprintf(stderr, "Computing checksum of default cf: %s\n",
  438. s.ToString().c_str());
  439. assert(false);
  440. }
  441. }
  442. for (auto* handle : secondary_cfhs_) {
  443. if (thread->rand.OneInOpt(3)) {
  444. // Use Get()
  445. uint64_t key = rand64.Uniform(static_cast<uint64_t>(max_key));
  446. std::string key_str = Key(key);
  447. std::string value;
  448. std::string key_ts;
  449. s = secondary_db_->Get(
  450. read_opts, handle, key_str, &value,
  451. FLAGS_user_timestamp_size > 0 ? &key_ts : nullptr);
  452. s.PermitUncheckedError();
  453. } else {
  454. // Use range scan
  455. if (read_opts.auto_refresh_iterator_with_snapshot) {
  456. snapshot = std::make_unique<ManagedSnapshot>(db_);
  457. read_opts.snapshot = snapshot->snapshot();
  458. }
  459. std::unique_ptr<Iterator> iter(
  460. secondary_db_->NewIterator(read_opts, handle));
  461. uint32_t rnd = (thread->rand.Next()) % 4;
  462. if (0 == rnd) {
  463. // SeekToFirst() + Next()*5
  464. read_opts.total_order_seek = true;
  465. iter->SeekToFirst();
  466. for (int i = 0; i < 5 && iter->Valid(); ++i, iter->Next()) {
  467. }
  468. } else if (1 == rnd) {
  469. // SeekToLast() + Prev()*5
  470. read_opts.total_order_seek = true;
  471. iter->SeekToLast();
  472. for (int i = 0; i < 5 && iter->Valid(); ++i, iter->Prev()) {
  473. }
  474. } else if (2 == rnd) {
  475. // Seek() +Next()*5
  476. uint64_t key = rand64.Uniform(static_cast<uint64_t>(max_key));
  477. std::string key_str = Key(key);
  478. iter->Seek(key_str);
  479. for (int i = 0; i < 5 && iter->Valid(); ++i, iter->Next()) {
  480. }
  481. } else {
  482. // SeekForPrev() + Prev()*5
  483. uint64_t key = rand64.Uniform(static_cast<uint64_t>(max_key));
  484. std::string key_str = Key(key);
  485. iter->SeekForPrev(key_str);
  486. for (int i = 0; i < 5 && iter->Valid(); ++i, iter->Prev()) {
  487. }
  488. }
  489. if (read_opts.auto_refresh_iterator_with_snapshot) {
  490. read_opts.snapshot = nullptr;
  491. }
  492. }
  493. }
  494. }
  495. void MaybeClearOneColumnFamily(ThreadState* thread) override {
  496. if (FLAGS_column_families > 1) {
  497. if (thread->rand.OneInOpt(FLAGS_clear_column_family_one_in)) {
  498. // drop column family and then create it again (can't drop default)
  499. int cf = thread->rand.Next() % (FLAGS_column_families - 1) + 1;
  500. std::string new_name =
  501. std::to_string(new_column_family_name_.fetch_add(1));
  502. {
  503. MutexLock l(thread->shared->GetMutex());
  504. fprintf(
  505. stdout,
  506. "[CF %d] Dropping and recreating column family. new name: %s\n",
  507. cf, new_name.c_str());
  508. }
  509. thread->shared->LockColumnFamily(cf);
  510. Status s = db_->DropColumnFamily(column_families_[cf]);
  511. delete column_families_[cf];
  512. if (!s.ok()) {
  513. fprintf(stderr, "dropping column family error: %s\n",
  514. s.ToString().c_str());
  515. thread->shared->SafeTerminate();
  516. }
  517. s = db_->CreateColumnFamily(ColumnFamilyOptions(options_), new_name,
  518. &column_families_[cf]);
  519. column_family_names_[cf] = new_name;
  520. thread->shared->ClearColumnFamily(cf);
  521. if (!s.ok()) {
  522. fprintf(stderr, "creating column family error: %s\n",
  523. s.ToString().c_str());
  524. thread->shared->SafeTerminate();
  525. }
  526. thread->shared->UnlockColumnFamily(cf);
  527. }
  528. }
  529. }
  530. bool ShouldAcquireMutexOnKey() const override { return true; }
  531. bool IsStateTracked() const override { return true; }
  532. void TestKeyMayExist(ThreadState* thread, const ReadOptions& read_opts,
  533. const std::vector<int>& rand_column_families,
  534. const std::vector<int64_t>& rand_keys) override {
  535. auto cfh = column_families_[rand_column_families[0]];
  536. std::string key_str = Key(rand_keys[0]);
  537. Slice key = key_str;
  538. std::string ignore;
  539. ReadOptions read_opts_copy = read_opts;
  540. std::string read_ts_str;
  541. Slice read_ts_slice;
  542. if (FLAGS_user_timestamp_size > 0) {
  543. read_ts_str = GetNowNanos();
  544. read_ts_slice = read_ts_str;
  545. read_opts_copy.timestamp = &read_ts_slice;
  546. }
  547. bool read_older_ts = MaybeUseOlderTimestampForPointLookup(
  548. thread, read_ts_str, read_ts_slice, read_opts_copy);
  549. const ExpectedValue pre_read_expected_value =
  550. thread->shared->Get(rand_column_families[0], rand_keys[0]);
  551. bool key_may_exist = db_->KeyMayExist(read_opts_copy, cfh, key, &ignore);
  552. const ExpectedValue post_read_expected_value =
  553. thread->shared->Get(rand_column_families[0], rand_keys[0]);
  554. if (!key_may_exist && !FLAGS_skip_verifydb && !read_older_ts) {
  555. if (ExpectedValueHelper::MustHaveExisted(pre_read_expected_value,
  556. post_read_expected_value)) {
  557. thread->shared->SetVerificationFailure();
  558. fprintf(stderr,
  559. "error : inconsistent values for key %s: expected state has "
  560. "the key, TestKeyMayExist() returns false indicating the key "
  561. "must not exist.\n",
  562. key.ToString(true).c_str());
  563. }
  564. }
  565. }
  566. Status TestGet(ThreadState* thread, const ReadOptions& read_opts,
  567. const std::vector<int>& rand_column_families,
  568. const std::vector<int64_t>& rand_keys) override {
  569. auto cfh = column_families_[rand_column_families[0]];
  570. std::string key_str = Key(rand_keys[0]);
  571. Slice key = key_str;
  572. std::string from_db;
  573. ReadOptions read_opts_copy = read_opts;
  574. std::string read_ts_str;
  575. Slice read_ts_slice;
  576. if (FLAGS_user_timestamp_size > 0) {
  577. read_ts_str = GetNowNanos();
  578. read_ts_slice = read_ts_str;
  579. read_opts_copy.timestamp = &read_ts_slice;
  580. }
  581. bool read_older_ts = MaybeUseOlderTimestampForPointLookup(
  582. thread, read_ts_str, read_ts_slice, read_opts_copy);
  583. if (fault_fs_guard) {
  584. fault_fs_guard->GetAndResetInjectedThreadLocalErrorCount(
  585. FaultInjectionIOType::kRead);
  586. fault_fs_guard->GetAndResetInjectedThreadLocalErrorCount(
  587. FaultInjectionIOType::kMetadataRead);
  588. SharedState::ignore_read_error = false;
  589. }
  590. const ExpectedValue pre_read_expected_value =
  591. thread->shared->Get(rand_column_families[0], rand_keys[0]);
  592. Status s = db_->Get(read_opts_copy, cfh, key, &from_db);
  593. const ExpectedValue post_read_expected_value =
  594. thread->shared->Get(rand_column_families[0], rand_keys[0]);
  595. int injected_error_count = 0;
  596. if (fault_fs_guard) {
  597. injected_error_count = GetMinInjectedErrorCount(
  598. fault_fs_guard->GetAndResetInjectedThreadLocalErrorCount(
  599. FaultInjectionIOType::kRead),
  600. fault_fs_guard->GetAndResetInjectedThreadLocalErrorCount(
  601. FaultInjectionIOType::kMetadataRead));
  602. if (!SharedState::ignore_read_error && injected_error_count > 0 &&
  603. (s.ok() || s.IsNotFound())) {
  604. // Grab mutex so multiple thread don't try to print the
  605. // stack trace at the same time
  606. MutexLock l(thread->shared->GetMutex());
  607. fprintf(stderr, "Didn't get expected error from Get\n");
  608. fprintf(stderr, "Callstack that injected the fault\n");
  609. fault_fs_guard->PrintInjectedThreadLocalErrorBacktrace(
  610. FaultInjectionIOType::kRead);
  611. fault_fs_guard->PrintInjectedThreadLocalErrorBacktrace(
  612. FaultInjectionIOType::kMetadataRead);
  613. std::terminate();
  614. }
  615. }
  616. if (s.ok()) {
  617. // found case
  618. thread->stats.AddGets(1, 1);
  619. // we only have the latest expected state
  620. if (!FLAGS_skip_verifydb && !read_older_ts) {
  621. if (ExpectedValueHelper::MustHaveNotExisted(pre_read_expected_value,
  622. post_read_expected_value)) {
  623. thread->shared->SetVerificationFailure();
  624. fprintf(stderr,
  625. "error : inconsistent values for key %s (%" PRIi64
  626. "): Get returns %s, "
  627. "but expected state is \"deleted\".\n",
  628. key.ToString(true).c_str(), rand_keys[0],
  629. StringToHex(from_db).c_str());
  630. }
  631. Slice from_db_slice(from_db);
  632. uint32_t value_base_from_db = GetValueBase(from_db_slice);
  633. if (!ExpectedValueHelper::InExpectedValueBaseRange(
  634. value_base_from_db, pre_read_expected_value,
  635. post_read_expected_value)) {
  636. thread->shared->SetVerificationFailure();
  637. fprintf(stderr,
  638. "error : inconsistent values for key %s (%" PRIi64
  639. "): Get returns %s with "
  640. "value base %d that falls out of expected state's value base "
  641. "range.\n",
  642. key.ToString(true).c_str(), rand_keys[0],
  643. StringToHex(from_db).c_str(), value_base_from_db);
  644. }
  645. }
  646. } else if (s.IsNotFound()) {
  647. // not found case
  648. thread->stats.AddGets(1, 0);
  649. if (!FLAGS_skip_verifydb && !read_older_ts) {
  650. if (ExpectedValueHelper::MustHaveExisted(pre_read_expected_value,
  651. post_read_expected_value)) {
  652. thread->shared->SetVerificationFailure();
  653. fprintf(stderr,
  654. "error : inconsistent values for key %s (%" PRIi64
  655. "): expected state has "
  656. "the key, Get() returns NotFound.\n",
  657. key.ToString(true).c_str(), rand_keys[0]);
  658. }
  659. }
  660. } else if (injected_error_count == 0 || !IsErrorInjectedAndRetryable(s)) {
  661. thread->shared->SetVerificationFailure();
  662. fprintf(stderr, "error : Get() returns %s for key: %s (%" PRIi64 ").\n",
  663. s.ToString().c_str(), key.ToString(true).c_str(), rand_keys[0]);
  664. }
  665. return s;
  666. }
  667. std::vector<Status> TestMultiGet(
  668. ThreadState* thread, const ReadOptions& read_opts,
  669. const std::vector<int>& rand_column_families,
  670. const std::vector<int64_t>& rand_keys) override {
  671. size_t num_keys = rand_keys.size();
  672. std::vector<std::string> key_str;
  673. std::vector<Slice> keys;
  674. key_str.reserve(num_keys);
  675. keys.reserve(num_keys);
  676. std::vector<PinnableSlice> values(num_keys);
  677. std::vector<Status> statuses(num_keys);
  678. // When Flags_use_txn is enabled, we also do a read your write check.
  679. std::unordered_map<std::string, ExpectedValue> ryw_expected_values;
  680. SharedState* shared = thread->shared;
  681. assert(shared);
  682. int column_family = rand_column_families[0];
  683. ColumnFamilyHandle* cfh = column_families_[column_family];
  684. bool do_consistency_check = FLAGS_check_multiget_consistency;
  685. ReadOptions readoptionscopy = read_opts;
  686. if (do_consistency_check) {
  687. readoptionscopy.snapshot = db_->GetSnapshot();
  688. }
  689. std::string read_ts_str;
  690. Slice read_ts_slice;
  691. MaybeUseOlderTimestampForPointLookup(thread, read_ts_str, read_ts_slice,
  692. readoptionscopy);
  693. readoptionscopy.rate_limiter_priority =
  694. FLAGS_rate_limit_user_ops ? Env::IO_USER : Env::IO_TOTAL;
  695. // To appease clang analyzer
  696. const bool use_txn = FLAGS_use_txn;
  697. // Create a transaction in order to write some data. The purpose is to
  698. // exercise WriteBatchWithIndex::MultiGetFromBatchAndDB. The transaction
  699. // will be rolled back once MultiGet returns.
  700. std::unique_ptr<Transaction> txn;
  701. if (use_txn) {
  702. // TODO(hx235): test fault injection with MultiGet() with transactions
  703. if (fault_fs_guard) {
  704. fault_fs_guard->DisableThreadLocalErrorInjection(
  705. FaultInjectionIOType::kRead);
  706. fault_fs_guard->DisableThreadLocalErrorInjection(
  707. FaultInjectionIOType::kMetadataRead);
  708. }
  709. WriteOptions wo;
  710. if (FLAGS_rate_limit_auto_wal_flush) {
  711. wo.rate_limiter_priority = Env::IO_USER;
  712. }
  713. Status s = NewTxn(wo, thread, &txn);
  714. if (!s.ok()) {
  715. fprintf(stderr, "NewTxn error: %s\n", s.ToString().c_str());
  716. shared->SafeTerminate();
  717. }
  718. }
  719. for (size_t i = 0; i < num_keys; ++i) {
  720. uint64_t rand_key = rand_keys[i];
  721. key_str.emplace_back(Key(rand_key));
  722. keys.emplace_back(key_str.back());
  723. if (use_txn) {
  724. MaybeAddKeyToTxnForRYW(thread, column_family, rand_key, txn.get(),
  725. ryw_expected_values);
  726. }
  727. }
  728. int injected_error_count = 0;
  729. if (!use_txn) {
  730. if (fault_fs_guard) {
  731. fault_fs_guard->GetAndResetInjectedThreadLocalErrorCount(
  732. FaultInjectionIOType::kRead);
  733. fault_fs_guard->GetAndResetInjectedThreadLocalErrorCount(
  734. FaultInjectionIOType::kMetadataRead);
  735. SharedState::ignore_read_error = false;
  736. }
  737. db_->MultiGet(readoptionscopy, cfh, num_keys, keys.data(), values.data(),
  738. statuses.data());
  739. if (fault_fs_guard) {
  740. injected_error_count = GetMinInjectedErrorCount(
  741. fault_fs_guard->GetAndResetInjectedThreadLocalErrorCount(
  742. FaultInjectionIOType::kRead),
  743. fault_fs_guard->GetAndResetInjectedThreadLocalErrorCount(
  744. FaultInjectionIOType::kMetadataRead));
  745. if (injected_error_count > 0) {
  746. int stat_nok_nfound = 0;
  747. for (const auto& s : statuses) {
  748. if (!s.ok() && !s.IsNotFound()) {
  749. stat_nok_nfound++;
  750. }
  751. }
  752. if (!SharedState::ignore_read_error &&
  753. stat_nok_nfound < injected_error_count) {
  754. // Grab mutex so multiple thread don't try to print the
  755. // stack trace at the same time
  756. MutexLock l(shared->GetMutex());
  757. fprintf(stderr, "Didn't get expected error from MultiGet. \n");
  758. fprintf(stderr,
  759. "num_keys %zu Expected %d errors, seen at least %d\n",
  760. num_keys, injected_error_count, stat_nok_nfound);
  761. fprintf(stderr, "Callstack that injected the fault\n");
  762. fault_fs_guard->PrintInjectedThreadLocalErrorBacktrace(
  763. FaultInjectionIOType::kRead);
  764. fault_fs_guard->PrintInjectedThreadLocalErrorBacktrace(
  765. FaultInjectionIOType::kMetadataRead);
  766. std::terminate();
  767. }
  768. }
  769. }
  770. } else {
  771. assert(txn);
  772. txn->MultiGet(readoptionscopy, cfh, num_keys, keys.data(), values.data(),
  773. statuses.data());
  774. }
  775. auto ryw_check =
  776. [](const Slice& key, const PinnableSlice& value, const Status& s,
  777. const std::optional<ExpectedValue>& ryw_expected_value) -> bool {
  778. if (!ryw_expected_value.has_value()) {
  779. return true;
  780. }
  781. const ExpectedValue& expected = ryw_expected_value.value();
  782. char expected_value[100];
  783. if (s.ok() &&
  784. ExpectedValueHelper::MustHaveNotExisted(expected, expected)) {
  785. fprintf(stderr,
  786. "MultiGet returned value different from what was "
  787. "written for key %s\n",
  788. key.ToString(true).c_str());
  789. fprintf(stderr,
  790. "MultiGet returned ok, transaction has non-committed "
  791. "delete.\n");
  792. return false;
  793. } else if (s.IsNotFound() &&
  794. ExpectedValueHelper::MustHaveExisted(expected, expected)) {
  795. fprintf(stderr,
  796. "MultiGet returned value different from what was "
  797. "written for key %s\n",
  798. key.ToString(true).c_str());
  799. fprintf(stderr,
  800. "MultiGet returned not found, transaction has "
  801. "non-committed value.\n");
  802. return false;
  803. } else if (s.ok() &&
  804. ExpectedValueHelper::MustHaveExisted(expected, expected)) {
  805. Slice from_txn_slice(value);
  806. size_t sz = GenerateValue(expected.GetValueBase(), expected_value,
  807. sizeof(expected_value));
  808. Slice expected_value_slice(expected_value, sz);
  809. if (expected_value_slice.compare(from_txn_slice) == 0) {
  810. return true;
  811. }
  812. fprintf(stderr,
  813. "MultiGet returned value different from what was "
  814. "written for key %s\n",
  815. key.ToString(true /* hex */).c_str());
  816. fprintf(stderr, "MultiGet returned value %s\n",
  817. from_txn_slice.ToString(true /* hex */).c_str());
  818. fprintf(stderr, "Transaction has non-committed value %s\n",
  819. expected_value_slice.ToString(true /* hex */).c_str());
  820. return false;
  821. }
  822. return true;
  823. };
  824. auto check_multiget =
  825. [&](const Slice& key, const PinnableSlice& expected_value,
  826. const Status& s,
  827. const std::optional<ExpectedValue>& ryw_expected_value) -> bool {
  828. // Temporarily disable error injection for verification
  829. if (fault_fs_guard) {
  830. fault_fs_guard->DisableThreadLocalErrorInjection(
  831. FaultInjectionIOType::kRead);
  832. fault_fs_guard->DisableThreadLocalErrorInjection(
  833. FaultInjectionIOType::kMetadataRead);
  834. }
  835. bool check_multiget_res = true;
  836. bool is_consistent = true;
  837. bool is_ryw_correct = true;
  838. // If test does not use transaction, the consistency check for each key
  839. // included check results from db `Get` and db `MultiGet` are consistent.
  840. // If test use transaction, after consistency check, also do a read your
  841. // own write check.
  842. Status tmp_s;
  843. std::string value;
  844. if (use_txn) {
  845. assert(txn);
  846. ThreadStatusUtil::SetThreadOperation(
  847. ThreadStatus::OperationType::OP_GET);
  848. tmp_s = txn->Get(readoptionscopy, cfh, key, &value);
  849. ThreadStatusUtil::SetThreadOperation(
  850. ThreadStatus::OperationType::OP_MULTIGET);
  851. } else {
  852. ThreadStatusUtil::SetThreadOperation(
  853. ThreadStatus::OperationType::OP_GET);
  854. tmp_s = db_->Get(readoptionscopy, cfh, key, &value);
  855. ThreadStatusUtil::SetThreadOperation(
  856. ThreadStatus::OperationType::OP_MULTIGET);
  857. }
  858. if (!tmp_s.ok() && !tmp_s.IsNotFound()) {
  859. fprintf(stderr, "Get error: %s\n", s.ToString().c_str());
  860. is_consistent = false;
  861. } else if (!s.ok() && tmp_s.ok()) {
  862. fprintf(stderr,
  863. "MultiGet(%d) returned different results with key %s. "
  864. "Snapshot Seq No: %" PRIu64 "\n",
  865. column_family, key.ToString(true).c_str(),
  866. readoptionscopy.snapshot->GetSequenceNumber());
  867. fprintf(stderr, "Get returned ok, MultiGet returned not found\n");
  868. is_consistent = false;
  869. } else if (s.ok() && tmp_s.IsNotFound()) {
  870. fprintf(stderr,
  871. "MultiGet(%d) returned different results with key %s. "
  872. "Snapshot Seq No: %" PRIu64 "\n",
  873. column_family, key.ToString(true).c_str(),
  874. readoptionscopy.snapshot->GetSequenceNumber());
  875. fprintf(stderr, "MultiGet returned ok, Get returned not found\n");
  876. is_consistent = false;
  877. } else if (s.ok() && value != expected_value.ToString()) {
  878. fprintf(stderr,
  879. "MultiGet(%d) returned different results with key %s. "
  880. "Snapshot Seq No: %" PRIu64 "\n",
  881. column_family, key.ToString(true).c_str(),
  882. readoptionscopy.snapshot->GetSequenceNumber());
  883. fprintf(stderr, "MultiGet returned value %s\n",
  884. expected_value.ToString(true).c_str());
  885. fprintf(stderr, "Get returned value %s\n",
  886. Slice(value).ToString(true /* hex */).c_str());
  887. is_consistent = false;
  888. }
  889. // If test uses transaction, continue to do a read your own write check.
  890. if (is_consistent && use_txn) {
  891. is_ryw_correct = ryw_check(key, expected_value, s, ryw_expected_value);
  892. }
  893. if (!is_consistent) {
  894. fprintf(stderr, "TestMultiGet error: is_consistent is false\n");
  895. thread->stats.AddErrors(1);
  896. check_multiget_res = false;
  897. // Fail fast to preserve the DB state
  898. shared->SetVerificationFailure();
  899. } else if (!is_ryw_correct) {
  900. fprintf(stderr, "TestMultiGet error: is_ryw_correct is false\n");
  901. thread->stats.AddErrors(1);
  902. check_multiget_res = false;
  903. // Fail fast to preserve the DB state
  904. shared->SetVerificationFailure();
  905. } else if (s.ok()) {
  906. // found case
  907. thread->stats.AddGets(1, 1);
  908. } else if (s.IsNotFound()) {
  909. // not found case
  910. thread->stats.AddGets(1, 0);
  911. } else if (s.IsMergeInProgress() && use_txn) {
  912. // With txn this is sometimes expected.
  913. thread->stats.AddGets(1, 1);
  914. } else if (injected_error_count == 0 || !IsErrorInjectedAndRetryable(s)) {
  915. fprintf(stderr, "MultiGet error: %s\n", s.ToString().c_str());
  916. thread->stats.AddErrors(1);
  917. shared->SetVerificationFailure();
  918. }
  919. // Enable back error injection disbled for checking results
  920. if (fault_fs_guard) {
  921. fault_fs_guard->DisableThreadLocalErrorInjection(
  922. FaultInjectionIOType::kRead);
  923. fault_fs_guard->DisableThreadLocalErrorInjection(
  924. FaultInjectionIOType::kMetadataRead);
  925. }
  926. return check_multiget_res;
  927. };
  928. // Consistency check
  929. if (do_consistency_check && injected_error_count == 0) {
  930. size_t num_of_keys = keys.size();
  931. assert(values.size() == num_of_keys);
  932. assert(statuses.size() == num_of_keys);
  933. for (size_t i = 0; i < num_of_keys; ++i) {
  934. bool check_result = true;
  935. if (use_txn) {
  936. std::optional<ExpectedValue> ryw_expected_value;
  937. const auto it = ryw_expected_values.find(key_str[i]);
  938. if (it != ryw_expected_values.end()) {
  939. ryw_expected_value = it->second;
  940. }
  941. check_result = check_multiget(keys[i], values[i], statuses[i],
  942. ryw_expected_value);
  943. } else {
  944. check_result = check_multiget(keys[i], values[i], statuses[i],
  945. std::nullopt /* ryw_expected_value */);
  946. }
  947. if (!check_result) {
  948. break;
  949. }
  950. }
  951. }
  952. if (readoptionscopy.snapshot) {
  953. db_->ReleaseSnapshot(readoptionscopy.snapshot);
  954. }
  955. if (use_txn) {
  956. txn->Rollback().PermitUncheckedError();
  957. // Enable back error injection disbled for transactions
  958. if (fault_fs_guard) {
  959. fault_fs_guard->EnableThreadLocalErrorInjection(
  960. FaultInjectionIOType::kRead);
  961. fault_fs_guard->EnableThreadLocalErrorInjection(
  962. FaultInjectionIOType::kMetadataRead);
  963. }
  964. }
  965. return statuses;
  966. }
  967. void TestGetEntity(ThreadState* thread, const ReadOptions& read_opts,
  968. const std::vector<int>& rand_column_families,
  969. const std::vector<int64_t>& rand_keys) override {
  970. assert(thread);
  971. SharedState* const shared = thread->shared;
  972. assert(shared);
  973. assert(!rand_column_families.empty());
  974. const int column_family = rand_column_families[0];
  975. assert(column_family >= 0);
  976. assert(column_family < static_cast<int>(column_families_.size()));
  977. ColumnFamilyHandle* const cfh = column_families_[column_family];
  978. assert(cfh);
  979. assert(!rand_keys.empty());
  980. const int64_t key = rand_keys[0];
  981. const std::string key_str = Key(key);
  982. PinnableWideColumns columns_from_db;
  983. PinnableAttributeGroups attribute_groups_from_db;
  984. ReadOptions read_opts_copy = read_opts;
  985. std::string read_ts_str;
  986. Slice read_ts_slice;
  987. if (FLAGS_user_timestamp_size > 0) {
  988. read_ts_str = GetNowNanos();
  989. read_ts_slice = read_ts_str;
  990. read_opts_copy.timestamp = &read_ts_slice;
  991. }
  992. const bool read_older_ts = MaybeUseOlderTimestampForPointLookup(
  993. thread, read_ts_str, read_ts_slice, read_opts_copy);
  994. const ExpectedValue pre_read_expected_value =
  995. thread->shared->Get(column_family, key);
  996. if (fault_fs_guard) {
  997. fault_fs_guard->GetAndResetInjectedThreadLocalErrorCount(
  998. FaultInjectionIOType::kRead);
  999. fault_fs_guard->GetAndResetInjectedThreadLocalErrorCount(
  1000. FaultInjectionIOType::kMetadataRead);
  1001. SharedState::ignore_read_error = false;
  1002. }
  1003. Status s;
  1004. if (FLAGS_use_attribute_group) {
  1005. attribute_groups_from_db.emplace_back(cfh);
  1006. s = db_->GetEntity(read_opts_copy, key_str, &attribute_groups_from_db);
  1007. if (s.ok()) {
  1008. s = attribute_groups_from_db.back().status();
  1009. }
  1010. } else {
  1011. s = db_->GetEntity(read_opts_copy, cfh, key_str, &columns_from_db);
  1012. }
  1013. const ExpectedValue post_read_expected_value =
  1014. thread->shared->Get(column_family, key);
  1015. int injected_error_count = 0;
  1016. if (fault_fs_guard) {
  1017. injected_error_count = GetMinInjectedErrorCount(
  1018. fault_fs_guard->GetAndResetInjectedThreadLocalErrorCount(
  1019. FaultInjectionIOType::kRead),
  1020. fault_fs_guard->GetAndResetInjectedThreadLocalErrorCount(
  1021. FaultInjectionIOType::kMetadataRead));
  1022. if (!SharedState::ignore_read_error && injected_error_count > 0 &&
  1023. (s.ok() || s.IsNotFound())) {
  1024. // Grab mutex so multiple thread don't try to print the
  1025. // stack trace at the same time
  1026. MutexLock l(thread->shared->GetMutex());
  1027. fprintf(stderr, "Didn't get expected error from GetEntity\n");
  1028. fprintf(stderr, "Callstack that injected the fault\n");
  1029. fault_fs_guard->PrintInjectedThreadLocalErrorBacktrace(
  1030. FaultInjectionIOType::kRead);
  1031. fault_fs_guard->PrintInjectedThreadLocalErrorBacktrace(
  1032. FaultInjectionIOType::kMetadataRead);
  1033. std::terminate();
  1034. }
  1035. }
  1036. if (s.ok()) {
  1037. thread->stats.AddGets(1, 1);
  1038. if (!FLAGS_skip_verifydb && !read_older_ts) {
  1039. if (FLAGS_use_attribute_group) {
  1040. assert(!attribute_groups_from_db.empty());
  1041. }
  1042. const WideColumns& columns =
  1043. FLAGS_use_attribute_group
  1044. ? attribute_groups_from_db.back().columns()
  1045. : columns_from_db.columns();
  1046. if (!VerifyWideColumns(columns)) {
  1047. shared->SetVerificationFailure();
  1048. fprintf(stderr,
  1049. "error : inconsistent columns returned by GetEntity for key "
  1050. "%s (%" PRIi64 "): %s\n",
  1051. StringToHex(key_str).c_str(), rand_keys[0],
  1052. WideColumnsToHex(columns).c_str());
  1053. } else if (ExpectedValueHelper::MustHaveNotExisted(
  1054. pre_read_expected_value, post_read_expected_value)) {
  1055. shared->SetVerificationFailure();
  1056. fprintf(stderr,
  1057. "error : inconsistent values for key %s (%" PRIi64
  1058. "): GetEntity returns %s, "
  1059. "expected state does not have the key.\n",
  1060. StringToHex(key_str).c_str(), rand_keys[0],
  1061. WideColumnsToHex(columns).c_str());
  1062. } else {
  1063. const uint32_t value_base_from_db =
  1064. GetValueBase(WideColumnsHelper::GetDefaultColumn(columns));
  1065. if (!ExpectedValueHelper::InExpectedValueBaseRange(
  1066. value_base_from_db, pre_read_expected_value,
  1067. post_read_expected_value)) {
  1068. shared->SetVerificationFailure();
  1069. fprintf(
  1070. stderr,
  1071. "error : inconsistent values for key %s (%" PRIi64
  1072. "): GetEntity returns %s "
  1073. "with value base %d that falls out of expected state's value "
  1074. "base range.\n",
  1075. StringToHex(key_str).c_str(), rand_keys[0],
  1076. WideColumnsToHex(columns).c_str(), value_base_from_db);
  1077. }
  1078. }
  1079. }
  1080. } else if (s.IsNotFound()) {
  1081. thread->stats.AddGets(1, 0);
  1082. if (!FLAGS_skip_verifydb && !read_older_ts) {
  1083. if (ExpectedValueHelper::MustHaveExisted(pre_read_expected_value,
  1084. post_read_expected_value)) {
  1085. shared->SetVerificationFailure();
  1086. fprintf(stderr,
  1087. "error : inconsistent values for key %s (%" PRIi64
  1088. "): expected state has "
  1089. "the key, GetEntity returns NotFound.\n",
  1090. StringToHex(key_str).c_str(), rand_keys[0]);
  1091. }
  1092. }
  1093. } else if (injected_error_count == 0 || !IsErrorInjectedAndRetryable(s)) {
  1094. fprintf(stderr,
  1095. "error : GetEntity() returns %s for key: %s (%" PRIi64 ").\n",
  1096. s.ToString().c_str(), StringToHex(key_str).c_str(), rand_keys[0]);
  1097. thread->shared->SetVerificationFailure();
  1098. }
  1099. }
  1100. void TestMultiGetEntity(ThreadState* thread, const ReadOptions& read_opts,
  1101. const std::vector<int>& rand_column_families,
  1102. const std::vector<int64_t>& rand_keys) override {
  1103. assert(thread);
  1104. ManagedSnapshot snapshot_guard(db_);
  1105. ReadOptions read_opts_copy(read_opts);
  1106. read_opts_copy.snapshot = snapshot_guard.snapshot();
  1107. assert(!rand_column_families.empty());
  1108. const int column_family = rand_column_families[0];
  1109. assert(column_family >= 0);
  1110. assert(column_family < static_cast<int>(column_families_.size()));
  1111. ColumnFamilyHandle* const cfh = column_families_[column_family];
  1112. assert(cfh);
  1113. assert(!rand_keys.empty());
  1114. const size_t num_keys = rand_keys.size();
  1115. std::unique_ptr<Transaction> txn;
  1116. if (FLAGS_use_txn) {
  1117. // TODO(hx235): test fault injection with MultiGetEntity() with
  1118. // transactions
  1119. if (fault_fs_guard) {
  1120. fault_fs_guard->DisableThreadLocalErrorInjection(
  1121. FaultInjectionIOType::kRead);
  1122. fault_fs_guard->DisableThreadLocalErrorInjection(
  1123. FaultInjectionIOType::kMetadataRead);
  1124. }
  1125. WriteOptions write_options;
  1126. if (FLAGS_rate_limit_auto_wal_flush) {
  1127. write_options.rate_limiter_priority = Env::IO_USER;
  1128. }
  1129. const Status s = NewTxn(write_options, thread, &txn);
  1130. if (!s.ok()) {
  1131. fprintf(stderr, "NewTxn error: %s\n", s.ToString().c_str());
  1132. thread->shared->SafeTerminate();
  1133. }
  1134. }
  1135. std::vector<std::string> keys(num_keys);
  1136. std::vector<Slice> key_slices(num_keys);
  1137. std::unordered_map<std::string, ExpectedValue> ryw_expected_values;
  1138. for (size_t i = 0; i < num_keys; ++i) {
  1139. const int64_t key = rand_keys[i];
  1140. keys[i] = Key(key);
  1141. key_slices[i] = keys[i];
  1142. if (FLAGS_use_txn) {
  1143. MaybeAddKeyToTxnForRYW(thread, column_family, key, txn.get(),
  1144. ryw_expected_values);
  1145. }
  1146. }
  1147. int injected_error_count = 0;
  1148. auto verify_expected_errors = [&](auto get_status) {
  1149. assert(fault_fs_guard);
  1150. injected_error_count = GetMinInjectedErrorCount(
  1151. fault_fs_guard->GetAndResetInjectedThreadLocalErrorCount(
  1152. FaultInjectionIOType::kRead),
  1153. fault_fs_guard->GetAndResetInjectedThreadLocalErrorCount(
  1154. FaultInjectionIOType::kMetadataRead));
  1155. if (injected_error_count) {
  1156. int stat_nok_nfound = 0;
  1157. for (size_t i = 0; i < num_keys; ++i) {
  1158. const Status& s = get_status(i);
  1159. if (!s.ok() && !s.IsNotFound()) {
  1160. ++stat_nok_nfound;
  1161. }
  1162. }
  1163. if (!SharedState::ignore_read_error &&
  1164. stat_nok_nfound < injected_error_count) {
  1165. // Grab mutex so multiple threads don't try to print the
  1166. // stack trace at the same time
  1167. assert(thread->shared);
  1168. MutexLock l(thread->shared->GetMutex());
  1169. fprintf(stderr, "Didn't get expected error from MultiGetEntity\n");
  1170. fprintf(stderr, "num_keys %zu Expected %d errors, seen %d\n",
  1171. num_keys, injected_error_count, stat_nok_nfound);
  1172. fprintf(stderr, "Call stack that injected the fault\n");
  1173. fault_fs_guard->PrintInjectedThreadLocalErrorBacktrace(
  1174. FaultInjectionIOType::kRead);
  1175. fault_fs_guard->PrintInjectedThreadLocalErrorBacktrace(
  1176. FaultInjectionIOType::kMetadataRead);
  1177. std::terminate();
  1178. }
  1179. }
  1180. };
  1181. auto check_results = [&](auto get_columns, auto get_status,
  1182. auto do_extra_check, auto call_get_entity) {
  1183. // Temporarily disable error injection for checking results
  1184. if (fault_fs_guard) {
  1185. fault_fs_guard->DisableThreadLocalErrorInjection(
  1186. FaultInjectionIOType::kRead);
  1187. fault_fs_guard->DisableThreadLocalErrorInjection(
  1188. FaultInjectionIOType::kMetadataRead);
  1189. }
  1190. const bool check_get_entity =
  1191. !injected_error_count && FLAGS_check_multiget_entity_consistency;
  1192. for (size_t i = 0; i < num_keys; ++i) {
  1193. const WideColumns& columns = get_columns(i);
  1194. const Status& s = get_status(i);
  1195. bool is_consistent = true;
  1196. if (s.ok() && !VerifyWideColumns(columns)) {
  1197. fprintf(
  1198. stderr,
  1199. "error : inconsistent columns returned by MultiGetEntity for key "
  1200. "%s: %s\n",
  1201. StringToHex(keys[i]).c_str(), WideColumnsToHex(columns).c_str());
  1202. is_consistent = false;
  1203. } else if (s.ok() || s.IsNotFound()) {
  1204. if (!do_extra_check(keys[i], columns, s)) {
  1205. is_consistent = false;
  1206. } else if (check_get_entity) {
  1207. PinnableWideColumns cmp_result;
  1208. ThreadStatusUtil::SetThreadOperation(
  1209. ThreadStatus::OperationType::OP_GETENTITY);
  1210. const Status cmp_s = call_get_entity(key_slices[i], &cmp_result);
  1211. if (!cmp_s.ok() && !cmp_s.IsNotFound()) {
  1212. fprintf(stderr, "GetEntity error: %s\n",
  1213. cmp_s.ToString().c_str());
  1214. is_consistent = false;
  1215. } else if (cmp_s.IsNotFound()) {
  1216. if (s.ok()) {
  1217. fprintf(
  1218. stderr,
  1219. "Inconsistent results for key %s: MultiGetEntity returned "
  1220. "ok, GetEntity returned not found\n",
  1221. StringToHex(keys[i]).c_str());
  1222. is_consistent = false;
  1223. }
  1224. } else {
  1225. assert(cmp_s.ok());
  1226. if (s.IsNotFound()) {
  1227. fprintf(
  1228. stderr,
  1229. "Inconsistent results for key %s: MultiGetEntity returned "
  1230. "not found, GetEntity returned ok\n",
  1231. StringToHex(keys[i]).c_str());
  1232. is_consistent = false;
  1233. } else {
  1234. assert(s.ok());
  1235. const WideColumns& cmp_columns = cmp_result.columns();
  1236. if (columns != cmp_columns) {
  1237. fprintf(stderr,
  1238. "Inconsistent results for key %s: MultiGetEntity "
  1239. "returned "
  1240. "%s, GetEntity returned %s\n",
  1241. StringToHex(keys[i]).c_str(),
  1242. WideColumnsToHex(columns).c_str(),
  1243. WideColumnsToHex(cmp_columns).c_str());
  1244. is_consistent = false;
  1245. }
  1246. }
  1247. }
  1248. }
  1249. }
  1250. if (!is_consistent) {
  1251. fprintf(stderr,
  1252. "TestMultiGetEntity error: results are not consistent\n");
  1253. thread->stats.AddErrors(1);
  1254. // Fail fast to preserve the DB state
  1255. thread->shared->SetVerificationFailure();
  1256. break;
  1257. } else if (s.ok()) {
  1258. thread->stats.AddGets(1, 1);
  1259. } else if (s.IsNotFound()) {
  1260. thread->stats.AddGets(1, 0);
  1261. } else if (injected_error_count == 0 ||
  1262. !IsErrorInjectedAndRetryable(s)) {
  1263. fprintf(stderr, "MultiGetEntity error: %s\n", s.ToString().c_str());
  1264. thread->stats.AddErrors(1);
  1265. thread->shared->SetVerificationFailure();
  1266. }
  1267. }
  1268. // Enable back error injection disbled for checking results
  1269. if (fault_fs_guard) {
  1270. fault_fs_guard->EnableThreadLocalErrorInjection(
  1271. FaultInjectionIOType::kRead);
  1272. fault_fs_guard->EnableThreadLocalErrorInjection(
  1273. FaultInjectionIOType::kMetadataRead);
  1274. }
  1275. };
  1276. if (FLAGS_use_txn) {
  1277. // Transactional/read-your-own-writes MultiGetEntity verification
  1278. std::vector<PinnableWideColumns> results(num_keys);
  1279. std::vector<Status> statuses(num_keys);
  1280. assert(txn);
  1281. txn->MultiGetEntity(read_opts_copy, cfh, num_keys, key_slices.data(),
  1282. results.data(), statuses.data());
  1283. auto ryw_check = [&](const std::string& key, const WideColumns& columns,
  1284. const Status& s) -> bool {
  1285. const auto it = ryw_expected_values.find(key);
  1286. if (it == ryw_expected_values.end()) {
  1287. return true;
  1288. }
  1289. const auto& ryw_expected_value = it->second;
  1290. if (s.ok()) {
  1291. if (ryw_expected_value.IsDeleted()) {
  1292. fprintf(
  1293. stderr,
  1294. "MultiGetEntity failed the read-your-own-write check for key "
  1295. "%s\n",
  1296. Slice(key).ToString(true).c_str());
  1297. fprintf(stderr,
  1298. "MultiGetEntity returned ok, transaction has non-committed "
  1299. "delete\n");
  1300. return false;
  1301. } else {
  1302. const uint32_t value_base = ryw_expected_value.GetValueBase();
  1303. char expected_value[100];
  1304. const size_t sz = GenerateValue(value_base, expected_value,
  1305. sizeof(expected_value));
  1306. const Slice expected_slice(expected_value, sz);
  1307. const WideColumns expected_columns =
  1308. GenerateExpectedWideColumns(value_base, expected_slice);
  1309. if (columns != expected_columns) {
  1310. fprintf(
  1311. stderr,
  1312. "MultiGetEntity failed the read-your-own-write check for key "
  1313. "%s\n",
  1314. Slice(key).ToString(true).c_str());
  1315. fprintf(stderr, "MultiGetEntity returned %s\n",
  1316. WideColumnsToHex(columns).c_str());
  1317. fprintf(stderr, "Transaction has non-committed write %s\n",
  1318. WideColumnsToHex(expected_columns).c_str());
  1319. return false;
  1320. }
  1321. return true;
  1322. }
  1323. }
  1324. assert(s.IsNotFound());
  1325. if (!ryw_expected_value.IsDeleted()) {
  1326. fprintf(stderr,
  1327. "MultiGetEntity failed the read-your-own-write check for key "
  1328. "%s\n",
  1329. Slice(key).ToString(true).c_str());
  1330. fprintf(stderr,
  1331. "MultiGetEntity returned not found, transaction has "
  1332. "non-committed write\n");
  1333. return false;
  1334. }
  1335. return true;
  1336. };
  1337. check_results([&](size_t i) { return results[i].columns(); },
  1338. [&](size_t i) { return statuses[i]; }, ryw_check,
  1339. [&](const Slice& key, PinnableWideColumns* result) {
  1340. return txn->GetEntity(read_opts_copy, cfh, key, result);
  1341. });
  1342. txn->Rollback().PermitUncheckedError();
  1343. // Enable back error injection disbled for transactions
  1344. if (fault_fs_guard) {
  1345. fault_fs_guard->EnableThreadLocalErrorInjection(
  1346. FaultInjectionIOType::kRead);
  1347. fault_fs_guard->EnableThreadLocalErrorInjection(
  1348. FaultInjectionIOType::kMetadataRead);
  1349. }
  1350. } else if (FLAGS_use_attribute_group) {
  1351. // AttributeGroup MultiGetEntity verification
  1352. if (fault_fs_guard) {
  1353. fault_fs_guard->GetAndResetInjectedThreadLocalErrorCount(
  1354. FaultInjectionIOType::kRead);
  1355. fault_fs_guard->GetAndResetInjectedThreadLocalErrorCount(
  1356. FaultInjectionIOType::kMetadataRead);
  1357. SharedState::ignore_read_error = false;
  1358. }
  1359. std::vector<PinnableAttributeGroups> results;
  1360. results.reserve(num_keys);
  1361. for (size_t i = 0; i < num_keys; ++i) {
  1362. PinnableAttributeGroups attribute_groups;
  1363. attribute_groups.emplace_back(cfh);
  1364. results.emplace_back(std::move(attribute_groups));
  1365. }
  1366. db_->MultiGetEntity(read_opts_copy, num_keys, key_slices.data(),
  1367. results.data());
  1368. if (fault_fs_guard) {
  1369. verify_expected_errors(
  1370. [&](size_t i) { return results[i][0].status(); });
  1371. }
  1372. // Compare against non-attribute-group GetEntity result
  1373. check_results([&](size_t i) { return results[i][0].columns(); },
  1374. [&](size_t i) { return results[i][0].status(); },
  1375. [](const Slice& /* key */, const WideColumns& /* columns */,
  1376. const Status& /* s */) { return true; },
  1377. [&](const Slice& key, PinnableWideColumns* result) {
  1378. return db_->GetEntity(read_opts_copy, cfh, key, result);
  1379. });
  1380. } else {
  1381. // Non-AttributeGroup MultiGetEntity verification
  1382. if (fault_fs_guard) {
  1383. fault_fs_guard->GetAndResetInjectedThreadLocalErrorCount(
  1384. FaultInjectionIOType::kRead);
  1385. fault_fs_guard->GetAndResetInjectedThreadLocalErrorCount(
  1386. FaultInjectionIOType::kMetadataRead);
  1387. SharedState::ignore_read_error = false;
  1388. }
  1389. std::vector<PinnableWideColumns> results(num_keys);
  1390. std::vector<Status> statuses(num_keys);
  1391. db_->MultiGetEntity(read_opts_copy, cfh, num_keys, key_slices.data(),
  1392. results.data(), statuses.data());
  1393. if (fault_fs_guard) {
  1394. verify_expected_errors([&](size_t i) { return statuses[i]; });
  1395. }
  1396. check_results([&](size_t i) { return results[i].columns(); },
  1397. [&](size_t i) { return statuses[i]; },
  1398. [](const Slice& /* key */, const WideColumns& /* columns */,
  1399. const Status& /* s */) { return true; },
  1400. [&](const Slice& key, PinnableWideColumns* result) {
  1401. return db_->GetEntity(read_opts_copy, cfh, key, result);
  1402. });
  1403. }
  1404. }
  1405. Status TestPrefixScan(ThreadState* thread, const ReadOptions& read_opts,
  1406. const std::vector<int>& rand_column_families,
  1407. const std::vector<int64_t>& rand_keys) override {
  1408. assert(!rand_column_families.empty());
  1409. assert(!rand_keys.empty());
  1410. ColumnFamilyHandle* const cfh = column_families_[rand_column_families[0]];
  1411. assert(cfh);
  1412. const std::string key = Key(rand_keys[0]);
  1413. const Slice prefix(key.data(), FLAGS_prefix_size);
  1414. std::string upper_bound;
  1415. Slice ub_slice;
  1416. ReadOptions ro_copy = read_opts;
  1417. // Randomly test with `iterate_upper_bound` and `prefix_same_as_start`
  1418. //
  1419. // Get the next prefix first and then see if we want to set it to be the
  1420. // upper bound. We'll use the next prefix in an assertion later on
  1421. if (GetNextPrefix(prefix, &upper_bound) && thread->rand.OneIn(2)) {
  1422. // For half of the time, set the upper bound to the next prefix
  1423. ub_slice = Slice(upper_bound);
  1424. ro_copy.iterate_upper_bound = &ub_slice;
  1425. if (FLAGS_use_sqfc_for_range_queries) {
  1426. ro_copy.table_filter =
  1427. sqfc_factory_->GetTableFilterForRangeQuery(prefix, ub_slice);
  1428. }
  1429. } else if (options_.prefix_extractor && thread->rand.OneIn(2)) {
  1430. ro_copy.prefix_same_as_start = true;
  1431. }
  1432. std::string read_ts_str;
  1433. Slice read_ts_slice;
  1434. MaybeUseOlderTimestampForRangeScan(thread, read_ts_str, read_ts_slice,
  1435. ro_copy);
  1436. if (fault_fs_guard) {
  1437. fault_fs_guard->GetAndResetInjectedThreadLocalErrorCount(
  1438. FaultInjectionIOType::kRead);
  1439. fault_fs_guard->GetAndResetInjectedThreadLocalErrorCount(
  1440. FaultInjectionIOType::kMetadataRead);
  1441. SharedState::ignore_read_error = false;
  1442. }
  1443. std::unique_ptr<ManagedSnapshot> snapshot = nullptr;
  1444. if (ro_copy.snapshot == nullptr &&
  1445. ro_copy.auto_refresh_iterator_with_snapshot) {
  1446. snapshot = std::make_unique<ManagedSnapshot>(db_);
  1447. ro_copy.snapshot = snapshot->snapshot();
  1448. }
  1449. std::unique_ptr<Iterator> iter(db_->NewIterator(ro_copy, cfh));
  1450. uint64_t count = 0;
  1451. Status s;
  1452. for (iter->Seek(prefix); iter->Valid(); iter->Next()) {
  1453. // If upper or prefix bounds is specified, only keys of the target
  1454. // prefix should show up. Otherwise, we need to manual exit the loop when
  1455. // we see the first key that is not in the target prefix show up.
  1456. if (ro_copy.iterate_upper_bound != nullptr ||
  1457. ro_copy.prefix_same_as_start) {
  1458. assert(iter->key().starts_with(prefix));
  1459. } else if (!iter->key().starts_with(prefix)) {
  1460. break;
  1461. }
  1462. ++count;
  1463. // When iter_start_ts is set, iterator exposes internal keys, including
  1464. // tombstones; however, we want to perform column validation only for
  1465. // value-like types.
  1466. if (ro_copy.iter_start_ts) {
  1467. const ValueType value_type = ExtractValueType(iter->key());
  1468. if (value_type != kTypeValue && value_type != kTypeBlobIndex &&
  1469. value_type != kTypeWideColumnEntity) {
  1470. continue;
  1471. }
  1472. }
  1473. if (ro_copy.allow_unprepared_value) {
  1474. if (!iter->PrepareValue()) {
  1475. s = iter->status();
  1476. break;
  1477. }
  1478. }
  1479. if (!VerifyWideColumns(iter->value(), iter->columns())) {
  1480. s = Status::Corruption("Value and columns inconsistent",
  1481. DebugString(iter->value(), iter->columns()));
  1482. break;
  1483. }
  1484. }
  1485. if (ro_copy.iter_start_ts == nullptr) {
  1486. assert(count <= GetPrefixKeyCount(prefix.ToString(), upper_bound));
  1487. }
  1488. if (s.ok()) {
  1489. s = iter->status();
  1490. }
  1491. int injected_error_count = 0;
  1492. if (fault_fs_guard) {
  1493. injected_error_count = GetMinInjectedErrorCount(
  1494. fault_fs_guard->GetAndResetInjectedThreadLocalErrorCount(
  1495. FaultInjectionIOType::kRead),
  1496. fault_fs_guard->GetAndResetInjectedThreadLocalErrorCount(
  1497. FaultInjectionIOType::kMetadataRead));
  1498. if (!SharedState::ignore_read_error && injected_error_count > 0 &&
  1499. s.ok()) {
  1500. // Grab mutex so multiple thread don't try to print the
  1501. // stack trace at the same time
  1502. MutexLock l(thread->shared->GetMutex());
  1503. fprintf(stderr, "Didn't get expected error from PrefixScan\n");
  1504. fprintf(stderr, "Callstack that injected the fault\n");
  1505. fault_fs_guard->PrintInjectedThreadLocalErrorBacktrace(
  1506. FaultInjectionIOType::kRead);
  1507. fault_fs_guard->PrintInjectedThreadLocalErrorBacktrace(
  1508. FaultInjectionIOType::kMetadataRead);
  1509. std::terminate();
  1510. }
  1511. }
  1512. if (s.ok()) {
  1513. thread->stats.AddPrefixes(1, count);
  1514. } else if (injected_error_count == 0 || !IsErrorInjectedAndRetryable(s)) {
  1515. fprintf(stderr,
  1516. "TestPrefixScan error: %s with ReadOptions::iterate_upper_bound: "
  1517. "%s, prefix_same_as_start: %s \n",
  1518. s.ToString().c_str(),
  1519. ro_copy.iterate_upper_bound
  1520. ? ro_copy.iterate_upper_bound->ToString(true).c_str()
  1521. : "nullptr",
  1522. ro_copy.prefix_same_as_start ? "true" : "false");
  1523. thread->shared->SetVerificationFailure();
  1524. }
  1525. return s;
  1526. }
  1527. Status TestPut(ThreadState* thread, WriteOptions& write_opts,
  1528. const ReadOptions& read_opts,
  1529. const std::vector<int>& rand_column_families,
  1530. const std::vector<int64_t>& rand_keys,
  1531. char (&value)[100]) override {
  1532. assert(!rand_column_families.empty());
  1533. assert(!rand_keys.empty());
  1534. auto shared = thread->shared;
  1535. assert(shared);
  1536. const int64_t max_key = shared->GetMaxKey();
  1537. int64_t rand_key = rand_keys[0];
  1538. int rand_column_family = rand_column_families[0];
  1539. std::string write_ts;
  1540. std::unique_ptr<MutexLock> lock(
  1541. new MutexLock(shared->GetMutexForKey(rand_column_family, rand_key)));
  1542. while (!shared->AllowsOverwrite(rand_key) &&
  1543. (FLAGS_use_merge || shared->Exists(rand_column_family, rand_key))) {
  1544. lock.reset();
  1545. rand_key = thread->rand.Next() % max_key;
  1546. rand_column_family = thread->rand.Next() % FLAGS_column_families;
  1547. lock.reset(
  1548. new MutexLock(shared->GetMutexForKey(rand_column_family, rand_key)));
  1549. if (FLAGS_user_timestamp_size > 0) {
  1550. write_ts = GetNowNanos();
  1551. }
  1552. }
  1553. if (write_ts.empty() && FLAGS_user_timestamp_size) {
  1554. write_ts = GetNowNanos();
  1555. }
  1556. const std::string k = Key(rand_key);
  1557. ColumnFamilyHandle* const cfh = column_families_[rand_column_family];
  1558. assert(cfh);
  1559. if (FLAGS_verify_before_write) {
  1560. // Temporarily disable error injection for preparation
  1561. if (fault_fs_guard) {
  1562. fault_fs_guard->DisableThreadLocalErrorInjection(
  1563. FaultInjectionIOType::kRead);
  1564. fault_fs_guard->DisableThreadLocalErrorInjection(
  1565. FaultInjectionIOType::kMetadataRead);
  1566. }
  1567. std::string from_db;
  1568. Status s = db_->Get(read_opts, cfh, k, &from_db);
  1569. bool res = VerifyOrSyncValue(
  1570. rand_column_family, rand_key, read_opts, shared,
  1571. /* msg_prefix */ "Pre-Put Get verification", from_db, s);
  1572. // Enable back error injection disabled for preparation
  1573. if (fault_fs_guard) {
  1574. fault_fs_guard->EnableThreadLocalErrorInjection(
  1575. FaultInjectionIOType::kRead);
  1576. fault_fs_guard->EnableThreadLocalErrorInjection(
  1577. FaultInjectionIOType::kMetadataRead);
  1578. }
  1579. if (!res) {
  1580. return s;
  1581. }
  1582. }
  1583. // To track the final write status
  1584. Status s;
  1585. // To track the initial write status
  1586. Status initial_write_s;
  1587. // To track whether WAL write may have succeeded during the initial failed
  1588. // write
  1589. bool initial_wal_write_may_succeed = true;
  1590. bool commit_bypass_memtable = false;
  1591. PendingExpectedValue pending_expected_value =
  1592. shared->PreparePut(rand_column_family, rand_key);
  1593. const uint32_t value_base = pending_expected_value.GetFinalValueBase();
  1594. const size_t sz = GenerateValue(value_base, value, sizeof(value));
  1595. const Slice v(value, sz);
  1596. uint64_t wait_for_recover_start_time = 0;
  1597. do {
  1598. // In order to commit the expected state for the initial write failed with
  1599. // injected retryable error and successful WAL write, retry the write
  1600. // until it succeeds after the recovery finishes
  1601. if (!s.ok() && IsErrorInjectedAndRetryable(s) &&
  1602. initial_wal_write_may_succeed) {
  1603. std::this_thread::sleep_for(std::chrono::microseconds(1 * 1000 * 1000));
  1604. }
  1605. if (FLAGS_use_put_entity_one_in > 0 &&
  1606. (value_base % FLAGS_use_put_entity_one_in) == 0) {
  1607. if (!FLAGS_use_txn) {
  1608. if (FLAGS_use_attribute_group) {
  1609. s = db_->PutEntity(write_opts, k,
  1610. GenerateAttributeGroups({cfh}, value_base, v));
  1611. } else {
  1612. s = db_->PutEntity(write_opts, cfh, k,
  1613. GenerateWideColumns(value_base, v));
  1614. }
  1615. } else {
  1616. s = ExecuteTransaction(write_opts, thread, [&](Transaction& txn) {
  1617. return txn.PutEntity(cfh, k, GenerateWideColumns(value_base, v));
  1618. });
  1619. }
  1620. } else if (FLAGS_use_timed_put_one_in > 0 &&
  1621. ((value_base + kLargePrimeForCommonFactorSkew) %
  1622. FLAGS_use_timed_put_one_in) == 0) {
  1623. WriteBatch wb;
  1624. uint64_t write_unix_time = GetWriteUnixTime(thread);
  1625. s = wb.TimedPut(cfh, k, v, write_unix_time);
  1626. if (s.ok()) {
  1627. s = db_->Write(write_opts, &wb);
  1628. }
  1629. } else if (FLAGS_use_merge) {
  1630. if (!FLAGS_use_txn) {
  1631. if (FLAGS_user_timestamp_size == 0) {
  1632. if (FLAGS_ingest_wbwi_one_in &&
  1633. thread->rand.OneIn(FLAGS_ingest_wbwi_one_in)) {
  1634. auto wbwi = std::make_shared<WriteBatchWithIndex>(
  1635. options_.comparator, 0, /*overwrite_key=*/true);
  1636. s = wbwi->Merge(cfh, k, v);
  1637. if (s.ok()) {
  1638. s = db_->IngestWriteBatchWithIndex(write_opts, wbwi);
  1639. }
  1640. } else {
  1641. s = db_->Merge(write_opts, cfh, k, v);
  1642. }
  1643. } else {
  1644. s = db_->Merge(write_opts, cfh, k, write_ts, v);
  1645. }
  1646. } else {
  1647. s = ExecuteTransaction(write_opts, thread, [&](Transaction& txn) {
  1648. return txn.Merge(cfh, k, v);
  1649. });
  1650. }
  1651. } else {
  1652. if (!FLAGS_use_txn) {
  1653. if (FLAGS_user_timestamp_size == 0) {
  1654. if (FLAGS_ingest_wbwi_one_in &&
  1655. thread->rand.OneIn(FLAGS_ingest_wbwi_one_in)) {
  1656. auto wbwi = std::make_shared<WriteBatchWithIndex>(
  1657. options_.comparator, 0, /*overwrite_key=*/true);
  1658. s = wbwi->Put(cfh, k, v);
  1659. if (s.ok()) {
  1660. s = db_->IngestWriteBatchWithIndex(write_opts, wbwi);
  1661. }
  1662. } else {
  1663. s = db_->Put(write_opts, cfh, k, v);
  1664. }
  1665. } else {
  1666. s = db_->Put(write_opts, cfh, k, write_ts, v);
  1667. }
  1668. } else {
  1669. s = ExecuteTransaction(
  1670. write_opts, thread,
  1671. [&](Transaction& txn) { return txn.Put(cfh, k, v); },
  1672. &commit_bypass_memtable);
  1673. }
  1674. }
  1675. UpdateIfInitialWriteFails(db_stress_env, s, &initial_write_s,
  1676. &initial_wal_write_may_succeed,
  1677. &wait_for_recover_start_time);
  1678. } while (!s.ok() && IsErrorInjectedAndRetryable(s) &&
  1679. initial_wal_write_may_succeed);
  1680. if (!s.ok()) {
  1681. pending_expected_value.Rollback();
  1682. if (IsErrorInjectedAndRetryable(s)) {
  1683. assert(!initial_wal_write_may_succeed);
  1684. return s;
  1685. } else if (FLAGS_inject_error_severity == 2) {
  1686. if (!is_db_stopped_ && s.severity() >= Status::Severity::kFatalError) {
  1687. is_db_stopped_ = true;
  1688. } else if (!is_db_stopped_ ||
  1689. s.severity() < Status::Severity::kFatalError) {
  1690. fprintf(stderr, "put or merge error: %s\n", s.ToString().c_str());
  1691. thread->shared->SafeTerminate();
  1692. }
  1693. } else {
  1694. fprintf(stderr, "put or merge error: %s\n", s.ToString().c_str());
  1695. thread->shared->SafeTerminate();
  1696. }
  1697. } else {
  1698. PrintWriteRecoveryWaitTimeIfNeeded(
  1699. db_stress_env, initial_write_s, initial_wal_write_may_succeed,
  1700. wait_for_recover_start_time, "TestPut");
  1701. pending_expected_value.Commit();
  1702. thread->stats.AddBytesForWrites(1, sz);
  1703. PrintKeyValue(rand_column_family, static_cast<uint32_t>(rand_key), value,
  1704. sz);
  1705. }
  1706. return s;
  1707. }
  1708. Status TestDelete(ThreadState* thread, WriteOptions& write_opts,
  1709. const std::vector<int>& rand_column_families,
  1710. const std::vector<int64_t>& rand_keys) override {
  1711. int64_t rand_key = rand_keys[0];
  1712. int rand_column_family = rand_column_families[0];
  1713. auto shared = thread->shared;
  1714. std::unique_ptr<MutexLock> lock(
  1715. new MutexLock(shared->GetMutexForKey(rand_column_family, rand_key)));
  1716. // OPERATION delete
  1717. std::string write_ts_str = GetNowNanos();
  1718. Slice write_ts = write_ts_str;
  1719. std::string key_str = Key(rand_key);
  1720. Slice key = key_str;
  1721. auto cfh = column_families_[rand_column_family];
  1722. // To track the final write status
  1723. Status s;
  1724. // To track the initial write status
  1725. Status initial_write_s;
  1726. // To track whether WAL write may have succeeded during the initial failed
  1727. // write
  1728. bool initial_wal_write_may_succeed = true;
  1729. bool commit_bypass_memtable = false;
  1730. // Use delete if the key may be overwritten and a single deletion
  1731. // otherwise.
  1732. if (shared->AllowsOverwrite(rand_key)) {
  1733. PendingExpectedValue pending_expected_value =
  1734. shared->PrepareDelete(rand_column_family, rand_key);
  1735. uint64_t wait_for_recover_start_time = 0;
  1736. do {
  1737. // In order to commit the expected state for the initial write failed
  1738. // with injected retryable error and successful WAL write, retry the
  1739. // write until it succeeds after the recovery finishes
  1740. if (!s.ok() && IsErrorInjectedAndRetryable(s) &&
  1741. initial_wal_write_may_succeed) {
  1742. std::this_thread::sleep_for(
  1743. std::chrono::microseconds(1 * 1000 * 1000));
  1744. }
  1745. if (!FLAGS_use_txn) {
  1746. if (FLAGS_user_timestamp_size == 0) {
  1747. if (FLAGS_ingest_wbwi_one_in &&
  1748. thread->rand.OneIn(FLAGS_ingest_wbwi_one_in)) {
  1749. auto wbwi = std::make_shared<WriteBatchWithIndex>(
  1750. options_.comparator, 0, /*overwrite_key=*/true);
  1751. s = wbwi->Delete(cfh, key);
  1752. if (s.ok()) {
  1753. s = db_->IngestWriteBatchWithIndex(write_opts, wbwi);
  1754. }
  1755. } else {
  1756. s = db_->Delete(write_opts, cfh, key);
  1757. }
  1758. } else {
  1759. s = db_->Delete(write_opts, cfh, key, write_ts);
  1760. }
  1761. } else {
  1762. s = ExecuteTransaction(
  1763. write_opts, thread,
  1764. [&](Transaction& txn) { return txn.Delete(cfh, key); },
  1765. &commit_bypass_memtable);
  1766. }
  1767. UpdateIfInitialWriteFails(
  1768. db_stress_env, s, &initial_write_s, &initial_wal_write_may_succeed,
  1769. &wait_for_recover_start_time, commit_bypass_memtable);
  1770. } while (!s.ok() && IsErrorInjectedAndRetryable(s) &&
  1771. initial_wal_write_may_succeed);
  1772. if (!s.ok()) {
  1773. pending_expected_value.Rollback();
  1774. if (IsErrorInjectedAndRetryable(s)) {
  1775. assert(!initial_wal_write_may_succeed);
  1776. return s;
  1777. } else if (FLAGS_inject_error_severity == 2) {
  1778. if (!is_db_stopped_ &&
  1779. s.severity() >= Status::Severity::kFatalError) {
  1780. is_db_stopped_ = true;
  1781. } else if (!is_db_stopped_ ||
  1782. s.severity() < Status::Severity::kFatalError) {
  1783. fprintf(stderr, "delete error: %s\n", s.ToString().c_str());
  1784. thread->shared->SafeTerminate();
  1785. }
  1786. } else {
  1787. fprintf(stderr, "delete error: %s\n", s.ToString().c_str());
  1788. thread->shared->SafeTerminate();
  1789. }
  1790. } else {
  1791. PrintWriteRecoveryWaitTimeIfNeeded(
  1792. db_stress_env, initial_write_s, initial_wal_write_may_succeed,
  1793. wait_for_recover_start_time, "TestDelete");
  1794. pending_expected_value.Commit();
  1795. thread->stats.AddDeletes(1);
  1796. }
  1797. } else {
  1798. PendingExpectedValue pending_expected_value =
  1799. shared->PrepareSingleDelete(rand_column_family, rand_key);
  1800. uint64_t wait_for_recover_start_time = 0;
  1801. do {
  1802. // In order to commit the expected state for the initial write failed
  1803. // with injected retryable error and successful WAL write, retry the
  1804. // write until it succeeds after the recovery finishes
  1805. if (!s.ok() && IsErrorInjectedAndRetryable(s) &&
  1806. initial_wal_write_may_succeed) {
  1807. std::this_thread::sleep_for(
  1808. std::chrono::microseconds(1 * 1000 * 1000));
  1809. }
  1810. if (!FLAGS_use_txn) {
  1811. if (FLAGS_user_timestamp_size == 0) {
  1812. if (FLAGS_ingest_wbwi_one_in &&
  1813. thread->rand.OneIn(FLAGS_ingest_wbwi_one_in)) {
  1814. auto wbwi = std::make_shared<WriteBatchWithIndex>(
  1815. options_.comparator, 0, /*overwrite_key=*/true);
  1816. s = wbwi->SingleDelete(cfh, key);
  1817. if (s.ok()) {
  1818. s = db_->IngestWriteBatchWithIndex(write_opts, wbwi);
  1819. }
  1820. } else {
  1821. s = db_->SingleDelete(write_opts, cfh, key);
  1822. }
  1823. } else {
  1824. s = db_->SingleDelete(write_opts, cfh, key, write_ts);
  1825. }
  1826. } else {
  1827. s = ExecuteTransaction(
  1828. write_opts, thread,
  1829. [&](Transaction& txn) { return txn.SingleDelete(cfh, key); },
  1830. &commit_bypass_memtable);
  1831. }
  1832. UpdateIfInitialWriteFails(
  1833. db_stress_env, s, &initial_write_s, &initial_wal_write_may_succeed,
  1834. &wait_for_recover_start_time, commit_bypass_memtable);
  1835. } while (!s.ok() && IsErrorInjectedAndRetryable(s) &&
  1836. initial_wal_write_may_succeed);
  1837. if (!s.ok()) {
  1838. pending_expected_value.Rollback();
  1839. if (IsErrorInjectedAndRetryable(s)) {
  1840. assert(!initial_wal_write_may_succeed);
  1841. return s;
  1842. } else if (FLAGS_inject_error_severity == 2) {
  1843. if (!is_db_stopped_ &&
  1844. s.severity() >= Status::Severity::kFatalError) {
  1845. is_db_stopped_ = true;
  1846. } else if (!is_db_stopped_ ||
  1847. s.severity() < Status::Severity::kFatalError) {
  1848. fprintf(stderr, "single delete error: %s\n", s.ToString().c_str());
  1849. thread->shared->SafeTerminate();
  1850. }
  1851. } else {
  1852. fprintf(stderr, "single delete error: %s\n", s.ToString().c_str());
  1853. thread->shared->SafeTerminate();
  1854. }
  1855. } else {
  1856. PrintWriteRecoveryWaitTimeIfNeeded(
  1857. db_stress_env, initial_write_s, initial_wal_write_may_succeed,
  1858. wait_for_recover_start_time, "TestDelete");
  1859. pending_expected_value.Commit();
  1860. thread->stats.AddSingleDeletes(1);
  1861. }
  1862. }
  1863. return s;
  1864. }
  1865. Status TestDeleteRange(ThreadState* thread, WriteOptions& write_opts,
  1866. const std::vector<int>& rand_column_families,
  1867. const std::vector<int64_t>& rand_keys) override {
  1868. // OPERATION delete range
  1869. std::vector<std::unique_ptr<MutexLock>> range_locks;
  1870. // delete range does not respect disallowed overwrites. the keys for
  1871. // which overwrites are disallowed are randomly distributed so it
  1872. // could be expensive to find a range where each key allows
  1873. // overwrites.
  1874. int64_t rand_key = rand_keys[0];
  1875. int rand_column_family = rand_column_families[0];
  1876. auto shared = thread->shared;
  1877. int64_t max_key = shared->GetMaxKey();
  1878. if (rand_key > max_key - FLAGS_range_deletion_width) {
  1879. rand_key =
  1880. thread->rand.Next() % (max_key - FLAGS_range_deletion_width + 1);
  1881. }
  1882. GetDeleteRangeKeyLocks(thread, rand_column_family, rand_key, &range_locks);
  1883. // To track the final write status
  1884. Status s;
  1885. // To track the initial write status
  1886. Status initial_write_s;
  1887. // To track whether WAL write may have succeeded during the initial failed
  1888. // write
  1889. bool initial_wal_write_may_succeed = true;
  1890. std::vector<PendingExpectedValue> pending_expected_values =
  1891. shared->PrepareDeleteRange(rand_column_family, rand_key,
  1892. rand_key + FLAGS_range_deletion_width);
  1893. const int covered = static_cast<int>(pending_expected_values.size());
  1894. std::string keystr = Key(rand_key);
  1895. Slice key = keystr;
  1896. auto cfh = column_families_[rand_column_family];
  1897. std::string end_keystr = Key(rand_key + FLAGS_range_deletion_width);
  1898. Slice end_key = end_keystr;
  1899. std::string write_ts_str;
  1900. Slice write_ts;
  1901. uint64_t wait_for_recover_start_time = 0;
  1902. do {
  1903. // In order to commit the expected state for the initial write failed with
  1904. // injected retryable error and successful WAL write, retry the write
  1905. // until it succeeds after the recovery finishes
  1906. if (!s.ok() && IsErrorInjectedAndRetryable(s) &&
  1907. initial_wal_write_may_succeed) {
  1908. std::this_thread::sleep_for(std::chrono::microseconds(1 * 1000 * 1000));
  1909. }
  1910. if (FLAGS_user_timestamp_size) {
  1911. write_ts_str = GetNowNanos();
  1912. write_ts = write_ts_str;
  1913. s = db_->DeleteRange(write_opts, cfh, key, end_key, write_ts);
  1914. } else {
  1915. s = db_->DeleteRange(write_opts, cfh, key, end_key);
  1916. }
  1917. UpdateIfInitialWriteFails(db_stress_env, s, &initial_write_s,
  1918. &initial_wal_write_may_succeed,
  1919. &wait_for_recover_start_time);
  1920. } while (!s.ok() && IsErrorInjectedAndRetryable(s) &&
  1921. initial_wal_write_may_succeed);
  1922. if (!s.ok()) {
  1923. for (PendingExpectedValue& pending_expected_value :
  1924. pending_expected_values) {
  1925. pending_expected_value.Rollback();
  1926. }
  1927. if (IsErrorInjectedAndRetryable(s)) {
  1928. assert(!initial_wal_write_may_succeed);
  1929. return s;
  1930. } else if (FLAGS_inject_error_severity == 2) {
  1931. if (!is_db_stopped_ && s.severity() >= Status::Severity::kFatalError) {
  1932. is_db_stopped_ = true;
  1933. } else if (!is_db_stopped_ ||
  1934. s.severity() < Status::Severity::kFatalError) {
  1935. fprintf(stderr, "delete range error: %s\n", s.ToString().c_str());
  1936. thread->shared->SafeTerminate();
  1937. }
  1938. } else {
  1939. fprintf(stderr, "delete range error: %s\n", s.ToString().c_str());
  1940. thread->shared->SafeTerminate();
  1941. }
  1942. } else {
  1943. PrintWriteRecoveryWaitTimeIfNeeded(
  1944. db_stress_env, initial_write_s, initial_wal_write_may_succeed,
  1945. wait_for_recover_start_time, "TestDeleteRange");
  1946. for (PendingExpectedValue& pending_expected_value :
  1947. pending_expected_values) {
  1948. pending_expected_value.Commit();
  1949. }
  1950. thread->stats.AddRangeDeletions(1);
  1951. thread->stats.AddCoveredByRangeDeletions(covered);
  1952. }
  1953. return s;
  1954. }
  1955. void TestIngestExternalFile(ThreadState* thread,
  1956. const std::vector<int>& rand_column_families,
  1957. const std::vector<int64_t>& rand_keys) override {
  1958. // When true, we create two sst files, the first one with regular puts for
  1959. // a continuous range of keys, the second one with a standalone range
  1960. // deletion for all the keys. This is to exercise the standalone range
  1961. // deletion file's compaction input optimization.
  1962. bool test_standalone_range_deletion = thread->rand.OneInOpt(
  1963. FLAGS_test_ingest_standalone_range_deletion_one_in);
  1964. std::vector<std::string> external_files;
  1965. const std::string sst_filename =
  1966. FLAGS_db + "/." + std::to_string(thread->tid) + ".sst";
  1967. external_files.push_back(sst_filename);
  1968. std::string standalone_rangedel_filename;
  1969. if (test_standalone_range_deletion) {
  1970. standalone_rangedel_filename = FLAGS_db + "/." +
  1971. std::to_string(thread->tid) +
  1972. "_standalone_rangedel.sst";
  1973. external_files.push_back(standalone_rangedel_filename);
  1974. }
  1975. Status s;
  1976. std::ostringstream ingest_options_oss;
  1977. // Temporarily disable error injection for preparation
  1978. if (fault_fs_guard) {
  1979. fault_fs_guard->DisableThreadLocalErrorInjection(
  1980. FaultInjectionIOType::kMetadataRead);
  1981. fault_fs_guard->DisableThreadLocalErrorInjection(
  1982. FaultInjectionIOType::kMetadataWrite);
  1983. }
  1984. for (const auto& filename : external_files) {
  1985. if (db_stress_env->FileExists(filename).ok()) {
  1986. // Maybe we terminated abnormally before, so cleanup to give this file
  1987. // ingestion a clean slate
  1988. s = db_stress_env->DeleteFile(filename);
  1989. }
  1990. if (!s.ok()) {
  1991. return;
  1992. }
  1993. }
  1994. if (fault_fs_guard) {
  1995. fault_fs_guard->EnableThreadLocalErrorInjection(
  1996. FaultInjectionIOType::kMetadataRead);
  1997. fault_fs_guard->EnableThreadLocalErrorInjection(
  1998. FaultInjectionIOType::kMetadataWrite);
  1999. }
  2000. SstFileWriter sst_file_writer(EnvOptions(options_), options_);
  2001. SstFileWriter standalone_rangedel_sst_file_writer(EnvOptions(options_),
  2002. options_);
  2003. if (s.ok()) {
  2004. s = sst_file_writer.Open(sst_filename);
  2005. }
  2006. if (s.ok() && test_standalone_range_deletion) {
  2007. s = standalone_rangedel_sst_file_writer.Open(
  2008. standalone_rangedel_filename);
  2009. }
  2010. if (!s.ok()) {
  2011. return;
  2012. }
  2013. int64_t key_base = rand_keys[0];
  2014. int column_family = rand_column_families[0];
  2015. std::vector<std::unique_ptr<MutexLock>> range_locks;
  2016. range_locks.reserve(FLAGS_ingest_external_file_width);
  2017. std::vector<int64_t> keys;
  2018. keys.reserve(FLAGS_ingest_external_file_width);
  2019. std::vector<uint32_t> values;
  2020. values.reserve(FLAGS_ingest_external_file_width);
  2021. std::vector<PendingExpectedValue> pending_expected_values;
  2022. pending_expected_values.reserve(FLAGS_ingest_external_file_width);
  2023. SharedState* shared = thread->shared;
  2024. // Grab locks, add keys
  2025. assert(FLAGS_nooverwritepercent < 100);
  2026. for (int64_t key = key_base;
  2027. key < shared->GetMaxKey() &&
  2028. key < key_base + FLAGS_ingest_external_file_width;
  2029. ++key) {
  2030. if (key == key_base ||
  2031. (key & ((1 << FLAGS_log2_keys_per_lock) - 1)) == 0) {
  2032. range_locks.emplace_back(
  2033. new MutexLock(shared->GetMutexForKey(column_family, key)));
  2034. }
  2035. if (test_standalone_range_deletion) {
  2036. // Testing standalone range deletion needs a continuous range of keys.
  2037. if (shared->AllowsOverwrite(key)) {
  2038. if (keys.empty() || (!keys.empty() && keys.back() == key - 1)) {
  2039. keys.push_back(key);
  2040. } else {
  2041. keys.clear();
  2042. keys.push_back(key);
  2043. }
  2044. } else {
  2045. if (keys.size() > 0) {
  2046. break;
  2047. } else {
  2048. continue;
  2049. }
  2050. }
  2051. } else {
  2052. if (!shared->AllowsOverwrite(key)) {
  2053. // We could alternatively include `key` that is deleted.
  2054. continue;
  2055. }
  2056. keys.push_back(key);
  2057. }
  2058. }
  2059. if (s.ok() && keys.empty()) {
  2060. return;
  2061. }
  2062. // set pending state on expected values, create and ingest files.
  2063. size_t total_keys = keys.size();
  2064. for (size_t i = 0; s.ok() && i < total_keys; i++) {
  2065. int64_t key = keys.at(i);
  2066. char value[100];
  2067. auto key_str = Key(key);
  2068. const Slice k(key_str);
  2069. Slice v;
  2070. if (test_standalone_range_deletion) {
  2071. assert(i == 0 || keys.at(i - 1) == key - 1);
  2072. s = sst_file_writer.Put(k, v);
  2073. } else {
  2074. PendingExpectedValue pending_expected_value =
  2075. shared->PreparePut(column_family, key);
  2076. const uint32_t value_base = pending_expected_value.GetFinalValueBase();
  2077. const size_t value_len =
  2078. GenerateValue(value_base, value, sizeof(value));
  2079. v = Slice(value, value_len);
  2080. values.push_back(value_base);
  2081. pending_expected_values.push_back(pending_expected_value);
  2082. if (FLAGS_use_put_entity_one_in > 0 &&
  2083. (value_base % FLAGS_use_put_entity_one_in) == 0) {
  2084. WideColumns columns = GenerateWideColumns(values.back(), v);
  2085. s = sst_file_writer.PutEntity(k, columns);
  2086. } else {
  2087. s = sst_file_writer.Put(k, v);
  2088. }
  2089. }
  2090. }
  2091. if (s.ok() && !keys.empty()) {
  2092. s = sst_file_writer.Finish();
  2093. }
  2094. if (s.ok() && total_keys != 0 && test_standalone_range_deletion) {
  2095. int64_t start_key = keys.at(0);
  2096. int64_t end_key = keys.back() + 1;
  2097. pending_expected_values =
  2098. shared->PrepareDeleteRange(column_family, start_key, end_key);
  2099. auto start_key_str = Key(start_key);
  2100. const Slice start_key_slice(start_key_str);
  2101. auto end_key_str = Key(end_key);
  2102. const Slice end_key_slice(end_key_str);
  2103. s = standalone_rangedel_sst_file_writer.DeleteRange(start_key_slice,
  2104. end_key_slice);
  2105. if (s.ok()) {
  2106. s = standalone_rangedel_sst_file_writer.Finish();
  2107. }
  2108. }
  2109. if (s.ok()) {
  2110. IngestExternalFileOptions ingest_options;
  2111. ingest_options.move_files = thread->rand.OneInOpt(2);
  2112. ingest_options.verify_checksums_before_ingest = thread->rand.OneInOpt(2);
  2113. ingest_options.verify_checksums_readahead_size =
  2114. thread->rand.OneInOpt(2) ? 1024 * 1024 : 0;
  2115. ingest_options.fill_cache = thread->rand.OneInOpt(4);
  2116. ingest_options_oss << "move_files: " << ingest_options.move_files
  2117. << ", verify_checksums_before_ingest: "
  2118. << ingest_options.verify_checksums_before_ingest
  2119. << ", verify_checksums_readahead_size: "
  2120. << ingest_options.verify_checksums_readahead_size
  2121. << ", fill_cache: " << ingest_options.fill_cache
  2122. << ", test_standalone_range_deletion: "
  2123. << test_standalone_range_deletion;
  2124. s = db_->IngestExternalFile(column_families_[column_family],
  2125. external_files, ingest_options);
  2126. }
  2127. if (!s.ok()) {
  2128. for (PendingExpectedValue& pending_expected_value :
  2129. pending_expected_values) {
  2130. pending_expected_value.Rollback();
  2131. }
  2132. if (!IsErrorInjectedAndRetryable(s)) {
  2133. fprintf(stderr,
  2134. "file ingestion error: %s under specified "
  2135. "IngestExternalFileOptions: %s (Empty string or "
  2136. "missing field indicates default option or value is used)\n",
  2137. s.ToString().c_str(), ingest_options_oss.str().c_str());
  2138. thread->shared->SafeTerminate();
  2139. }
  2140. } else {
  2141. for (PendingExpectedValue& pending_expected_value :
  2142. pending_expected_values) {
  2143. pending_expected_value.Commit();
  2144. }
  2145. }
  2146. }
  2147. // Given a key K, this creates an iterator which scans the range
  2148. // [K, K + FLAGS_num_iterations) forward and backward.
  2149. // Then does a random sequence of Next/Prev operations.
  2150. Status TestIterateAgainstExpected(
  2151. ThreadState* thread, const ReadOptions& read_opts,
  2152. const std::vector<int>& rand_column_families,
  2153. const std::vector<int64_t>& rand_keys) override {
  2154. assert(thread);
  2155. assert(!rand_column_families.empty());
  2156. assert(!rand_keys.empty());
  2157. auto shared = thread->shared;
  2158. assert(shared);
  2159. int64_t max_key = shared->GetMaxKey();
  2160. const int64_t num_iter = static_cast<int64_t>(FLAGS_num_iterations);
  2161. int64_t lb = rand_keys[0];
  2162. if (lb > max_key - num_iter) {
  2163. lb = thread->rand.Next() % (max_key - num_iter + 1);
  2164. }
  2165. const int64_t ub = lb + num_iter;
  2166. const int rand_column_family = rand_column_families[0];
  2167. // Testing parallel read and write to the same key with user timestamp
  2168. // is not currently supported
  2169. std::vector<std::unique_ptr<MutexLock>> range_locks;
  2170. if (FLAGS_user_timestamp_size > 0) {
  2171. range_locks = shared->GetLocksForKeyRange(rand_column_family, lb, ub);
  2172. }
  2173. ReadOptions ro(read_opts);
  2174. if (FLAGS_prefix_size > 0) {
  2175. ro.total_order_seek = true;
  2176. }
  2177. std::string read_ts_str;
  2178. Slice read_ts;
  2179. if (FLAGS_user_timestamp_size > 0) {
  2180. read_ts_str = GetNowNanos();
  2181. read_ts = read_ts_str;
  2182. ro.timestamp = &read_ts;
  2183. }
  2184. std::string max_key_str;
  2185. Slice max_key_slice;
  2186. if (!FLAGS_destroy_db_initially) {
  2187. max_key_str = Key(max_key);
  2188. max_key_slice = max_key_str;
  2189. // to restrict iterator from reading keys written in batched_op_stress
  2190. // that do not have expected state updated and may not be parseable by
  2191. // GetIntVal().
  2192. ro.iterate_upper_bound = &max_key_slice;
  2193. }
  2194. std::string ub_str, lb_str;
  2195. if (FLAGS_use_sqfc_for_range_queries) {
  2196. ub_str = Key(ub);
  2197. lb_str = Key(lb);
  2198. ro.table_filter =
  2199. sqfc_factory_->GetTableFilterForRangeQuery(lb_str, ub_str);
  2200. }
  2201. ColumnFamilyHandle* const cfh = column_families_[rand_column_family];
  2202. assert(cfh);
  2203. const std::size_t expected_values_size = static_cast<std::size_t>(ub - lb);
  2204. std::vector<ExpectedValue> pre_read_expected_values;
  2205. std::vector<ExpectedValue> post_read_expected_values;
  2206. for (int64_t i = 0; i < static_cast<int64_t>(expected_values_size); ++i) {
  2207. pre_read_expected_values.push_back(
  2208. shared->Get(rand_column_family, i + lb));
  2209. }
  2210. // Snapshot initialization timing plays a crucial role here.
  2211. // We want the iterator to reflect the state of the DB between
  2212. // reading `pre_read_expected_values` and `post_read_expected_values`.
  2213. std::unique_ptr<ManagedSnapshot> snapshot = nullptr;
  2214. if (ro.auto_refresh_iterator_with_snapshot) {
  2215. snapshot = std::make_unique<ManagedSnapshot>(db_);
  2216. ro.snapshot = snapshot->snapshot();
  2217. }
  2218. std::unique_ptr<Iterator> iter;
  2219. if (FLAGS_use_multi_cf_iterator) {
  2220. std::vector<ColumnFamilyHandle*> cfhs;
  2221. cfhs.reserve(rand_column_families.size());
  2222. for (auto cf_index : rand_column_families) {
  2223. cfhs.emplace_back(column_families_[cf_index]);
  2224. }
  2225. assert(!cfhs.empty());
  2226. iter = db_->NewCoalescingIterator(ro, cfhs);
  2227. } else {
  2228. iter = std::unique_ptr<Iterator>(db_->NewIterator(ro, cfh));
  2229. }
  2230. for (int64_t i = 0; i < static_cast<int64_t>(expected_values_size); ++i) {
  2231. post_read_expected_values.push_back(
  2232. shared->Get(rand_column_family, i + lb));
  2233. }
  2234. assert(pre_read_expected_values.size() == expected_values_size &&
  2235. pre_read_expected_values.size() == post_read_expected_values.size());
  2236. std::string op_logs;
  2237. auto check_columns = [&]() {
  2238. assert(iter);
  2239. assert(iter->Valid());
  2240. if (!VerifyWideColumns(iter->value(), iter->columns())) {
  2241. shared->SetVerificationFailure();
  2242. fprintf(stderr,
  2243. "Verification failed for key %s: "
  2244. "Value and columns inconsistent: value: %s, columns: %s\n",
  2245. Slice(iter->key()).ToString(/* hex */ true).c_str(),
  2246. iter->value().ToString(/* hex */ true).c_str(),
  2247. WideColumnsToHex(iter->columns()).c_str());
  2248. fprintf(stderr, "Column family: %s, op_logs: %s\n",
  2249. cfh->GetName().c_str(), op_logs.c_str());
  2250. thread->stats.AddErrors(1);
  2251. return false;
  2252. }
  2253. return true;
  2254. };
  2255. auto check_no_key_in_range = [&](int64_t start, int64_t end) {
  2256. assert(start <= end);
  2257. for (auto j = std::max(start, lb); j < std::min(end, ub); ++j) {
  2258. std::size_t index = static_cast<std::size_t>(j - lb);
  2259. assert(index < pre_read_expected_values.size() &&
  2260. index < post_read_expected_values.size());
  2261. const ExpectedValue pre_read_expected_value =
  2262. pre_read_expected_values[index];
  2263. const ExpectedValue post_read_expected_value =
  2264. post_read_expected_values[index];
  2265. if (ExpectedValueHelper::MustHaveExisted(pre_read_expected_value,
  2266. post_read_expected_value)) {
  2267. // Fail fast to preserve the DB state.
  2268. thread->shared->SetVerificationFailure();
  2269. if (iter->Valid()) {
  2270. fprintf(stderr,
  2271. "Verification failed. Expected state has key %s, iterator "
  2272. "is at key %s\n",
  2273. Slice(Key(j)).ToString(true).c_str(),
  2274. iter->key().ToString(true).c_str());
  2275. } else {
  2276. fprintf(stderr,
  2277. "Verification failed. Expected state has key %s, iterator "
  2278. "is invalid\n",
  2279. Slice(Key(j)).ToString(true).c_str());
  2280. }
  2281. fprintf(stderr, "Column family: %s, op_logs: %s\n",
  2282. cfh->GetName().c_str(), op_logs.c_str());
  2283. thread->stats.AddErrors(1);
  2284. return false;
  2285. }
  2286. }
  2287. return true;
  2288. };
  2289. // Forward and backward scan to ensure we cover the entire range [lb, ub).
  2290. // The random sequence Next and Prev test below tends to be very short
  2291. // ranged.
  2292. int64_t last_key = lb - 1;
  2293. std::string key_str = Key(lb);
  2294. iter->Seek(key_str);
  2295. op_logs += "S " + Slice(key_str).ToString(true) + " ";
  2296. uint64_t curr = 0;
  2297. while (true) {
  2298. assert(last_key < ub);
  2299. if (iter->Valid() && ro.allow_unprepared_value) {
  2300. op_logs += "*";
  2301. if (!iter->PrepareValue()) {
  2302. assert(!iter->Valid());
  2303. assert(!iter->status().ok());
  2304. }
  2305. }
  2306. if (!iter->Valid()) {
  2307. if (!iter->status().ok()) {
  2308. if (IsErrorInjectedAndRetryable(iter->status())) {
  2309. return iter->status();
  2310. } else {
  2311. thread->shared->SetVerificationFailure();
  2312. fprintf(stderr, "TestIterate against expected state error: %s\n",
  2313. iter->status().ToString().c_str());
  2314. fprintf(stderr, "Column family: %s, op_logs: %s\n",
  2315. cfh->GetName().c_str(), op_logs.c_str());
  2316. thread->stats.AddErrors(1);
  2317. return iter->status();
  2318. }
  2319. }
  2320. if (!check_no_key_in_range(last_key + 1, ub)) {
  2321. return Status::OK();
  2322. }
  2323. break;
  2324. }
  2325. if (!check_columns()) {
  2326. return Status::OK();
  2327. }
  2328. // iter is valid, the range (last_key, current key) was skipped
  2329. GetIntVal(iter->key().ToString(), &curr);
  2330. if (static_cast<int64_t>(curr) <= last_key) {
  2331. thread->shared->SetVerificationFailure();
  2332. fprintf(stderr,
  2333. "TestIterateAgainstExpected failed: found unexpectedly small "
  2334. "key\n");
  2335. fprintf(stderr, "Column family: %s, op_logs: %s\n",
  2336. cfh->GetName().c_str(), op_logs.c_str());
  2337. fprintf(stderr, "Last op found key: %s, expected at least: %s\n",
  2338. Slice(Key(curr)).ToString(true).c_str(),
  2339. Slice(Key(last_key + 1)).ToString(true).c_str());
  2340. thread->stats.AddErrors(1);
  2341. return Status::OK();
  2342. }
  2343. if (!check_no_key_in_range(last_key + 1, static_cast<int64_t>(curr))) {
  2344. return Status::OK();
  2345. }
  2346. last_key = static_cast<int64_t>(curr);
  2347. if (last_key >= ub - 1) {
  2348. break;
  2349. }
  2350. iter->Next();
  2351. op_logs += "N";
  2352. }
  2353. // backward scan
  2354. key_str = Key(ub - 1);
  2355. iter->SeekForPrev(key_str);
  2356. op_logs += " SFP " + Slice(key_str).ToString(true) + " ";
  2357. last_key = ub;
  2358. while (true) {
  2359. assert(lb < last_key);
  2360. if (iter->Valid() && ro.allow_unprepared_value) {
  2361. op_logs += "*";
  2362. if (!iter->PrepareValue()) {
  2363. assert(!iter->Valid());
  2364. assert(!iter->status().ok());
  2365. }
  2366. }
  2367. if (!iter->Valid()) {
  2368. if (!iter->status().ok()) {
  2369. if (IsErrorInjectedAndRetryable(iter->status())) {
  2370. return iter->status();
  2371. } else {
  2372. thread->shared->SetVerificationFailure();
  2373. fprintf(stderr, "TestIterate against expected state error: %s\n",
  2374. iter->status().ToString().c_str());
  2375. fprintf(stderr, "Column family: %s, op_logs: %s\n",
  2376. cfh->GetName().c_str(), op_logs.c_str());
  2377. thread->stats.AddErrors(1);
  2378. return iter->status();
  2379. }
  2380. }
  2381. if (!check_no_key_in_range(lb, last_key)) {
  2382. return Status::OK();
  2383. }
  2384. break;
  2385. }
  2386. if (!check_columns()) {
  2387. return Status::OK();
  2388. }
  2389. // the range (current key, last key) was skipped
  2390. GetIntVal(iter->key().ToString(), &curr);
  2391. if (last_key <= static_cast<int64_t>(curr)) {
  2392. thread->shared->SetVerificationFailure();
  2393. fprintf(stderr,
  2394. "TestIterateAgainstExpected failed: found unexpectedly large "
  2395. "key\n");
  2396. fprintf(stderr, "Column family: %s, op_logs: %s\n",
  2397. cfh->GetName().c_str(), op_logs.c_str());
  2398. fprintf(stderr, "Last op found key: %s, expected at most: %s\n",
  2399. Slice(Key(curr)).ToString(true).c_str(),
  2400. Slice(Key(last_key - 1)).ToString(true).c_str());
  2401. thread->stats.AddErrors(1);
  2402. return Status::OK();
  2403. }
  2404. if (!check_no_key_in_range(static_cast<int64_t>(curr + 1), last_key)) {
  2405. return Status::OK();
  2406. }
  2407. last_key = static_cast<int64_t>(curr);
  2408. if (last_key <= lb) {
  2409. break;
  2410. }
  2411. iter->Prev();
  2412. op_logs += "P";
  2413. }
  2414. // Write-prepared/write-unprepared transactions and multi-CF iterator do not
  2415. // support Refresh() yet.
  2416. if (!(FLAGS_use_txn && FLAGS_txn_write_policy != 0) &&
  2417. !FLAGS_use_multi_cf_iterator && thread->rand.OneIn(2)) {
  2418. pre_read_expected_values.clear();
  2419. post_read_expected_values.clear();
  2420. // Refresh after forward/backward scan to allow higher chance of SV
  2421. // change.
  2422. for (int64_t i = 0; i < static_cast<int64_t>(expected_values_size); ++i) {
  2423. pre_read_expected_values.push_back(
  2424. shared->Get(rand_column_family, i + lb));
  2425. }
  2426. if (ro.auto_refresh_iterator_with_snapshot) {
  2427. snapshot = std::make_unique<ManagedSnapshot>(db_);
  2428. ro.snapshot = snapshot->snapshot();
  2429. }
  2430. Status rs = iter->Refresh(ro.snapshot);
  2431. if (!rs.ok() && IsErrorInjectedAndRetryable(rs)) {
  2432. return rs;
  2433. }
  2434. assert(rs.ok());
  2435. op_logs += "Refresh ";
  2436. for (int64_t i = 0; i < static_cast<int64_t>(expected_values_size); ++i) {
  2437. post_read_expected_values.push_back(
  2438. shared->Get(rand_column_family, i + lb));
  2439. }
  2440. assert(pre_read_expected_values.size() == expected_values_size &&
  2441. pre_read_expected_values.size() ==
  2442. post_read_expected_values.size());
  2443. }
  2444. // start from middle of [lb, ub) otherwise it is easy to iterate out of
  2445. // locked range
  2446. const int64_t mid = lb + num_iter / 2;
  2447. key_str = Key(mid);
  2448. const Slice key(key_str);
  2449. if (thread->rand.OneIn(2)) {
  2450. iter->Seek(key);
  2451. op_logs += " S " + key.ToString(true) + " ";
  2452. if (!iter->Valid() && iter->status().ok()) {
  2453. if (!check_no_key_in_range(mid, ub)) {
  2454. return Status::OK();
  2455. }
  2456. } else if (iter->Valid()) {
  2457. GetIntVal(iter->key().ToString(), &curr);
  2458. if (static_cast<int64_t>(curr) < mid) {
  2459. thread->shared->SetVerificationFailure();
  2460. fprintf(stderr,
  2461. "TestIterateAgainstExpected failed: found unexpectedly small "
  2462. "key\n");
  2463. fprintf(stderr, "Column family: %s, op_logs: %s\n",
  2464. cfh->GetName().c_str(), op_logs.c_str());
  2465. fprintf(stderr, "Last op found key: %s, expected at least: %s\n",
  2466. Slice(Key(curr)).ToString(true).c_str(),
  2467. Slice(Key(mid)).ToString(true).c_str());
  2468. thread->stats.AddErrors(1);
  2469. return Status::OK();
  2470. }
  2471. }
  2472. } else {
  2473. iter->SeekForPrev(key);
  2474. op_logs += " SFP " + key.ToString(true) + " ";
  2475. if (!iter->Valid() && iter->status().ok()) {
  2476. // iterator says nothing <= mid
  2477. if (!check_no_key_in_range(lb, mid + 1)) {
  2478. return Status::OK();
  2479. }
  2480. } else if (iter->Valid()) {
  2481. GetIntVal(iter->key().ToString(), &curr);
  2482. if (mid < static_cast<int64_t>(curr)) {
  2483. thread->shared->SetVerificationFailure();
  2484. fprintf(stderr,
  2485. "TestIterateAgainstExpected failed: found unexpectedly large "
  2486. "key\n");
  2487. fprintf(stderr, "Column family: %s, op_logs: %s\n",
  2488. cfh->GetName().c_str(), op_logs.c_str());
  2489. fprintf(stderr, "Last op found key: %s, expected at most: %s\n",
  2490. Slice(Key(curr)).ToString(true).c_str(),
  2491. Slice(Key(mid)).ToString(true).c_str());
  2492. thread->stats.AddErrors(1);
  2493. return Status::OK();
  2494. }
  2495. }
  2496. }
  2497. for (int64_t i = 0; i < num_iter && iter->Valid(); ++i) {
  2498. if (ro.allow_unprepared_value) {
  2499. op_logs += "*";
  2500. if (!iter->PrepareValue()) {
  2501. assert(!iter->Valid());
  2502. assert(!iter->status().ok());
  2503. break;
  2504. }
  2505. }
  2506. if (!check_columns()) {
  2507. return Status::OK();
  2508. }
  2509. GetIntVal(iter->key().ToString(), &curr);
  2510. if (static_cast<int64_t>(curr) < lb) {
  2511. iter->Next();
  2512. op_logs += "N";
  2513. } else if (static_cast<int64_t>(curr) >= ub) {
  2514. iter->Prev();
  2515. op_logs += "P";
  2516. } else {
  2517. const uint32_t value_base_from_db = GetValueBase(iter->value());
  2518. std::size_t index = static_cast<std::size_t>(curr - lb);
  2519. assert(index < pre_read_expected_values.size() &&
  2520. index < post_read_expected_values.size());
  2521. const ExpectedValue pre_read_expected_value =
  2522. pre_read_expected_values[index];
  2523. const ExpectedValue post_read_expected_value =
  2524. post_read_expected_values[index];
  2525. if (ExpectedValueHelper::MustHaveNotExisted(pre_read_expected_value,
  2526. post_read_expected_value) ||
  2527. !ExpectedValueHelper::InExpectedValueBaseRange(
  2528. value_base_from_db, pre_read_expected_value,
  2529. post_read_expected_value)) {
  2530. // Fail fast to preserve the DB state.
  2531. thread->shared->SetVerificationFailure();
  2532. fprintf(stderr,
  2533. "Verification failed: iterator has key %s, but expected "
  2534. "state does not.\n",
  2535. iter->key().ToString(true).c_str());
  2536. fprintf(stderr, "Column family: %s, op_logs: %s\n",
  2537. cfh->GetName().c_str(), op_logs.c_str());
  2538. thread->stats.AddErrors(1);
  2539. break;
  2540. }
  2541. if (thread->rand.OneIn(2)) {
  2542. iter->Next();
  2543. op_logs += "N";
  2544. if (!iter->Valid()) {
  2545. break;
  2546. }
  2547. uint64_t next = 0;
  2548. GetIntVal(iter->key().ToString(), &next);
  2549. if (next <= curr) {
  2550. thread->shared->SetVerificationFailure();
  2551. fprintf(stderr,
  2552. "TestIterateAgainstExpected failed: found unexpectedly "
  2553. "small key\n");
  2554. fprintf(stderr, "Column family: %s, op_logs: %s\n",
  2555. cfh->GetName().c_str(), op_logs.c_str());
  2556. fprintf(stderr, "Last op found key: %s, expected at least: %s\n",
  2557. Slice(Key(next)).ToString(true).c_str(),
  2558. Slice(Key(curr + 1)).ToString(true).c_str());
  2559. thread->stats.AddErrors(1);
  2560. return Status::OK();
  2561. }
  2562. if (!check_no_key_in_range(static_cast<int64_t>(curr + 1),
  2563. static_cast<int64_t>(next))) {
  2564. return Status::OK();
  2565. }
  2566. } else {
  2567. iter->Prev();
  2568. op_logs += "P";
  2569. if (!iter->Valid()) {
  2570. break;
  2571. }
  2572. uint64_t prev = 0;
  2573. GetIntVal(iter->key().ToString(), &prev);
  2574. if (curr <= prev) {
  2575. thread->shared->SetVerificationFailure();
  2576. fprintf(stderr,
  2577. "TestIterateAgainstExpected failed: found unexpectedly "
  2578. "large key\n");
  2579. fprintf(stderr, "Column family: %s, op_logs: %s\n",
  2580. cfh->GetName().c_str(), op_logs.c_str());
  2581. fprintf(stderr, "Last op found key: %s, expected at most: %s\n",
  2582. Slice(Key(prev)).ToString(true).c_str(),
  2583. Slice(Key(curr - 1)).ToString(true).c_str());
  2584. thread->stats.AddErrors(1);
  2585. return Status::OK();
  2586. }
  2587. if (!check_no_key_in_range(static_cast<int64_t>(prev + 1),
  2588. static_cast<int64_t>(curr))) {
  2589. return Status::OK();
  2590. }
  2591. }
  2592. }
  2593. }
  2594. if (!iter->status().ok()) {
  2595. if (IsErrorInjectedAndRetryable(iter->status())) {
  2596. return iter->status();
  2597. } else {
  2598. thread->shared->SetVerificationFailure();
  2599. fprintf(stderr, "TestIterate against expected state error: %s\n",
  2600. iter->status().ToString().c_str());
  2601. fprintf(stderr, "Column family: %s, op_logs: %s\n",
  2602. cfh->GetName().c_str(), op_logs.c_str());
  2603. thread->stats.AddErrors(1);
  2604. return iter->status();
  2605. }
  2606. }
  2607. thread->stats.AddIterations(1);
  2608. return Status::OK();
  2609. }
  2610. bool VerifyOrSyncValue(int cf, int64_t key, const ReadOptions& opts,
  2611. SharedState* shared, const std::string& value_from_db,
  2612. std::string msg_prefix, const Status& s) const {
  2613. if (shared->HasVerificationFailedYet()) {
  2614. return false;
  2615. }
  2616. const ExpectedValue expected_value = shared->Get(cf, key);
  2617. if (expected_value.PendingWrite() || expected_value.PendingDelete()) {
  2618. if (s.ok()) {
  2619. // Value exists in db, update state to reflect that
  2620. Slice slice(value_from_db);
  2621. uint32_t value_base = GetValueBase(slice);
  2622. shared->SyncPut(cf, key, value_base);
  2623. return true;
  2624. } else if (s.IsNotFound()) {
  2625. // Value doesn't exist in db, update state to reflect that
  2626. shared->SyncDelete(cf, key);
  2627. return true;
  2628. } else {
  2629. assert(false);
  2630. }
  2631. }
  2632. char expected_value_data[kValueMaxLen];
  2633. size_t expected_value_data_size =
  2634. GenerateValue(expected_value.GetValueBase(), expected_value_data,
  2635. sizeof(expected_value_data));
  2636. std::ostringstream read_u64ts;
  2637. if (opts.timestamp) {
  2638. read_u64ts << " while read with timestamp: ";
  2639. uint64_t read_ts;
  2640. if (DecodeU64Ts(*opts.timestamp, &read_ts).ok()) {
  2641. read_u64ts << std::to_string(read_ts) << ", ";
  2642. } else {
  2643. read_u64ts << s.ToString()
  2644. << " Encoded read timestamp: " << opts.timestamp->ToString()
  2645. << ", ";
  2646. }
  2647. }
  2648. // compare value_from_db with the value in the shared state
  2649. if (s.ok()) {
  2650. const Slice slice(value_from_db);
  2651. const uint32_t value_base_from_db = GetValueBase(slice);
  2652. if (ExpectedValueHelper::MustHaveNotExisted(expected_value,
  2653. expected_value)) {
  2654. VerificationAbort(
  2655. shared, msg_prefix + ": Unexpected value found" + read_u64ts.str(),
  2656. cf, key, value_from_db, "");
  2657. return false;
  2658. }
  2659. if (!ExpectedValueHelper::InExpectedValueBaseRange(
  2660. value_base_from_db, expected_value, expected_value)) {
  2661. VerificationAbort(
  2662. shared, msg_prefix + ": Unexpected value found" + read_u64ts.str(),
  2663. cf, key, value_from_db,
  2664. Slice(expected_value_data, expected_value_data_size));
  2665. return false;
  2666. }
  2667. // TODO: are the length/memcmp() checks repetitive?
  2668. if (value_from_db.length() != expected_value_data_size) {
  2669. VerificationAbort(shared,
  2670. msg_prefix + ": Length of value read is not equal" +
  2671. read_u64ts.str(),
  2672. cf, key, value_from_db,
  2673. Slice(expected_value_data, expected_value_data_size));
  2674. return false;
  2675. }
  2676. if (memcmp(value_from_db.data(), expected_value_data,
  2677. expected_value_data_size) != 0) {
  2678. VerificationAbort(shared,
  2679. msg_prefix + ": Contents of value read don't match" +
  2680. read_u64ts.str(),
  2681. cf, key, value_from_db,
  2682. Slice(expected_value_data, expected_value_data_size));
  2683. return false;
  2684. }
  2685. } else if (s.IsNotFound()) {
  2686. if (ExpectedValueHelper::MustHaveExisted(expected_value,
  2687. expected_value)) {
  2688. VerificationAbort(
  2689. shared,
  2690. msg_prefix + ": Value not found " + read_u64ts.str() + s.ToString(),
  2691. cf, key, "", Slice(expected_value_data, expected_value_data_size));
  2692. return false;
  2693. }
  2694. } else {
  2695. VerificationAbort(
  2696. shared,
  2697. msg_prefix + "Non-OK status " + read_u64ts.str() + s.ToString(), cf,
  2698. key, "", Slice(expected_value_data, expected_value_data_size));
  2699. return false;
  2700. }
  2701. return true;
  2702. }
  2703. // Compared to VerifyOrSyncValue, VerifyValueRange takes in a
  2704. // pre_read_expected_value to determine the lower bound of acceptable values.
  2705. // Anything from the pre_read_expected_value to the post_read_expected_value
  2706. // is considered acceptable. VerifyValueRange does not perform the initial
  2707. // "sync" step and does not compare the exact data/lengths for the values.
  2708. // This verification is suitable for verifying secondary or follower databases
  2709. bool VerifyValueRange(int cf, int64_t key, const ReadOptions& opts,
  2710. SharedState* shared, const std::string& value_from_db,
  2711. const std::string& msg_prefix, const Status& s,
  2712. const ExpectedValue& pre_read_expected_value) const {
  2713. if (shared->HasVerificationFailedYet()) {
  2714. return false;
  2715. }
  2716. const ExpectedValue post_read_expected_value = shared->Get(cf, key);
  2717. char expected_value_data[kValueMaxLen];
  2718. size_t expected_value_data_size =
  2719. GenerateValue(post_read_expected_value.GetValueBase(),
  2720. expected_value_data, sizeof(expected_value_data));
  2721. std::ostringstream read_u64ts;
  2722. if (opts.timestamp) {
  2723. read_u64ts << " while read with timestamp: ";
  2724. uint64_t read_ts;
  2725. if (DecodeU64Ts(*opts.timestamp, &read_ts).ok()) {
  2726. read_u64ts << std::to_string(read_ts) << ", ";
  2727. } else {
  2728. read_u64ts << s.ToString()
  2729. << " Encoded read timestamp: " << opts.timestamp->ToString()
  2730. << ", ";
  2731. }
  2732. }
  2733. // Compare value_from_db with the range of possible values from
  2734. // pre_read_expected_value to post_read_expected_value
  2735. if (s.ok()) {
  2736. const Slice slice(value_from_db);
  2737. const uint32_t value_base_from_db = GetValueBase(slice);
  2738. if (ExpectedValueHelper::MustHaveNotExisted(pre_read_expected_value,
  2739. post_read_expected_value)) {
  2740. VerificationAbort(shared,
  2741. msg_prefix +
  2742. ": Unexpected value found that should not exist" +
  2743. read_u64ts.str(),
  2744. cf, key, value_from_db, "");
  2745. return false;
  2746. }
  2747. if (!ExpectedValueHelper::InExpectedValueBaseRange(
  2748. value_base_from_db, pre_read_expected_value,
  2749. post_read_expected_value)) {
  2750. VerificationAbort(
  2751. shared,
  2752. msg_prefix +
  2753. ": Unexpected value found outside of the value base range" +
  2754. read_u64ts.str(),
  2755. cf, key, value_from_db,
  2756. Slice(expected_value_data, expected_value_data_size));
  2757. return false;
  2758. }
  2759. } else if (s.IsNotFound()) {
  2760. if (ExpectedValueHelper::MustHaveExisted(pre_read_expected_value,
  2761. post_read_expected_value)) {
  2762. VerificationAbort(shared,
  2763. msg_prefix + ": Value not found which should exist" +
  2764. read_u64ts.str() + s.ToString(),
  2765. cf, key, "",
  2766. Slice(expected_value_data, expected_value_data_size));
  2767. return false;
  2768. }
  2769. } else {
  2770. VerificationAbort(
  2771. shared,
  2772. msg_prefix + ": Non-OK status" + read_u64ts.str() + s.ToString(), cf,
  2773. key, "", Slice(expected_value_data, expected_value_data_size));
  2774. return false;
  2775. }
  2776. return true;
  2777. }
  2778. void PrepareTxnDbOptions(SharedState* shared,
  2779. TransactionDBOptions& txn_db_opts) override {
  2780. txn_db_opts.rollback_deletion_type_callback =
  2781. [shared](TransactionDB*, ColumnFamilyHandle*, const Slice& key) {
  2782. assert(shared);
  2783. uint64_t key_num = 0;
  2784. bool ok = GetIntVal(key.ToString(), &key_num);
  2785. assert(ok);
  2786. (void)ok;
  2787. return !shared->AllowsOverwrite(key_num);
  2788. };
  2789. }
  2790. void MaybeAddKeyToTxnForRYW(
  2791. ThreadState* thread, int column_family, int64_t key, Transaction* txn,
  2792. std::unordered_map<std::string, ExpectedValue>& ryw_expected_values) {
  2793. assert(thread);
  2794. assert(txn);
  2795. SharedState* const shared = thread->shared;
  2796. assert(shared);
  2797. const ExpectedValue expected_value =
  2798. thread->shared->Get(column_family, key);
  2799. bool may_exist = !ExpectedValueHelper::MustHaveNotExisted(expected_value,
  2800. expected_value);
  2801. if (!shared->AllowsOverwrite(key) && may_exist) {
  2802. // Just do read your write checks for keys that allow overwrites.
  2803. return;
  2804. }
  2805. // With a 1 in 10 probability, insert the just added key in the batch
  2806. // into the transaction. This will create an overlap with the MultiGet
  2807. // keys and exercise some corner cases in the code
  2808. if (thread->rand.OneIn(10)) {
  2809. assert(column_family >= 0);
  2810. assert(column_family < static_cast<int>(column_families_.size()));
  2811. ColumnFamilyHandle* const cfh = column_families_[column_family];
  2812. assert(cfh);
  2813. const std::string k = Key(key);
  2814. enum class Op {
  2815. PutOrPutEntity,
  2816. Merge,
  2817. Delete,
  2818. // add new operations above this line
  2819. NumberOfOps
  2820. };
  2821. const Op op = static_cast<Op>(
  2822. thread->rand.Uniform(static_cast<int>(Op::NumberOfOps)));
  2823. Status s;
  2824. ExpectedValue new_expected_value;
  2825. switch (op) {
  2826. case Op::PutOrPutEntity:
  2827. case Op::Merge: {
  2828. ExpectedValue put_value;
  2829. put_value.SyncPut(static_cast<uint32_t>(thread->rand.Uniform(
  2830. static_cast<int>(ExpectedValue::GetValueBaseMask()))));
  2831. new_expected_value = put_value;
  2832. const uint32_t value_base = put_value.GetValueBase();
  2833. char value[100];
  2834. const size_t sz = GenerateValue(value_base, value, sizeof(value));
  2835. const Slice v(value, sz);
  2836. if (op == Op::PutOrPutEntity || !FLAGS_use_merge) {
  2837. if (FLAGS_use_put_entity_one_in > 0 &&
  2838. (value_base % FLAGS_use_put_entity_one_in) == 0) {
  2839. s = txn->PutEntity(cfh, k, GenerateWideColumns(value_base, v));
  2840. } else {
  2841. s = txn->Put(cfh, k, v);
  2842. }
  2843. } else {
  2844. s = txn->Merge(cfh, k, v);
  2845. }
  2846. break;
  2847. }
  2848. case Op::Delete: {
  2849. ExpectedValue delete_value;
  2850. delete_value.SyncDelete();
  2851. new_expected_value = delete_value;
  2852. s = txn->Delete(cfh, k);
  2853. break;
  2854. }
  2855. default:
  2856. assert(false);
  2857. }
  2858. // It is possible that multiple thread concurrently try to write to the
  2859. // same key, which could cause lock timeout or deadlock in the
  2860. // transactiondb layer, before transaction is rolled back.
  2861. // E.g.
  2862. // Timestamp 1: Transaction A: lock key M for write
  2863. // Timestamp 2: Transaction B: lock key N for write
  2864. // Timestamp 3: Transaction B: try to lock key M for write -> wait
  2865. // Timestamp 4: Transaction A: try to lock key N for write -> deadlock
  2866. if (s.IsTimedOut() || s.IsDeadlock()) {
  2867. return;
  2868. }
  2869. ryw_expected_values[k] = new_expected_value;
  2870. if (!s.ok()) {
  2871. fprintf(stderr,
  2872. "Transaction write error in read-your-own-write test: %s\n",
  2873. s.ToString().c_str());
  2874. shared->SafeTerminate();
  2875. }
  2876. }
  2877. }
  2878. };
  2879. StressTest* CreateNonBatchedOpsStressTest() {
  2880. return new NonBatchedOpsStressTest();
  2881. }
  2882. } // namespace ROCKSDB_NAMESPACE
  2883. #endif // GFLAGS