db_stress_test_base.cc 181 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719272027212722272327242725272627272728272927302731273227332734273527362737273827392740274127422743274427452746274727482749275027512752275327542755275627572758275927602761276227632764276527662767276827692770277127722773277427752776277727782779278027812782278327842785278627872788278927902791279227932794279527962797279827992800280128022803280428052806280728082809281028112812281328142815281628172818281928202821282228232824282528262827282828292830283128322833283428352836283728382839284028412842284328442845284628472848284928502851285228532854285528562857285828592860286128622863286428652866286728682869287028712872287328742875287628772878287928802881288228832884288528862887288828892890289128922893289428952896289728982899290029012902290329042905290629072908290929102911291229132914291529162917291829192920292129222923292429252926292729282929293029312932293329342935293629372938293929402941294229432944294529462947294829492950295129522953295429552956295729582959296029612962296329642965296629672968296929702971297229732974297529762977297829792980298129822983298429852986298729882989299029912992299329942995299629972998299930003001300230033004300530063007300830093010301130123013301430153016301730183019302030213022302330243025302630273028302930303031303230333034303530363037303830393040304130423043304430453046304730483049305030513052305330543055305630573058305930603061306230633064306530663067306830693070307130723073307430753076307730783079308030813082308330843085308630873088308930903091309230933094309530963097309830993100310131023103310431053106310731083109311031113112311331143115311631173118311931203121312231233124312531263127312831293130313131323133313431353136313731383139314031413142314331443145314631473148314931503151315231533154315531563157315831593160316131623163316431653166316731683169317031713172317331743175317631773178317931803181318231833184318531863187318831893190319131923193319431953196319731983199320032013202320332043205320632073208320932103211321232133214321532163217321832193220322132223223322432253226322732283229323032313232323332343235323632373238323932403241324232433244324532463247324832493250325132523253325432553256325732583259326032613262326332643265326632673268326932703271327232733274327532763277327832793280328132823283328432853286328732883289329032913292329332943295329632973298329933003301330233033304330533063307330833093310331133123313331433153316331733183319332033213322332333243325332633273328332933303331333233333334333533363337333833393340334133423343334433453346334733483349335033513352335333543355335633573358335933603361336233633364336533663367336833693370337133723373337433753376337733783379338033813382338333843385338633873388338933903391339233933394339533963397339833993400340134023403340434053406340734083409341034113412341334143415341634173418341934203421342234233424342534263427342834293430343134323433343434353436343734383439344034413442344334443445344634473448344934503451345234533454345534563457345834593460346134623463346434653466346734683469347034713472347334743475347634773478347934803481348234833484348534863487348834893490349134923493349434953496349734983499350035013502350335043505350635073508350935103511351235133514351535163517351835193520352135223523352435253526352735283529353035313532353335343535353635373538353935403541354235433544354535463547354835493550355135523553355435553556355735583559356035613562356335643565356635673568356935703571357235733574357535763577357835793580358135823583358435853586358735883589359035913592359335943595359635973598359936003601360236033604360536063607360836093610361136123613361436153616361736183619362036213622362336243625362636273628362936303631363236333634363536363637363836393640364136423643364436453646364736483649365036513652365336543655365636573658365936603661366236633664366536663667366836693670367136723673367436753676367736783679368036813682368336843685368636873688368936903691369236933694369536963697369836993700370137023703370437053706370737083709371037113712371337143715371637173718371937203721372237233724372537263727372837293730373137323733373437353736373737383739374037413742374337443745374637473748374937503751375237533754375537563757375837593760376137623763376437653766376737683769377037713772377337743775377637773778377937803781378237833784378537863787378837893790379137923793379437953796379737983799380038013802380338043805380638073808380938103811381238133814381538163817381838193820382138223823382438253826382738283829383038313832383338343835383638373838383938403841384238433844384538463847384838493850385138523853385438553856385738583859386038613862386338643865386638673868386938703871387238733874387538763877387838793880388138823883388438853886388738883889389038913892389338943895389638973898389939003901390239033904390539063907390839093910391139123913391439153916391739183919392039213922392339243925392639273928392939303931393239333934393539363937393839393940394139423943394439453946394739483949395039513952395339543955395639573958395939603961396239633964396539663967396839693970397139723973397439753976397739783979398039813982398339843985398639873988398939903991399239933994399539963997399839994000400140024003400440054006400740084009401040114012401340144015401640174018401940204021402240234024402540264027402840294030403140324033403440354036403740384039404040414042404340444045404640474048404940504051405240534054405540564057405840594060406140624063406440654066406740684069407040714072407340744075407640774078407940804081408240834084408540864087408840894090409140924093409440954096409740984099410041014102410341044105410641074108410941104111411241134114411541164117411841194120412141224123412441254126412741284129413041314132413341344135413641374138413941404141414241434144414541464147414841494150415141524153415441554156415741584159416041614162416341644165416641674168416941704171417241734174417541764177417841794180418141824183418441854186418741884189419041914192419341944195419641974198419942004201420242034204420542064207420842094210421142124213421442154216421742184219422042214222422342244225422642274228422942304231423242334234423542364237423842394240424142424243424442454246424742484249425042514252425342544255425642574258425942604261426242634264426542664267426842694270427142724273427442754276427742784279428042814282428342844285428642874288428942904291429242934294429542964297429842994300430143024303430443054306430743084309431043114312431343144315431643174318431943204321432243234324432543264327432843294330433143324333433443354336433743384339434043414342434343444345434643474348434943504351435243534354435543564357435843594360436143624363436443654366436743684369437043714372437343744375437643774378437943804381438243834384438543864387438843894390439143924393439443954396439743984399440044014402440344044405440644074408440944104411441244134414441544164417441844194420442144224423442444254426442744284429443044314432443344344435443644374438443944404441444244434444444544464447444844494450445144524453445444554456445744584459446044614462446344644465446644674468446944704471447244734474447544764477447844794480448144824483448444854486448744884489449044914492449344944495449644974498449945004501450245034504450545064507450845094510451145124513451445154516451745184519452045214522452345244525452645274528452945304531453245334534453545364537453845394540454145424543454445454546454745484549455045514552455345544555455645574558455945604561456245634564456545664567456845694570457145724573457445754576457745784579458045814582458345844585458645874588458945904591459245934594459545964597459845994600460146024603460446054606460746084609461046114612461346144615461646174618461946204621462246234624462546264627462846294630463146324633463446354636463746384639464046414642464346444645464646474648464946504651465246534654465546564657465846594660466146624663466446654666466746684669467046714672467346744675467646774678467946804681468246834684468546864687468846894690469146924693469446954696469746984699470047014702470347044705470647074708470947104711471247134714471547164717471847194720
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. //
  10. #include <ios>
  11. #include <thread>
  12. #include "db_stress_tool/db_stress_compression_manager.h"
  13. #include "db_stress_tool/db_stress_listener.h"
  14. #include "rocksdb/io_status.h"
  15. #include "rocksdb/options.h"
  16. #include "rocksdb/slice_transform.h"
  17. #include "util/compression.h"
  18. #ifdef GFLAGS
  19. #include "db_stress_tool/db_stress_common.h"
  20. #include "db_stress_tool/db_stress_compaction_filter.h"
  21. #include "db_stress_tool/db_stress_compaction_service.h"
  22. #include "db_stress_tool/db_stress_driver.h"
  23. #include "db_stress_tool/db_stress_filters.h"
  24. #include "db_stress_tool/db_stress_table_properties_collector.h"
  25. #include "db_stress_tool/db_stress_wide_merge_operator.h"
  26. #include "options/options_parser.h"
  27. #include "rocksdb/convenience.h"
  28. #include "rocksdb/filter_policy.h"
  29. #include "rocksdb/secondary_cache.h"
  30. #include "rocksdb/sst_file_manager.h"
  31. #include "rocksdb/table_properties.h"
  32. #include "rocksdb/types.h"
  33. #include "rocksdb/utilities/object_registry.h"
  34. #include "rocksdb/utilities/write_batch_with_index.h"
  35. #include "test_util/testutil.h"
  36. #include "util/cast_util.h"
  37. #include "util/simple_mixed_compressor.h"
  38. #include "utilities/backup/backup_engine_impl.h"
  39. #include "utilities/fault_injection_fs.h"
  40. #include "utilities/fault_injection_secondary_cache.h"
  41. namespace ROCKSDB_NAMESPACE {
  42. namespace {
  43. std::shared_ptr<const FilterPolicy> CreateFilterPolicy() {
  44. if (FLAGS_bloom_bits < 0) {
  45. return BlockBasedTableOptions().filter_policy;
  46. }
  47. const FilterPolicy* new_policy;
  48. if (FLAGS_bloom_before_level == INT_MAX) {
  49. // Use Bloom API
  50. new_policy = NewBloomFilterPolicy(FLAGS_bloom_bits, false);
  51. } else {
  52. new_policy =
  53. NewRibbonFilterPolicy(FLAGS_bloom_bits, FLAGS_bloom_before_level);
  54. }
  55. return std::shared_ptr<const FilterPolicy>(new_policy);
  56. }
  57. } // namespace
  58. StressTest::StressTest()
  59. : cache_(NewCache(FLAGS_cache_size, FLAGS_cache_numshardbits)),
  60. filter_policy_(CreateFilterPolicy()),
  61. db_(nullptr),
  62. txn_db_(nullptr),
  63. optimistic_txn_db_(nullptr),
  64. db_aptr_(nullptr),
  65. clock_(db_stress_env->GetSystemClock().get()),
  66. new_column_family_name_(1),
  67. num_times_reopened_(0),
  68. db_preload_finished_(false),
  69. secondary_db_(nullptr),
  70. is_db_stopped_(false) {
  71. if (FLAGS_destroy_db_initially) {
  72. std::vector<std::string> files;
  73. db_stress_env->GetChildren(FLAGS_db, &files);
  74. for (unsigned int i = 0; i < files.size(); i++) {
  75. if (Slice(files[i]).starts_with("heap-")) {
  76. db_stress_env->DeleteFile(FLAGS_db + "/" + files[i]);
  77. }
  78. }
  79. Options options;
  80. options.env = db_stress_env;
  81. // Remove files without preserving manfiest files
  82. const Status s = !FLAGS_use_blob_db
  83. ? DestroyDB(FLAGS_db, options)
  84. : blob_db::DestroyBlobDB(FLAGS_db, options,
  85. blob_db::BlobDBOptions());
  86. if (!s.ok()) {
  87. fprintf(stderr, "Cannot destroy original db: %s\n", s.ToString().c_str());
  88. exit(1);
  89. }
  90. }
  91. Status s = DbStressSqfcManager().MakeSharedFactory(
  92. FLAGS_sqfc_name, FLAGS_sqfc_version, &sqfc_factory_);
  93. if (!s.ok()) {
  94. fprintf(stderr, "Error initializing SstQueryFilterConfig: %s\n",
  95. s.ToString().c_str());
  96. exit(1);
  97. }
  98. }
  99. void StressTest::CleanUp() {
  100. CleanUpColumnFamilies();
  101. if (db_) {
  102. db_->Close();
  103. }
  104. delete db_;
  105. db_ = nullptr;
  106. delete secondary_db_;
  107. secondary_db_ = nullptr;
  108. }
  109. void StressTest::CleanUpColumnFamilies() {
  110. for (auto cf : column_families_) {
  111. delete cf;
  112. }
  113. column_families_.clear();
  114. for (auto* cf : secondary_cfhs_) {
  115. delete cf;
  116. }
  117. secondary_cfhs_.clear();
  118. }
  119. std::shared_ptr<Cache> StressTest::NewCache(size_t capacity,
  120. int32_t num_shard_bits) {
  121. ConfigOptions config_options;
  122. if (capacity <= 0) {
  123. return nullptr;
  124. }
  125. std::shared_ptr<SecondaryCache> secondary_cache;
  126. if (!FLAGS_secondary_cache_uri.empty()) {
  127. assert(!strstr(FLAGS_secondary_cache_uri.c_str(),
  128. "compressed_secondary_cache") ||
  129. (FLAGS_compressed_secondary_cache_size == 0 &&
  130. FLAGS_compressed_secondary_cache_ratio == 0.0 &&
  131. !StartsWith(FLAGS_cache_type, "tiered_")));
  132. Status s = SecondaryCache::CreateFromString(
  133. config_options, FLAGS_secondary_cache_uri, &secondary_cache);
  134. if (secondary_cache == nullptr) {
  135. fprintf(stderr,
  136. "No secondary cache registered matching string: %s status=%s\n",
  137. FLAGS_secondary_cache_uri.c_str(), s.ToString().c_str());
  138. exit(1);
  139. }
  140. if (FLAGS_secondary_cache_fault_one_in > 0) {
  141. secondary_cache = std::make_shared<FaultInjectionSecondaryCache>(
  142. secondary_cache, static_cast<uint32_t>(FLAGS_seed),
  143. FLAGS_secondary_cache_fault_one_in);
  144. }
  145. } else if (FLAGS_compressed_secondary_cache_size > 0) {
  146. if (StartsWith(FLAGS_cache_type, "tiered_")) {
  147. fprintf(stderr,
  148. "Cannot specify both compressed_secondary_cache_size and %s\n",
  149. FLAGS_cache_type.c_str());
  150. exit(1);
  151. }
  152. CompressedSecondaryCacheOptions opts;
  153. opts.capacity = FLAGS_compressed_secondary_cache_size;
  154. opts.compress_format_version = FLAGS_compress_format_version;
  155. if (FLAGS_enable_do_not_compress_roles) {
  156. opts.do_not_compress_roles = {CacheEntryRoleSet::All()};
  157. }
  158. opts.enable_custom_split_merge = FLAGS_enable_custom_split_merge;
  159. secondary_cache = NewCompressedSecondaryCache(opts);
  160. if (secondary_cache == nullptr) {
  161. fprintf(stderr, "Failed to allocate compressed secondary cache\n");
  162. exit(1);
  163. }
  164. compressed_secondary_cache = secondary_cache;
  165. }
  166. std::string cache_type = FLAGS_cache_type;
  167. size_t cache_size = FLAGS_cache_size;
  168. bool tiered = false;
  169. if (StartsWith(cache_type, "tiered_")) {
  170. tiered = true;
  171. cache_type.erase(0, strlen("tiered_"));
  172. }
  173. if (FLAGS_use_write_buffer_manager) {
  174. cache_size += FLAGS_db_write_buffer_size;
  175. }
  176. if (cache_type == "clock_cache") {
  177. fprintf(stderr, "Old clock cache implementation has been removed.\n");
  178. exit(1);
  179. } else if (EndsWith(cache_type, "hyper_clock_cache")) {
  180. size_t estimated_entry_charge;
  181. if (cache_type == "fixed_hyper_clock_cache" ||
  182. cache_type == "hyper_clock_cache") {
  183. estimated_entry_charge = FLAGS_block_size;
  184. } else if (cache_type == "auto_hyper_clock_cache") {
  185. estimated_entry_charge = 0;
  186. } else {
  187. fprintf(stderr, "Cache type not supported.");
  188. exit(1);
  189. }
  190. HyperClockCacheOptions opts(cache_size, estimated_entry_charge,
  191. num_shard_bits);
  192. opts.hash_seed = BitwiseAnd(FLAGS_seed, INT32_MAX);
  193. if (tiered) {
  194. TieredCacheOptions tiered_opts;
  195. tiered_opts.cache_opts = &opts;
  196. tiered_opts.cache_type = PrimaryCacheType::kCacheTypeHCC;
  197. tiered_opts.total_capacity = cache_size;
  198. tiered_opts.compressed_secondary_ratio = 0.5;
  199. tiered_opts.adm_policy =
  200. static_cast<TieredAdmissionPolicy>(FLAGS_adm_policy);
  201. if (tiered_opts.adm_policy ==
  202. TieredAdmissionPolicy::kAdmPolicyThreeQueue) {
  203. CompressedSecondaryCacheOptions nvm_sec_cache_opts;
  204. nvm_sec_cache_opts.capacity = cache_size;
  205. tiered_opts.nvm_sec_cache =
  206. NewCompressedSecondaryCache(nvm_sec_cache_opts);
  207. }
  208. block_cache = NewTieredCache(tiered_opts);
  209. } else {
  210. opts.secondary_cache = std::move(secondary_cache);
  211. block_cache = opts.MakeSharedCache();
  212. }
  213. } else if (EndsWith(cache_type, "lru_cache")) {
  214. LRUCacheOptions opts;
  215. opts.capacity = capacity;
  216. opts.num_shard_bits = num_shard_bits;
  217. opts.metadata_charge_policy =
  218. static_cast<CacheMetadataChargePolicy>(FLAGS_metadata_charge_policy);
  219. opts.use_adaptive_mutex = FLAGS_use_adaptive_mutex_lru;
  220. opts.high_pri_pool_ratio = FLAGS_high_pri_pool_ratio;
  221. opts.low_pri_pool_ratio = FLAGS_low_pri_pool_ratio;
  222. if (tiered) {
  223. TieredCacheOptions tiered_opts;
  224. tiered_opts.cache_opts = &opts;
  225. tiered_opts.cache_type = PrimaryCacheType::kCacheTypeLRU;
  226. tiered_opts.total_capacity = cache_size;
  227. tiered_opts.compressed_secondary_ratio = 0.5;
  228. tiered_opts.adm_policy =
  229. static_cast<TieredAdmissionPolicy>(FLAGS_adm_policy);
  230. if (tiered_opts.adm_policy ==
  231. TieredAdmissionPolicy::kAdmPolicyThreeQueue) {
  232. CompressedSecondaryCacheOptions nvm_sec_cache_opts;
  233. nvm_sec_cache_opts.capacity = cache_size;
  234. tiered_opts.nvm_sec_cache =
  235. NewCompressedSecondaryCache(nvm_sec_cache_opts);
  236. }
  237. block_cache = NewTieredCache(tiered_opts);
  238. } else {
  239. opts.secondary_cache = std::move(secondary_cache);
  240. block_cache = NewLRUCache(opts);
  241. }
  242. } else {
  243. fprintf(stderr, "Cache type not supported.");
  244. exit(1);
  245. }
  246. return block_cache;
  247. }
  248. std::vector<std::string> StressTest::GetBlobCompressionTags() {
  249. std::vector<std::string> compression_tags{"kNoCompression"};
  250. if (Snappy_Supported()) {
  251. compression_tags.emplace_back("kSnappyCompression");
  252. }
  253. if (LZ4_Supported()) {
  254. compression_tags.emplace_back("kLZ4Compression");
  255. }
  256. if (ZSTD_Supported()) {
  257. compression_tags.emplace_back("kZSTD");
  258. }
  259. return compression_tags;
  260. }
  261. bool StressTest::BuildOptionsTable() {
  262. if (FLAGS_set_options_one_in <= 0) {
  263. return true;
  264. }
  265. bool keepRibbonFilterPolicyOnly = FLAGS_bloom_before_level != INT_MAX;
  266. std::unordered_map<std::string, std::vector<std::string>> options_tbl = {
  267. {"write_buffer_size",
  268. {std::to_string(options_.write_buffer_size),
  269. std::to_string(options_.write_buffer_size * 2),
  270. std::to_string(options_.write_buffer_size * 4)}},
  271. {"max_write_buffer_number",
  272. {std::to_string(options_.max_write_buffer_number),
  273. std::to_string(options_.max_write_buffer_number * 2),
  274. std::to_string(options_.max_write_buffer_number * 4)}},
  275. {"arena_block_size",
  276. {
  277. std::to_string(options_.arena_block_size),
  278. std::to_string(options_.write_buffer_size / 4),
  279. std::to_string(options_.write_buffer_size / 8),
  280. }},
  281. {"memtable_huge_page_size", {"0", std::to_string(2 * 1024 * 1024)}},
  282. {"strict_max_successive_merges", {"false", "true"}},
  283. {"inplace_update_num_locks", {"100", "200", "300"}},
  284. // TODO: re-enable once internal task T124324915 is fixed.
  285. // {"experimental_mempurge_threshold", {"0.0", "1.0"}},
  286. // TODO(ljin): enable test for this option
  287. // {"disable_auto_compactions", {"100", "200", "300"}},
  288. {"level0_slowdown_writes_trigger",
  289. {
  290. std::to_string(options_.level0_slowdown_writes_trigger),
  291. std::to_string(options_.level0_slowdown_writes_trigger + 2),
  292. std::to_string(options_.level0_slowdown_writes_trigger + 4),
  293. }},
  294. {"level0_stop_writes_trigger",
  295. {
  296. std::to_string(options_.level0_stop_writes_trigger),
  297. std::to_string(options_.level0_stop_writes_trigger + 2),
  298. std::to_string(options_.level0_stop_writes_trigger + 4),
  299. }},
  300. {"max_compaction_bytes",
  301. {
  302. std::to_string(options_.target_file_size_base * 5),
  303. std::to_string(options_.target_file_size_base * 15),
  304. std::to_string(options_.target_file_size_base * 100),
  305. }},
  306. {"target_file_size_base",
  307. {
  308. std::to_string(options_.target_file_size_base),
  309. std::to_string(options_.target_file_size_base * 2),
  310. std::to_string(options_.target_file_size_base * 4),
  311. }},
  312. {"target_file_size_multiplier",
  313. {
  314. std::to_string(options_.target_file_size_multiplier),
  315. "1",
  316. "2",
  317. }},
  318. {"max_bytes_for_level_base",
  319. {
  320. std::to_string(options_.max_bytes_for_level_base / 2),
  321. std::to_string(options_.max_bytes_for_level_base),
  322. std::to_string(options_.max_bytes_for_level_base * 2),
  323. }},
  324. {"max_bytes_for_level_multiplier",
  325. {
  326. std::to_string(options_.max_bytes_for_level_multiplier),
  327. "1",
  328. "2",
  329. }},
  330. {"max_sequential_skip_in_iterations", {"4", "8", "12"}},
  331. {"block_based_table_factory",
  332. {
  333. keepRibbonFilterPolicyOnly ? "{filter_policy=ribbonfilter:2.35}"
  334. : "{filter_policy=bloomfilter:2.34}",
  335. "{filter_policy=ribbonfilter:5.67:-1}",
  336. keepRibbonFilterPolicyOnly ? "{filter_policy=ribbonfilter:8.9:3}"
  337. : "{filter_policy=nullptr}",
  338. "{block_size=" + std::to_string(FLAGS_block_size) + "}",
  339. "{block_size=" +
  340. std::to_string(FLAGS_block_size + (FLAGS_seed & 0xFFFU)) + "}",
  341. }},
  342. };
  343. if (FLAGS_compaction_style == kCompactionStyleUniversal &&
  344. FLAGS_universal_max_read_amp > 0) {
  345. // level0_file_num_compaction_trigger needs to be at most max_read_amp
  346. options_tbl.emplace(
  347. "level0_file_num_compaction_trigger",
  348. std::vector<std::string>{
  349. std::to_string(options_.level0_file_num_compaction_trigger),
  350. std::to_string(
  351. std::min(options_.level0_file_num_compaction_trigger + 2,
  352. FLAGS_universal_max_read_amp)),
  353. std::to_string(
  354. std::min(options_.level0_file_num_compaction_trigger + 4,
  355. FLAGS_universal_max_read_amp)),
  356. });
  357. } else {
  358. options_tbl.emplace(
  359. "level0_file_num_compaction_trigger",
  360. std::vector<std::string>{
  361. std::to_string(options_.level0_file_num_compaction_trigger),
  362. std::to_string(options_.level0_file_num_compaction_trigger + 2),
  363. std::to_string(options_.level0_file_num_compaction_trigger + 4),
  364. });
  365. }
  366. if (FLAGS_unordered_write) {
  367. options_tbl.emplace("max_successive_merges", std::vector<std::string>{"0"});
  368. } else {
  369. options_tbl.emplace("max_successive_merges",
  370. std::vector<std::string>{"0", "2", "4"});
  371. }
  372. if (FLAGS_allow_setting_blob_options_dynamically) {
  373. options_tbl.emplace("enable_blob_files",
  374. std::vector<std::string>{"false", "true"});
  375. options_tbl.emplace("min_blob_size",
  376. std::vector<std::string>{"0", "8", "16"});
  377. options_tbl.emplace("blob_file_size",
  378. std::vector<std::string>{"1M", "16M", "256M", "1G"});
  379. options_tbl.emplace("blob_compression_type", GetBlobCompressionTags());
  380. options_tbl.emplace("enable_blob_garbage_collection",
  381. std::vector<std::string>{"false", "true"});
  382. options_tbl.emplace(
  383. "blob_garbage_collection_age_cutoff",
  384. std::vector<std::string>{"0.0", "0.25", "0.5", "0.75", "1.0"});
  385. options_tbl.emplace("blob_garbage_collection_force_threshold",
  386. std::vector<std::string>{"0.5", "0.75", "1.0"});
  387. options_tbl.emplace("blob_compaction_readahead_size",
  388. std::vector<std::string>{"0", "1M", "4M"});
  389. options_tbl.emplace("blob_file_starting_level",
  390. std::vector<std::string>{"0", "1", "2"});
  391. options_tbl.emplace("prepopulate_blob_cache",
  392. std::vector<std::string>{"kDisable", "kFlushOnly"});
  393. }
  394. if (keepRibbonFilterPolicyOnly) {
  395. // Can modify RibbonFilterPolicy field
  396. options_tbl.emplace("table_factory.filter_policy.bloom_before_level",
  397. std::vector<std::string>{"-1", "0", "1", "2",
  398. "2147483646", "2147483647"});
  399. }
  400. if (!FLAGS_file_temperature_age_thresholds.empty()) {
  401. // Modify file_temperature_age_thresholds only if it is set initially
  402. // (FIFO tiered storage setup)
  403. options_tbl.emplace(
  404. "file_temperature_age_thresholds",
  405. std::vector<std::string>{
  406. "{{temperature=kWarm;age=10}:{temperature=kCool;age=30}:{"
  407. "temperature=kCold;age=100}:{"
  408. "temperature=kIce;age=300}}",
  409. "{{temperature=kWarm;age=30}:{temperature=kCold;age=300}}",
  410. "{{temperature=kCold;age=100}}", "{}"});
  411. options_tbl.emplace(
  412. "allow_trivial_copy_when_change_temperature",
  413. std::vector<std::string>{
  414. FLAGS_allow_trivial_copy_when_change_temperature ? "true"
  415. : "false"});
  416. }
  417. // NOTE: allow -1 to mean starting disabled but dynamically changing
  418. // But 0 means tiering is disabled for the entire run.
  419. if (FLAGS_preclude_last_level_data_seconds != 0) {
  420. options_tbl.emplace("preclude_last_level_data_seconds",
  421. std::vector<std::string>{"0", "5", "30", "5000"});
  422. }
  423. options_tbl.emplace("preserve_internal_time_seconds",
  424. std::vector<std::string>{"0", "5", "30", "5000"});
  425. options_table_ = std::move(options_tbl);
  426. for (const auto& iter : options_table_) {
  427. options_index_.push_back(iter.first);
  428. }
  429. return true;
  430. }
  431. void StressTest::InitDb(SharedState* shared) {
  432. uint64_t now = clock_->NowMicros();
  433. fprintf(stdout, "%s Initializing db_stress\n",
  434. clock_->TimeToString(now / 1000000).c_str());
  435. PrintEnv();
  436. Open(shared);
  437. BuildOptionsTable();
  438. }
  439. void StressTest::FinishInitDb(SharedState* shared) {
  440. if (FLAGS_read_only) {
  441. uint64_t now = clock_->NowMicros();
  442. fprintf(stdout, "%s Preloading db with %" PRIu64 " KVs\n",
  443. clock_->TimeToString(now / 1000000).c_str(), FLAGS_max_key);
  444. PreloadDbAndReopenAsReadOnly(FLAGS_max_key, shared);
  445. }
  446. if (shared->HasHistory()) {
  447. // The way it works right now is, if there's any history, that means the
  448. // previous run mutating the DB had all its operations traced, in which case
  449. // we should always be able to `Restore()` the expected values to match the
  450. // `db_`'s current seqno.
  451. Status s = shared->Restore(db_);
  452. if (!s.ok()) {
  453. fprintf(stderr, "Error restoring historical expected values: %s\n",
  454. s.ToString().c_str());
  455. exit(1);
  456. }
  457. }
  458. if (FLAGS_use_txn && !FLAGS_use_optimistic_txn) {
  459. // It's OK here without sync because unsynced data cannot be lost at this
  460. // point
  461. // - even with sync_fault_injection=1 as the
  462. // file is still directly writable until after FinishInitDb()
  463. ProcessRecoveredPreparedTxns(shared);
  464. }
  465. if (FLAGS_enable_compaction_filter) {
  466. auto* compaction_filter_factory =
  467. static_cast<DbStressCompactionFilterFactory*>(
  468. options_.compaction_filter_factory.get());
  469. assert(compaction_filter_factory);
  470. // This must be called only after any potential `SharedState::Restore()` has
  471. // completed in order for the `compaction_filter_factory` to operate on the
  472. // correct latest values file.
  473. compaction_filter_factory->SetSharedState(shared);
  474. fprintf(stdout, "Compaction filter factory: %s\n",
  475. compaction_filter_factory->Name());
  476. }
  477. }
  478. void StressTest::TrackExpectedState(SharedState* shared) {
  479. // When data loss is simulated, recovery from potential data loss is a prefix
  480. // recovery that requires tracing
  481. if (MightHaveUnsyncedDataLoss() && IsStateTracked()) {
  482. Status s = shared->SaveAtAndAfter(db_);
  483. if (!s.ok()) {
  484. fprintf(stderr, "Error enabling history tracing: %s\n",
  485. s.ToString().c_str());
  486. exit(1);
  487. }
  488. }
  489. }
  490. Status StressTest::AssertSame(DB* db, ColumnFamilyHandle* cf,
  491. ThreadState::SnapshotState& snap_state) {
  492. Status s;
  493. if (cf->GetName() != snap_state.cf_at_name) {
  494. return s;
  495. }
  496. // This `ReadOptions` is for validation purposes. Ignore
  497. // `FLAGS_rate_limit_user_ops` to avoid slowing any validation.
  498. ReadOptions ropt;
  499. ropt.snapshot = snap_state.snapshot;
  500. ropt.auto_refresh_iterator_with_snapshot =
  501. FLAGS_auto_refresh_iterator_with_snapshot;
  502. Slice ts;
  503. if (!snap_state.timestamp.empty()) {
  504. ts = snap_state.timestamp;
  505. ropt.timestamp = &ts;
  506. }
  507. PinnableSlice exp_v(&snap_state.value);
  508. exp_v.PinSelf();
  509. PinnableSlice v;
  510. s = db->Get(ropt, cf, snap_state.key, &v);
  511. if (!s.ok() && !s.IsNotFound()) {
  512. // When `persist_user_defined_timestamps` is false, a repeated read with
  513. // both a read timestamp and an explicitly taken snapshot cannot guarantee
  514. // consistent result all the time. When it cannot return consistent result,
  515. // it will return an `InvalidArgument` status.
  516. if (s.IsInvalidArgument() && !FLAGS_persist_user_defined_timestamps) {
  517. return Status::OK();
  518. }
  519. return s;
  520. }
  521. if (snap_state.status != s) {
  522. return Status::Corruption(
  523. "The snapshot gave inconsistent results for key " +
  524. std::to_string(Hash(snap_state.key.c_str(), snap_state.key.size(), 0)) +
  525. " in cf " + cf->GetName() + ": (" + snap_state.status.ToString() +
  526. ") vs. (" + s.ToString() + ")");
  527. }
  528. if (s.ok()) {
  529. if (exp_v != v) {
  530. return Status::Corruption("The snapshot gave inconsistent values: (" +
  531. exp_v.ToString() + ") vs. (" + v.ToString() +
  532. ")");
  533. }
  534. }
  535. if (snap_state.key_vec != nullptr) {
  536. // When `prefix_extractor` is set, seeking to beginning and scanning
  537. // across prefixes are only supported with `total_order_seek` set.
  538. ropt.total_order_seek = true;
  539. std::unique_ptr<Iterator> iterator(db->NewIterator(ropt));
  540. std::unique_ptr<std::vector<bool>> tmp_bitvec(
  541. new std::vector<bool>(FLAGS_max_key));
  542. for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) {
  543. uint64_t key_val;
  544. if (GetIntVal(iterator->key().ToString(), &key_val)) {
  545. (*tmp_bitvec.get())[key_val] = true;
  546. }
  547. }
  548. if (!std::equal(snap_state.key_vec->begin(), snap_state.key_vec->end(),
  549. tmp_bitvec.get()->begin())) {
  550. return Status::Corruption("Found inconsistent keys at this snapshot");
  551. }
  552. }
  553. return Status::OK();
  554. }
  555. void StressTest::ProcessStatus(SharedState* shared, std::string opname,
  556. const Status& s,
  557. bool ignore_injected_error) const {
  558. if (s.ok()) {
  559. return;
  560. }
  561. if (!ignore_injected_error || !IsErrorInjectedAndRetryable(s)) {
  562. std::ostringstream oss;
  563. oss << opname << " failed: " << s.ToString();
  564. VerificationAbort(shared, oss.str());
  565. assert(false);
  566. }
  567. }
  568. void StressTest::VerificationAbort(SharedState* shared, std::string msg) const {
  569. fprintf(stderr, "Verification failed: %s\n", msg.c_str());
  570. shared->SetVerificationFailure();
  571. }
  572. void StressTest::VerificationAbort(SharedState* shared, std::string msg, int cf,
  573. int64_t key) const {
  574. auto key_str = Key(key);
  575. Slice key_slice = key_str;
  576. fprintf(stderr,
  577. "Verification failed for column family %d key %s (%" PRIi64 "): %s\n",
  578. cf, key_slice.ToString(true).c_str(), key, msg.c_str());
  579. shared->SetVerificationFailure();
  580. }
  581. void StressTest::VerificationAbort(SharedState* shared, std::string msg, int cf,
  582. int64_t key, Slice value_from_db,
  583. Slice value_from_expected) const {
  584. auto key_str = Key(key);
  585. fprintf(stderr,
  586. "Verification failed for column family %d key %s (%" PRIi64
  587. "): value_from_db: %s, value_from_expected: %s, msg: %s\n",
  588. cf, Slice(key_str).ToString(true).c_str(), key,
  589. value_from_db.ToString(true).c_str(),
  590. value_from_expected.ToString(true).c_str(), msg.c_str());
  591. shared->SetVerificationFailure();
  592. }
  593. void StressTest::VerificationAbort(SharedState* shared, int cf, int64_t key,
  594. const Slice& value,
  595. const WideColumns& columns) const {
  596. assert(shared);
  597. auto key_str = Key(key);
  598. fprintf(stderr,
  599. "Verification failed for column family %d key %s (%" PRIi64
  600. "): Value and columns inconsistent: value: %s, columns: %s\n",
  601. cf, Slice(key_str).ToString(/* hex */ true).c_str(), key,
  602. value.ToString(/* hex */ true).c_str(),
  603. WideColumnsToHex(columns).c_str());
  604. shared->SetVerificationFailure();
  605. }
  606. std::string StressTest::DebugString(const Slice& value,
  607. const WideColumns& columns) {
  608. std::ostringstream oss;
  609. oss << "value: " << value.ToString(/* hex */ true)
  610. << ", columns: " << WideColumnsToHex(columns);
  611. return oss.str();
  612. }
  613. void StressTest::PrintStatistics() {
  614. // Print statistics from the DB instance instead of global dbstats
  615. if (db_) {
  616. auto stats = db_->GetOptions().statistics;
  617. if (stats) {
  618. fprintf(stdout, "STATISTICS:\n%s\n", stats->ToString().c_str());
  619. }
  620. }
  621. // Print statistics from secondary DB instance if it exists
  622. if (secondary_db_) {
  623. auto stats = secondary_db_->GetOptions().statistics;
  624. if (stats) {
  625. fprintf(stdout, "Secondary instance STATISTICS:\n%s\n",
  626. stats->ToString().c_str());
  627. }
  628. }
  629. }
  630. // Currently PreloadDb has to be single-threaded.
  631. void StressTest::PreloadDbAndReopenAsReadOnly(int64_t number_of_keys,
  632. SharedState* shared) {
  633. WriteOptions write_opts;
  634. write_opts.disableWAL = FLAGS_disable_wal;
  635. if (FLAGS_sync) {
  636. write_opts.sync = true;
  637. }
  638. if (FLAGS_rate_limit_auto_wal_flush) {
  639. write_opts.rate_limiter_priority = Env::IO_USER;
  640. }
  641. char value[100];
  642. int cf_idx = 0;
  643. Status s;
  644. for (auto cfh : column_families_) {
  645. for (int64_t k = 0; k != number_of_keys; ++k) {
  646. const std::string key = Key(k);
  647. PendingExpectedValue pending_expected_value =
  648. shared->PreparePut(cf_idx, k);
  649. const uint32_t value_base = pending_expected_value.GetFinalValueBase();
  650. const size_t sz = GenerateValue(value_base, value, sizeof(value));
  651. const Slice v(value, sz);
  652. std::string ts;
  653. if (FLAGS_user_timestamp_size > 0) {
  654. ts = GetNowNanos();
  655. }
  656. if (FLAGS_use_put_entity_one_in > 0 &&
  657. (value_base % FLAGS_use_put_entity_one_in) == 0) {
  658. if (!FLAGS_use_txn) {
  659. if (FLAGS_use_attribute_group) {
  660. s = db_->PutEntity(write_opts, key,
  661. GenerateAttributeGroups({cfh}, value_base, v));
  662. } else {
  663. s = db_->PutEntity(write_opts, cfh, key,
  664. GenerateWideColumns(value_base, v));
  665. }
  666. } else {
  667. s = ExecuteTransaction(
  668. write_opts, /*thread=*/nullptr, [&](Transaction& txn) {
  669. return txn.PutEntity(cfh, key,
  670. GenerateWideColumns(value_base, v));
  671. });
  672. }
  673. } else if (FLAGS_use_merge) {
  674. if (!FLAGS_use_txn) {
  675. if (FLAGS_user_timestamp_size > 0) {
  676. s = db_->Merge(write_opts, cfh, key, ts, v);
  677. } else {
  678. s = db_->Merge(write_opts, cfh, key, v);
  679. }
  680. } else {
  681. s = ExecuteTransaction(
  682. write_opts, /*thread=*/nullptr,
  683. [&](Transaction& txn) { return txn.Merge(cfh, key, v); });
  684. }
  685. } else {
  686. if (!FLAGS_use_txn) {
  687. if (FLAGS_user_timestamp_size > 0) {
  688. s = db_->Put(write_opts, cfh, key, ts, v);
  689. } else {
  690. s = db_->Put(write_opts, cfh, key, v);
  691. }
  692. } else {
  693. s = ExecuteTransaction(
  694. write_opts, /*thread=*/nullptr,
  695. [&](Transaction& txn) { return txn.Put(cfh, key, v); });
  696. }
  697. }
  698. if (!s.ok()) {
  699. pending_expected_value.Rollback();
  700. break;
  701. }
  702. pending_expected_value.Commit();
  703. }
  704. if (!s.ok()) {
  705. break;
  706. }
  707. ++cf_idx;
  708. }
  709. if (s.ok()) {
  710. s = db_->Flush(FlushOptions(), column_families_);
  711. }
  712. if (s.ok()) {
  713. CleanUpColumnFamilies();
  714. delete db_;
  715. db_ = nullptr;
  716. txn_db_ = nullptr;
  717. optimistic_txn_db_ = nullptr;
  718. delete secondary_db_;
  719. secondary_db_ = nullptr;
  720. db_preload_finished_.store(true);
  721. auto now = clock_->NowMicros();
  722. fprintf(stdout, "%s Reopening database in read-only\n",
  723. clock_->TimeToString(now / 1000000).c_str());
  724. // Reopen as read-only, can ignore all options related to updates
  725. Open(shared);
  726. } else {
  727. fprintf(stderr, "Failed to preload db");
  728. exit(1);
  729. }
  730. }
  731. Status StressTest::SetOptions(ThreadState* thread) {
  732. assert(FLAGS_set_options_one_in > 0);
  733. std::unordered_map<std::string, std::string> opts;
  734. std::string name =
  735. options_index_[thread->rand.Next() % options_index_.size()];
  736. int value_idx = thread->rand.Next() % options_table_[name].size();
  737. if (name == "level0_file_num_compaction_trigger" ||
  738. name == "level0_slowdown_writes_trigger" ||
  739. name == "level0_stop_writes_trigger") {
  740. opts["level0_file_num_compaction_trigger"] =
  741. options_table_["level0_file_num_compaction_trigger"][value_idx];
  742. opts["level0_slowdown_writes_trigger"] =
  743. options_table_["level0_slowdown_writes_trigger"][value_idx];
  744. opts["level0_stop_writes_trigger"] =
  745. options_table_["level0_stop_writes_trigger"][value_idx];
  746. } else {
  747. opts[name] = options_table_[name][value_idx];
  748. }
  749. int rand_cf_idx = thread->rand.Next() % FLAGS_column_families;
  750. auto cfh = column_families_[rand_cf_idx];
  751. return db_->SetOptions(cfh, opts);
  752. }
  753. Options StressTest::GetOptions(int cf_id) {
  754. auto cfh = column_families_[cf_id];
  755. assert(cfh);
  756. return db_->GetOptions(cfh);
  757. }
  758. void StressTest::ProcessRecoveredPreparedTxns(SharedState* shared) {
  759. assert(txn_db_);
  760. std::vector<Transaction*> recovered_prepared_trans;
  761. txn_db_->GetAllPreparedTransactions(&recovered_prepared_trans);
  762. for (Transaction* txn : recovered_prepared_trans) {
  763. ProcessRecoveredPreparedTxnsHelper(txn, shared);
  764. delete txn;
  765. }
  766. recovered_prepared_trans.clear();
  767. txn_db_->GetAllPreparedTransactions(&recovered_prepared_trans);
  768. assert(recovered_prepared_trans.size() == 0);
  769. }
  770. void StressTest::ProcessRecoveredPreparedTxnsHelper(Transaction* txn,
  771. SharedState* shared) {
  772. thread_local Random rand(static_cast<uint32_t>(FLAGS_seed));
  773. for (size_t i = 0; i < column_families_.size(); ++i) {
  774. std::unique_ptr<WBWIIterator> wbwi_iter(
  775. txn->GetWriteBatch()->NewIterator(column_families_[i]));
  776. for (wbwi_iter->SeekToFirst(); wbwi_iter->Valid(); wbwi_iter->Next()) {
  777. uint64_t key_val;
  778. if (GetIntVal(wbwi_iter->Entry().key.ToString(), &key_val)) {
  779. shared->SyncPendingPut(static_cast<int>(i) /* cf_idx */, key_val);
  780. }
  781. }
  782. }
  783. if (rand.OneIn(2)) {
  784. Status s = txn->Commit();
  785. assert(s.ok());
  786. } else {
  787. Status s = txn->Rollback();
  788. assert(s.ok());
  789. }
  790. }
  791. Status StressTest::NewTxn(WriteOptions& write_opts, ThreadState* thread,
  792. std::unique_ptr<Transaction>* out_txn,
  793. bool* commit_bypass_memtable) {
  794. if (!FLAGS_use_txn) {
  795. return Status::InvalidArgument("NewTxn when FLAGS_use_txn is not set");
  796. }
  797. write_opts.disableWAL = FLAGS_disable_wal;
  798. static std::atomic<uint64_t> txn_id = {0};
  799. if (FLAGS_use_optimistic_txn) {
  800. out_txn->reset(optimistic_txn_db_->BeginTransaction(write_opts));
  801. return Status::OK();
  802. } else {
  803. TransactionOptions txn_options;
  804. txn_options.use_only_the_last_commit_time_batch_for_recovery =
  805. FLAGS_use_only_the_last_commit_time_batch_for_recovery;
  806. txn_options.lock_timeout = 600000; // 10 min
  807. txn_options.deadlock_detect = true;
  808. if (FLAGS_commit_bypass_memtable_one_in > 0 &&
  809. thread->rand.OneIn(FLAGS_commit_bypass_memtable_one_in)) {
  810. assert(FLAGS_txn_write_policy == 0);
  811. assert(FLAGS_user_timestamp_size == 0);
  812. if (thread->rand.OneIn(2)) {
  813. txn_options.commit_bypass_memtable = true;
  814. }
  815. if (thread->rand.OneIn(2)) {
  816. txn_options.large_txn_commit_optimize_threshold = 1;
  817. }
  818. if (thread->rand.OneIn(2) ||
  819. (!txn_options.commit_bypass_memtable &&
  820. txn_options.large_txn_commit_optimize_threshold != 1)) {
  821. txn_options.large_txn_commit_optimize_byte_threshold = 1;
  822. }
  823. if (commit_bypass_memtable) {
  824. *commit_bypass_memtable = txn_options.commit_bypass_memtable;
  825. }
  826. }
  827. out_txn->reset(txn_db_->BeginTransaction(write_opts, txn_options));
  828. auto istr = std::to_string(txn_id.fetch_add(1));
  829. Status s = (*out_txn)->SetName("xid" + istr);
  830. return s;
  831. }
  832. }
  833. Status StressTest::CommitTxn(Transaction& txn, ThreadState* thread) {
  834. if (!FLAGS_use_txn) {
  835. return Status::InvalidArgument("CommitTxn when FLAGS_use_txn is not set");
  836. }
  837. Status s = Status::OK();
  838. // We don't issue write to transaction's underlying WriteBatch in stress test
  839. assert(txn.GetWriteBatch()->GetWriteBatch()->Count());
  840. assert(txn.GetWriteBatch()->GetWBWIOpCount() ==
  841. txn.GetWriteBatch()->GetWriteBatch()->Count());
  842. if (FLAGS_use_optimistic_txn) {
  843. assert(optimistic_txn_db_);
  844. s = txn.Commit();
  845. } else {
  846. assert(txn_db_);
  847. s = txn.Prepare();
  848. std::shared_ptr<const Snapshot> timestamped_snapshot;
  849. if (s.ok()) {
  850. if (thread && FLAGS_create_timestamped_snapshot_one_in &&
  851. thread->rand.OneIn(FLAGS_create_timestamped_snapshot_one_in)) {
  852. uint64_t ts = db_stress_env->NowNanos();
  853. s = txn.CommitAndTryCreateSnapshot(/*notifier=*/nullptr, ts,
  854. &timestamped_snapshot);
  855. std::pair<Status, std::shared_ptr<const Snapshot>> res;
  856. if (thread->tid == 0) {
  857. uint64_t now = db_stress_env->NowNanos();
  858. res = txn_db_->CreateTimestampedSnapshot(now);
  859. if (res.first.ok()) {
  860. assert(res.second);
  861. assert(res.second->GetTimestamp() == now);
  862. if (timestamped_snapshot) {
  863. assert(res.second->GetTimestamp() >
  864. timestamped_snapshot->GetTimestamp());
  865. }
  866. } else {
  867. assert(!res.second);
  868. }
  869. }
  870. } else {
  871. s = txn.Commit();
  872. }
  873. }
  874. if (thread && FLAGS_create_timestamped_snapshot_one_in > 0 &&
  875. thread->rand.OneInOpt(50000)) {
  876. uint64_t now = db_stress_env->NowNanos();
  877. constexpr uint64_t time_diff = static_cast<uint64_t>(1000) * 1000 * 1000;
  878. txn_db_->ReleaseTimestampedSnapshotsOlderThan(now - time_diff);
  879. }
  880. }
  881. return s;
  882. }
  883. Status StressTest::ExecuteTransaction(WriteOptions& write_opts,
  884. ThreadState* thread,
  885. std::function<Status(Transaction&)>&& ops,
  886. bool* commit_bypass_memtable) {
  887. std::unique_ptr<Transaction> txn;
  888. Status s = NewTxn(write_opts, thread, &txn, commit_bypass_memtable);
  889. std::string try_again_messages;
  890. if (s.ok()) {
  891. for (int tries = 1;; ++tries) {
  892. s = ops(*txn);
  893. if (s.ok()) {
  894. s = CommitTxn(*txn, thread);
  895. if (s.ok()) {
  896. break;
  897. }
  898. }
  899. // Optimistic txn might return TryAgain, in which case rollback
  900. // and try again.
  901. if (!s.IsTryAgain() || !FLAGS_use_optimistic_txn) {
  902. break;
  903. }
  904. // Record and report historical TryAgain messages for debugging
  905. try_again_messages +=
  906. std::to_string(SystemClock::Default()->NowMicros() / 1000);
  907. try_again_messages += "ms ";
  908. try_again_messages += s.getState();
  909. try_again_messages += "\n";
  910. // In theory, each Rollback after TryAgain should have an independent
  911. // chance of success, so too many retries could indicate something is
  912. // not working properly.
  913. if (tries >= 10) {
  914. s = Status::TryAgain(try_again_messages);
  915. break;
  916. }
  917. s = txn->Rollback();
  918. if (!s.ok()) {
  919. break;
  920. }
  921. }
  922. }
  923. return s;
  924. }
  925. void StressTest::OperateDb(ThreadState* thread) {
  926. ReadOptions read_opts(FLAGS_verify_checksum, true);
  927. read_opts.rate_limiter_priority =
  928. FLAGS_rate_limit_user_ops ? Env::IO_USER : Env::IO_TOTAL;
  929. read_opts.async_io = FLAGS_async_io;
  930. read_opts.adaptive_readahead = FLAGS_adaptive_readahead;
  931. read_opts.readahead_size = FLAGS_readahead_size;
  932. read_opts.auto_readahead_size = FLAGS_auto_readahead_size;
  933. read_opts.fill_cache = FLAGS_fill_cache;
  934. read_opts.optimize_multiget_for_io = FLAGS_optimize_multiget_for_io;
  935. read_opts.allow_unprepared_value = FLAGS_allow_unprepared_value;
  936. read_opts.auto_refresh_iterator_with_snapshot =
  937. FLAGS_auto_refresh_iterator_with_snapshot;
  938. WriteOptions write_opts;
  939. if (FLAGS_rate_limit_auto_wal_flush) {
  940. write_opts.rate_limiter_priority = Env::IO_USER;
  941. }
  942. write_opts.memtable_insert_hint_per_batch =
  943. FLAGS_memtable_insert_hint_per_batch;
  944. auto shared = thread->shared;
  945. char value[100];
  946. std::string from_db;
  947. if (FLAGS_sync) {
  948. write_opts.sync = true;
  949. }
  950. write_opts.disableWAL = FLAGS_disable_wal;
  951. write_opts.protection_bytes_per_key = FLAGS_batch_protection_bytes_per_key;
  952. const int prefix_bound = static_cast<int>(FLAGS_readpercent) +
  953. static_cast<int>(FLAGS_prefixpercent);
  954. const int write_bound = prefix_bound + static_cast<int>(FLAGS_writepercent);
  955. const int del_bound = write_bound + static_cast<int>(FLAGS_delpercent);
  956. const int delrange_bound =
  957. del_bound + static_cast<int>(FLAGS_delrangepercent);
  958. const int iterate_bound =
  959. delrange_bound + static_cast<int>(FLAGS_iterpercent);
  960. const uint64_t ops_per_open = FLAGS_ops_per_thread / (FLAGS_reopen + 1);
  961. thread->stats.Start();
  962. for (int open_cnt = 0; open_cnt <= FLAGS_reopen; ++open_cnt) {
  963. if (thread->shared->HasVerificationFailedYet() ||
  964. thread->shared->ShouldStopTest()) {
  965. break;
  966. }
  967. if (open_cnt != 0) {
  968. thread->stats.FinishedSingleOp();
  969. MutexLock l(thread->shared->GetMutex());
  970. while (!thread->snapshot_queue.empty()) {
  971. db_->ReleaseSnapshot(thread->snapshot_queue.front().second.snapshot);
  972. delete thread->snapshot_queue.front().second.key_vec;
  973. thread->snapshot_queue.pop();
  974. }
  975. thread->shared->IncVotedReopen();
  976. if (thread->shared->AllVotedReopen()) {
  977. thread->shared->GetStressTest()->Reopen(thread);
  978. thread->shared->GetCondVar()->SignalAll();
  979. } else {
  980. thread->shared->GetCondVar()->Wait();
  981. }
  982. // Commenting this out as we don't want to reset stats on each open.
  983. // thread->stats.Start();
  984. }
  985. #ifndef NDEBUG
  986. if (fault_fs_guard) {
  987. fault_fs_guard->SetThreadLocalErrorContext(
  988. FaultInjectionIOType::kRead, thread->shared->GetSeed(),
  989. FLAGS_read_fault_one_in,
  990. FLAGS_inject_error_severity == 1 /* retryable */,
  991. FLAGS_inject_error_severity == 2 /* has_data_loss*/);
  992. fault_fs_guard->EnableThreadLocalErrorInjection(
  993. FaultInjectionIOType::kRead);
  994. fault_fs_guard->SetThreadLocalErrorContext(
  995. FaultInjectionIOType::kWrite, thread->shared->GetSeed(),
  996. FLAGS_write_fault_one_in,
  997. FLAGS_inject_error_severity == 1 /* retryable */,
  998. FLAGS_inject_error_severity == 2 /* has_data_loss*/);
  999. fault_fs_guard->EnableThreadLocalErrorInjection(
  1000. FaultInjectionIOType::kWrite);
  1001. fault_fs_guard->SetThreadLocalErrorContext(
  1002. FaultInjectionIOType::kMetadataRead, thread->shared->GetSeed(),
  1003. FLAGS_metadata_read_fault_one_in,
  1004. FLAGS_inject_error_severity == 1 /* retryable */,
  1005. FLAGS_inject_error_severity == 2 /* has_data_loss*/);
  1006. fault_fs_guard->EnableThreadLocalErrorInjection(
  1007. FaultInjectionIOType::kMetadataRead);
  1008. fault_fs_guard->SetThreadLocalErrorContext(
  1009. FaultInjectionIOType::kMetadataWrite, thread->shared->GetSeed(),
  1010. FLAGS_metadata_write_fault_one_in,
  1011. FLAGS_inject_error_severity == 1 /* retryable */,
  1012. FLAGS_inject_error_severity == 2 /* has_data_loss*/);
  1013. fault_fs_guard->EnableThreadLocalErrorInjection(
  1014. FaultInjectionIOType::kMetadataWrite);
  1015. }
  1016. #endif // NDEBUG
  1017. for (uint64_t i = 0; i < ops_per_open; i++) {
  1018. if (thread->shared->HasVerificationFailedYet()) {
  1019. break;
  1020. }
  1021. // Change Options
  1022. if (thread->rand.OneInOpt(FLAGS_set_options_one_in)) {
  1023. Status s = SetOptions(thread);
  1024. ProcessStatus(shared, "SetOptions", s);
  1025. }
  1026. if (thread->rand.OneInOpt(FLAGS_set_in_place_one_in)) {
  1027. options_.inplace_update_support ^= options_.inplace_update_support;
  1028. }
  1029. if (thread->tid == 0 && FLAGS_verify_db_one_in > 0 &&
  1030. thread->rand.OneIn(FLAGS_verify_db_one_in)) {
  1031. // Temporarily disable error injection for verification
  1032. if (fault_fs_guard) {
  1033. fault_fs_guard->DisableAllThreadLocalErrorInjection();
  1034. }
  1035. ContinuouslyVerifyDb(thread);
  1036. // Enable back error injection disabled for verification
  1037. if (fault_fs_guard) {
  1038. fault_fs_guard->EnableAllThreadLocalErrorInjection();
  1039. }
  1040. if (thread->shared->ShouldStopTest()) {
  1041. break;
  1042. }
  1043. }
  1044. MaybeClearOneColumnFamily(thread);
  1045. if (thread->rand.OneInOpt(FLAGS_manual_wal_flush_one_in)) {
  1046. bool sync = thread->rand.OneIn(2) ? true : false;
  1047. Status s = db_->FlushWAL(sync);
  1048. if (!s.ok() && !IsErrorInjectedAndRetryable(s) &&
  1049. !(sync && s.IsNotSupported())) {
  1050. fprintf(stderr, "FlushWAL(sync=%s) failed: %s\n",
  1051. (sync ? "true" : "false"), s.ToString().c_str());
  1052. }
  1053. }
  1054. if (thread->rand.OneInOpt(FLAGS_lock_wal_one_in)) {
  1055. Status s = db_->LockWAL();
  1056. if (!s.ok() && !IsErrorInjectedAndRetryable(s)) {
  1057. fprintf(stderr, "LockWAL() failed: %s\n", s.ToString().c_str());
  1058. } else if (s.ok()) {
  1059. // Temporarily disable error injection for verification
  1060. if (fault_fs_guard) {
  1061. fault_fs_guard->DisableAllThreadLocalErrorInjection();
  1062. }
  1063. // Verify no writes during LockWAL
  1064. auto old_seqno = db_->GetLatestSequenceNumber();
  1065. // And also that WAL is not changed during LockWAL()
  1066. std::unique_ptr<WalFile> old_wal;
  1067. s = db_->GetCurrentWalFile(&old_wal);
  1068. if (!s.ok()) {
  1069. fprintf(stderr, "GetCurrentWalFile() failed: %s\n",
  1070. s.ToString().c_str());
  1071. } else {
  1072. // Yield for a while
  1073. do {
  1074. std::this_thread::yield();
  1075. } while (thread->rand.OneIn(2));
  1076. // Current WAL and size should not have changed
  1077. std::unique_ptr<WalFile> new_wal;
  1078. s = db_->GetCurrentWalFile(&new_wal);
  1079. if (!s.ok()) {
  1080. fprintf(stderr, "GetCurrentWalFile() failed: %s\n",
  1081. s.ToString().c_str());
  1082. } else {
  1083. if (old_wal->LogNumber() != new_wal->LogNumber()) {
  1084. fprintf(stderr,
  1085. "Failed: WAL number changed during LockWAL(): %" PRIu64
  1086. " to %" PRIu64 "\n",
  1087. old_wal->LogNumber(), new_wal->LogNumber());
  1088. }
  1089. if (old_wal->SizeFileBytes() != new_wal->SizeFileBytes()) {
  1090. fprintf(stderr,
  1091. "Failed: WAL %" PRIu64
  1092. " size changed during LockWAL(): %" PRIu64
  1093. " to %" PRIu64 "\n",
  1094. old_wal->LogNumber(), old_wal->SizeFileBytes(),
  1095. new_wal->SizeFileBytes());
  1096. }
  1097. }
  1098. }
  1099. // Verify no writes during LockWAL
  1100. auto new_seqno = db_->GetLatestSequenceNumber();
  1101. if (old_seqno != new_seqno) {
  1102. fprintf(
  1103. stderr,
  1104. "Failure: latest seqno changed from %u to %u with WAL locked\n",
  1105. (unsigned)old_seqno, (unsigned)new_seqno);
  1106. }
  1107. // Verification done. Now unlock WAL
  1108. s = db_->UnlockWAL();
  1109. if (!s.ok()) {
  1110. fprintf(stderr, "UnlockWAL() failed: %s\n", s.ToString().c_str());
  1111. }
  1112. // Enable back error injection disabled for verification
  1113. if (fault_fs_guard) {
  1114. fault_fs_guard->EnableAllThreadLocalErrorInjection();
  1115. }
  1116. }
  1117. }
  1118. if (thread->rand.OneInOpt(FLAGS_sync_wal_one_in)) {
  1119. Status s = db_->SyncWAL();
  1120. if (!s.ok() && !s.IsNotSupported() && !IsErrorInjectedAndRetryable(s)) {
  1121. fprintf(stderr, "SyncWAL() failed: %s\n", s.ToString().c_str());
  1122. }
  1123. }
  1124. int rand_column_family = thread->rand.Next() % FLAGS_column_families;
  1125. ColumnFamilyHandle* column_family = column_families_[rand_column_family];
  1126. if (thread->rand.OneInOpt(FLAGS_compact_files_one_in)) {
  1127. TestCompactFiles(thread, column_family);
  1128. }
  1129. int64_t rand_key = GenerateOneKey(thread, i);
  1130. std::string keystr = Key(rand_key);
  1131. Slice key = keystr;
  1132. if (thread->rand.OneInOpt(FLAGS_compact_range_one_in)) {
  1133. TestCompactRange(thread, rand_key, key, column_family);
  1134. if (thread->shared->HasVerificationFailedYet()) {
  1135. break;
  1136. }
  1137. }
  1138. if (thread->rand.OneInOpt(FLAGS_promote_l0_one_in)) {
  1139. TestPromoteL0(thread, column_family);
  1140. }
  1141. std::vector<int> rand_column_families =
  1142. GenerateColumnFamilies(FLAGS_column_families, rand_column_family);
  1143. if (thread->rand.OneInOpt(FLAGS_flush_one_in)) {
  1144. Status status = TestFlush(rand_column_families);
  1145. ProcessStatus(shared, "Flush", status);
  1146. }
  1147. if (thread->rand.OneInOpt(FLAGS_get_live_files_apis_one_in)) {
  1148. Status s_1 = TestGetLiveFiles();
  1149. ProcessStatus(shared, "GetLiveFiles", s_1);
  1150. Status s_2 = TestGetLiveFilesMetaData();
  1151. ProcessStatus(shared, "GetLiveFilesMetaData", s_2);
  1152. // TODO: enable again after making `GetLiveFilesStorageInfo()`
  1153. // compatible with `Options::recycle_log_file_num`
  1154. if (FLAGS_recycle_log_file_num == 0) {
  1155. Status s_3 = TestGetLiveFilesStorageInfo();
  1156. ProcessStatus(shared, "GetLiveFilesStorageInfo", s_3);
  1157. }
  1158. }
  1159. if (thread->rand.OneInOpt(FLAGS_get_all_column_family_metadata_one_in)) {
  1160. Status status = TestGetAllColumnFamilyMetaData();
  1161. ProcessStatus(shared, "GetAllColumnFamilyMetaData", status);
  1162. }
  1163. if (thread->rand.OneInOpt(FLAGS_get_sorted_wal_files_one_in)) {
  1164. Status status = TestGetSortedWalFiles();
  1165. ProcessStatus(shared, "GetSortedWalFiles", status);
  1166. }
  1167. if (thread->rand.OneInOpt(FLAGS_get_current_wal_file_one_in)) {
  1168. Status status = TestGetCurrentWalFile();
  1169. ProcessStatus(shared, "GetCurrentWalFile", status);
  1170. }
  1171. if (thread->rand.OneInOpt(FLAGS_reset_stats_one_in)) {
  1172. Status status = TestResetStats();
  1173. ProcessStatus(shared, "ResetStats", status);
  1174. }
  1175. if (thread->rand.OneInOpt(FLAGS_pause_background_one_in)) {
  1176. Status status = TestPauseBackground(thread);
  1177. ProcessStatus(shared, "Pause/ContinueBackgroundWork", status);
  1178. }
  1179. if (thread->rand.OneInOpt(FLAGS_disable_file_deletions_one_in)) {
  1180. Status status = TestDisableFileDeletions(thread);
  1181. ProcessStatus(shared, "TestDisableFileDeletions", status);
  1182. }
  1183. if (thread->rand.OneInOpt(FLAGS_disable_manual_compaction_one_in)) {
  1184. Status status = TestDisableManualCompaction(thread);
  1185. ProcessStatus(shared, "TestDisableManualCompaction", status);
  1186. }
  1187. if (thread->rand.OneInOpt(FLAGS_verify_checksum_one_in)) {
  1188. ThreadStatusUtil::SetEnableTracking(FLAGS_enable_thread_tracking);
  1189. ThreadStatusUtil::SetThreadOperation(
  1190. ThreadStatus::OperationType::OP_VERIFY_DB_CHECKSUM);
  1191. Status status = db_->VerifyChecksum();
  1192. ThreadStatusUtil::ResetThreadStatus();
  1193. ProcessStatus(shared, "VerifyChecksum", status);
  1194. }
  1195. if (thread->rand.OneInOpt(FLAGS_verify_file_checksums_one_in)) {
  1196. ThreadStatusUtil::SetEnableTracking(FLAGS_enable_thread_tracking);
  1197. ThreadStatusUtil::SetThreadOperation(
  1198. ThreadStatus::OperationType::OP_VERIFY_FILE_CHECKSUMS);
  1199. Status status = db_->VerifyFileChecksums(read_opts);
  1200. ThreadStatusUtil::ResetThreadStatus();
  1201. ProcessStatus(shared, "VerifyFileChecksums", status);
  1202. }
  1203. if (thread->rand.OneInOpt(FLAGS_get_property_one_in)) {
  1204. // TestGetProperty doesn't return status for us to tell whether it has
  1205. // failed due to injected error. So we disable fault injection to avoid
  1206. // false positive
  1207. if (fault_fs_guard) {
  1208. fault_fs_guard->DisableAllThreadLocalErrorInjection();
  1209. }
  1210. TestGetProperty(thread);
  1211. if (fault_fs_guard) {
  1212. fault_fs_guard->EnableAllThreadLocalErrorInjection();
  1213. }
  1214. }
  1215. if (thread->rand.OneInOpt(FLAGS_get_properties_of_all_tables_one_in)) {
  1216. Status status = TestGetPropertiesOfAllTables();
  1217. ProcessStatus(shared, "TestGetPropertiesOfAllTables", status);
  1218. }
  1219. std::vector<int64_t> rand_keys = GenerateKeys(rand_key);
  1220. if (thread->rand.OneInOpt(FLAGS_ingest_external_file_one_in)) {
  1221. TestIngestExternalFile(thread, rand_column_families, rand_keys);
  1222. }
  1223. if (thread->rand.OneInOpt(FLAGS_backup_one_in)) {
  1224. // Beyond a certain DB size threshold, this test becomes heavier than
  1225. // it's worth.
  1226. uint64_t total_size = 0;
  1227. if (FLAGS_backup_max_size > 0) {
  1228. std::vector<FileAttributes> files;
  1229. db_stress_env->GetChildrenFileAttributes(FLAGS_db, &files);
  1230. for (auto& file : files) {
  1231. total_size += file.size_bytes;
  1232. }
  1233. }
  1234. if (total_size <= FLAGS_backup_max_size) {
  1235. // TODO(hx235): enable error injection with
  1236. // backup/restore after fixing the various issues it surfaces
  1237. if (fault_fs_guard) {
  1238. fault_fs_guard->DisableAllThreadLocalErrorInjection();
  1239. }
  1240. Status s = TestBackupRestore(thread, rand_column_families, rand_keys);
  1241. if (fault_fs_guard) {
  1242. fault_fs_guard->EnableAllThreadLocalErrorInjection();
  1243. }
  1244. ProcessStatus(shared, "Backup/restore", s);
  1245. }
  1246. }
  1247. if (thread->rand.OneInOpt(FLAGS_checkpoint_one_in)) {
  1248. Status s = TestCheckpoint(thread, rand_column_families, rand_keys);
  1249. ProcessStatus(shared, "Checkpoint", s);
  1250. }
  1251. if (thread->rand.OneInOpt(FLAGS_approximate_size_one_in)) {
  1252. Status s =
  1253. TestApproximateSize(thread, i, rand_column_families, rand_keys);
  1254. ProcessStatus(shared, "ApproximateSize", s);
  1255. }
  1256. if (thread->rand.OneInOpt(FLAGS_acquire_snapshot_one_in)) {
  1257. TestAcquireSnapshot(thread, rand_column_family, keystr, i);
  1258. }
  1259. /*always*/ {
  1260. Status s = MaybeReleaseSnapshots(thread, i);
  1261. ProcessStatus(shared, "Snapshot", s);
  1262. }
  1263. // Assign timestamps if necessary.
  1264. std::string read_ts_str;
  1265. Slice read_ts;
  1266. if (FLAGS_user_timestamp_size > 0) {
  1267. read_ts_str = GetNowNanos();
  1268. read_ts = read_ts_str;
  1269. read_opts.timestamp = &read_ts;
  1270. }
  1271. if (thread->rand.OneInOpt(FLAGS_key_may_exist_one_in)) {
  1272. TestKeyMayExist(thread, read_opts, rand_column_families, rand_keys);
  1273. }
  1274. // Prefix-recoverability relies on tracing successful user writes.
  1275. // Currently we trace all user writes regardless of whether it later
  1276. // succeeds or not. To simplify, we disable any fault injection during
  1277. // user write.
  1278. // TODO(hx235): support tracing user writes with fault injection.
  1279. bool disable_fault_injection_during_user_write =
  1280. fault_fs_guard && MightHaveUnsyncedDataLoss();
  1281. int prob_op = thread->rand.Uniform(100);
  1282. // Reset this in case we pick something other than a read op. We don't
  1283. // want to use a stale value when deciding at the beginning of the loop
  1284. // whether to vote to reopen
  1285. if (prob_op >= 0 && prob_op < static_cast<int>(FLAGS_readpercent)) {
  1286. assert(0 <= prob_op);
  1287. // OPERATION read
  1288. ThreadStatusUtil::SetEnableTracking(FLAGS_enable_thread_tracking);
  1289. if (FLAGS_use_multi_get_entity) {
  1290. constexpr uint64_t max_batch_size = 64;
  1291. const uint64_t batch_size = std::min(
  1292. static_cast<uint64_t>(thread->rand.Uniform(max_batch_size)) + 1,
  1293. ops_per_open - i);
  1294. assert(batch_size >= 1);
  1295. assert(batch_size <= max_batch_size);
  1296. assert(i + batch_size <= ops_per_open);
  1297. rand_keys = GenerateNKeys(thread, static_cast<int>(batch_size), i);
  1298. ThreadStatusUtil::SetThreadOperation(
  1299. ThreadStatus::OperationType::OP_MULTIGETENTITY);
  1300. TestMultiGetEntity(thread, read_opts, rand_column_families,
  1301. rand_keys);
  1302. i += batch_size - 1;
  1303. } else if (FLAGS_use_get_entity) {
  1304. ThreadStatusUtil::SetThreadOperation(
  1305. ThreadStatus::OperationType::OP_GETENTITY);
  1306. TestGetEntity(thread, read_opts, rand_column_families, rand_keys);
  1307. } else if (FLAGS_use_multiget) {
  1308. // Leave room for one more iteration of the loop with a single key
  1309. // batch. This is to ensure that each thread does exactly the same
  1310. // number of ops
  1311. int multiget_batch_size = static_cast<int>(
  1312. std::min(static_cast<uint64_t>(thread->rand.Uniform(64)),
  1313. FLAGS_ops_per_thread - i - 1));
  1314. // If its the last iteration, ensure that multiget_batch_size is 1
  1315. multiget_batch_size = std::max(multiget_batch_size, 1);
  1316. rand_keys = GenerateNKeys(thread, multiget_batch_size, i);
  1317. ThreadStatusUtil::SetThreadOperation(
  1318. ThreadStatus::OperationType::OP_MULTIGET);
  1319. TestMultiGet(thread, read_opts, rand_column_families, rand_keys);
  1320. i += multiget_batch_size - 1;
  1321. } else {
  1322. ThreadStatusUtil::SetThreadOperation(
  1323. ThreadStatus::OperationType::OP_GET);
  1324. TestGet(thread, read_opts, rand_column_families, rand_keys);
  1325. }
  1326. ThreadStatusUtil::ResetThreadStatus();
  1327. } else if (prob_op < prefix_bound) {
  1328. assert(static_cast<int>(FLAGS_readpercent) <= prob_op);
  1329. // OPERATION prefix scan
  1330. // keys are 8 bytes long, prefix size is FLAGS_prefix_size. There are
  1331. // (8 - FLAGS_prefix_size) bytes besides the prefix. So there will
  1332. // be 2 ^ ((8 - FLAGS_prefix_size) * 8) possible keys with the same
  1333. // prefix
  1334. TestPrefixScan(thread, read_opts, rand_column_families, rand_keys);
  1335. } else if (prob_op < write_bound) {
  1336. assert(prefix_bound <= prob_op);
  1337. // OPERATION write
  1338. if (disable_fault_injection_during_user_write) {
  1339. fault_fs_guard->DisableAllThreadLocalErrorInjection();
  1340. }
  1341. TestPut(thread, write_opts, read_opts, rand_column_families, rand_keys,
  1342. value);
  1343. if (disable_fault_injection_during_user_write) {
  1344. fault_fs_guard->EnableAllThreadLocalErrorInjection();
  1345. }
  1346. } else if (prob_op < del_bound) {
  1347. assert(write_bound <= prob_op);
  1348. // OPERATION delete
  1349. if (disable_fault_injection_during_user_write) {
  1350. fault_fs_guard->DisableAllThreadLocalErrorInjection();
  1351. }
  1352. TestDelete(thread, write_opts, rand_column_families, rand_keys);
  1353. if (disable_fault_injection_during_user_write) {
  1354. fault_fs_guard->EnableAllThreadLocalErrorInjection();
  1355. }
  1356. } else if (prob_op < delrange_bound) {
  1357. assert(del_bound <= prob_op);
  1358. // OPERATION delete range
  1359. if (disable_fault_injection_during_user_write) {
  1360. fault_fs_guard->DisableAllThreadLocalErrorInjection();
  1361. }
  1362. TestDeleteRange(thread, write_opts, rand_column_families, rand_keys);
  1363. if (disable_fault_injection_during_user_write) {
  1364. fault_fs_guard->EnableAllThreadLocalErrorInjection();
  1365. }
  1366. } else if (prob_op < iterate_bound) {
  1367. assert(delrange_bound <= prob_op);
  1368. // OPERATION iterate
  1369. if (FLAGS_use_multiscan) {
  1370. int num_seeks = static_cast<int>(
  1371. std::min(static_cast<uint64_t>(thread->rand.Uniform(64)),
  1372. static_cast<uint64_t>(FLAGS_ops_per_thread - i - 1)));
  1373. // Generate 2x num_seeks random keys, as each scan has a start key
  1374. // and an upper bound
  1375. rand_keys = GenerateNKeys(thread, num_seeks * 2, i);
  1376. i += num_seeks - 1;
  1377. ThreadStatusUtil::SetEnableTracking(FLAGS_enable_thread_tracking);
  1378. ThreadStatusUtil::SetThreadOperation(
  1379. ThreadStatus::OperationType::OP_DBITERATOR);
  1380. Status s;
  1381. s = TestMultiScan(thread, read_opts, rand_column_families, rand_keys);
  1382. ThreadStatusUtil::ResetThreadStatus();
  1383. } else if (!FLAGS_skip_verifydb &&
  1384. thread->rand.OneInOpt(
  1385. FLAGS_verify_iterator_with_expected_state_one_in)) {
  1386. ThreadStatusUtil::SetEnableTracking(FLAGS_enable_thread_tracking);
  1387. ThreadStatusUtil::SetThreadOperation(
  1388. ThreadStatus::OperationType::OP_DBITERATOR);
  1389. TestIterateAgainstExpected(thread, read_opts, rand_column_families,
  1390. rand_keys);
  1391. ThreadStatusUtil::ResetThreadStatus();
  1392. } else {
  1393. int num_seeks = static_cast<int>(std::min(
  1394. std::max(static_cast<uint64_t>(thread->rand.Uniform(4)),
  1395. static_cast<uint64_t>(1)),
  1396. std::max(static_cast<uint64_t>(FLAGS_ops_per_thread - i - 1),
  1397. static_cast<uint64_t>(1))));
  1398. rand_keys = GenerateNKeys(thread, num_seeks, i);
  1399. i += num_seeks - 1;
  1400. ThreadStatusUtil::SetEnableTracking(FLAGS_enable_thread_tracking);
  1401. ThreadStatusUtil::SetThreadOperation(
  1402. ThreadStatus::OperationType::OP_DBITERATOR);
  1403. Status s;
  1404. if (FLAGS_use_multi_cf_iterator && FLAGS_use_attribute_group) {
  1405. s = TestIterateAttributeGroups(thread, read_opts,
  1406. rand_column_families, rand_keys);
  1407. ProcessStatus(shared, "IterateAttributeGroups", s);
  1408. } else {
  1409. s = TestIterate(thread, read_opts, rand_column_families, rand_keys);
  1410. ProcessStatus(shared, "Iterate", s);
  1411. }
  1412. ThreadStatusUtil::ResetThreadStatus();
  1413. }
  1414. } else {
  1415. assert(iterate_bound <= prob_op);
  1416. TestCustomOperations(thread, rand_column_families);
  1417. }
  1418. thread->stats.FinishedSingleOp();
  1419. }
  1420. #ifndef NDEBUG
  1421. if (fault_fs_guard) {
  1422. fault_fs_guard->DisableAllThreadLocalErrorInjection();
  1423. }
  1424. #endif // NDEBUG
  1425. }
  1426. while (!thread->snapshot_queue.empty()) {
  1427. db_->ReleaseSnapshot(thread->snapshot_queue.front().second.snapshot);
  1428. delete thread->snapshot_queue.front().second.key_vec;
  1429. thread->snapshot_queue.pop();
  1430. }
  1431. thread->stats.Stop();
  1432. }
  1433. // Generated a list of keys that close to boundaries of SST keys.
  1434. // If there isn't any SST file in the DB, return empty list.
  1435. std::vector<std::string> StressTest::GetWhiteBoxKeys(ThreadState* thread,
  1436. DB* db,
  1437. ColumnFamilyHandle* cfh,
  1438. size_t num_keys) {
  1439. ColumnFamilyMetaData cfmd;
  1440. db->GetColumnFamilyMetaData(cfh, &cfmd);
  1441. std::vector<std::string> boundaries;
  1442. for (const LevelMetaData& lmd : cfmd.levels) {
  1443. for (const SstFileMetaData& sfmd : lmd.files) {
  1444. // If FLAGS_user_timestamp_size > 0, then both smallestkey and largestkey
  1445. // have timestamps.
  1446. const auto& skey = sfmd.smallestkey;
  1447. const auto& lkey = sfmd.largestkey;
  1448. assert(skey.size() >= FLAGS_user_timestamp_size);
  1449. assert(lkey.size() >= FLAGS_user_timestamp_size);
  1450. boundaries.push_back(
  1451. skey.substr(0, skey.size() - FLAGS_user_timestamp_size));
  1452. boundaries.push_back(
  1453. lkey.substr(0, lkey.size() - FLAGS_user_timestamp_size));
  1454. }
  1455. }
  1456. if (boundaries.empty()) {
  1457. return {};
  1458. }
  1459. std::vector<std::string> ret;
  1460. for (size_t j = 0; j < num_keys; j++) {
  1461. std::string k =
  1462. boundaries[thread->rand.Uniform(static_cast<int>(boundaries.size()))];
  1463. if (thread->rand.OneIn(3)) {
  1464. // Reduce one byte from the string
  1465. for (int i = static_cast<int>(k.length()) - 1; i >= 0; i--) {
  1466. uint8_t cur = k[i];
  1467. if (cur > 0) {
  1468. k[i] = static_cast<char>(cur - 1);
  1469. break;
  1470. } else if (i > 0) {
  1471. k[i] = 0xFFu;
  1472. }
  1473. }
  1474. } else if (thread->rand.OneIn(2)) {
  1475. // Add one byte to the string
  1476. for (int i = static_cast<int>(k.length()) - 1; i >= 0; i--) {
  1477. uint8_t cur = k[i];
  1478. if (cur < 255) {
  1479. k[i] = static_cast<char>(cur + 1);
  1480. break;
  1481. } else if (i > 0) {
  1482. k[i] = 0x00;
  1483. }
  1484. }
  1485. }
  1486. ret.push_back(k);
  1487. }
  1488. return ret;
  1489. }
  1490. // Given a key K, this creates an iterator which scans to K and then
  1491. // does a random sequence of Next/Prev operations.
  1492. Status StressTest::TestIterate(ThreadState* thread,
  1493. const ReadOptions& read_opts,
  1494. const std::vector<int>& rand_column_families,
  1495. const std::vector<int64_t>& rand_keys) {
  1496. auto new_iter_func = [&rand_column_families, this](const ReadOptions& ro) {
  1497. if (FLAGS_use_multi_cf_iterator) {
  1498. std::vector<ColumnFamilyHandle*> cfhs;
  1499. cfhs.reserve(rand_column_families.size());
  1500. for (auto cf_index : rand_column_families) {
  1501. cfhs.emplace_back(column_families_[cf_index]);
  1502. }
  1503. assert(!cfhs.empty());
  1504. return db_->NewCoalescingIterator(ro, cfhs);
  1505. } else {
  1506. ColumnFamilyHandle* const cfh = column_families_[rand_column_families[0]];
  1507. assert(cfh);
  1508. return std::unique_ptr<Iterator>(db_->NewIterator(ro, cfh));
  1509. }
  1510. };
  1511. auto verify_func = [](Iterator* iter) {
  1512. if (!VerifyWideColumns(iter->value(), iter->columns())) {
  1513. fprintf(stderr,
  1514. "Value and columns inconsistent for iterator: value: %s, "
  1515. "columns: %s\n",
  1516. iter->value().ToString(/* hex */ true).c_str(),
  1517. WideColumnsToHex(iter->columns()).c_str());
  1518. return false;
  1519. }
  1520. return true;
  1521. };
  1522. return TestIterateImpl<Iterator>(thread, read_opts, rand_column_families,
  1523. rand_keys, new_iter_func, verify_func);
  1524. }
  1525. Status StressTest::TestIterateAttributeGroups(
  1526. ThreadState* thread, const ReadOptions& read_opts,
  1527. const std::vector<int>& rand_column_families,
  1528. const std::vector<int64_t>& rand_keys) {
  1529. auto new_iter_func = [&rand_column_families, this](const ReadOptions& ro) {
  1530. assert(FLAGS_use_multi_cf_iterator);
  1531. std::vector<ColumnFamilyHandle*> cfhs;
  1532. cfhs.reserve(rand_column_families.size());
  1533. for (auto cf_index : rand_column_families) {
  1534. cfhs.emplace_back(column_families_[cf_index]);
  1535. }
  1536. assert(!cfhs.empty());
  1537. return db_->NewAttributeGroupIterator(ro, cfhs);
  1538. };
  1539. auto verify_func = [](AttributeGroupIterator* iter) {
  1540. if (!VerifyIteratorAttributeGroups(iter->attribute_groups())) {
  1541. // TODO - print out attribute group values
  1542. fprintf(stderr,
  1543. "one of the columns in the attribute groups inconsistent for "
  1544. "iterator\n");
  1545. return false;
  1546. }
  1547. return true;
  1548. };
  1549. return TestIterateImpl<AttributeGroupIterator>(
  1550. thread, read_opts, rand_column_families, rand_keys, new_iter_func,
  1551. verify_func);
  1552. }
  1553. Status StressTest::TestMultiScan(ThreadState* thread,
  1554. const ReadOptions& read_opts,
  1555. const std::vector<int>& rand_column_families,
  1556. const std::vector<int64_t>& rand_keys) {
  1557. size_t num_scans = rand_keys.size() / 2;
  1558. assert(!rand_column_families.empty());
  1559. assert(!rand_keys.empty());
  1560. ThreadStatus::OperationType cur_op_type =
  1561. ThreadStatusUtil::GetThreadOperation();
  1562. ThreadStatusUtil::SetThreadOperation(ThreadStatus::OperationType::OP_UNKNOWN);
  1563. ManagedSnapshot snapshot_guard(db_);
  1564. ThreadStatusUtil::SetThreadOperation(cur_op_type);
  1565. ReadOptions ro = read_opts;
  1566. ro.snapshot = snapshot_guard.snapshot();
  1567. std::string read_ts_str;
  1568. Slice read_ts_slice;
  1569. MaybeUseOlderTimestampForRangeScan(thread, read_ts_str, read_ts_slice, ro);
  1570. std::vector<std::string> start_key_strs;
  1571. std::vector<std::string> end_key_strs;
  1572. // TODO support reverse BytewiseComparator in the stress test
  1573. MultiScanArgs scan_opts(options_.comparator);
  1574. scan_opts.use_async_io = FLAGS_multiscan_use_async_io;
  1575. start_key_strs.reserve(num_scans);
  1576. end_key_strs.reserve(num_scans);
  1577. // Will be initialized before Seek() below.
  1578. Slice ub;
  1579. ro.iterate_upper_bound = &ub;
  1580. for (size_t i = 0; i < num_scans * 2; i += 2) {
  1581. assert(rand_keys[i] <= rand_keys[i + 1]);
  1582. start_key_strs.emplace_back(Key(rand_keys[i]));
  1583. end_key_strs.emplace_back(Key(rand_keys[i + 1]));
  1584. scan_opts.insert(Slice(start_key_strs.back()), Slice(end_key_strs.back()));
  1585. }
  1586. std::string op_logs;
  1587. ro.pin_data = thread->rand.OneIn(2);
  1588. ro.background_purge_on_iterator_cleanup = thread->rand.OneIn(2);
  1589. assert(options_.prefix_extractor.get() == nullptr);
  1590. std::unique_ptr<Iterator> iter;
  1591. iter.reset(db_->NewIterator(ro, column_families_[rand_column_families[0]]));
  1592. iter->Prepare(scan_opts);
  1593. constexpr size_t kOpLogsLimit = 50000;
  1594. auto verify_func = [](Iterator* iterator) {
  1595. if (!VerifyWideColumns(iterator->value(), iterator->columns())) {
  1596. fprintf(stderr,
  1597. "Value and columns inconsistent for iterator: value: %s, "
  1598. "columns: %s\n",
  1599. iterator->value().ToString(/* hex */ true).c_str(),
  1600. WideColumnsToHex(iterator->columns()).c_str());
  1601. return false;
  1602. }
  1603. return true;
  1604. };
  1605. for (const ScanOptions& scan_opt : scan_opts.GetScanRanges()) {
  1606. if (op_logs.size() > kOpLogsLimit) {
  1607. // Shouldn't take too much memory for the history log. Clear it.
  1608. op_logs = "(cleared...)\n";
  1609. }
  1610. // Set up an iterator, perform the same operations without bounds and with
  1611. // total order seek, and compare the results. This is to identify bugs
  1612. // related to bounds, prefix extractor, or reseeking. Sometimes we are
  1613. // comparing iterators with the same set-up, and it doesn't hurt to check
  1614. // them to be equal.
  1615. //
  1616. // This `ReadOptions` is for validation purposes. Ignore
  1617. // `FLAGS_rate_limit_user_ops` to avoid slowing any validation.
  1618. ReadOptions cmp_ro;
  1619. cmp_ro.timestamp = ro.timestamp;
  1620. cmp_ro.iter_start_ts = ro.iter_start_ts;
  1621. cmp_ro.snapshot = snapshot_guard.snapshot();
  1622. cmp_ro.auto_refresh_iterator_with_snapshot =
  1623. ro.auto_refresh_iterator_with_snapshot;
  1624. cmp_ro.total_order_seek = true;
  1625. ColumnFamilyHandle* const cmp_cfh =
  1626. GetControlCfh(thread, rand_column_families[0]);
  1627. assert(cmp_cfh);
  1628. std::unique_ptr<Iterator> cmp_iter(db_->NewIterator(cmp_ro, cmp_cfh));
  1629. bool diverged = false;
  1630. assert(scan_opt.range.start);
  1631. assert(scan_opt.range.limit);
  1632. Slice key = scan_opt.range.start.value();
  1633. ub = scan_opt.range.limit.value();
  1634. LastIterateOp last_op;
  1635. iter->Seek(key);
  1636. cmp_iter->Seek(key);
  1637. last_op = kLastOpSeek;
  1638. op_logs += "S " + key.ToString(true) + " ";
  1639. if (iter->Valid() && ro.allow_unprepared_value) {
  1640. op_logs += "*";
  1641. if (!iter->PrepareValue()) {
  1642. assert(!iter->Valid());
  1643. assert(!iter->status().ok());
  1644. }
  1645. }
  1646. if (!iter->status().ok() && IsErrorInjectedAndRetryable(iter->status())) {
  1647. return iter->status();
  1648. } else if (!cmp_iter->status().ok() &&
  1649. IsErrorInjectedAndRetryable(cmp_iter->status())) {
  1650. return cmp_iter->status();
  1651. }
  1652. VerifyIterator(thread, cmp_cfh, ro, iter.get(), cmp_iter.get(), last_op,
  1653. key, op_logs, verify_func, &diverged);
  1654. while (iter->Valid()) {
  1655. iter->Next();
  1656. if (!diverged) {
  1657. assert(cmp_iter->Valid());
  1658. cmp_iter->Next();
  1659. }
  1660. op_logs += "N";
  1661. if (iter->Valid() && ro.allow_unprepared_value) {
  1662. op_logs += "*";
  1663. if (!iter->PrepareValue()) {
  1664. assert(!iter->Valid());
  1665. assert(!iter->status().ok());
  1666. }
  1667. }
  1668. if (!iter->status().ok() && IsErrorInjectedAndRetryable(iter->status())) {
  1669. return iter->status();
  1670. } else if (!cmp_iter->status().ok() &&
  1671. IsErrorInjectedAndRetryable(cmp_iter->status())) {
  1672. return cmp_iter->status();
  1673. }
  1674. VerifyIterator(thread, cmp_cfh, ro, iter.get(), cmp_iter.get(), last_op,
  1675. key, op_logs, verify_func, &diverged);
  1676. if (diverged) {
  1677. if (thread->shared->HasVerificationFailedYet()) {
  1678. const std::vector<ScanOptions>& scanoptions =
  1679. scan_opts.GetScanRanges();
  1680. for (const auto& t : scanoptions) {
  1681. fprintf(stdout, "Multiscan options: %s to %s \n",
  1682. t.range.start.value().ToString(true).c_str(),
  1683. t.range.limit.value().ToString(true).c_str());
  1684. }
  1685. }
  1686. break;
  1687. }
  1688. }
  1689. thread->stats.AddIterations(1);
  1690. op_logs += "; ";
  1691. if (diverged) {
  1692. break;
  1693. }
  1694. }
  1695. return Status::OK();
  1696. }
  1697. template <typename IterType, typename NewIterFunc, typename VerifyFunc>
  1698. Status StressTest::TestIterateImpl(ThreadState* thread,
  1699. const ReadOptions& read_opts,
  1700. const std::vector<int>& rand_column_families,
  1701. const std::vector<int64_t>& rand_keys,
  1702. NewIterFunc new_iter_func,
  1703. VerifyFunc verify_func) {
  1704. assert(!rand_column_families.empty());
  1705. assert(!rand_keys.empty());
  1706. ManagedSnapshot snapshot_guard(db_);
  1707. ReadOptions ro = read_opts;
  1708. ro.snapshot = snapshot_guard.snapshot();
  1709. std::string read_ts_str;
  1710. Slice read_ts_slice;
  1711. MaybeUseOlderTimestampForRangeScan(thread, read_ts_str, read_ts_slice, ro);
  1712. std::string op_logs;
  1713. ro.pin_data = thread->rand.OneIn(2);
  1714. ro.background_purge_on_iterator_cleanup = thread->rand.OneIn(2);
  1715. bool expect_total_order = false;
  1716. if (thread->rand.OneIn(16)) {
  1717. // When prefix extractor is used, it's useful to cover total order seek.
  1718. ro.total_order_seek = true;
  1719. expect_total_order = true;
  1720. } else if (thread->rand.OneIn(4)) {
  1721. ro.total_order_seek = thread->rand.OneIn(2);
  1722. ro.auto_prefix_mode = true;
  1723. expect_total_order = true;
  1724. } else if (options_.prefix_extractor.get() == nullptr) {
  1725. expect_total_order = true;
  1726. }
  1727. std::string upper_bound_str;
  1728. Slice upper_bound;
  1729. // Prefer no bound with no range query filtering; prefer bound with it
  1730. if (FLAGS_use_sqfc_for_range_queries ^ thread->rand.OneIn(16)) {
  1731. // Note: upper_bound can be smaller than the seek key.
  1732. const int64_t rand_upper_key = GenerateOneKey(thread, FLAGS_ops_per_thread);
  1733. upper_bound_str = Key(rand_upper_key);
  1734. upper_bound = Slice(upper_bound_str);
  1735. ro.iterate_upper_bound = &upper_bound;
  1736. }
  1737. std::string lower_bound_str;
  1738. Slice lower_bound;
  1739. if (FLAGS_use_sqfc_for_range_queries ^ thread->rand.OneIn(16)) {
  1740. // Note: lower_bound can be greater than the seek key.
  1741. const int64_t rand_lower_key = GenerateOneKey(thread, FLAGS_ops_per_thread);
  1742. lower_bound_str = Key(rand_lower_key);
  1743. lower_bound = Slice(lower_bound_str);
  1744. ro.iterate_lower_bound = &lower_bound;
  1745. }
  1746. if (FLAGS_use_sqfc_for_range_queries && ro.iterate_upper_bound &&
  1747. ro.iterate_lower_bound) {
  1748. ro.table_filter = sqfc_factory_->GetTableFilterForRangeQuery(
  1749. *ro.iterate_lower_bound, *ro.iterate_upper_bound);
  1750. }
  1751. std::unique_ptr<IterType> iter = new_iter_func(ro);
  1752. std::vector<std::string> key_strs;
  1753. if (thread->rand.OneIn(16)) {
  1754. // Generate keys close to lower or upper bound of SST files.
  1755. key_strs =
  1756. GetWhiteBoxKeys(thread, db_, column_families_[rand_column_families[0]],
  1757. rand_keys.size());
  1758. }
  1759. if (key_strs.empty()) {
  1760. // Use the random keys passed in.
  1761. for (int64_t rkey : rand_keys) {
  1762. key_strs.push_back(Key(rkey));
  1763. }
  1764. }
  1765. constexpr size_t kOpLogsLimit = 10000;
  1766. for (const std::string& key_str : key_strs) {
  1767. if (op_logs.size() > kOpLogsLimit) {
  1768. // Shouldn't take too much memory for the history log. Clear it.
  1769. op_logs = "(cleared...)\n";
  1770. }
  1771. if (!FLAGS_use_sqfc_for_range_queries &&
  1772. ro.iterate_upper_bound != nullptr && thread->rand.OneIn(2)) {
  1773. // With a 1/2 chance, change the upper bound.
  1774. // Not compatible with sqfc range filter.
  1775. // It is possible that it is changed before first use, but there is no
  1776. // problem with that.
  1777. const int64_t rand_upper_key =
  1778. GenerateOneKey(thread, FLAGS_ops_per_thread);
  1779. upper_bound_str = Key(rand_upper_key);
  1780. upper_bound = Slice(upper_bound_str);
  1781. }
  1782. if (!FLAGS_use_sqfc_for_range_queries &&
  1783. ro.iterate_lower_bound != nullptr && thread->rand.OneIn(4)) {
  1784. // With a 1/4 chance, change the lower bound.
  1785. // Not compatible with sqfc range filter.
  1786. // It is possible that it is changed before first use, but there is no
  1787. // problem with that.
  1788. const int64_t rand_lower_key =
  1789. GenerateOneKey(thread, FLAGS_ops_per_thread);
  1790. lower_bound_str = Key(rand_lower_key);
  1791. lower_bound = Slice(lower_bound_str);
  1792. }
  1793. // Set up an iterator, perform the same operations without bounds and with
  1794. // total order seek, and compare the results. This is to identify bugs
  1795. // related to bounds, prefix extractor, or reseeking. Sometimes we are
  1796. // comparing iterators with the same set-up, and it doesn't hurt to check
  1797. // them to be equal.
  1798. //
  1799. // This `ReadOptions` is for validation purposes. Ignore
  1800. // `FLAGS_rate_limit_user_ops` to avoid slowing any validation.
  1801. ReadOptions cmp_ro;
  1802. cmp_ro.timestamp = ro.timestamp;
  1803. cmp_ro.iter_start_ts = ro.iter_start_ts;
  1804. cmp_ro.snapshot = snapshot_guard.snapshot();
  1805. cmp_ro.auto_refresh_iterator_with_snapshot =
  1806. ro.auto_refresh_iterator_with_snapshot;
  1807. cmp_ro.total_order_seek = true;
  1808. ColumnFamilyHandle* const cmp_cfh =
  1809. GetControlCfh(thread, rand_column_families[0]);
  1810. assert(cmp_cfh);
  1811. std::unique_ptr<Iterator> cmp_iter(db_->NewIterator(cmp_ro, cmp_cfh));
  1812. bool diverged = false;
  1813. Slice key(key_str);
  1814. const bool support_seek_first_or_last = expect_total_order;
  1815. // Write-prepared and Write-unprepared and multi-cf-iterator do not support
  1816. // Refresh() yet.
  1817. if (!(FLAGS_use_txn && FLAGS_txn_write_policy != 0 /* write committed */) &&
  1818. !FLAGS_use_multi_cf_iterator && thread->rand.OneIn(4)) {
  1819. Status s = iter->Refresh(snapshot_guard.snapshot());
  1820. if (!s.ok() && IsErrorInjectedAndRetryable(s)) {
  1821. return s;
  1822. }
  1823. assert(s.ok());
  1824. op_logs += "Refresh ";
  1825. }
  1826. LastIterateOp last_op;
  1827. if (support_seek_first_or_last && thread->rand.OneIn(100)) {
  1828. iter->SeekToFirst();
  1829. cmp_iter->SeekToFirst();
  1830. last_op = kLastOpSeekToFirst;
  1831. op_logs += "STF ";
  1832. } else if (support_seek_first_or_last && thread->rand.OneIn(100)) {
  1833. iter->SeekToLast();
  1834. cmp_iter->SeekToLast();
  1835. last_op = kLastOpSeekToLast;
  1836. op_logs += "STL ";
  1837. } else if (thread->rand.OneIn(8)) {
  1838. iter->SeekForPrev(key);
  1839. cmp_iter->SeekForPrev(key);
  1840. last_op = kLastOpSeekForPrev;
  1841. op_logs += "SFP " + key.ToString(true) + " ";
  1842. } else {
  1843. iter->Seek(key);
  1844. cmp_iter->Seek(key);
  1845. last_op = kLastOpSeek;
  1846. op_logs += "S " + key.ToString(true) + " ";
  1847. }
  1848. if (iter->Valid() && ro.allow_unprepared_value) {
  1849. op_logs += "*";
  1850. if (!iter->PrepareValue()) {
  1851. assert(!iter->Valid());
  1852. assert(!iter->status().ok());
  1853. }
  1854. }
  1855. if (!iter->status().ok() && IsErrorInjectedAndRetryable(iter->status())) {
  1856. return iter->status();
  1857. } else if (!cmp_iter->status().ok() &&
  1858. IsErrorInjectedAndRetryable(cmp_iter->status())) {
  1859. return cmp_iter->status();
  1860. }
  1861. VerifyIterator(thread, cmp_cfh, ro, iter.get(), cmp_iter.get(), last_op,
  1862. key, op_logs, verify_func, &diverged);
  1863. const bool no_reverse =
  1864. (FLAGS_memtablerep == "prefix_hash" && !expect_total_order);
  1865. for (uint64_t i = 0; i < FLAGS_num_iterations && iter->Valid(); ++i) {
  1866. if (no_reverse || thread->rand.OneIn(2)) {
  1867. iter->Next();
  1868. if (!diverged) {
  1869. assert(cmp_iter->Valid());
  1870. cmp_iter->Next();
  1871. }
  1872. op_logs += "N";
  1873. } else {
  1874. iter->Prev();
  1875. if (!diverged) {
  1876. assert(cmp_iter->Valid());
  1877. cmp_iter->Prev();
  1878. }
  1879. op_logs += "P";
  1880. }
  1881. last_op = kLastOpNextOrPrev;
  1882. if (iter->Valid() && ro.allow_unprepared_value) {
  1883. op_logs += "*";
  1884. if (!iter->PrepareValue()) {
  1885. assert(!iter->Valid());
  1886. assert(!iter->status().ok());
  1887. }
  1888. }
  1889. if (!iter->status().ok() && IsErrorInjectedAndRetryable(iter->status())) {
  1890. return iter->status();
  1891. } else if (!cmp_iter->status().ok() &&
  1892. IsErrorInjectedAndRetryable(cmp_iter->status())) {
  1893. return cmp_iter->status();
  1894. }
  1895. VerifyIterator(thread, cmp_cfh, ro, iter.get(), cmp_iter.get(), last_op,
  1896. key, op_logs, verify_func, &diverged);
  1897. }
  1898. thread->stats.AddIterations(1);
  1899. op_logs += "; ";
  1900. }
  1901. return Status::OK();
  1902. }
  1903. Status StressTest::TestGetLiveFiles() const {
  1904. std::vector<std::string> live_file;
  1905. uint64_t manifest_size = 0;
  1906. return db_->GetLiveFiles(live_file, &manifest_size);
  1907. }
  1908. Status StressTest::TestGetLiveFilesMetaData() const {
  1909. std::vector<LiveFileMetaData> live_file_metadata;
  1910. db_->GetLiveFilesMetaData(&live_file_metadata);
  1911. return Status::OK();
  1912. }
  1913. Status StressTest::TestGetLiveFilesStorageInfo() const {
  1914. std::vector<LiveFileStorageInfo> live_file_storage_info;
  1915. return db_->GetLiveFilesStorageInfo(LiveFilesStorageInfoOptions(),
  1916. &live_file_storage_info);
  1917. }
  1918. Status StressTest::TestGetAllColumnFamilyMetaData() const {
  1919. std::vector<ColumnFamilyMetaData> all_cf_metadata;
  1920. db_->GetAllColumnFamilyMetaData(&all_cf_metadata);
  1921. return Status::OK();
  1922. }
  1923. Status StressTest::TestGetSortedWalFiles() const {
  1924. VectorWalPtr log_ptr;
  1925. return db_->GetSortedWalFiles(log_ptr);
  1926. }
  1927. Status StressTest::TestGetCurrentWalFile() const {
  1928. std::unique_ptr<WalFile> cur_wal_file;
  1929. return db_->GetCurrentWalFile(&cur_wal_file);
  1930. }
  1931. // Compare the two iterator, iter and cmp_iter are in the same position,
  1932. // unless iter might be made invalidate or undefined because of
  1933. // upper or lower bounds, or prefix extractor.
  1934. // Will flag failure if the verification fails.
  1935. // diverged = true if the two iterator is already diverged.
  1936. // True if verification passed, false if not.
  1937. template <typename IterType, typename VerifyFuncType>
  1938. void StressTest::VerifyIterator(
  1939. ThreadState* thread, ColumnFamilyHandle* cmp_cfh, const ReadOptions& ro,
  1940. IterType* iter, Iterator* cmp_iter, LastIterateOp op, const Slice& seek_key,
  1941. const std::string& op_logs, VerifyFuncType verify_func, bool* diverged) {
  1942. assert(diverged);
  1943. if (*diverged) {
  1944. return;
  1945. }
  1946. if (ro.iter_start_ts != nullptr) {
  1947. assert(FLAGS_user_timestamp_size > 0);
  1948. // We currently do not verify iterator when dumping history of internal
  1949. // keys.
  1950. *diverged = true;
  1951. return;
  1952. }
  1953. if (op == kLastOpSeekToFirst && ro.iterate_lower_bound != nullptr) {
  1954. // SeekToFirst() with lower bound is not well-defined.
  1955. *diverged = true;
  1956. return;
  1957. } else if (op == kLastOpSeekToLast && ro.iterate_upper_bound != nullptr) {
  1958. // SeekToLast() with higher bound is not well-defined.
  1959. *diverged = true;
  1960. return;
  1961. } else if (op == kLastOpSeek && ro.iterate_lower_bound != nullptr &&
  1962. (options_.comparator->CompareWithoutTimestamp(
  1963. *ro.iterate_lower_bound, /*a_has_ts=*/false, seek_key,
  1964. /*b_has_ts=*/false) >= 0 ||
  1965. (ro.iterate_upper_bound != nullptr &&
  1966. options_.comparator->CompareWithoutTimestamp(
  1967. *ro.iterate_lower_bound, /*a_has_ts=*/false,
  1968. *ro.iterate_upper_bound, /*b_has_ts*/ false) >= 0))) {
  1969. // Lower bound behavior is not well-defined if it is larger than
  1970. // seek key or upper bound. Disable the check for now.
  1971. *diverged = true;
  1972. return;
  1973. } else if (op == kLastOpSeekForPrev && ro.iterate_upper_bound != nullptr &&
  1974. (options_.comparator->CompareWithoutTimestamp(
  1975. *ro.iterate_upper_bound, /*a_has_ts=*/false, seek_key,
  1976. /*b_has_ts=*/false) <= 0 ||
  1977. (ro.iterate_lower_bound != nullptr &&
  1978. options_.comparator->CompareWithoutTimestamp(
  1979. *ro.iterate_lower_bound, /*a_has_ts=*/false,
  1980. *ro.iterate_upper_bound, /*b_has_ts=*/false) >= 0))) {
  1981. // Upper bound behavior is not well-defined if it is smaller than
  1982. // seek key or lower bound. Disable the check for now.
  1983. *diverged = true;
  1984. return;
  1985. }
  1986. const SliceTransform* pe = (ro.total_order_seek || ro.auto_prefix_mode)
  1987. ? nullptr
  1988. : options_.prefix_extractor.get();
  1989. const Comparator* cmp = options_.comparator;
  1990. std::ostringstream read_opt_oss;
  1991. read_opt_oss << "pin_data: " << ro.pin_data
  1992. << ", background_purge_on_iterator_cleanup: "
  1993. << ro.background_purge_on_iterator_cleanup
  1994. << ", total_order_seek: " << ro.total_order_seek
  1995. << ", auto_prefix_mode: " << ro.auto_prefix_mode
  1996. << ", iterate_upper_bound: "
  1997. << (ro.iterate_upper_bound
  1998. ? ro.iterate_upper_bound->ToString(true).c_str()
  1999. : "")
  2000. << ", iterate_lower_bound: "
  2001. << (ro.iterate_lower_bound
  2002. ? ro.iterate_lower_bound->ToString(true).c_str()
  2003. : "")
  2004. << ", allow_unprepared_value: " << ro.allow_unprepared_value
  2005. << ", auto_refresh_iterator_with_snapshot: "
  2006. << ro.auto_refresh_iterator_with_snapshot
  2007. << ", snapshot: " << (ro.snapshot ? "non-nullptr" : "nullptr")
  2008. << ", timestamp: "
  2009. << (ro.timestamp ? ro.timestamp->ToString(true).c_str() : "")
  2010. << ", iter_start_ts: "
  2011. << (ro.iter_start_ts ? ro.iter_start_ts->ToString(true).c_str()
  2012. : "");
  2013. if (iter->Valid() && !cmp_iter->Valid()) {
  2014. if (pe != nullptr) {
  2015. if (!pe->InDomain(seek_key)) {
  2016. // Prefix seek a non-in-domain key is undefined. Skip checking for
  2017. // this scenario.
  2018. *diverged = true;
  2019. return;
  2020. } else if (!pe->InDomain(iter->key())) {
  2021. // out of range is iterator key is not in domain anymore.
  2022. *diverged = true;
  2023. return;
  2024. } else if (pe->Transform(iter->key()) != pe->Transform(seek_key)) {
  2025. *diverged = true;
  2026. return;
  2027. }
  2028. }
  2029. fprintf(stderr,
  2030. "Control iterator is invalid but iterator has key %s "
  2031. "%s under specified iterator ReadOptions: %s (Empty string or "
  2032. "missing field indicates default option or value is used)\n",
  2033. iter->key().ToString(true).c_str(), op_logs.c_str(),
  2034. read_opt_oss.str().c_str());
  2035. *diverged = true;
  2036. } else if (cmp_iter->Valid()) {
  2037. // Iterator is not valid. It can be legitimate if it has already been
  2038. // out of upper or lower bound, or filtered out by prefix iterator.
  2039. const Slice& total_order_key = cmp_iter->key();
  2040. if (pe != nullptr) {
  2041. if (!pe->InDomain(seek_key)) {
  2042. // Prefix seek a non-in-domain key is undefined. Skip checking for
  2043. // this scenario.
  2044. *diverged = true;
  2045. return;
  2046. }
  2047. if (!pe->InDomain(total_order_key) ||
  2048. pe->Transform(total_order_key) != pe->Transform(seek_key)) {
  2049. // If the prefix is exhausted, the only thing needs to check
  2050. // is the iterator isn't return a position in prefix.
  2051. // Either way, checking can stop from here.
  2052. *diverged = true;
  2053. if (!iter->Valid() || !pe->InDomain(iter->key()) ||
  2054. pe->Transform(iter->key()) != pe->Transform(seek_key)) {
  2055. return;
  2056. }
  2057. fprintf(stderr,
  2058. "Iterator stays in prefix but control doesn't"
  2059. " iterator key %s control iterator key %s %s under specified "
  2060. "iterator ReadOptions: %s (Empty string or "
  2061. "missing field indicates default option or value is used)\n",
  2062. iter->key().ToString(true).c_str(),
  2063. cmp_iter->key().ToString(true).c_str(), op_logs.c_str(),
  2064. read_opt_oss.str().c_str());
  2065. }
  2066. }
  2067. // Check upper or lower bounds.
  2068. if (!*diverged) {
  2069. if ((iter->Valid() && iter->key() != cmp_iter->key()) ||
  2070. (!iter->Valid() &&
  2071. (ro.iterate_upper_bound == nullptr ||
  2072. cmp->CompareWithoutTimestamp(total_order_key, /*a_has_ts=*/false,
  2073. *ro.iterate_upper_bound,
  2074. /*b_has_ts=*/false) < 0) &&
  2075. (ro.iterate_lower_bound == nullptr ||
  2076. cmp->CompareWithoutTimestamp(total_order_key, /*a_has_ts=*/false,
  2077. *ro.iterate_lower_bound,
  2078. /*b_has_ts=*/false) > 0))) {
  2079. fprintf(stderr,
  2080. "Iterator diverged from control iterator which"
  2081. " has value %s %s under specified iterator ReadOptions: %s "
  2082. "(Empty string or "
  2083. "missing field indicates default option or value is used)\n",
  2084. total_order_key.ToString(true).c_str(), op_logs.c_str(),
  2085. read_opt_oss.str().c_str());
  2086. if (iter->Valid()) {
  2087. fprintf(stderr, "iterator has value %s\n",
  2088. iter->key().ToString(true).c_str());
  2089. } else {
  2090. fprintf(stderr, "iterator is not valid with status: %s\n",
  2091. iter->status().ToString().c_str());
  2092. }
  2093. *diverged = true;
  2094. }
  2095. }
  2096. }
  2097. if (!*diverged && iter->Valid()) {
  2098. if (!verify_func(iter)) {
  2099. *diverged = true;
  2100. }
  2101. }
  2102. if (*diverged) {
  2103. fprintf(stderr, "VerifyIterator failed. Control CF %s\n",
  2104. cmp_cfh->GetName().c_str());
  2105. thread->stats.AddErrors(1);
  2106. // Fail fast to preserve the DB state.
  2107. thread->shared->SetVerificationFailure();
  2108. }
  2109. }
  2110. Status StressTest::TestBackupRestore(
  2111. ThreadState* thread, const std::vector<int>& rand_column_families,
  2112. const std::vector<int64_t>& rand_keys) {
  2113. std::vector<std::unique_ptr<MutexLock>> locks;
  2114. if (ShouldAcquireMutexOnKey()) {
  2115. for (int rand_column_family : rand_column_families) {
  2116. // `rand_keys[0]` on each chosen CF will be verified.
  2117. locks.emplace_back(new MutexLock(
  2118. thread->shared->GetMutexForKey(rand_column_family, rand_keys[0])));
  2119. }
  2120. }
  2121. const std::string backup_dir =
  2122. FLAGS_db + "/.backup" + std::to_string(thread->tid);
  2123. const std::string restore_dir =
  2124. FLAGS_db + "/.restore" + std::to_string(thread->tid);
  2125. BackupEngineOptions backup_opts(backup_dir);
  2126. // For debugging, get info_log from live options
  2127. backup_opts.info_log = db_->GetDBOptions().info_log.get();
  2128. if (thread->rand.OneIn(10)) {
  2129. backup_opts.share_table_files = false;
  2130. } else {
  2131. backup_opts.share_table_files = true;
  2132. if (thread->rand.OneIn(5)) {
  2133. backup_opts.share_files_with_checksum = false;
  2134. } else {
  2135. backup_opts.share_files_with_checksum = true;
  2136. if (thread->rand.OneIn(2)) {
  2137. // old
  2138. backup_opts.share_files_with_checksum_naming =
  2139. BackupEngineOptions::kLegacyCrc32cAndFileSize;
  2140. } else {
  2141. // new
  2142. backup_opts.share_files_with_checksum_naming =
  2143. BackupEngineOptions::kUseDbSessionId;
  2144. }
  2145. if (thread->rand.OneIn(2)) {
  2146. backup_opts.share_files_with_checksum_naming =
  2147. backup_opts.share_files_with_checksum_naming |
  2148. BackupEngineOptions::kFlagIncludeFileSize;
  2149. }
  2150. }
  2151. }
  2152. if (thread->rand.OneIn(2)) {
  2153. backup_opts.schema_version = 1;
  2154. } else {
  2155. backup_opts.schema_version = 2;
  2156. }
  2157. if (thread->rand.OneIn(3)) {
  2158. backup_opts.max_background_operations = 16;
  2159. } else {
  2160. backup_opts.max_background_operations = 1;
  2161. }
  2162. if (thread->rand.OneIn(2)) {
  2163. backup_opts.backup_rate_limiter.reset(NewGenericRateLimiter(
  2164. FLAGS_backup_max_size * 1000000 /* rate_bytes_per_sec */,
  2165. 1 /* refill_period_us */));
  2166. }
  2167. if (thread->rand.OneIn(2)) {
  2168. backup_opts.restore_rate_limiter.reset(NewGenericRateLimiter(
  2169. FLAGS_backup_max_size * 1000000 /* rate_bytes_per_sec */,
  2170. 1 /* refill_period_us */));
  2171. }
  2172. backup_opts.current_temperatures_override_manifest = thread->rand.OneIn(2);
  2173. std::ostringstream backup_opt_oss;
  2174. backup_opt_oss << "share_table_files: " << backup_opts.share_table_files
  2175. << ", share_files_with_checksum: "
  2176. << backup_opts.share_files_with_checksum
  2177. << ", share_files_with_checksum_naming: "
  2178. << backup_opts.share_files_with_checksum_naming
  2179. << ", schema_version: " << backup_opts.schema_version
  2180. << ", max_background_operations: "
  2181. << backup_opts.max_background_operations
  2182. << ", backup_rate_limiter: "
  2183. << backup_opts.backup_rate_limiter.get()
  2184. << ", restore_rate_limiter: "
  2185. << backup_opts.restore_rate_limiter.get()
  2186. << ", current_temperatures_override_manifest: "
  2187. << backup_opts.current_temperatures_override_manifest;
  2188. std::ostringstream create_backup_opt_oss;
  2189. std::ostringstream restore_opts_oss;
  2190. BackupEngine* backup_engine = nullptr;
  2191. std::string from = "a backup/restore operation";
  2192. Status s = BackupEngine::Open(db_stress_env, backup_opts, &backup_engine);
  2193. if (!s.ok()) {
  2194. from = "BackupEngine::Open";
  2195. }
  2196. if (s.ok() && FLAGS_manual_wal_flush_one_in > 0) {
  2197. // To avoid missing buffered WAL data during backup and cause false-positive
  2198. // inconsistent values between original DB and restored DB
  2199. s = db_->FlushWAL(/*sync=*/false);
  2200. if (!s.ok()) {
  2201. from = "FlushWAL";
  2202. }
  2203. }
  2204. if (s.ok()) {
  2205. if (backup_opts.schema_version >= 2 && thread->rand.OneIn(2)) {
  2206. TEST_BackupMetaSchemaOptions test_opts;
  2207. test_opts.crc32c_checksums = thread->rand.OneIn(2) == 0;
  2208. test_opts.file_sizes = thread->rand.OneIn(2) == 0;
  2209. TEST_SetBackupMetaSchemaOptions(backup_engine, test_opts);
  2210. }
  2211. CreateBackupOptions create_opts;
  2212. if (FLAGS_disable_wal) {
  2213. // The verification can only work when latest value of `key` is backed up,
  2214. // which requires flushing in case of WAL disabled.
  2215. //
  2216. // Note this triggers a flush with a key lock held. Meanwhile, operations
  2217. // like flush/compaction may attempt to grab key locks like in
  2218. // `DbStressCompactionFilter`. The philosophy around preventing deadlock
  2219. // is the background operation key lock acquisition only tries but does
  2220. // not wait for the lock. So here in the foreground it is OK to hold the
  2221. // lock and wait on a background operation (flush).
  2222. create_opts.flush_before_backup = true;
  2223. }
  2224. create_opts.decrease_background_thread_cpu_priority = thread->rand.OneIn(2);
  2225. create_opts.background_thread_cpu_priority = static_cast<CpuPriority>(
  2226. thread->rand.Next() % (static_cast<int>(CpuPriority::kHigh) + 1));
  2227. create_backup_opt_oss << "flush_before_backup: "
  2228. << create_opts.flush_before_backup
  2229. << ", decrease_background_thread_cpu_priority: "
  2230. << create_opts.decrease_background_thread_cpu_priority
  2231. << ", background_thread_cpu_priority: "
  2232. << static_cast<int>(
  2233. create_opts.background_thread_cpu_priority);
  2234. s = backup_engine->CreateNewBackup(create_opts, db_);
  2235. if (!s.ok()) {
  2236. from = "BackupEngine::CreateNewBackup";
  2237. }
  2238. }
  2239. if (s.ok()) {
  2240. delete backup_engine;
  2241. backup_engine = nullptr;
  2242. s = BackupEngine::Open(db_stress_env, backup_opts, &backup_engine);
  2243. if (!s.ok()) {
  2244. from = "BackupEngine::Open (again)";
  2245. }
  2246. }
  2247. std::vector<BackupInfo> backup_info;
  2248. // If inplace_not_restore, we verify the backup by opening it as a
  2249. // read-only DB. If !inplace_not_restore, we restore it to a temporary
  2250. // directory for verification.
  2251. bool inplace_not_restore = thread->rand.OneIn(3);
  2252. if (s.ok()) {
  2253. backup_engine->GetBackupInfo(&backup_info,
  2254. /*include_file_details*/ inplace_not_restore);
  2255. if (backup_info.empty()) {
  2256. s = Status::NotFound("no backups found");
  2257. from = "BackupEngine::GetBackupInfo";
  2258. }
  2259. }
  2260. if (s.ok() && thread->rand.OneIn(2)) {
  2261. s = backup_engine->VerifyBackup(
  2262. backup_info.front().backup_id,
  2263. thread->rand.OneIn(2) /* verify_with_checksum */);
  2264. if (!s.ok()) {
  2265. from = "BackupEngine::VerifyBackup";
  2266. }
  2267. }
  2268. const bool allow_persistent = thread->tid == 0; // not too many
  2269. bool from_latest = false;
  2270. int count = static_cast<int>(backup_info.size());
  2271. RestoreOptions restore_options;
  2272. restore_options.keep_log_files = thread->rand.OneIn(2);
  2273. restore_opts_oss << "keep_log_files: " << restore_options.keep_log_files;
  2274. if (s.ok() && !inplace_not_restore) {
  2275. if (count > 1) {
  2276. s = backup_engine->RestoreDBFromBackup(
  2277. restore_options, backup_info[thread->rand.Uniform(count)].backup_id,
  2278. restore_dir /* db_dir */, restore_dir /* wal_dir */);
  2279. if (!s.ok()) {
  2280. from = "BackupEngine::RestoreDBFromBackup";
  2281. }
  2282. } else {
  2283. from_latest = true;
  2284. s = backup_engine->RestoreDBFromLatestBackup(
  2285. restore_options, restore_dir /* db_dir */, restore_dir /* wal_dir */);
  2286. if (!s.ok()) {
  2287. from = "BackupEngine::RestoreDBFromLatestBackup";
  2288. }
  2289. }
  2290. }
  2291. if (s.ok() && !inplace_not_restore) {
  2292. // Purge early if restoring, to ensure the restored directory doesn't
  2293. // have some secret dependency on the backup directory.
  2294. uint32_t to_keep = 0;
  2295. if (allow_persistent) {
  2296. // allow one thread to keep up to 2 backups
  2297. to_keep = thread->rand.Uniform(3);
  2298. }
  2299. s = backup_engine->PurgeOldBackups(to_keep);
  2300. if (!s.ok()) {
  2301. from = "BackupEngine::PurgeOldBackups";
  2302. }
  2303. }
  2304. DB* restored_db = nullptr;
  2305. std::vector<ColumnFamilyHandle*> restored_cf_handles;
  2306. // Not yet implemented: opening restored BlobDB or TransactionDB
  2307. Options db_opt;
  2308. if (s.ok() && !FLAGS_use_txn && !FLAGS_use_blob_db) {
  2309. s = PrepareOptionsForRestoredDB(&db_opt);
  2310. if (!s.ok()) {
  2311. from = "PrepareRestoredDBOptions in backup/restore";
  2312. }
  2313. }
  2314. if (s.ok() && !FLAGS_use_txn && !FLAGS_use_blob_db) {
  2315. std::vector<ColumnFamilyDescriptor> cf_descriptors;
  2316. // TODO(ajkr): `column_family_names_` is not safe to access here when
  2317. // `clear_column_family_one_in != 0`. But we can't easily switch to
  2318. // `ListColumnFamilies` to get names because it won't necessarily give
  2319. // the same order as `column_family_names_`.
  2320. assert(FLAGS_clear_column_family_one_in == 0);
  2321. for (const auto& name : column_family_names_) {
  2322. cf_descriptors.emplace_back(name, ColumnFamilyOptions(db_opt));
  2323. }
  2324. if (inplace_not_restore) {
  2325. BackupInfo& info = backup_info[thread->rand.Uniform(count)];
  2326. db_opt.env = info.env_for_open.get();
  2327. s = DB::OpenForReadOnly(DBOptions(db_opt), info.name_for_open,
  2328. cf_descriptors, &restored_cf_handles,
  2329. &restored_db);
  2330. if (!s.ok()) {
  2331. from = "DB::OpenForReadOnly in backup/restore";
  2332. }
  2333. } else {
  2334. s = DB::Open(DBOptions(db_opt), restore_dir, cf_descriptors,
  2335. &restored_cf_handles, &restored_db);
  2336. if (!s.ok()) {
  2337. from = "DB::Open in backup/restore";
  2338. }
  2339. }
  2340. }
  2341. // Note the column families chosen by `rand_column_families` cannot be
  2342. // dropped while the locks for `rand_keys` are held. So we should not have
  2343. // to worry about accessing those column families throughout this function.
  2344. //
  2345. // For simplicity, currently only verifies existence/non-existence of a
  2346. // single key
  2347. for (size_t i = 0; restored_db && s.ok() && i < rand_column_families.size();
  2348. ++i) {
  2349. std::string key_str = Key(rand_keys[0]);
  2350. Slice key = key_str;
  2351. std::string restored_value;
  2352. // This `ReadOptions` is for validation purposes. Ignore
  2353. // `FLAGS_rate_limit_user_ops` to avoid slowing any validation.
  2354. ReadOptions read_opts;
  2355. std::string ts_str;
  2356. Slice ts;
  2357. if (FLAGS_user_timestamp_size > 0) {
  2358. ts_str = GetNowNanos();
  2359. ts = ts_str;
  2360. read_opts.timestamp = &ts;
  2361. }
  2362. Status get_status = restored_db->Get(
  2363. read_opts, restored_cf_handles[rand_column_families[i]], key,
  2364. &restored_value);
  2365. bool exists = thread->shared->Exists(rand_column_families[i], rand_keys[0]);
  2366. if (get_status.ok()) {
  2367. if (!exists && from_latest && ShouldAcquireMutexOnKey()) {
  2368. std::ostringstream oss;
  2369. oss << "0x" << key.ToString(true)
  2370. << " exists in restore but not in original db";
  2371. s = Status::Corruption(oss.str());
  2372. }
  2373. } else if (get_status.IsNotFound()) {
  2374. if (exists && from_latest && ShouldAcquireMutexOnKey()) {
  2375. std::ostringstream oss;
  2376. oss << "0x" << key.ToString(true)
  2377. << " exists in original db but not in restore";
  2378. s = Status::Corruption(oss.str());
  2379. }
  2380. } else {
  2381. s = get_status;
  2382. if (!s.ok()) {
  2383. from = "DB::Get in backup/restore";
  2384. }
  2385. }
  2386. }
  2387. if (restored_db != nullptr) {
  2388. for (auto* cf_handle : restored_cf_handles) {
  2389. restored_db->DestroyColumnFamilyHandle(cf_handle);
  2390. }
  2391. delete restored_db;
  2392. restored_db = nullptr;
  2393. }
  2394. if (s.ok() && inplace_not_restore) {
  2395. // Purge late if inplace open read-only
  2396. uint32_t to_keep = 0;
  2397. if (allow_persistent) {
  2398. // allow one thread to keep up to 2 backups
  2399. to_keep = thread->rand.Uniform(3);
  2400. }
  2401. s = backup_engine->PurgeOldBackups(to_keep);
  2402. if (!s.ok()) {
  2403. from = "BackupEngine::PurgeOldBackups";
  2404. }
  2405. }
  2406. if (backup_engine != nullptr) {
  2407. delete backup_engine;
  2408. backup_engine = nullptr;
  2409. }
  2410. // Temporarily disable error injection for clean up
  2411. if (fault_fs_guard) {
  2412. fault_fs_guard->DisableAllThreadLocalErrorInjection();
  2413. }
  2414. if (s.ok() || IsErrorInjectedAndRetryable(s)) {
  2415. // Preserve directories on failure, or allowed persistent backup
  2416. if (!allow_persistent) {
  2417. s = DestroyDir(db_stress_env, backup_dir);
  2418. if (!s.ok()) {
  2419. from = "Destroy backup dir";
  2420. }
  2421. }
  2422. }
  2423. if (s.ok() || IsErrorInjectedAndRetryable(s)) {
  2424. s = DestroyDir(db_stress_env, restore_dir);
  2425. if (!s.ok()) {
  2426. from = "Destroy restore dir";
  2427. }
  2428. }
  2429. // Enable back error injection disabled for clean up
  2430. if (fault_fs_guard) {
  2431. fault_fs_guard->EnableAllThreadLocalErrorInjection();
  2432. }
  2433. if (!s.ok() && !IsErrorInjectedAndRetryable(s)) {
  2434. fprintf(stderr,
  2435. "Failure in %s with: %s under specified BackupEngineOptions: %s, "
  2436. "CreateBackupOptions: %s, RestoreOptions: %s (Empty string or "
  2437. "missing field indicates default option or value is used)\n",
  2438. from.c_str(), s.ToString().c_str(), backup_opt_oss.str().c_str(),
  2439. create_backup_opt_oss.str().c_str(),
  2440. restore_opts_oss.str().c_str());
  2441. }
  2442. return s;
  2443. }
  2444. void InitializeMergeOperator(Options& options) {
  2445. if (FLAGS_use_full_merge_v1) {
  2446. options.merge_operator = MergeOperators::CreateDeprecatedPutOperator();
  2447. } else {
  2448. if (FLAGS_use_put_entity_one_in > 0) {
  2449. options.merge_operator = std::make_shared<DBStressWideMergeOperator>();
  2450. } else {
  2451. options.merge_operator = MergeOperators::CreatePutOperator();
  2452. }
  2453. }
  2454. }
  2455. Status StressTest::PrepareOptionsForRestoredDB(Options* options) {
  2456. assert(options);
  2457. // To avoid race with other threads' operations (e.g, SetOptions())
  2458. // on the same pointer sub-option (e.g, `std::shared_ptr<const FilterPolicy>
  2459. // filter_policy`) while having the same settings as `options_`, we create a
  2460. // new Options object from `options_`'s string to deep copy these pointer
  2461. // sub-options
  2462. Status s;
  2463. ConfigOptions config_opts;
  2464. std::string db_options_str;
  2465. s = GetStringFromDBOptions(config_opts, options_, &db_options_str);
  2466. if (!s.ok()) {
  2467. return s;
  2468. }
  2469. DBOptions db_options;
  2470. s = GetDBOptionsFromString(config_opts, Options(), db_options_str,
  2471. &db_options);
  2472. if (!s.ok()) {
  2473. return s;
  2474. }
  2475. std::string cf_options_str;
  2476. s = GetStringFromColumnFamilyOptions(config_opts, options_, &cf_options_str);
  2477. if (!s.ok()) {
  2478. return s;
  2479. }
  2480. ColumnFamilyOptions cf_options;
  2481. s = GetColumnFamilyOptionsFromString(config_opts, Options(), cf_options_str,
  2482. &cf_options);
  2483. if (!s.ok()) {
  2484. return s;
  2485. }
  2486. *options = Options(db_options, cf_options);
  2487. options->best_efforts_recovery = false;
  2488. options->listeners.clear();
  2489. // Avoid dangling/shared file descriptors, for reliable destroy
  2490. options->sst_file_manager = nullptr;
  2491. // GetColumnFamilyOptionsFromString does not create customized merge operator.
  2492. InitializeMergeOperator(*options);
  2493. if (FLAGS_user_timestamp_size > 0) {
  2494. // Check OPTIONS string loading can bootstrap the correct user comparator
  2495. // from object registry.
  2496. assert(options->comparator);
  2497. assert(options->comparator == test::BytewiseComparatorWithU64TsWrapper());
  2498. }
  2499. return Status::OK();
  2500. }
  2501. Status StressTest::TestApproximateSize(
  2502. ThreadState* thread, uint64_t iteration,
  2503. const std::vector<int>& rand_column_families,
  2504. const std::vector<int64_t>& rand_keys) {
  2505. // rand_keys likely only has one key. Just use the first one.
  2506. assert(!rand_keys.empty());
  2507. assert(!rand_column_families.empty());
  2508. int64_t key1 = rand_keys[0];
  2509. int64_t key2;
  2510. if (thread->rand.OneIn(2)) {
  2511. // Two totally random keys. This tends to cover large ranges.
  2512. key2 = GenerateOneKey(thread, iteration);
  2513. if (key2 < key1) {
  2514. std::swap(key1, key2);
  2515. }
  2516. } else {
  2517. // Unless users pass a very large FLAGS_max_key, it we should not worry
  2518. // about overflow. It is for testing, so we skip the overflow checking
  2519. // for simplicity.
  2520. key2 = key1 + static_cast<int64_t>(thread->rand.Uniform(1000));
  2521. }
  2522. std::string key1_str = Key(key1);
  2523. std::string key2_str = Key(key2);
  2524. Range range{Slice(key1_str), Slice(key2_str)};
  2525. if (thread->rand.OneIn(3)) {
  2526. // Call GetApproximateMemTableStats instead
  2527. uint64_t count, size;
  2528. db_->GetApproximateMemTableStats(column_families_[rand_column_families[0]],
  2529. range, &count, &size);
  2530. return Status::OK();
  2531. } else {
  2532. // Call GetApproximateSizes
  2533. SizeApproximationOptions sao;
  2534. sao.include_memtables = thread->rand.OneIn(2);
  2535. if (sao.include_memtables) {
  2536. sao.include_files = thread->rand.OneIn(2);
  2537. }
  2538. if (thread->rand.OneIn(2)) {
  2539. if (thread->rand.OneIn(2)) {
  2540. sao.files_size_error_margin = 0.0;
  2541. } else {
  2542. sao.files_size_error_margin =
  2543. static_cast<double>(thread->rand.Uniform(3));
  2544. }
  2545. }
  2546. uint64_t result;
  2547. return db_->GetApproximateSizes(
  2548. sao, column_families_[rand_column_families[0]], &range, 1, &result);
  2549. }
  2550. }
  2551. Status StressTest::TestCheckpoint(ThreadState* thread,
  2552. const std::vector<int>& rand_column_families,
  2553. const std::vector<int64_t>& rand_keys) {
  2554. std::vector<std::unique_ptr<MutexLock>> locks;
  2555. if (ShouldAcquireMutexOnKey()) {
  2556. for (int rand_column_family : rand_column_families) {
  2557. // `rand_keys[0]` on each chosen CF will be verified.
  2558. locks.emplace_back(new MutexLock(
  2559. thread->shared->GetMutexForKey(rand_column_family, rand_keys[0])));
  2560. }
  2561. }
  2562. std::string checkpoint_dir =
  2563. FLAGS_db + "/.checkpoint" + std::to_string(thread->tid);
  2564. Options tmp_opts(options_);
  2565. tmp_opts.listeners.clear();
  2566. tmp_opts.env = db_stress_env;
  2567. // Avoid delayed deletion so whole directory can be deleted
  2568. tmp_opts.sst_file_manager.reset();
  2569. // Temporarily disable error injection for clean-up
  2570. if (fault_fs_guard) {
  2571. fault_fs_guard->DisableAllThreadLocalErrorInjection();
  2572. }
  2573. DestroyDB(checkpoint_dir, tmp_opts);
  2574. // Enable back error injection disabled for clean-up
  2575. if (fault_fs_guard) {
  2576. fault_fs_guard->EnableAllThreadLocalErrorInjection();
  2577. }
  2578. Checkpoint* checkpoint = nullptr;
  2579. Status s = Checkpoint::Create(db_, &checkpoint);
  2580. if (s.ok()) {
  2581. s = checkpoint->CreateCheckpoint(checkpoint_dir);
  2582. if (!s.ok() && !IsErrorInjectedAndRetryable(s)) {
  2583. fprintf(stderr, "Fail to create checkpoint to %s\n",
  2584. checkpoint_dir.c_str());
  2585. std::vector<std::string> files;
  2586. // Temporarily disable error injection to print debugging information
  2587. if (fault_fs_guard) {
  2588. fault_fs_guard->DisableThreadLocalErrorInjection(
  2589. FaultInjectionIOType::kMetadataRead);
  2590. }
  2591. Status my_s = db_stress_env->GetChildren(checkpoint_dir, &files);
  2592. // Enable back disable error injection disabled for printing debugging
  2593. // information
  2594. if (fault_fs_guard) {
  2595. fault_fs_guard->EnableThreadLocalErrorInjection(
  2596. FaultInjectionIOType::kMetadataRead);
  2597. }
  2598. if (!my_s.ok()) {
  2599. fprintf(stderr, "Fail to GetChildren under %s due to %s\n",
  2600. checkpoint_dir.c_str(), my_s.ToString().c_str());
  2601. } else {
  2602. for (const auto& f : files) {
  2603. fprintf(stderr, " %s\n", f.c_str());
  2604. }
  2605. }
  2606. }
  2607. }
  2608. delete checkpoint;
  2609. checkpoint = nullptr;
  2610. std::vector<ColumnFamilyHandle*> cf_handles;
  2611. DB* checkpoint_db = nullptr;
  2612. if (s.ok()) {
  2613. Options options(options_);
  2614. options.best_efforts_recovery = false;
  2615. options.listeners.clear();
  2616. // Avoid race condition in trash handling after delete checkpoint_db
  2617. options.sst_file_manager.reset();
  2618. std::vector<ColumnFamilyDescriptor> cf_descs;
  2619. // TODO(ajkr): `column_family_names_` is not safe to access here when
  2620. // `clear_column_family_one_in != 0`. But we can't easily switch to
  2621. // `ListColumnFamilies` to get names because it won't necessarily give
  2622. // the same order as `column_family_names_`.
  2623. assert(FLAGS_clear_column_family_one_in == 0);
  2624. if (FLAGS_clear_column_family_one_in == 0) {
  2625. for (const auto& name : column_family_names_) {
  2626. cf_descs.emplace_back(name, ColumnFamilyOptions(options));
  2627. }
  2628. s = DB::OpenForReadOnly(DBOptions(options), checkpoint_dir, cf_descs,
  2629. &cf_handles, &checkpoint_db);
  2630. }
  2631. }
  2632. if (checkpoint_db != nullptr) {
  2633. // Note the column families chosen by `rand_column_families` cannot be
  2634. // dropped while the locks for `rand_keys` are held. So we should not have
  2635. // to worry about accessing those column families throughout this function.
  2636. for (size_t i = 0; s.ok() && i < rand_column_families.size(); ++i) {
  2637. std::string key_str = Key(rand_keys[0]);
  2638. Slice key = key_str;
  2639. std::string ts_str;
  2640. Slice ts;
  2641. ReadOptions read_opts;
  2642. if (FLAGS_user_timestamp_size > 0) {
  2643. ts_str = GetNowNanos();
  2644. ts = ts_str;
  2645. read_opts.timestamp = &ts;
  2646. }
  2647. std::string value;
  2648. Status get_status = checkpoint_db->Get(
  2649. read_opts, cf_handles[rand_column_families[i]], key, &value);
  2650. bool exists =
  2651. thread->shared->Exists(rand_column_families[i], rand_keys[0]);
  2652. if (get_status.ok()) {
  2653. if (!exists && ShouldAcquireMutexOnKey()) {
  2654. std::ostringstream oss;
  2655. oss << "0x" << key.ToString(true) << " exists in checkpoint "
  2656. << checkpoint_dir << " but not in original db";
  2657. s = Status::Corruption(oss.str());
  2658. }
  2659. } else if (get_status.IsNotFound()) {
  2660. if (exists && ShouldAcquireMutexOnKey()) {
  2661. std::ostringstream oss;
  2662. oss << "0x" << key.ToString(true)
  2663. << " exists in original db but not in checkpoint "
  2664. << checkpoint_dir;
  2665. s = Status::Corruption(oss.str());
  2666. }
  2667. } else {
  2668. s = get_status;
  2669. }
  2670. }
  2671. for (auto cfh : cf_handles) {
  2672. delete cfh;
  2673. }
  2674. cf_handles.clear();
  2675. delete checkpoint_db;
  2676. checkpoint_db = nullptr;
  2677. }
  2678. // Temporarily disable error injection for clean-up
  2679. if (fault_fs_guard) {
  2680. fault_fs_guard->DisableAllThreadLocalErrorInjection();
  2681. }
  2682. if (!s.ok() && !IsErrorInjectedAndRetryable(s)) {
  2683. fprintf(stderr, "A checkpoint operation failed with: %s\n",
  2684. s.ToString().c_str());
  2685. } else {
  2686. DestroyDB(checkpoint_dir, tmp_opts);
  2687. }
  2688. // Enable back error injection disabled for clean-up
  2689. if (fault_fs_guard) {
  2690. fault_fs_guard->EnableAllThreadLocalErrorInjection();
  2691. }
  2692. return s;
  2693. }
  2694. void StressTest::TestGetProperty(ThreadState* thread) const {
  2695. std::unordered_set<std::string> levelPropertyNames = {
  2696. DB::Properties::kAggregatedTablePropertiesAtLevel,
  2697. DB::Properties::kCompressionRatioAtLevelPrefix,
  2698. DB::Properties::kNumFilesAtLevelPrefix,
  2699. };
  2700. std::unordered_set<std::string> unknownPropertyNames = {
  2701. DB::Properties::kEstimateOldestKeyTime,
  2702. DB::Properties::kOptionsStatistics,
  2703. DB::Properties::
  2704. kLiveSstFilesSizeAtTemperature, // similar to levelPropertyNames, it
  2705. // requires a number suffix
  2706. };
  2707. unknownPropertyNames.insert(levelPropertyNames.begin(),
  2708. levelPropertyNames.end());
  2709. std::unordered_set<std::string> blobCachePropertyNames = {
  2710. DB::Properties::kBlobCacheCapacity,
  2711. DB::Properties::kBlobCacheUsage,
  2712. DB::Properties::kBlobCachePinnedUsage,
  2713. };
  2714. if (db_->GetOptions().blob_cache == nullptr) {
  2715. unknownPropertyNames.insert(blobCachePropertyNames.begin(),
  2716. blobCachePropertyNames.end());
  2717. }
  2718. std::string prop;
  2719. for (const auto& ppt_name_and_info : InternalStats::ppt_name_to_info) {
  2720. bool res = db_->GetProperty(ppt_name_and_info.first, &prop);
  2721. if (unknownPropertyNames.find(ppt_name_and_info.first) ==
  2722. unknownPropertyNames.end()) {
  2723. if (!res) {
  2724. fprintf(stderr, "Failed to get DB property: %s\n",
  2725. ppt_name_and_info.first.c_str());
  2726. thread->shared->SetVerificationFailure();
  2727. }
  2728. if (ppt_name_and_info.second.handle_int != nullptr) {
  2729. uint64_t prop_int;
  2730. if (!db_->GetIntProperty(ppt_name_and_info.first, &prop_int)) {
  2731. fprintf(stderr, "Failed to get Int property: %s\n",
  2732. ppt_name_and_info.first.c_str());
  2733. thread->shared->SetVerificationFailure();
  2734. }
  2735. }
  2736. if (ppt_name_and_info.second.handle_map != nullptr) {
  2737. std::map<std::string, std::string> prop_map;
  2738. if (!db_->GetMapProperty(ppt_name_and_info.first, &prop_map)) {
  2739. fprintf(stderr, "Failed to get Map property: %s\n",
  2740. ppt_name_and_info.first.c_str());
  2741. thread->shared->SetVerificationFailure();
  2742. }
  2743. }
  2744. }
  2745. }
  2746. ROCKSDB_NAMESPACE::ColumnFamilyMetaData cf_meta_data;
  2747. db_->GetColumnFamilyMetaData(&cf_meta_data);
  2748. int level_size = static_cast<int>(cf_meta_data.levels.size());
  2749. for (int level = 0; level < level_size; level++) {
  2750. for (const auto& ppt_name : levelPropertyNames) {
  2751. bool res = db_->GetProperty(ppt_name + std::to_string(level), &prop);
  2752. if (!res) {
  2753. fprintf(stderr, "Failed to get DB property: %s\n",
  2754. (ppt_name + std::to_string(level)).c_str());
  2755. thread->shared->SetVerificationFailure();
  2756. }
  2757. }
  2758. }
  2759. // Test for an invalid property name
  2760. if (thread->rand.OneIn(100)) {
  2761. if (db_->GetProperty("rocksdb.invalid_property_name", &prop)) {
  2762. fprintf(stderr, "Failed to return false for invalid property name\n");
  2763. thread->shared->SetVerificationFailure();
  2764. }
  2765. }
  2766. }
  2767. Status StressTest::TestGetPropertiesOfAllTables() const {
  2768. TablePropertiesCollection props;
  2769. return db_->GetPropertiesOfAllTables(&props);
  2770. }
  2771. void StressTest::TestCompactFiles(ThreadState* thread,
  2772. ColumnFamilyHandle* column_family) {
  2773. ROCKSDB_NAMESPACE::ColumnFamilyMetaData cf_meta_data;
  2774. db_->GetColumnFamilyMetaData(column_family, &cf_meta_data);
  2775. if (cf_meta_data.levels.empty()) {
  2776. return;
  2777. }
  2778. // Randomly compact up to three consecutive files from a level
  2779. const int kMaxRetry = 3;
  2780. for (int attempt = 0; attempt < kMaxRetry; ++attempt) {
  2781. size_t random_level =
  2782. thread->rand.Uniform(static_cast<int>(cf_meta_data.levels.size()));
  2783. const auto& files = cf_meta_data.levels[random_level].files;
  2784. if (files.size() > 0) {
  2785. size_t random_file_index =
  2786. thread->rand.Uniform(static_cast<int>(files.size()));
  2787. if (files[random_file_index].being_compacted) {
  2788. // Retry as the selected file is currently being compacted
  2789. continue;
  2790. }
  2791. std::vector<std::string> input_files;
  2792. input_files.push_back(files[random_file_index].name);
  2793. if (random_file_index > 0 &&
  2794. !files[random_file_index - 1].being_compacted) {
  2795. input_files.push_back(files[random_file_index - 1].name);
  2796. }
  2797. if (random_file_index + 1 < files.size() &&
  2798. !files[random_file_index + 1].being_compacted) {
  2799. input_files.push_back(files[random_file_index + 1].name);
  2800. }
  2801. size_t output_level =
  2802. std::min(random_level + 1, cf_meta_data.levels.size() - 1);
  2803. CompactionOptions compact_options;
  2804. if (thread->rand.OneIn(2)) {
  2805. compact_options.output_file_size_limit = FLAGS_target_file_size_base;
  2806. }
  2807. std::ostringstream compact_opt_oss;
  2808. compact_opt_oss << "output_file_size_limit: "
  2809. << compact_options.output_file_size_limit;
  2810. auto s = db_->CompactFiles(compact_options, column_family, input_files,
  2811. static_cast<int>(output_level));
  2812. if (!s.ok()) {
  2813. thread->stats.AddNumCompactFilesFailed(1);
  2814. // TOOD (hx235): allow an exact list of tolerable failures under stress
  2815. // test
  2816. bool non_ok_status_allowed =
  2817. s.IsManualCompactionPaused() || IsErrorInjectedAndRetryable(s) ||
  2818. s.IsAborted() || s.IsInvalidArgument() || s.IsNotSupported();
  2819. if (!non_ok_status_allowed) {
  2820. fprintf(stderr,
  2821. "Unable to perform CompactFiles(): %s under specified "
  2822. "CompactionOptions: %s (Empty string or "
  2823. "missing field indicates default option or value is used)\n",
  2824. s.ToString().c_str(), compact_opt_oss.str().c_str());
  2825. thread->shared->SafeTerminate();
  2826. }
  2827. } else {
  2828. thread->stats.AddNumCompactFilesSucceed(1);
  2829. }
  2830. break;
  2831. }
  2832. }
  2833. }
  2834. void StressTest::TestPromoteL0(ThreadState* thread,
  2835. ColumnFamilyHandle* column_family) {
  2836. int target_level = thread->rand.Next() % options_.num_levels;
  2837. Status s = db_->PromoteL0(column_family, target_level);
  2838. if (!s.ok()) {
  2839. // The second error occurs when another concurrent PromoteL0() moving the
  2840. // same files finishes first which is an allowed behavior
  2841. bool non_ok_status_allowed =
  2842. s.IsInvalidArgument() ||
  2843. (s.IsCorruption() &&
  2844. s.ToString().find("VersionBuilder: Cannot delete table file") !=
  2845. std::string::npos &&
  2846. s.ToString().find("since it is on level") != std::string::npos);
  2847. if (!non_ok_status_allowed) {
  2848. fprintf(stderr,
  2849. "Unable to perform PromoteL0(): %s under specified "
  2850. "target_level: %d.\n",
  2851. s.ToString().c_str(), target_level);
  2852. thread->shared->SafeTerminate();
  2853. }
  2854. }
  2855. }
  2856. Status StressTest::TestFlush(const std::vector<int>& rand_column_families) {
  2857. FlushOptions flush_opts;
  2858. assert(flush_opts.wait);
  2859. if (FLAGS_atomic_flush) {
  2860. return db_->Flush(flush_opts, column_families_);
  2861. }
  2862. std::vector<ColumnFamilyHandle*> cfhs;
  2863. std::for_each(rand_column_families.begin(), rand_column_families.end(),
  2864. [this, &cfhs](int k) { cfhs.push_back(column_families_[k]); });
  2865. return db_->Flush(flush_opts, cfhs);
  2866. }
  2867. Status StressTest::TestResetStats() { return db_->ResetStats(); }
  2868. Status StressTest::TestPauseBackground(ThreadState* thread) {
  2869. Status status = db_->PauseBackgroundWork();
  2870. if (!status.ok()) {
  2871. return status;
  2872. }
  2873. // To avoid stalling/deadlocking ourself in this thread, just
  2874. // sleep here during pause and let other threads do db operations.
  2875. // Sleep up to ~16 seconds (2**24 microseconds), but very skewed
  2876. // toward short pause. (1 chance in 25 of pausing >= 1s;
  2877. // 1 chance in 625 of pausing full 16s.)
  2878. int pwr2_micros =
  2879. std::min(thread->rand.Uniform(25), thread->rand.Uniform(25));
  2880. clock_->SleepForMicroseconds(1 << pwr2_micros);
  2881. return db_->ContinueBackgroundWork();
  2882. }
  2883. Status StressTest::TestDisableFileDeletions(ThreadState* thread) {
  2884. Status status = db_->DisableFileDeletions();
  2885. if (!status.ok()) {
  2886. return status;
  2887. }
  2888. // Similar to TestPauseBackground()
  2889. int pwr2_micros =
  2890. std::min(thread->rand.Uniform(25), thread->rand.Uniform(25));
  2891. clock_->SleepForMicroseconds(1 << pwr2_micros);
  2892. return db_->EnableFileDeletions();
  2893. }
  2894. Status StressTest::TestDisableManualCompaction(ThreadState* thread) {
  2895. db_->DisableManualCompaction();
  2896. // Similar to TestPauseBackground()
  2897. int pwr2_micros =
  2898. std::min(thread->rand.Uniform(25), thread->rand.Uniform(25));
  2899. clock_->SleepForMicroseconds(1 << pwr2_micros);
  2900. db_->EnableManualCompaction();
  2901. return Status::OK();
  2902. }
  2903. void StressTest::TestAcquireSnapshot(ThreadState* thread,
  2904. int rand_column_family,
  2905. const std::string& keystr, uint64_t i) {
  2906. Slice key = keystr;
  2907. ColumnFamilyHandle* column_family = column_families_[rand_column_family];
  2908. // This `ReadOptions` is for validation purposes. Ignore
  2909. // `FLAGS_rate_limit_user_ops` to avoid slowing any validation.
  2910. ReadOptions ropt;
  2911. auto db_impl = static_cast_with_check<DBImpl>(db_->GetRootDB());
  2912. const bool ww_snapshot = thread->rand.OneIn(10);
  2913. const Snapshot* snapshot =
  2914. ww_snapshot ? db_impl->GetSnapshotForWriteConflictBoundary()
  2915. : db_->GetSnapshot();
  2916. ropt.snapshot = snapshot;
  2917. ropt.auto_refresh_iterator_with_snapshot =
  2918. FLAGS_auto_refresh_iterator_with_snapshot;
  2919. // Ideally, we want snapshot taking and timestamp generation to be atomic
  2920. // here, so that the snapshot corresponds to the timestamp. However, it is
  2921. // not possible with current GetSnapshot() API.
  2922. std::string ts_str;
  2923. Slice ts;
  2924. if (FLAGS_user_timestamp_size > 0) {
  2925. ts_str = GetNowNanos();
  2926. ts = ts_str;
  2927. ropt.timestamp = &ts;
  2928. }
  2929. std::string value_at;
  2930. // When taking a snapshot, we also read a key from that snapshot. We
  2931. // will later read the same key before releasing the snapshot and
  2932. // verify that the results are the same.
  2933. Status status_at = db_->Get(ropt, column_family, key, &value_at);
  2934. if (!status_at.ok() && IsErrorInjectedAndRetryable(status_at)) {
  2935. db_->ReleaseSnapshot(snapshot);
  2936. return;
  2937. }
  2938. std::vector<bool>* key_vec = nullptr;
  2939. if (FLAGS_compare_full_db_state_snapshot && (thread->tid == 0)) {
  2940. key_vec = new std::vector<bool>(FLAGS_max_key);
  2941. // When `prefix_extractor` is set, seeking to beginning and scanning
  2942. // across prefixes are only supported with `total_order_seek` set.
  2943. ropt.total_order_seek = true;
  2944. std::unique_ptr<Iterator> iterator(db_->NewIterator(ropt));
  2945. for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) {
  2946. uint64_t key_val;
  2947. if (GetIntVal(iterator->key().ToString(), &key_val)) {
  2948. (*key_vec)[key_val] = true;
  2949. }
  2950. }
  2951. }
  2952. ThreadState::SnapshotState snap_state = {snapshot,
  2953. rand_column_family,
  2954. column_family->GetName(),
  2955. keystr,
  2956. status_at,
  2957. value_at,
  2958. key_vec,
  2959. ts_str};
  2960. uint64_t hold_for = FLAGS_snapshot_hold_ops;
  2961. if (FLAGS_long_running_snapshots) {
  2962. // Hold 10% of snapshots for 10x more
  2963. if (thread->rand.OneIn(10)) {
  2964. assert(hold_for < std::numeric_limits<uint64_t>::max() / 10);
  2965. hold_for *= 10;
  2966. // Hold 1% of snapshots for 100x more
  2967. if (thread->rand.OneIn(10)) {
  2968. assert(hold_for < std::numeric_limits<uint64_t>::max() / 10);
  2969. hold_for *= 10;
  2970. }
  2971. }
  2972. }
  2973. uint64_t release_at = std::min(FLAGS_ops_per_thread - 1, i + hold_for);
  2974. thread->snapshot_queue.emplace(release_at, snap_state);
  2975. }
  2976. Status StressTest::MaybeReleaseSnapshots(ThreadState* thread, uint64_t i) {
  2977. while (!thread->snapshot_queue.empty() &&
  2978. i >= thread->snapshot_queue.front().first) {
  2979. auto snap_state = thread->snapshot_queue.front().second;
  2980. assert(snap_state.snapshot);
  2981. // Note: this is unsafe as the cf might be dropped concurrently. But
  2982. // it is ok since unclean cf drop is cunnrently not supported by write
  2983. // prepared transactions.
  2984. Status s = AssertSame(db_, column_families_[snap_state.cf_at], snap_state);
  2985. db_->ReleaseSnapshot(snap_state.snapshot);
  2986. delete snap_state.key_vec;
  2987. thread->snapshot_queue.pop();
  2988. if (!s.ok()) {
  2989. return s;
  2990. }
  2991. }
  2992. return Status::OK();
  2993. }
  2994. void StressTest::TestCompactRange(ThreadState* thread, int64_t rand_key,
  2995. const Slice& start_key,
  2996. ColumnFamilyHandle* column_family) {
  2997. int64_t end_key_num;
  2998. if (std::numeric_limits<int64_t>::max() - rand_key <
  2999. FLAGS_compact_range_width) {
  3000. end_key_num = std::numeric_limits<int64_t>::max();
  3001. } else {
  3002. end_key_num = FLAGS_compact_range_width + rand_key;
  3003. }
  3004. std::string end_key_buf = Key(end_key_num);
  3005. Slice end_key(end_key_buf);
  3006. CompactRangeOptions cro;
  3007. cro.exclusive_manual_compaction = static_cast<bool>(thread->rand.Next() % 2);
  3008. if (static_cast<ROCKSDB_NAMESPACE::CompactionStyle>(FLAGS_compaction_style) !=
  3009. ROCKSDB_NAMESPACE::CompactionStyle::kCompactionStyleFIFO) {
  3010. cro.change_level = static_cast<bool>(thread->rand.Next() % 2);
  3011. }
  3012. if (thread->rand.OneIn(2)) {
  3013. cro.target_level = thread->rand.Next() % options_.num_levels;
  3014. }
  3015. std::vector<BottommostLevelCompaction> bottom_level_styles = {
  3016. BottommostLevelCompaction::kSkip,
  3017. BottommostLevelCompaction::kIfHaveCompactionFilter,
  3018. BottommostLevelCompaction::kForce,
  3019. BottommostLevelCompaction::kForceOptimized};
  3020. cro.bottommost_level_compaction =
  3021. bottom_level_styles[thread->rand.Next() %
  3022. static_cast<uint32_t>(bottom_level_styles.size())];
  3023. cro.allow_write_stall = static_cast<bool>(thread->rand.Next() % 2);
  3024. cro.max_subcompactions = static_cast<uint32_t>(thread->rand.Next() % 4);
  3025. std::vector<BlobGarbageCollectionPolicy> blob_gc_policies = {
  3026. BlobGarbageCollectionPolicy::kForce,
  3027. BlobGarbageCollectionPolicy::kDisable,
  3028. BlobGarbageCollectionPolicy::kUseDefault};
  3029. cro.blob_garbage_collection_policy =
  3030. blob_gc_policies[thread->rand.Next() %
  3031. static_cast<uint32_t>(blob_gc_policies.size())];
  3032. cro.blob_garbage_collection_age_cutoff =
  3033. static_cast<double>(thread->rand.Next() % 100) / 100.0;
  3034. const Snapshot* pre_snapshot = nullptr;
  3035. uint32_t pre_hash = 0;
  3036. if (thread->rand.OneIn(2)) {
  3037. // Temporarily disable error injection to for validation
  3038. if (fault_fs_guard) {
  3039. fault_fs_guard->DisableAllThreadLocalErrorInjection();
  3040. }
  3041. // Declare a snapshot and compare the data before and after the compaction
  3042. pre_snapshot = db_->GetSnapshot();
  3043. pre_hash =
  3044. GetRangeHash(thread, pre_snapshot, column_family, start_key, end_key);
  3045. // Enable back error injection disabled for validation
  3046. if (fault_fs_guard) {
  3047. fault_fs_guard->EnableAllThreadLocalErrorInjection();
  3048. }
  3049. }
  3050. std::ostringstream compact_range_opt_oss;
  3051. compact_range_opt_oss << "exclusive_manual_compaction: "
  3052. << cro.exclusive_manual_compaction
  3053. << ", change_level: " << cro.change_level
  3054. << ", target_level: " << cro.target_level
  3055. << ", bottommost_level_compaction: "
  3056. << static_cast<int>(cro.bottommost_level_compaction)
  3057. << ", allow_write_stall: " << cro.allow_write_stall
  3058. << ", max_subcompactions: " << cro.max_subcompactions
  3059. << ", blob_garbage_collection_policy: "
  3060. << static_cast<int>(cro.blob_garbage_collection_policy)
  3061. << ", blob_garbage_collection_age_cutoff: "
  3062. << cro.blob_garbage_collection_age_cutoff;
  3063. Status status = db_->CompactRange(cro, column_family, &start_key, &end_key);
  3064. if (!status.ok()) {
  3065. // TOOD (hx235): allow an exact list of tolerable failures under stress test
  3066. bool non_ok_status_allowed =
  3067. status.IsManualCompactionPaused() ||
  3068. IsErrorInjectedAndRetryable(status) || status.IsAborted() ||
  3069. status.IsInvalidArgument() || status.IsNotSupported();
  3070. if (!non_ok_status_allowed) {
  3071. fprintf(stderr,
  3072. "Unable to perform CompactRange(): %s under specified "
  3073. "CompactRangeOptions: %s (Empty string or "
  3074. "missing field indicates default option or value is used)\n",
  3075. status.ToString().c_str(), compact_range_opt_oss.str().c_str());
  3076. // Fail fast to preserve the DB state.
  3077. thread->shared->SetVerificationFailure();
  3078. }
  3079. }
  3080. if (pre_snapshot != nullptr) {
  3081. // Temporarily disable error injection for validation
  3082. if (fault_fs_guard) {
  3083. fault_fs_guard->DisableAllThreadLocalErrorInjection();
  3084. }
  3085. uint32_t post_hash =
  3086. GetRangeHash(thread, pre_snapshot, column_family, start_key, end_key);
  3087. if (pre_hash != post_hash) {
  3088. fprintf(stderr,
  3089. "Data hash different before and after compact range "
  3090. "start_key %s end_key %s under specified CompactRangeOptions: %s "
  3091. "(Empty string or "
  3092. "missing field indicates default option or value is used)\n",
  3093. start_key.ToString(true).c_str(), end_key.ToString(true).c_str(),
  3094. compact_range_opt_oss.str().c_str());
  3095. thread->stats.AddErrors(1);
  3096. // Fail fast to preserve the DB state.
  3097. thread->shared->SetVerificationFailure();
  3098. }
  3099. db_->ReleaseSnapshot(pre_snapshot);
  3100. if (fault_fs_guard) {
  3101. // Enable back error injection disabled for validation
  3102. fault_fs_guard->EnableAllThreadLocalErrorInjection();
  3103. }
  3104. }
  3105. }
  3106. uint32_t StressTest::GetRangeHash(ThreadState* thread, const Snapshot* snapshot,
  3107. ColumnFamilyHandle* column_family,
  3108. const Slice& start_key,
  3109. const Slice& end_key) {
  3110. // This `ReadOptions` is for validation purposes. Ignore
  3111. // `FLAGS_rate_limit_user_ops` to avoid slowing any validation.
  3112. ReadOptions ro;
  3113. ro.snapshot = snapshot;
  3114. ro.total_order_seek = true;
  3115. ro.auto_refresh_iterator_with_snapshot =
  3116. FLAGS_auto_refresh_iterator_with_snapshot;
  3117. std::string ts_str;
  3118. Slice ts;
  3119. if (FLAGS_user_timestamp_size > 0) {
  3120. ts_str = GetNowNanos();
  3121. ts = ts_str;
  3122. ro.timestamp = &ts;
  3123. }
  3124. std::unique_ptr<Iterator> it(db_->NewIterator(ro, column_family));
  3125. constexpr char kCrcCalculatorSepearator = ';';
  3126. uint32_t crc = 0;
  3127. for (it->Seek(start_key);
  3128. it->Valid() && options_.comparator->Compare(it->key(), end_key) <= 0;
  3129. it->Next()) {
  3130. crc = crc32c::Extend(crc, it->key().data(), it->key().size());
  3131. crc = crc32c::Extend(crc, &kCrcCalculatorSepearator, sizeof(char));
  3132. crc = crc32c::Extend(crc, it->value().data(), it->value().size());
  3133. crc = crc32c::Extend(crc, &kCrcCalculatorSepearator, sizeof(char));
  3134. for (const auto& column : it->columns()) {
  3135. crc = crc32c::Extend(crc, column.name().data(), column.name().size());
  3136. crc = crc32c::Extend(crc, &kCrcCalculatorSepearator, sizeof(char));
  3137. crc = crc32c::Extend(crc, column.value().data(), column.value().size());
  3138. crc = crc32c::Extend(crc, &kCrcCalculatorSepearator, sizeof(char));
  3139. }
  3140. }
  3141. if (!it->status().ok()) {
  3142. fprintf(stderr, "Iterator non-OK when calculating range CRC: %s\n",
  3143. it->status().ToString().c_str());
  3144. thread->stats.AddErrors(1);
  3145. // Fail fast to preserve the DB state.
  3146. thread->shared->SetVerificationFailure();
  3147. }
  3148. return crc;
  3149. }
  3150. void StressTest::PrintEnv() const {
  3151. fprintf(stdout, "RocksDB version : %d.%d\n", kMajorVersion,
  3152. kMinorVersion);
  3153. fprintf(stdout, "Format version : %d\n", FLAGS_format_version);
  3154. fprintf(stdout, "TransactionDB : %s\n",
  3155. FLAGS_use_txn ? "true" : "false");
  3156. if (FLAGS_use_txn) {
  3157. fprintf(stdout, "TransactionDB Type : %s\n",
  3158. FLAGS_use_optimistic_txn ? "Optimistic" : "Pessimistic");
  3159. if (FLAGS_use_optimistic_txn) {
  3160. fprintf(stdout, "OCC Validation Type : %d\n",
  3161. static_cast<int>(FLAGS_occ_validation_policy));
  3162. if (static_cast<uint64_t>(OccValidationPolicy::kValidateParallel) ==
  3163. FLAGS_occ_validation_policy) {
  3164. fprintf(stdout, "Share Lock Buckets : %s\n",
  3165. FLAGS_share_occ_lock_buckets ? "true" : "false");
  3166. if (FLAGS_share_occ_lock_buckets) {
  3167. fprintf(stdout, "Lock Bucket Count : %d\n",
  3168. static_cast<int>(FLAGS_occ_lock_bucket_count));
  3169. }
  3170. }
  3171. } else {
  3172. fprintf(stdout, "Two write queues: : %s\n",
  3173. FLAGS_two_write_queues ? "true" : "false");
  3174. fprintf(stdout, "Write policy : %d\n",
  3175. static_cast<int>(FLAGS_txn_write_policy));
  3176. if (static_cast<uint64_t>(TxnDBWritePolicy::WRITE_PREPARED) ==
  3177. FLAGS_txn_write_policy ||
  3178. static_cast<uint64_t>(TxnDBWritePolicy::WRITE_UNPREPARED) ==
  3179. FLAGS_txn_write_policy) {
  3180. fprintf(stdout, "Snapshot cache bits : %d\n",
  3181. static_cast<int>(FLAGS_wp_snapshot_cache_bits));
  3182. fprintf(stdout, "Commit cache bits : %d\n",
  3183. static_cast<int>(FLAGS_wp_commit_cache_bits));
  3184. }
  3185. fprintf(stdout, "last cwb for recovery : %s\n",
  3186. FLAGS_use_only_the_last_commit_time_batch_for_recovery ? "true"
  3187. : "false");
  3188. }
  3189. }
  3190. fprintf(stdout, "Stacked BlobDB : %s\n",
  3191. FLAGS_use_blob_db ? "true" : "false");
  3192. fprintf(stdout, "Read only mode : %s\n",
  3193. FLAGS_read_only ? "true" : "false");
  3194. fprintf(stdout, "Atomic flush : %s\n",
  3195. FLAGS_atomic_flush ? "true" : "false");
  3196. fprintf(stdout, "Manual WAL flush : %s\n",
  3197. FLAGS_manual_wal_flush_one_in > 0 ? "true" : "false");
  3198. fprintf(stdout, "Column families : %d\n", FLAGS_column_families);
  3199. if (!FLAGS_test_batches_snapshots) {
  3200. fprintf(stdout, "Clear CFs one in : %d\n",
  3201. FLAGS_clear_column_family_one_in);
  3202. }
  3203. fprintf(stdout, "Number of threads : %d\n", FLAGS_threads);
  3204. fprintf(stdout, "Ops per thread : %lu\n",
  3205. (unsigned long)FLAGS_ops_per_thread);
  3206. std::string ttl_state("unused");
  3207. if (FLAGS_ttl > 0) {
  3208. ttl_state = std::to_string(FLAGS_ttl);
  3209. }
  3210. fprintf(stdout, "Time to live(sec) : %s\n", ttl_state.c_str());
  3211. fprintf(stdout, "Read percentage : %d%%\n", FLAGS_readpercent);
  3212. fprintf(stdout, "Prefix percentage : %d%%\n", FLAGS_prefixpercent);
  3213. fprintf(stdout, "Write percentage : %d%%\n", FLAGS_writepercent);
  3214. fprintf(stdout, "Delete percentage : %d%%\n", FLAGS_delpercent);
  3215. fprintf(stdout, "Delete range percentage : %d%%\n", FLAGS_delrangepercent);
  3216. fprintf(stdout, "No overwrite percentage : %d%%\n",
  3217. FLAGS_nooverwritepercent);
  3218. fprintf(stdout, "Iterate percentage : %d%%\n", FLAGS_iterpercent);
  3219. fprintf(stdout, "Custom ops percentage : %d%%\n", FLAGS_customopspercent);
  3220. fprintf(stdout, "DB-write-buffer-size : %" PRIu64 "\n",
  3221. FLAGS_db_write_buffer_size);
  3222. fprintf(stdout, "Write-buffer-size : %d\n", FLAGS_write_buffer_size);
  3223. fprintf(stdout, "Iterations : %lu\n",
  3224. (unsigned long)FLAGS_num_iterations);
  3225. fprintf(stdout, "Max key : %lu\n",
  3226. (unsigned long)FLAGS_max_key);
  3227. fprintf(stdout, "Ratio #ops/#keys : %f\n",
  3228. (1.0 * FLAGS_ops_per_thread * FLAGS_threads) / FLAGS_max_key);
  3229. fprintf(stdout, "Num times DB reopens : %d\n", FLAGS_reopen);
  3230. fprintf(stdout, "Batches/snapshots : %d\n",
  3231. FLAGS_test_batches_snapshots);
  3232. fprintf(stdout, "Do update in place : %d\n",
  3233. FLAGS_inplace_update_support);
  3234. fprintf(stdout, "Num keys per lock : %d\n",
  3235. 1 << FLAGS_log2_keys_per_lock);
  3236. std::string compression = CompressionTypeToString(compression_type_e);
  3237. fprintf(stdout, "Compression : %s\n", compression.c_str());
  3238. std::string bottommost_compression =
  3239. CompressionTypeToString(bottommost_compression_type_e);
  3240. fprintf(stdout, "Bottommost Compression : %s\n",
  3241. bottommost_compression.c_str());
  3242. std::string checksum = ChecksumTypeToString(checksum_type_e);
  3243. fprintf(stdout, "Checksum type : %s\n", checksum.c_str());
  3244. fprintf(stdout, "File checksum impl : %s\n",
  3245. FLAGS_file_checksum_impl.c_str());
  3246. fprintf(stdout, "Bloom bits / key : %s\n",
  3247. FormatDoubleParam(FLAGS_bloom_bits).c_str());
  3248. fprintf(stdout, "Max subcompactions : %" PRIu64 "\n",
  3249. FLAGS_subcompactions);
  3250. fprintf(stdout, "Use MultiGet : %s\n",
  3251. FLAGS_use_multiget ? "true" : "false");
  3252. fprintf(stdout, "Use GetEntity : %s\n",
  3253. FLAGS_use_get_entity ? "true" : "false");
  3254. fprintf(stdout, "Use MultiGetEntity : %s\n",
  3255. FLAGS_use_multi_get_entity ? "true" : "false");
  3256. fprintf(stdout, "Verification only : %s\n",
  3257. FLAGS_verification_only ? "true" : "false");
  3258. const char* memtablerep;
  3259. switch (FLAGS_rep_factory) {
  3260. default:
  3261. case kSkipList:
  3262. memtablerep = "skip_list";
  3263. break;
  3264. case kHashSkipList:
  3265. memtablerep = "prefix_hash";
  3266. break;
  3267. case kVectorRep:
  3268. memtablerep = "vector";
  3269. break;
  3270. }
  3271. fprintf(stdout, "Memtablerep : %s\n", memtablerep);
  3272. #ifndef NDEBUG
  3273. KillPoint* kp = KillPoint::GetInstance();
  3274. fprintf(stdout, "Test kill odd : %d\n", kp->rocksdb_kill_odds);
  3275. if (!kp->rocksdb_kill_exclude_prefixes.empty()) {
  3276. fprintf(stdout, "Skipping kill points prefixes:\n");
  3277. for (auto& p : kp->rocksdb_kill_exclude_prefixes) {
  3278. fprintf(stdout, " %s\n", p.c_str());
  3279. }
  3280. }
  3281. #endif
  3282. fprintf(stdout, "Periodic Compaction Secs : %" PRIu64 "\n",
  3283. FLAGS_periodic_compaction_seconds);
  3284. fprintf(stdout, "Daily Offpeak UTC : %s\n",
  3285. FLAGS_daily_offpeak_time_utc.c_str());
  3286. fprintf(stdout, "Compaction TTL : %" PRIu64 "\n",
  3287. FLAGS_compaction_ttl);
  3288. const char* compaction_pri = "";
  3289. switch (FLAGS_compaction_pri) {
  3290. case kByCompensatedSize:
  3291. compaction_pri = "kByCompensatedSize";
  3292. break;
  3293. case kOldestLargestSeqFirst:
  3294. compaction_pri = "kOldestLargestSeqFirst";
  3295. break;
  3296. case kOldestSmallestSeqFirst:
  3297. compaction_pri = "kOldestSmallestSeqFirst";
  3298. break;
  3299. case kMinOverlappingRatio:
  3300. compaction_pri = "kMinOverlappingRatio";
  3301. break;
  3302. case kRoundRobin:
  3303. compaction_pri = "kRoundRobin";
  3304. break;
  3305. }
  3306. fprintf(stdout, "Compaction Pri : %s\n", compaction_pri);
  3307. fprintf(stdout, "Background Purge : %d\n",
  3308. static_cast<int>(FLAGS_avoid_unnecessary_blocking_io));
  3309. fprintf(stdout, "Write DB ID to manifest : %d\n",
  3310. static_cast<int>(FLAGS_write_dbid_to_manifest));
  3311. fprintf(stdout, "Max Write Batch Group Size: %" PRIu64 "\n",
  3312. FLAGS_max_write_batch_group_size_bytes);
  3313. fprintf(stdout, "Use dynamic level : %d\n",
  3314. static_cast<int>(FLAGS_level_compaction_dynamic_level_bytes));
  3315. fprintf(stdout, "Metadata read fault one in : %d\n",
  3316. FLAGS_metadata_read_fault_one_in);
  3317. fprintf(stdout, "Metadata write fault one in : %d\n",
  3318. FLAGS_metadata_write_fault_one_in);
  3319. fprintf(stdout, "Read fault one in : %d\n", FLAGS_read_fault_one_in);
  3320. fprintf(stdout, "Write fault one in : %d\n", FLAGS_write_fault_one_in);
  3321. fprintf(stdout, "Open metadata read fault one in:\n");
  3322. fprintf(stdout, " %d\n",
  3323. FLAGS_open_metadata_read_fault_one_in);
  3324. fprintf(stdout, "Open metadata write fault one in:\n");
  3325. fprintf(stdout, " %d\n",
  3326. FLAGS_open_metadata_write_fault_one_in);
  3327. fprintf(stdout, "Open read fault one in :\n");
  3328. fprintf(stdout, " %d\n",
  3329. FLAGS_open_read_fault_one_in);
  3330. fprintf(stdout, "Open write fault one in :\n");
  3331. fprintf(stdout, " %d\n",
  3332. FLAGS_open_write_fault_one_in);
  3333. fprintf(stdout, "Sync fault injection : %d\n",
  3334. FLAGS_sync_fault_injection);
  3335. fprintf(stdout, "Best efforts recovery : %d\n",
  3336. static_cast<int>(FLAGS_best_efforts_recovery));
  3337. fprintf(stdout, "User timestamp size bytes : %d\n",
  3338. static_cast<int>(FLAGS_user_timestamp_size));
  3339. fprintf(stdout, "Persist user defined timestamps : %d\n",
  3340. FLAGS_persist_user_defined_timestamps);
  3341. fprintf(stdout, "WAL compression : %s\n",
  3342. FLAGS_wal_compression.c_str());
  3343. fprintf(stdout, "Try verify sst unique id : %d\n",
  3344. static_cast<int>(FLAGS_verify_sst_unique_id_in_manifest));
  3345. fprintf(stdout, "------------------------------------------------\n");
  3346. }
  3347. void StressTest::Open(SharedState* shared, bool reopen) {
  3348. assert(db_ == nullptr);
  3349. assert(txn_db_ == nullptr);
  3350. assert(optimistic_txn_db_ == nullptr);
  3351. if (!InitializeOptionsFromFile(options_)) {
  3352. InitializeOptionsFromFlags(cache_, filter_policy_, options_);
  3353. }
  3354. InitializeOptionsGeneral(cache_, filter_policy_, sqfc_factory_, options_);
  3355. DbStressCustomCompressionManager::Register();
  3356. if (!strcasecmp(FLAGS_compression_manager.c_str(), "custom")) {
  3357. options_.compression_manager =
  3358. std::make_shared<DbStressCustomCompressionManager>();
  3359. } else if (!strcasecmp(FLAGS_compression_manager.c_str(), "mixed")) {
  3360. options_.compression_manager =
  3361. std::make_shared<RoundRobinManager>(GetBuiltinV2CompressionManager());
  3362. } else if (!strcasecmp(FLAGS_compression_manager.c_str(), "randommixed")) {
  3363. options_.compression_manager =
  3364. std::make_shared<RandomMixedCompressionManager>(
  3365. GetBuiltinV2CompressionManager());
  3366. } else if (!strcasecmp(FLAGS_compression_manager.c_str(), "autoskip")) {
  3367. options_.compression_manager =
  3368. CreateAutoSkipCompressionManager(GetBuiltinV2CompressionManager());
  3369. } else if (!strcasecmp(FLAGS_compression_manager.c_str(), "none")) {
  3370. // Nothing to do using default compression manager
  3371. } else {
  3372. fprintf(stderr, "Unknown compression manager: %s\n",
  3373. FLAGS_compression_manager.c_str());
  3374. exit(1);
  3375. }
  3376. if (FLAGS_prefix_size == 0 && FLAGS_rep_factory == kHashSkipList) {
  3377. fprintf(stderr,
  3378. "prefeix_size cannot be zero if memtablerep == prefix_hash\n");
  3379. exit(1);
  3380. }
  3381. if (FLAGS_prefix_size != 0 && FLAGS_rep_factory != kHashSkipList) {
  3382. fprintf(stdout,
  3383. "WARNING: prefix_size is non-zero but "
  3384. "memtablerep != prefix_hash\n");
  3385. }
  3386. // Remote Compaction
  3387. if (FLAGS_remote_compaction_worker_threads > 0) {
  3388. // TODO(jaykorean) Remove this after fix - remote worker shouldn't recover
  3389. // from WAL
  3390. if (!FLAGS_disable_wal) {
  3391. fprintf(stderr,
  3392. "WAL is not compatible with Remote Compaction in Stress Test\n");
  3393. exit(1);
  3394. }
  3395. if ((options_.enable_blob_files ||
  3396. options_.enable_blob_garbage_collection ||
  3397. FLAGS_allow_setting_blob_options_dynamically)) {
  3398. fprintf(stderr,
  3399. "Integrated BlobDB is currently incompatible with Remote "
  3400. "Compaction\n");
  3401. exit(1);
  3402. }
  3403. // Each DB open/reopen gets a fresh compaction service instance with a clean
  3404. // aborted_ state
  3405. auto compaction_service = std::make_shared<DbStressCompactionService>(
  3406. shared, FLAGS_remote_compaction_failure_fall_back_to_local);
  3407. options_.compaction_service = compaction_service;
  3408. }
  3409. if (FLAGS_allow_resumption_one_in > 0) {
  3410. if (FLAGS_remote_compaction_worker_threads == 0) {
  3411. fprintf(stderr,
  3412. "allow_resumption or randomize_allow_resumption requires "
  3413. "remote_compaction_worker_threads > 0\n");
  3414. exit(1);
  3415. }
  3416. }
  3417. if ((options_.enable_blob_files || options_.enable_blob_garbage_collection ||
  3418. FLAGS_allow_setting_blob_options_dynamically) &&
  3419. FLAGS_best_efforts_recovery) {
  3420. fprintf(stderr,
  3421. "Integrated BlobDB is currently incompatible with best-effort "
  3422. "recovery\n");
  3423. exit(1);
  3424. }
  3425. fprintf(stdout,
  3426. "Integrated BlobDB: blob files enabled %d, min blob size %" PRIu64
  3427. ", blob file size %" PRIu64
  3428. ", blob compression type %s, blob GC enabled %d, cutoff %f, force "
  3429. "threshold %f, blob compaction readahead size %" PRIu64
  3430. ", blob file starting level %d\n",
  3431. options_.enable_blob_files, options_.min_blob_size,
  3432. options_.blob_file_size,
  3433. CompressionTypeToString(options_.blob_compression_type).c_str(),
  3434. options_.enable_blob_garbage_collection,
  3435. options_.blob_garbage_collection_age_cutoff,
  3436. options_.blob_garbage_collection_force_threshold,
  3437. options_.blob_compaction_readahead_size,
  3438. options_.blob_file_starting_level);
  3439. if (FLAGS_use_blob_cache) {
  3440. fprintf(stdout,
  3441. "Integrated BlobDB: blob cache enabled"
  3442. ", block and blob caches shared: %d",
  3443. FLAGS_use_shared_block_and_blob_cache);
  3444. if (!FLAGS_use_shared_block_and_blob_cache) {
  3445. fprintf(stdout,
  3446. ", blob cache size %" PRIu64 ", blob cache num shard bits: %d",
  3447. FLAGS_blob_cache_size, FLAGS_blob_cache_numshardbits);
  3448. }
  3449. fprintf(stdout, ", blob cache prepopulated: %d\n",
  3450. FLAGS_prepopulate_blob_cache);
  3451. } else {
  3452. fprintf(stdout, "Integrated BlobDB: blob cache disabled\n");
  3453. }
  3454. fprintf(stdout, "DB path: [%s]\n", FLAGS_db.c_str());
  3455. Status s;
  3456. if (FLAGS_ttl == -1) {
  3457. std::vector<std::string> existing_column_families;
  3458. s = DB::ListColumnFamilies(DBOptions(options_), FLAGS_db,
  3459. &existing_column_families); // ignore errors
  3460. if (!s.ok()) {
  3461. // DB doesn't exist
  3462. assert(existing_column_families.empty());
  3463. assert(column_family_names_.empty());
  3464. column_family_names_.push_back(kDefaultColumnFamilyName);
  3465. } else if (column_family_names_.empty()) {
  3466. // this is the first call to the function Open()
  3467. column_family_names_ = existing_column_families;
  3468. } else {
  3469. // this is a reopen. just assert that existing column_family_names are
  3470. // equivalent to what we remember
  3471. auto sorted_cfn = column_family_names_;
  3472. std::sort(sorted_cfn.begin(), sorted_cfn.end());
  3473. std::sort(existing_column_families.begin(),
  3474. existing_column_families.end());
  3475. if (sorted_cfn != existing_column_families) {
  3476. fprintf(stderr, "Expected column families differ from the existing:\n");
  3477. fprintf(stderr, "Expected: {");
  3478. for (const auto& cf : sorted_cfn) {
  3479. fprintf(stderr, "%s ", cf.c_str());
  3480. }
  3481. fprintf(stderr, "}\n");
  3482. fprintf(stderr, "Existing: {");
  3483. for (const auto& cf : existing_column_families) {
  3484. fprintf(stderr, "%s ", cf.c_str());
  3485. }
  3486. fprintf(stderr, "}\n");
  3487. }
  3488. assert(sorted_cfn == existing_column_families);
  3489. }
  3490. std::vector<ColumnFamilyDescriptor> cf_descriptors;
  3491. for (const auto& name : column_family_names_) {
  3492. if (name != kDefaultColumnFamilyName) {
  3493. new_column_family_name_ =
  3494. std::max(new_column_family_name_.load(), std::stoi(name) + 1);
  3495. }
  3496. cf_descriptors.emplace_back(name, ColumnFamilyOptions(options_));
  3497. }
  3498. while (cf_descriptors.size() < (size_t)FLAGS_column_families) {
  3499. std::string name = std::to_string(new_column_family_name_.load());
  3500. new_column_family_name_++;
  3501. cf_descriptors.emplace_back(name, ColumnFamilyOptions(options_));
  3502. column_family_names_.push_back(name);
  3503. }
  3504. options_.listeners.clear();
  3505. options_.listeners.emplace_back(
  3506. new DbStressListener(FLAGS_db, options_.db_paths, cf_descriptors,
  3507. db_stress_listener_env, shared));
  3508. RegisterAdditionalListeners();
  3509. // If this is for DB reopen, error injection may have been enabled.
  3510. // Disable it here in case there is no open fault injection.
  3511. if (fault_fs_guard) {
  3512. fault_fs_guard->DisableAllThreadLocalErrorInjection();
  3513. }
  3514. // TODO; test transaction DB Open with fault injection
  3515. if (!FLAGS_use_txn) {
  3516. bool inject_sync_fault = FLAGS_sync_fault_injection;
  3517. bool inject_open_meta_read_error =
  3518. FLAGS_open_metadata_read_fault_one_in > 0;
  3519. bool inject_open_meta_write_error =
  3520. FLAGS_open_metadata_write_fault_one_in > 0;
  3521. bool inject_open_read_error = FLAGS_open_read_fault_one_in > 0;
  3522. bool inject_open_write_error = FLAGS_open_write_fault_one_in > 0;
  3523. if ((inject_sync_fault || inject_open_meta_read_error ||
  3524. inject_open_meta_write_error || inject_open_read_error ||
  3525. inject_open_write_error) &&
  3526. fault_fs_guard
  3527. ->FileExists(FLAGS_db + "/CURRENT", IOOptions(), nullptr)
  3528. .ok()) {
  3529. if (inject_sync_fault || inject_open_write_error) {
  3530. fault_fs_guard->SetFilesystemDirectWritable(false);
  3531. fault_fs_guard->SetInjectUnsyncedDataLoss(inject_sync_fault);
  3532. }
  3533. fault_fs_guard->SetThreadLocalErrorContext(
  3534. FaultInjectionIOType::kMetadataRead,
  3535. static_cast<uint32_t>(FLAGS_seed),
  3536. FLAGS_open_metadata_read_fault_one_in, false /* retryable */,
  3537. false /* has_data_loss */);
  3538. fault_fs_guard->EnableThreadLocalErrorInjection(
  3539. FaultInjectionIOType::kMetadataRead);
  3540. fault_fs_guard->SetThreadLocalErrorContext(
  3541. FaultInjectionIOType::kMetadataWrite,
  3542. static_cast<uint32_t>(FLAGS_seed),
  3543. FLAGS_open_metadata_write_fault_one_in, false /* retryable */,
  3544. false /* has_data_loss */);
  3545. fault_fs_guard->EnableThreadLocalErrorInjection(
  3546. FaultInjectionIOType::kMetadataWrite);
  3547. fault_fs_guard->SetThreadLocalErrorContext(
  3548. FaultInjectionIOType::kRead, static_cast<uint32_t>(FLAGS_seed),
  3549. FLAGS_open_read_fault_one_in, false /* retryable */,
  3550. false /* has_data_loss */);
  3551. fault_fs_guard->EnableThreadLocalErrorInjection(
  3552. FaultInjectionIOType::kRead);
  3553. fault_fs_guard->SetThreadLocalErrorContext(
  3554. FaultInjectionIOType::kWrite, static_cast<uint32_t>(FLAGS_seed),
  3555. FLAGS_open_write_fault_one_in, false /* retryable */,
  3556. false /* has_data_loss */);
  3557. fault_fs_guard->EnableThreadLocalErrorInjection(
  3558. FaultInjectionIOType::kWrite);
  3559. }
  3560. while (true) {
  3561. // StackableDB-based BlobDB
  3562. if (FLAGS_use_blob_db) {
  3563. blob_db::BlobDBOptions blob_db_options;
  3564. blob_db_options.min_blob_size = FLAGS_blob_db_min_blob_size;
  3565. blob_db_options.bytes_per_sync = FLAGS_blob_db_bytes_per_sync;
  3566. blob_db_options.blob_file_size = FLAGS_blob_db_file_size;
  3567. blob_db_options.enable_garbage_collection = FLAGS_blob_db_enable_gc;
  3568. blob_db_options.garbage_collection_cutoff = FLAGS_blob_db_gc_cutoff;
  3569. blob_db::BlobDB* blob_db = nullptr;
  3570. s = blob_db::BlobDB::Open(options_, blob_db_options, FLAGS_db,
  3571. cf_descriptors, &column_families_,
  3572. &blob_db);
  3573. if (s.ok()) {
  3574. db_ = blob_db;
  3575. }
  3576. } else {
  3577. if (db_preload_finished_.load() && FLAGS_read_only) {
  3578. s = DB::OpenForReadOnly(DBOptions(options_), FLAGS_db,
  3579. cf_descriptors, &column_families_, &db_);
  3580. } else {
  3581. s = DB::Open(DBOptions(options_), FLAGS_db, cf_descriptors,
  3582. &column_families_, &db_);
  3583. }
  3584. }
  3585. if (inject_sync_fault || inject_open_meta_read_error ||
  3586. inject_open_meta_write_error || inject_open_read_error ||
  3587. inject_open_write_error) {
  3588. fault_fs_guard->DisableAllThreadLocalErrorInjection();
  3589. if (s.ok()) {
  3590. // Injected errors might happen in background compactions. We
  3591. // wait for all compactions to finish to make sure DB is in
  3592. // clean state before executing queries.
  3593. s = db_->GetRootDB()->WaitForCompact(WaitForCompactOptions());
  3594. if (!s.ok()) {
  3595. CleanUpColumnFamilies();
  3596. delete db_;
  3597. db_ = nullptr;
  3598. delete secondary_db_;
  3599. secondary_db_ = nullptr;
  3600. }
  3601. }
  3602. if (!s.ok()) {
  3603. // After failure to opening a DB due to IO error or unsynced data
  3604. // loss, retry should successfully open the DB with correct data if
  3605. // no IO error shows up.
  3606. inject_sync_fault = false;
  3607. inject_open_meta_read_error = false;
  3608. inject_open_meta_write_error = false;
  3609. inject_open_read_error = false;
  3610. inject_open_write_error = false;
  3611. // TODO: Unsynced data loss during DB reopen is not supported yet in
  3612. // stress test. Will need to recreate expected state if we decide
  3613. // to support unsynced data loss during DB reopen.
  3614. if (!reopen) {
  3615. Random rand(static_cast<uint32_t>(FLAGS_seed));
  3616. if (rand.OneIn(2)) {
  3617. fault_fs_guard->DeleteFilesCreatedAfterLastDirSync(IOOptions(),
  3618. nullptr);
  3619. }
  3620. if (rand.OneIn(3)) {
  3621. fault_fs_guard->DropUnsyncedFileData();
  3622. } else if (rand.OneIn(2)) {
  3623. fault_fs_guard->DropRandomUnsyncedFileData(&rand);
  3624. }
  3625. }
  3626. continue;
  3627. }
  3628. }
  3629. break;
  3630. }
  3631. } else {
  3632. if (FLAGS_use_optimistic_txn) {
  3633. OptimisticTransactionDBOptions optimistic_txn_db_options;
  3634. optimistic_txn_db_options.validate_policy =
  3635. static_cast<OccValidationPolicy>(FLAGS_occ_validation_policy);
  3636. if (FLAGS_share_occ_lock_buckets) {
  3637. optimistic_txn_db_options.shared_lock_buckets =
  3638. MakeSharedOccLockBuckets(FLAGS_occ_lock_bucket_count);
  3639. } else {
  3640. optimistic_txn_db_options.occ_lock_buckets =
  3641. FLAGS_occ_lock_bucket_count;
  3642. optimistic_txn_db_options.shared_lock_buckets = nullptr;
  3643. }
  3644. s = OptimisticTransactionDB::Open(
  3645. options_, optimistic_txn_db_options, FLAGS_db, cf_descriptors,
  3646. &column_families_, &optimistic_txn_db_);
  3647. if (!s.ok()) {
  3648. fprintf(stderr, "Error in opening the OptimisticTransactionDB [%s]\n",
  3649. s.ToString().c_str());
  3650. fflush(stderr);
  3651. }
  3652. assert(s.ok());
  3653. {
  3654. db_ = optimistic_txn_db_;
  3655. db_aptr_.store(optimistic_txn_db_, std::memory_order_release);
  3656. }
  3657. } else {
  3658. TransactionDBOptions txn_db_options;
  3659. assert(FLAGS_txn_write_policy <= TxnDBWritePolicy::WRITE_UNPREPARED);
  3660. txn_db_options.write_policy =
  3661. static_cast<TxnDBWritePolicy>(FLAGS_txn_write_policy);
  3662. if (FLAGS_unordered_write) {
  3663. assert(txn_db_options.write_policy ==
  3664. TxnDBWritePolicy::WRITE_PREPARED);
  3665. options_.unordered_write = true;
  3666. options_.two_write_queues = true;
  3667. txn_db_options.skip_concurrency_control = true;
  3668. } else {
  3669. options_.two_write_queues = FLAGS_two_write_queues;
  3670. }
  3671. txn_db_options.wp_snapshot_cache_bits =
  3672. static_cast<size_t>(FLAGS_wp_snapshot_cache_bits);
  3673. txn_db_options.wp_commit_cache_bits =
  3674. static_cast<size_t>(FLAGS_wp_commit_cache_bits);
  3675. txn_db_options.use_per_key_point_lock_mgr =
  3676. FLAGS_use_per_key_point_lock_mgr;
  3677. PrepareTxnDbOptions(shared, txn_db_options);
  3678. s = TransactionDB::Open(options_, txn_db_options, FLAGS_db,
  3679. cf_descriptors, &column_families_, &txn_db_);
  3680. if (!s.ok()) {
  3681. fprintf(stderr, "Error in opening the TransactionDB [%s]\n",
  3682. s.ToString().c_str());
  3683. fflush(stderr);
  3684. }
  3685. assert(s.ok());
  3686. // Do not swap the order of the following.
  3687. {
  3688. db_ = txn_db_;
  3689. db_aptr_.store(txn_db_, std::memory_order_release);
  3690. }
  3691. }
  3692. }
  3693. if (!s.ok()) {
  3694. fprintf(stderr, "Error in opening the DB [%s]\n", s.ToString().c_str());
  3695. fflush(stderr);
  3696. }
  3697. assert(s.ok());
  3698. assert(column_families_.size() ==
  3699. static_cast<size_t>(FLAGS_column_families));
  3700. // Clear statistics reference from options_ to intentionally shorten the
  3701. // statistics object lifetime to be same as the db object (which is the
  3702. // common case in practice) and detect if RocksDB access the statistics
  3703. // beyond its lifetime.
  3704. if (FLAGS_statistics) {
  3705. options_.statistics.reset();
  3706. }
  3707. // Secondary instance does not support write-prepared/write-unprepared
  3708. // transactions, thus just disable secondary instance if we use
  3709. // transaction.
  3710. if (s.ok() && FLAGS_test_secondary && !FLAGS_use_txn) {
  3711. Options tmp_opts;
  3712. // TODO(yanqin) support max_open_files != -1 for secondary instance.
  3713. tmp_opts.max_open_files = -1;
  3714. tmp_opts.env = db_stress_env;
  3715. const std::string& secondary_path = FLAGS_secondaries_base;
  3716. s = DB::OpenAsSecondary(tmp_opts, FLAGS_db, secondary_path,
  3717. cf_descriptors, &secondary_cfhs_, &secondary_db_);
  3718. assert(s.ok());
  3719. assert(secondary_cfhs_.size() ==
  3720. static_cast<size_t>(FLAGS_column_families));
  3721. }
  3722. } else {
  3723. DBWithTTL* db_with_ttl;
  3724. s = DBWithTTL::Open(options_, FLAGS_db, &db_with_ttl, FLAGS_ttl);
  3725. db_ = db_with_ttl;
  3726. }
  3727. if (FLAGS_preserve_unverified_changes) {
  3728. // Up until now, no live file should have become obsolete due to these
  3729. // options. After `DisableFileDeletions()` we can reenable auto compactions
  3730. // since, even if live files become obsolete, they won't be deleted.
  3731. assert(options_.avoid_flush_during_recovery);
  3732. assert(options_.disable_auto_compactions);
  3733. if (s.ok()) {
  3734. s = db_->DisableFileDeletions();
  3735. }
  3736. if (s.ok()) {
  3737. s = db_->EnableAutoCompaction(column_families_);
  3738. }
  3739. }
  3740. if (!s.ok()) {
  3741. fprintf(stderr, "open error: %s\n", s.ToString().c_str());
  3742. exit(1);
  3743. }
  3744. if (db_->GetLatestSequenceNumber() < shared->GetPersistedSeqno()) {
  3745. fprintf(stderr,
  3746. "DB of latest sequence number %" PRIu64
  3747. "did not recover to the persisted "
  3748. "sequence number %" PRIu64 " from last DB session\n",
  3749. db_->GetLatestSequenceNumber(), shared->GetPersistedSeqno());
  3750. exit(1);
  3751. }
  3752. }
  3753. void StressTest::Reopen(ThreadState* thread) {
  3754. // BG jobs in WritePrepared must be canceled first because i) they can access
  3755. // the db via a callbac ii) they hold on to a snapshot and the upcoming
  3756. // ::Close would complain about it.
  3757. const bool write_prepared = FLAGS_use_txn && FLAGS_txn_write_policy != 0;
  3758. bool bg_canceled __attribute__((unused)) = false;
  3759. if (write_prepared || thread->rand.OneIn(2)) {
  3760. const bool wait =
  3761. write_prepared || static_cast<bool>(thread->rand.OneIn(2));
  3762. CancelAllBackgroundWork(db_, wait);
  3763. bg_canceled = wait;
  3764. }
  3765. assert(!write_prepared || bg_canceled);
  3766. CleanUpColumnFamilies();
  3767. // Currently reopen does not restore expected state
  3768. // with potential data loss in mind like the first open before
  3769. // crash-recovery verification does. Therefore it always expects no data
  3770. // loss and we should ensure no data loss in testing.
  3771. // TODO(hx235): eliminate the FlushWAL(true /* sync */)/SyncWAL() below
  3772. if (!FLAGS_disable_wal) {
  3773. Status s;
  3774. if (FLAGS_manual_wal_flush_one_in > 0) {
  3775. s = db_->FlushWAL(/*sync=*/true);
  3776. } else {
  3777. s = db_->SyncWAL();
  3778. }
  3779. if (!s.ok()) {
  3780. fprintf(stderr,
  3781. "Error persisting WAL data which is needed before reopening the "
  3782. "DB: %s\n",
  3783. s.ToString().c_str());
  3784. exit(1);
  3785. }
  3786. }
  3787. if (thread->rand.OneIn(2)) {
  3788. Status s = db_->Close();
  3789. if (!s.ok()) {
  3790. fprintf(stderr, "Non-ok close status: %s\n", s.ToString().c_str());
  3791. fflush(stderr);
  3792. }
  3793. assert(s.ok());
  3794. }
  3795. assert((txn_db_ == nullptr && optimistic_txn_db_ == nullptr) ||
  3796. (db_ == txn_db_ || db_ == optimistic_txn_db_));
  3797. delete db_;
  3798. db_ = nullptr;
  3799. txn_db_ = nullptr;
  3800. optimistic_txn_db_ = nullptr;
  3801. delete secondary_db_;
  3802. secondary_db_ = nullptr;
  3803. num_times_reopened_++;
  3804. auto now = clock_->NowMicros();
  3805. fprintf(stdout, "%s Reopening database for the %dth time\n",
  3806. clock_->TimeToString(now / 1000000).c_str(), num_times_reopened_);
  3807. Open(thread->shared, /*reopen=*/true);
  3808. if (thread->shared->GetStressTest()->MightHaveUnsyncedDataLoss() &&
  3809. IsStateTracked()) {
  3810. Status s = thread->shared->SaveAtAndAfter(db_);
  3811. if (!s.ok()) {
  3812. fprintf(stderr, "Error enabling history tracing: %s\n",
  3813. s.ToString().c_str());
  3814. exit(1);
  3815. }
  3816. }
  3817. }
  3818. bool StressTest::MaybeUseOlderTimestampForPointLookup(ThreadState* thread,
  3819. std::string& ts_str,
  3820. Slice& ts_slice,
  3821. ReadOptions& read_opts) {
  3822. if (FLAGS_user_timestamp_size == 0) {
  3823. return false;
  3824. }
  3825. if (!FLAGS_persist_user_defined_timestamps) {
  3826. // Not read with older timestamps to avoid get InvalidArgument.
  3827. return false;
  3828. }
  3829. assert(thread);
  3830. if (!thread->rand.OneInOpt(3)) {
  3831. return false;
  3832. }
  3833. const SharedState* const shared = thread->shared;
  3834. assert(shared);
  3835. const uint64_t start_ts = shared->GetStartTimestamp();
  3836. uint64_t now = db_stress_env->NowNanos();
  3837. assert(now > start_ts);
  3838. uint64_t time_diff = now - start_ts;
  3839. uint64_t ts = start_ts + (thread->rand.Next64() % time_diff);
  3840. ts_str.clear();
  3841. PutFixed64(&ts_str, ts);
  3842. ts_slice = ts_str;
  3843. read_opts.timestamp = &ts_slice;
  3844. return true;
  3845. }
  3846. void StressTest::MaybeUseOlderTimestampForRangeScan(ThreadState* thread,
  3847. std::string& ts_str,
  3848. Slice& ts_slice,
  3849. ReadOptions& read_opts) {
  3850. if (FLAGS_user_timestamp_size == 0) {
  3851. return;
  3852. }
  3853. if (!FLAGS_persist_user_defined_timestamps) {
  3854. // Not read with older timestamps to avoid get InvalidArgument.
  3855. return;
  3856. }
  3857. assert(thread);
  3858. if (!thread->rand.OneInOpt(3)) {
  3859. return;
  3860. }
  3861. const Slice* const saved_ts = read_opts.timestamp;
  3862. assert(saved_ts != nullptr);
  3863. const SharedState* const shared = thread->shared;
  3864. assert(shared);
  3865. const uint64_t start_ts = shared->GetStartTimestamp();
  3866. uint64_t now = db_stress_env->NowNanos();
  3867. assert(now > start_ts);
  3868. uint64_t time_diff = now - start_ts;
  3869. uint64_t ts = start_ts + (thread->rand.Next64() % time_diff);
  3870. ts_str.clear();
  3871. PutFixed64(&ts_str, ts);
  3872. ts_slice = ts_str;
  3873. read_opts.timestamp = &ts_slice;
  3874. // TODO (yanqin): support Merge with iter_start_ts
  3875. if (!thread->rand.OneInOpt(3) || FLAGS_use_merge || FLAGS_use_full_merge_v1) {
  3876. return;
  3877. }
  3878. ts_str.clear();
  3879. PutFixed64(&ts_str, start_ts);
  3880. ts_slice = ts_str;
  3881. read_opts.iter_start_ts = &ts_slice;
  3882. read_opts.timestamp = saved_ts;
  3883. }
  3884. void CheckAndSetOptionsForUserTimestamp(Options& options) {
  3885. assert(FLAGS_user_timestamp_size > 0);
  3886. const Comparator* const cmp = test::BytewiseComparatorWithU64TsWrapper();
  3887. assert(cmp);
  3888. if (FLAGS_user_timestamp_size != cmp->timestamp_size()) {
  3889. fprintf(stderr,
  3890. "Only -user_timestamp_size=%d is supported in stress test.\n",
  3891. static_cast<int>(cmp->timestamp_size()));
  3892. exit(1);
  3893. }
  3894. if (FLAGS_use_txn) {
  3895. fprintf(stderr, "TransactionDB does not support timestamp yet.\n");
  3896. exit(1);
  3897. }
  3898. if (FLAGS_test_cf_consistency || FLAGS_test_batches_snapshots) {
  3899. fprintf(stderr,
  3900. "Due to per-key ts-seq ordering constraint, only the (default) "
  3901. "non-batched test is supported with timestamp.\n");
  3902. exit(1);
  3903. }
  3904. if (FLAGS_ingest_external_file_one_in > 0) {
  3905. fprintf(stderr, "Bulk loading may not support timestamp yet.\n");
  3906. exit(1);
  3907. }
  3908. options.comparator = cmp;
  3909. options.persist_user_defined_timestamps =
  3910. FLAGS_persist_user_defined_timestamps;
  3911. }
  3912. bool ShouldDisableAutoCompactionsBeforeVerifyDb() {
  3913. return !FLAGS_disable_auto_compactions && FLAGS_enable_compaction_filter;
  3914. }
  3915. bool InitializeOptionsFromFile(Options& options) {
  3916. DBOptions db_options;
  3917. ConfigOptions config_options;
  3918. config_options.ignore_unknown_options = false;
  3919. config_options.input_strings_escaped = true;
  3920. config_options.env = db_stress_env;
  3921. std::vector<ColumnFamilyDescriptor> cf_descriptors;
  3922. if (!FLAGS_options_file.empty()) {
  3923. Status s = LoadOptionsFromFile(config_options, FLAGS_options_file,
  3924. &db_options, &cf_descriptors);
  3925. if (!s.ok()) {
  3926. fprintf(stderr, "Unable to load options file %s --- %s\n",
  3927. FLAGS_options_file.c_str(), s.ToString().c_str());
  3928. exit(1);
  3929. }
  3930. db_options.env = new CompositeEnvWrapper(db_stress_env);
  3931. options = Options(db_options, cf_descriptors[0].options);
  3932. return true;
  3933. }
  3934. return false;
  3935. }
  3936. void InitializeOptionsFromFlags(
  3937. const std::shared_ptr<Cache>& cache,
  3938. const std::shared_ptr<const FilterPolicy>& filter_policy,
  3939. Options& options) {
  3940. BlockBasedTableOptions block_based_options;
  3941. block_based_options.decouple_partitioned_filters =
  3942. FLAGS_decouple_partitioned_filters;
  3943. block_based_options.block_cache = cache;
  3944. block_based_options.cache_index_and_filter_blocks =
  3945. FLAGS_cache_index_and_filter_blocks;
  3946. block_based_options.metadata_cache_options.top_level_index_pinning =
  3947. static_cast<PinningTier>(FLAGS_top_level_index_pinning);
  3948. block_based_options.metadata_cache_options.partition_pinning =
  3949. static_cast<PinningTier>(FLAGS_partition_pinning);
  3950. block_based_options.metadata_cache_options.unpartitioned_pinning =
  3951. static_cast<PinningTier>(FLAGS_unpartitioned_pinning);
  3952. block_based_options.checksum = checksum_type_e;
  3953. block_based_options.block_size = FLAGS_block_size;
  3954. block_based_options.cache_usage_options.options_overrides.insert(
  3955. {CacheEntryRole::kCompressionDictionaryBuildingBuffer,
  3956. {/*.charged = */ FLAGS_charge_compression_dictionary_building_buffer
  3957. ? CacheEntryRoleOptions::Decision::kEnabled
  3958. : CacheEntryRoleOptions::Decision::kDisabled}});
  3959. block_based_options.cache_usage_options.options_overrides.insert(
  3960. {CacheEntryRole::kFilterConstruction,
  3961. {/*.charged = */ FLAGS_charge_filter_construction
  3962. ? CacheEntryRoleOptions::Decision::kEnabled
  3963. : CacheEntryRoleOptions::Decision::kDisabled}});
  3964. block_based_options.cache_usage_options.options_overrides.insert(
  3965. {CacheEntryRole::kBlockBasedTableReader,
  3966. {/*.charged = */ FLAGS_charge_table_reader
  3967. ? CacheEntryRoleOptions::Decision::kEnabled
  3968. : CacheEntryRoleOptions::Decision::kDisabled}});
  3969. block_based_options.cache_usage_options.options_overrides.insert(
  3970. {CacheEntryRole::kFileMetadata,
  3971. {/*.charged = */ FLAGS_charge_file_metadata
  3972. ? CacheEntryRoleOptions::Decision::kEnabled
  3973. : CacheEntryRoleOptions::Decision::kDisabled}});
  3974. block_based_options.cache_usage_options.options_overrides.insert(
  3975. {CacheEntryRole::kBlobCache,
  3976. {/*.charged = */ FLAGS_charge_blob_cache
  3977. ? CacheEntryRoleOptions::Decision::kEnabled
  3978. : CacheEntryRoleOptions::Decision::kDisabled}});
  3979. block_based_options.format_version =
  3980. static_cast<uint32_t>(FLAGS_format_version);
  3981. block_based_options.index_block_restart_interval =
  3982. static_cast<int32_t>(FLAGS_index_block_restart_interval);
  3983. block_based_options.filter_policy = filter_policy;
  3984. block_based_options.partition_filters = FLAGS_partition_filters;
  3985. block_based_options.optimize_filters_for_memory =
  3986. FLAGS_optimize_filters_for_memory;
  3987. block_based_options.detect_filter_construct_corruption =
  3988. FLAGS_detect_filter_construct_corruption;
  3989. block_based_options.index_type =
  3990. static_cast<BlockBasedTableOptions::IndexType>(FLAGS_index_type);
  3991. block_based_options.data_block_index_type =
  3992. static_cast<BlockBasedTableOptions::DataBlockIndexType>(
  3993. FLAGS_data_block_index_type);
  3994. block_based_options.prepopulate_block_cache =
  3995. static_cast<BlockBasedTableOptions::PrepopulateBlockCache>(
  3996. FLAGS_prepopulate_block_cache);
  3997. block_based_options.initial_auto_readahead_size =
  3998. FLAGS_initial_auto_readahead_size;
  3999. block_based_options.max_auto_readahead_size = FLAGS_max_auto_readahead_size;
  4000. block_based_options.num_file_reads_for_auto_readahead =
  4001. FLAGS_num_file_reads_for_auto_readahead;
  4002. block_based_options.cache_index_and_filter_blocks_with_high_priority =
  4003. FLAGS_cache_index_and_filter_blocks_with_high_priority;
  4004. block_based_options.use_delta_encoding = FLAGS_use_delta_encoding;
  4005. block_based_options.verify_compression = FLAGS_verify_compression;
  4006. block_based_options.read_amp_bytes_per_bit = FLAGS_read_amp_bytes_per_bit;
  4007. block_based_options.enable_index_compression = FLAGS_enable_index_compression;
  4008. block_based_options.index_shortening =
  4009. static_cast<BlockBasedTableOptions::IndexShorteningMode>(
  4010. FLAGS_index_shortening);
  4011. block_based_options.block_align = FLAGS_block_align;
  4012. block_based_options.super_block_alignment_size =
  4013. fLU64::FLAGS_super_block_alignment_size;
  4014. block_based_options.super_block_alignment_space_overhead_ratio =
  4015. fLU64::FLAGS_super_block_alignment_space_overhead_ratio;
  4016. options.table_factory.reset(NewBlockBasedTableFactory(block_based_options));
  4017. options.db_write_buffer_size = FLAGS_db_write_buffer_size;
  4018. options.write_buffer_size = FLAGS_write_buffer_size;
  4019. options.max_write_buffer_number = FLAGS_max_write_buffer_number;
  4020. options.min_write_buffer_number_to_merge =
  4021. FLAGS_min_write_buffer_number_to_merge;
  4022. options.max_write_buffer_size_to_maintain =
  4023. FLAGS_max_write_buffer_size_to_maintain;
  4024. options.memtable_prefix_bloom_size_ratio =
  4025. FLAGS_memtable_prefix_bloom_size_ratio;
  4026. if (FLAGS_use_write_buffer_manager) {
  4027. options.write_buffer_manager.reset(
  4028. new WriteBufferManager(FLAGS_db_write_buffer_size, block_cache));
  4029. }
  4030. options.memtable_whole_key_filtering = FLAGS_memtable_whole_key_filtering;
  4031. if (ShouldDisableAutoCompactionsBeforeVerifyDb()) {
  4032. options.disable_auto_compactions = true;
  4033. } else {
  4034. options.disable_auto_compactions = FLAGS_disable_auto_compactions;
  4035. }
  4036. options.max_background_compactions = FLAGS_max_background_compactions;
  4037. options.max_background_flushes = FLAGS_max_background_flushes;
  4038. options.compaction_style =
  4039. static_cast<ROCKSDB_NAMESPACE::CompactionStyle>(FLAGS_compaction_style);
  4040. if (options.compaction_style ==
  4041. ROCKSDB_NAMESPACE::CompactionStyle::kCompactionStyleFIFO) {
  4042. options.compaction_options_fifo.allow_compaction =
  4043. FLAGS_fifo_allow_compaction;
  4044. }
  4045. options.compaction_pri =
  4046. static_cast<ROCKSDB_NAMESPACE::CompactionPri>(FLAGS_compaction_pri);
  4047. options.num_levels = FLAGS_num_levels;
  4048. if (FLAGS_prefix_size >= 0) {
  4049. options.prefix_extractor.reset(NewFixedPrefixTransform(FLAGS_prefix_size));
  4050. if (FLAGS_enable_memtable_insert_with_hint_prefix_extractor) {
  4051. options.memtable_insert_with_hint_prefix_extractor =
  4052. options.prefix_extractor;
  4053. }
  4054. }
  4055. options.max_open_files = FLAGS_open_files;
  4056. if (FLAGS_statistics) {
  4057. options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
  4058. }
  4059. options.env = db_stress_env;
  4060. options.use_fsync = FLAGS_use_fsync;
  4061. options.compaction_readahead_size = FLAGS_compaction_readahead_size;
  4062. options.allow_mmap_reads = FLAGS_mmap_read;
  4063. options.allow_mmap_writes = FLAGS_mmap_write;
  4064. options.use_direct_reads = FLAGS_use_direct_reads;
  4065. options.use_direct_io_for_flush_and_compaction =
  4066. FLAGS_use_direct_io_for_flush_and_compaction;
  4067. options.recycle_log_file_num =
  4068. static_cast<size_t>(FLAGS_recycle_log_file_num);
  4069. options.target_file_size_base = FLAGS_target_file_size_base;
  4070. options.target_file_size_multiplier = FLAGS_target_file_size_multiplier;
  4071. options.max_bytes_for_level_base = FLAGS_max_bytes_for_level_base;
  4072. options.max_bytes_for_level_multiplier = FLAGS_max_bytes_for_level_multiplier;
  4073. options.level0_stop_writes_trigger = FLAGS_level0_stop_writes_trigger;
  4074. options.level0_slowdown_writes_trigger = FLAGS_level0_slowdown_writes_trigger;
  4075. options.level0_file_num_compaction_trigger =
  4076. FLAGS_level0_file_num_compaction_trigger;
  4077. options.compression = compression_type_e;
  4078. options.bottommost_compression = bottommost_compression_type_e;
  4079. options.compression_opts.max_dict_bytes = FLAGS_compression_max_dict_bytes;
  4080. options.compression_opts.zstd_max_train_bytes =
  4081. FLAGS_compression_zstd_max_train_bytes;
  4082. options.compression_opts.parallel_threads =
  4083. FLAGS_compression_parallel_threads;
  4084. options.compression_opts.max_dict_buffer_bytes =
  4085. FLAGS_compression_max_dict_buffer_bytes;
  4086. if (ZSTD_FinalizeDictionarySupported()) {
  4087. options.compression_opts.use_zstd_dict_trainer =
  4088. FLAGS_compression_use_zstd_dict_trainer;
  4089. } else if (!FLAGS_compression_use_zstd_dict_trainer) {
  4090. fprintf(
  4091. stdout,
  4092. "WARNING: use_zstd_dict_trainer is false but zstd finalizeDictionary "
  4093. "cannot be used because ZSTD 1.4.5+ is not linked with the binary."
  4094. " zstd dictionary trainer will be used.\n");
  4095. }
  4096. if (FLAGS_compression_checksum) {
  4097. options.compression_opts.checksum = true;
  4098. }
  4099. options.max_manifest_file_size = FLAGS_max_manifest_file_size;
  4100. options.max_subcompactions = static_cast<uint32_t>(FLAGS_subcompactions);
  4101. options.allow_concurrent_memtable_write =
  4102. FLAGS_allow_concurrent_memtable_write;
  4103. options.experimental_mempurge_threshold =
  4104. FLAGS_experimental_mempurge_threshold;
  4105. options.periodic_compaction_seconds = FLAGS_periodic_compaction_seconds;
  4106. options.daily_offpeak_time_utc = FLAGS_daily_offpeak_time_utc;
  4107. options.stats_dump_period_sec =
  4108. static_cast<unsigned int>(FLAGS_stats_dump_period_sec);
  4109. options.ttl = FLAGS_compaction_ttl;
  4110. options.enable_pipelined_write = FLAGS_enable_pipelined_write;
  4111. options.enable_write_thread_adaptive_yield =
  4112. FLAGS_enable_write_thread_adaptive_yield;
  4113. options.compaction_options_universal.size_ratio = FLAGS_universal_size_ratio;
  4114. options.compaction_options_universal.min_merge_width =
  4115. FLAGS_universal_min_merge_width;
  4116. options.compaction_options_universal.max_merge_width =
  4117. FLAGS_universal_max_merge_width;
  4118. options.compaction_options_universal.max_size_amplification_percent =
  4119. FLAGS_universal_max_size_amplification_percent;
  4120. options.compaction_options_universal.max_read_amp =
  4121. FLAGS_universal_max_read_amp;
  4122. options.atomic_flush = FLAGS_atomic_flush;
  4123. options.manual_wal_flush = FLAGS_manual_wal_flush_one_in > 0 ? true : false;
  4124. options.avoid_unnecessary_blocking_io = FLAGS_avoid_unnecessary_blocking_io;
  4125. options.write_dbid_to_manifest = FLAGS_write_dbid_to_manifest;
  4126. options.write_identity_file = FLAGS_write_identity_file;
  4127. options.avoid_flush_during_recovery = FLAGS_avoid_flush_during_recovery;
  4128. options.max_write_batch_group_size_bytes =
  4129. FLAGS_max_write_batch_group_size_bytes;
  4130. options.level_compaction_dynamic_level_bytes =
  4131. FLAGS_level_compaction_dynamic_level_bytes;
  4132. options.track_and_verify_wals_in_manifest = true;
  4133. options.track_and_verify_wals = FLAGS_track_and_verify_wals;
  4134. options.verify_sst_unique_id_in_manifest =
  4135. FLAGS_verify_sst_unique_id_in_manifest;
  4136. options.memtable_protection_bytes_per_key =
  4137. FLAGS_memtable_protection_bytes_per_key;
  4138. options.block_protection_bytes_per_key = FLAGS_block_protection_bytes_per_key;
  4139. options.paranoid_memory_checks = FLAGS_paranoid_memory_checks;
  4140. options.memtable_veirfy_per_key_checksum_on_seek =
  4141. FLAGS_memtable_veirfy_per_key_checksum_on_seek;
  4142. // Integrated BlobDB
  4143. options.enable_blob_files = FLAGS_enable_blob_files;
  4144. options.min_blob_size = FLAGS_min_blob_size;
  4145. options.blob_file_size = FLAGS_blob_file_size;
  4146. options.blob_compression_type =
  4147. StringToCompressionType(FLAGS_blob_compression_type.c_str());
  4148. options.enable_blob_garbage_collection = FLAGS_enable_blob_garbage_collection;
  4149. options.blob_garbage_collection_age_cutoff =
  4150. FLAGS_blob_garbage_collection_age_cutoff;
  4151. options.blob_garbage_collection_force_threshold =
  4152. FLAGS_blob_garbage_collection_force_threshold;
  4153. options.blob_compaction_readahead_size = FLAGS_blob_compaction_readahead_size;
  4154. options.blob_file_starting_level = FLAGS_blob_file_starting_level;
  4155. if (FLAGS_use_blob_cache) {
  4156. if (FLAGS_use_shared_block_and_blob_cache) {
  4157. options.blob_cache = cache;
  4158. } else {
  4159. if (FLAGS_blob_cache_size > 0) {
  4160. LRUCacheOptions co;
  4161. co.capacity = FLAGS_blob_cache_size;
  4162. co.num_shard_bits = FLAGS_blob_cache_numshardbits;
  4163. options.blob_cache = NewLRUCache(co);
  4164. } else {
  4165. fprintf(stderr,
  4166. "Unable to create a standalone blob cache if blob_cache_size "
  4167. "<= 0.\n");
  4168. exit(1);
  4169. }
  4170. }
  4171. switch (FLAGS_prepopulate_blob_cache) {
  4172. case 0:
  4173. options.prepopulate_blob_cache = PrepopulateBlobCache::kDisable;
  4174. break;
  4175. case 1:
  4176. options.prepopulate_blob_cache = PrepopulateBlobCache::kFlushOnly;
  4177. break;
  4178. default:
  4179. fprintf(stderr, "Unknown prepopulate blob cache mode\n");
  4180. exit(1);
  4181. }
  4182. }
  4183. options.wal_compression =
  4184. StringToCompressionType(FLAGS_wal_compression.c_str());
  4185. options.last_level_temperature =
  4186. StringToTemperature(FLAGS_last_level_temperature.c_str());
  4187. options.default_write_temperature =
  4188. StringToTemperature(FLAGS_default_write_temperature.c_str());
  4189. options.default_temperature =
  4190. StringToTemperature(FLAGS_default_temperature.c_str());
  4191. if (!FLAGS_file_temperature_age_thresholds.empty()) {
  4192. const std::string allowTrivialCopyBoolStr =
  4193. FLAGS_allow_trivial_copy_when_change_temperature ? "true" : "false";
  4194. Status s = GetColumnFamilyOptionsFromString(
  4195. {}, options,
  4196. "compaction_options_fifo={file_temperature_age_thresholds=" +
  4197. FLAGS_file_temperature_age_thresholds +
  4198. ";allow_trivial_copy_when_change_temperature=" +
  4199. allowTrivialCopyBoolStr + "}",
  4200. &options);
  4201. if (!s.ok()) {
  4202. fprintf(stderr, "While setting file_temperature_age_thresholds: %s\n",
  4203. s.ToString().c_str());
  4204. exit(1);
  4205. }
  4206. }
  4207. // NOTE: allow -1 to mean starting disabled but dynamically changing
  4208. options.preclude_last_level_data_seconds =
  4209. std::max(FLAGS_preclude_last_level_data_seconds, int64_t{0});
  4210. options.preserve_internal_time_seconds = FLAGS_preserve_internal_time_seconds;
  4211. switch (FLAGS_rep_factory) {
  4212. case kSkipList:
  4213. // no need to do anything
  4214. break;
  4215. case kHashSkipList:
  4216. options.memtable_factory.reset(NewHashSkipListRepFactory(10000));
  4217. break;
  4218. case kVectorRep:
  4219. options.memtable_factory.reset(new VectorRepFactory());
  4220. break;
  4221. }
  4222. InitializeMergeOperator(options);
  4223. if (FLAGS_enable_compaction_filter) {
  4224. options.compaction_filter_factory =
  4225. std::make_shared<DbStressCompactionFilterFactory>();
  4226. }
  4227. options.best_efforts_recovery = FLAGS_best_efforts_recovery;
  4228. options.paranoid_file_checks = FLAGS_paranoid_file_checks;
  4229. if (FLAGS_user_timestamp_size > 0) {
  4230. CheckAndSetOptionsForUserTimestamp(options);
  4231. }
  4232. options.allow_data_in_errors = FLAGS_allow_data_in_errors;
  4233. options.enable_thread_tracking = FLAGS_enable_thread_tracking;
  4234. options.memtable_max_range_deletions = FLAGS_memtable_max_range_deletions;
  4235. options.bottommost_file_compaction_delay =
  4236. FLAGS_bottommost_file_compaction_delay;
  4237. options.allow_fallocate = FLAGS_allow_fallocate;
  4238. options.table_cache_numshardbits = FLAGS_table_cache_numshardbits;
  4239. options.log_readahead_size = FLAGS_log_readahead_size;
  4240. options.bgerror_resume_retry_interval = FLAGS_bgerror_resume_retry_interval;
  4241. options.delete_obsolete_files_period_micros =
  4242. FLAGS_delete_obsolete_files_period_micros;
  4243. options.max_log_file_size = FLAGS_max_log_file_size;
  4244. options.log_file_time_to_roll = FLAGS_log_file_time_to_roll;
  4245. options.use_adaptive_mutex = FLAGS_use_adaptive_mutex;
  4246. options.advise_random_on_open = FLAGS_advise_random_on_open;
  4247. // TODO (hx235): test the functionality of `WAL_ttl_seconds`,
  4248. // `WAL_size_limit_MB` i.e, `GetUpdatesSince()`
  4249. options.WAL_ttl_seconds = FLAGS_WAL_ttl_seconds;
  4250. options.WAL_size_limit_MB = FLAGS_WAL_size_limit_MB;
  4251. options.wal_bytes_per_sync = FLAGS_wal_bytes_per_sync;
  4252. options.strict_bytes_per_sync = FLAGS_strict_bytes_per_sync;
  4253. options.avoid_flush_during_shutdown = FLAGS_avoid_flush_during_shutdown;
  4254. options.dump_malloc_stats = FLAGS_dump_malloc_stats;
  4255. options.stats_history_buffer_size = FLAGS_stats_history_buffer_size;
  4256. options.skip_stats_update_on_db_open = FLAGS_skip_stats_update_on_db_open;
  4257. options.optimize_filters_for_hits = FLAGS_optimize_filters_for_hits;
  4258. options.sample_for_compression = FLAGS_sample_for_compression;
  4259. options.report_bg_io_stats = FLAGS_report_bg_io_stats;
  4260. options.manifest_preallocation_size = FLAGS_manifest_preallocation_size;
  4261. if (FLAGS_enable_checksum_handoff) {
  4262. options.checksum_handoff_file_types = {FileTypeSet::All()};
  4263. } else {
  4264. options.checksum_handoff_file_types = {};
  4265. }
  4266. options.max_total_wal_size = FLAGS_max_total_wal_size;
  4267. options.soft_pending_compaction_bytes_limit =
  4268. FLAGS_soft_pending_compaction_bytes_limit;
  4269. options.hard_pending_compaction_bytes_limit =
  4270. FLAGS_hard_pending_compaction_bytes_limit;
  4271. options.max_sequential_skip_in_iterations =
  4272. FLAGS_max_sequential_skip_in_iterations;
  4273. if (FLAGS_enable_sst_partitioner_factory) {
  4274. options.sst_partitioner_factory = std::shared_ptr<SstPartitionerFactory>(
  4275. NewSstPartitionerFixedPrefixFactory(1));
  4276. }
  4277. options.lowest_used_cache_tier =
  4278. static_cast<CacheTier>(FLAGS_lowest_used_cache_tier);
  4279. options.inplace_update_support = FLAGS_inplace_update_support;
  4280. options.uncache_aggressiveness = FLAGS_uncache_aggressiveness;
  4281. options.memtable_op_scan_flush_trigger = FLAGS_memtable_op_scan_flush_trigger;
  4282. options.compaction_options_universal.reduce_file_locking =
  4283. FLAGS_universal_reduce_file_locking;
  4284. }
  4285. void InitializeOptionsGeneral(
  4286. const std::shared_ptr<Cache>& cache,
  4287. const std::shared_ptr<const FilterPolicy>& filter_policy,
  4288. const std::shared_ptr<SstQueryFilterConfigsManager::Factory>& sqfc_factory,
  4289. Options& options) {
  4290. options.create_missing_column_families = true;
  4291. options.create_if_missing = true;
  4292. if (FLAGS_statistics) {
  4293. options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
  4294. }
  4295. if (options.env == Options().env) {
  4296. options.env = db_stress_env;
  4297. }
  4298. assert(options.table_factory);
  4299. auto table_options =
  4300. options.table_factory->GetOptions<BlockBasedTableOptions>();
  4301. if (table_options) {
  4302. if (FLAGS_cache_size > 0) {
  4303. table_options->block_cache = cache;
  4304. }
  4305. if (!table_options->filter_policy) {
  4306. table_options->filter_policy = filter_policy;
  4307. }
  4308. }
  4309. // TODO: row_cache, thread-pool IO priority, CPU priority.
  4310. if (!options.rate_limiter) {
  4311. if (FLAGS_rate_limiter_bytes_per_sec > 0) {
  4312. options.rate_limiter.reset(NewGenericRateLimiter(
  4313. FLAGS_rate_limiter_bytes_per_sec, 1000 /* refill_period_us */,
  4314. 10 /* fairness */,
  4315. FLAGS_rate_limit_bg_reads ? RateLimiter::Mode::kReadsOnly
  4316. : RateLimiter::Mode::kWritesOnly));
  4317. }
  4318. }
  4319. if (!options.file_checksum_gen_factory) {
  4320. options.file_checksum_gen_factory =
  4321. GetFileChecksumImpl(FLAGS_file_checksum_impl);
  4322. }
  4323. if (FLAGS_sst_file_manager_bytes_per_sec > 0 ||
  4324. FLAGS_sst_file_manager_bytes_per_truncate > 0) {
  4325. Status status;
  4326. options.sst_file_manager.reset(NewSstFileManager(
  4327. db_stress_env, options.info_log, "" /* trash_dir */,
  4328. static_cast<int64_t>(FLAGS_sst_file_manager_bytes_per_sec),
  4329. true /* delete_existing_trash */, &status,
  4330. 0.25 /* max_trash_db_ratio */,
  4331. FLAGS_sst_file_manager_bytes_per_truncate));
  4332. if (!status.ok()) {
  4333. fprintf(stderr, "SstFileManager creation failed: %s\n",
  4334. status.ToString().c_str());
  4335. exit(1);
  4336. }
  4337. }
  4338. if (FLAGS_preserve_unverified_changes) {
  4339. if (!options.avoid_flush_during_recovery) {
  4340. fprintf(stderr,
  4341. "WARNING: flipping `avoid_flush_during_recovery` to true for "
  4342. "`preserve_unverified_changes` to keep all files\n");
  4343. options.avoid_flush_during_recovery = true;
  4344. }
  4345. // Together with `avoid_flush_during_recovery == true`, this will prevent
  4346. // live files from becoming obsolete and deleted between `DB::Open()` and
  4347. // `DisableFileDeletions()` due to flush or compaction. We do not need to
  4348. // warn the user since we will reenable compaction soon.
  4349. options.disable_auto_compactions = true;
  4350. }
  4351. options.table_properties_collector_factories.clear();
  4352. options.table_properties_collector_factories.emplace_back(
  4353. std::make_shared<DbStressTablePropertiesCollectorFactory>());
  4354. if (sqfc_factory && !sqfc_factory->GetConfigs().IsEmptyNotFound()) {
  4355. options.table_properties_collector_factories.emplace_back(sqfc_factory);
  4356. }
  4357. // Add CompactOnDeletionCollectorFactory if enabled
  4358. if (FLAGS_enable_compaction_on_deletion_trigger) {
  4359. options.table_properties_collector_factories.emplace_back(
  4360. ROCKSDB_NAMESPACE::NewCompactOnDeletionCollectorFactory(
  4361. FLAGS_compaction_on_deletion_window_size,
  4362. FLAGS_compaction_on_deletion_trigger_count,
  4363. FLAGS_compaction_on_deletion_ratio,
  4364. FLAGS_compaction_on_deletion_min_file_size));
  4365. }
  4366. }
  4367. } // namespace ROCKSDB_NAMESPACE
  4368. #endif // GFLAGS