version_set_test.cc 163 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645264626472648264926502651265226532654265526562657265826592660266126622663266426652666266726682669267026712672267326742675267626772678267926802681268226832684268526862687268826892690269126922693269426952696269726982699270027012702270327042705270627072708270927102711271227132714271527162717271827192720272127222723272427252726272727282729273027312732273327342735273627372738273927402741274227432744274527462747274827492750275127522753275427552756275727582759276027612762276327642765276627672768276927702771277227732774277527762777277827792780278127822783278427852786278727882789279027912792279327942795279627972798279928002801280228032804280528062807280828092810281128122813281428152816281728182819282028212822282328242825282628272828282928302831283228332834283528362837283828392840284128422843284428452846284728482849285028512852285328542855285628572858285928602861286228632864286528662867286828692870287128722873287428752876287728782879288028812882288328842885288628872888288928902891289228932894289528962897289828992900290129022903290429052906290729082909291029112912291329142915291629172918291929202921292229232924292529262927292829292930293129322933293429352936293729382939294029412942294329442945294629472948294929502951295229532954295529562957295829592960296129622963296429652966296729682969297029712972297329742975297629772978297929802981298229832984298529862987298829892990299129922993299429952996299729982999300030013002300330043005300630073008300930103011301230133014301530163017301830193020302130223023302430253026302730283029303030313032303330343035303630373038303930403041304230433044304530463047304830493050305130523053305430553056305730583059306030613062306330643065306630673068306930703071307230733074307530763077307830793080308130823083308430853086308730883089309030913092309330943095309630973098309931003101310231033104310531063107310831093110311131123113311431153116311731183119312031213122312331243125312631273128312931303131313231333134313531363137313831393140314131423143314431453146314731483149315031513152315331543155315631573158315931603161316231633164316531663167316831693170317131723173317431753176317731783179318031813182318331843185318631873188318931903191319231933194319531963197319831993200320132023203320432053206320732083209321032113212321332143215321632173218321932203221322232233224322532263227322832293230323132323233323432353236323732383239324032413242324332443245324632473248324932503251325232533254325532563257325832593260326132623263326432653266326732683269327032713272327332743275327632773278327932803281328232833284328532863287328832893290329132923293329432953296329732983299330033013302330333043305330633073308330933103311331233133314331533163317331833193320332133223323332433253326332733283329333033313332333333343335333633373338333933403341334233433344334533463347334833493350335133523353335433553356335733583359336033613362336333643365336633673368336933703371337233733374337533763377337833793380338133823383338433853386338733883389339033913392339333943395339633973398339934003401340234033404340534063407340834093410341134123413341434153416341734183419342034213422342334243425342634273428342934303431343234333434343534363437343834393440344134423443344434453446344734483449345034513452345334543455345634573458345934603461346234633464346534663467346834693470347134723473347434753476347734783479348034813482348334843485348634873488348934903491349234933494349534963497349834993500350135023503350435053506350735083509351035113512351335143515351635173518351935203521352235233524352535263527352835293530353135323533353435353536353735383539354035413542354335443545354635473548354935503551355235533554355535563557355835593560356135623563356435653566356735683569357035713572357335743575357635773578357935803581358235833584358535863587358835893590359135923593359435953596359735983599360036013602360336043605360636073608360936103611361236133614361536163617361836193620362136223623362436253626362736283629363036313632363336343635363636373638363936403641364236433644364536463647364836493650365136523653365436553656365736583659366036613662366336643665366636673668366936703671367236733674367536763677367836793680368136823683368436853686368736883689369036913692369336943695369636973698369937003701370237033704370537063707370837093710371137123713371437153716371737183719372037213722372337243725372637273728372937303731373237333734373537363737373837393740374137423743374437453746374737483749375037513752375337543755375637573758375937603761376237633764376537663767376837693770377137723773377437753776377737783779378037813782378337843785378637873788378937903791379237933794379537963797379837993800380138023803380438053806380738083809381038113812381338143815381638173818381938203821382238233824382538263827382838293830383138323833383438353836383738383839384038413842384338443845384638473848384938503851385238533854385538563857385838593860386138623863386438653866386738683869387038713872387338743875387638773878387938803881388238833884388538863887388838893890389138923893389438953896389738983899390039013902390339043905390639073908390939103911391239133914391539163917391839193920392139223923392439253926392739283929393039313932393339343935393639373938393939403941394239433944394539463947394839493950395139523953395439553956395739583959396039613962396339643965396639673968396939703971397239733974397539763977397839793980398139823983398439853986398739883989399039913992399339943995399639973998399940004001400240034004400540064007400840094010401140124013401440154016401740184019402040214022402340244025402640274028402940304031403240334034403540364037403840394040404140424043404440454046404740484049405040514052405340544055405640574058405940604061406240634064406540664067406840694070407140724073407440754076407740784079408040814082408340844085408640874088408940904091409240934094409540964097409840994100410141024103410441054106410741084109411041114112411341144115411641174118411941204121412241234124412541264127412841294130413141324133413441354136413741384139414041414142414341444145414641474148414941504151415241534154415541564157415841594160416141624163416441654166416741684169417041714172417341744175417641774178417941804181418241834184418541864187418841894190419141924193419441954196419741984199420042014202420342044205420642074208420942104211421242134214421542164217421842194220422142224223422442254226422742284229423042314232423342344235423642374238423942404241424242434244424542464247424842494250425142524253425442554256425742584259426042614262426342644265426642674268426942704271427242734274427542764277427842794280428142824283428442854286428742884289429042914292429342944295429642974298429943004301430243034304430543064307430843094310431143124313431443154316431743184319432043214322432343244325432643274328432943304331433243334334433543364337433843394340434143424343434443454346434743484349435043514352435343544355435643574358435943604361436243634364436543664367436843694370437143724373437443754376437743784379438043814382438343844385438643874388438943904391439243934394439543964397439843994400440144024403440444054406440744084409441044114412441344144415441644174418441944204421442244234424442544264427442844294430443144324433443444354436443744384439444044414442444344444445444644474448444944504451445244534454445544564457445844594460446144624463446444654466446744684469447044714472447344744475447644774478447944804481448244834484448544864487
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #include "db/version_set.h"
  10. #include <algorithm>
  11. #include "db/blob/blob_log_writer.h"
  12. #include "db/db_impl/db_impl.h"
  13. #include "db/db_test_util.h"
  14. #include "db/log_writer.h"
  15. #include "db/manifest_ops.h"
  16. #include "db/version_edit.h"
  17. #include "rocksdb/advanced_options.h"
  18. #include "rocksdb/convenience.h"
  19. #include "rocksdb/file_system.h"
  20. #include "table/block_based/block_based_table_factory.h"
  21. #include "table/mock_table.h"
  22. #include "table/unique_id_impl.h"
  23. #include "test_util/mock_time_env.h"
  24. #include "test_util/testharness.h"
  25. #include "test_util/testutil.h"
  26. #include "util/defer.h"
  27. #include "util/string_util.h"
  28. namespace ROCKSDB_NAMESPACE {
  29. class GenerateLevelFilesBriefTest : public testing::Test {
  30. public:
  31. std::vector<FileMetaData*> files_;
  32. LevelFilesBrief file_level_;
  33. Arena arena_;
  34. GenerateLevelFilesBriefTest() = default;
  35. ~GenerateLevelFilesBriefTest() override {
  36. for (size_t i = 0; i < files_.size(); i++) {
  37. delete files_[i];
  38. }
  39. }
  40. void Add(const char* smallest, const char* largest,
  41. SequenceNumber smallest_seq = 100,
  42. SequenceNumber largest_seq = 100) {
  43. FileMetaData* f = new FileMetaData(
  44. files_.size() + 1, 0, 0,
  45. InternalKey(smallest, smallest_seq, kTypeValue),
  46. InternalKey(largest, largest_seq, kTypeValue), smallest_seq,
  47. largest_seq, /* marked_for_compact */ false, Temperature::kUnknown,
  48. kInvalidBlobFileNumber, kUnknownOldestAncesterTime,
  49. kUnknownFileCreationTime, kUnknownEpochNumber, kUnknownFileChecksum,
  50. kUnknownFileChecksumFuncName, kNullUniqueId64x2, 0, 0,
  51. /* user_defined_timestamps_persisted */ true);
  52. files_.push_back(f);
  53. }
  54. int Compare() {
  55. int diff = 0;
  56. for (size_t i = 0; i < files_.size(); i++) {
  57. if (file_level_.files[i].fd.GetNumber() != files_[i]->fd.GetNumber()) {
  58. diff++;
  59. }
  60. }
  61. return diff;
  62. }
  63. };
  64. TEST_F(GenerateLevelFilesBriefTest, Empty) {
  65. DoGenerateLevelFilesBrief(&file_level_, files_, &arena_);
  66. ASSERT_EQ(0u, file_level_.num_files);
  67. ASSERT_EQ(0, Compare());
  68. }
  69. TEST_F(GenerateLevelFilesBriefTest, Single) {
  70. Add("p", "q");
  71. DoGenerateLevelFilesBrief(&file_level_, files_, &arena_);
  72. ASSERT_EQ(1u, file_level_.num_files);
  73. ASSERT_EQ(0, Compare());
  74. }
  75. TEST_F(GenerateLevelFilesBriefTest, Multiple) {
  76. Add("150", "200");
  77. Add("200", "250");
  78. Add("300", "350");
  79. Add("400", "450");
  80. DoGenerateLevelFilesBrief(&file_level_, files_, &arena_);
  81. ASSERT_EQ(4u, file_level_.num_files);
  82. ASSERT_EQ(0, Compare());
  83. }
  84. class CountingLogger : public Logger {
  85. public:
  86. CountingLogger() : log_count(0) {}
  87. using Logger::Logv;
  88. void Logv(const char* /*format*/, va_list /*ap*/) override { log_count++; }
  89. int log_count;
  90. };
  91. Options GetOptionsWithNumLevels(int num_levels,
  92. std::shared_ptr<CountingLogger> logger) {
  93. Options opt;
  94. opt.num_levels = num_levels;
  95. opt.info_log = logger;
  96. return opt;
  97. }
  98. class VersionStorageInfoTestBase : public testing::Test {
  99. public:
  100. const Comparator* ucmp_;
  101. InternalKeyComparator icmp_;
  102. std::shared_ptr<CountingLogger> logger_;
  103. Options options_;
  104. ImmutableOptions ioptions_;
  105. MutableCFOptions mutable_cf_options_;
  106. VersionStorageInfo vstorage_;
  107. InternalKey GetInternalKey(const char* ukey,
  108. SequenceNumber smallest_seq = 100) {
  109. return InternalKey(ukey, smallest_seq, kTypeValue);
  110. }
  111. explicit VersionStorageInfoTestBase(const Comparator* ucmp)
  112. : ucmp_(ucmp),
  113. icmp_(ucmp_),
  114. logger_(new CountingLogger()),
  115. options_(GetOptionsWithNumLevels(6, logger_)),
  116. ioptions_(options_),
  117. mutable_cf_options_(options_),
  118. vstorage_(&icmp_, ucmp_, 6, kCompactionStyleLevel,
  119. /*src_vstorage=*/nullptr,
  120. /*_force_consistency_checks=*/false,
  121. EpochNumberRequirement::kMustPresent, ioptions_.clock,
  122. mutable_cf_options_.bottommost_file_compaction_delay,
  123. OffpeakTimeOption()) {}
  124. ~VersionStorageInfoTestBase() override {
  125. for (int i = 0; i < vstorage_.num_levels(); ++i) {
  126. for (auto* f : vstorage_.LevelFiles(i)) {
  127. if (--f->refs == 0) {
  128. delete f;
  129. }
  130. }
  131. }
  132. }
  133. void Add(int level, uint32_t file_number, const char* smallest,
  134. const char* largest, uint64_t file_size = 0,
  135. uint64_t oldest_blob_file_number = kInvalidBlobFileNumber,
  136. uint64_t compensated_range_deletion_size = 0) {
  137. constexpr SequenceNumber dummy_seq = 0;
  138. Add(level, file_number, GetInternalKey(smallest, dummy_seq),
  139. GetInternalKey(largest, dummy_seq), file_size, oldest_blob_file_number,
  140. compensated_range_deletion_size);
  141. }
  142. void Add(int level, uint32_t file_number, const InternalKey& smallest,
  143. const InternalKey& largest, uint64_t file_size = 0,
  144. uint64_t oldest_blob_file_number = kInvalidBlobFileNumber,
  145. uint64_t compensated_range_deletion_size = 0) {
  146. assert(level < vstorage_.num_levels());
  147. FileMetaData* f = new FileMetaData(
  148. file_number, 0, file_size, smallest, largest, /* smallest_seq */ 0,
  149. /* largest_seq */ 0, /* marked_for_compact */ false,
  150. Temperature::kUnknown, oldest_blob_file_number,
  151. kUnknownOldestAncesterTime, kUnknownFileCreationTime,
  152. kUnknownEpochNumber, kUnknownFileChecksum, kUnknownFileChecksumFuncName,
  153. kNullUniqueId64x2, compensated_range_deletion_size, 0,
  154. /* user_defined_timestamps_persisted */ true);
  155. vstorage_.AddFile(level, f);
  156. }
  157. void AddBlob(uint64_t blob_file_number, uint64_t total_blob_count,
  158. uint64_t total_blob_bytes,
  159. BlobFileMetaData::LinkedSsts linked_ssts,
  160. uint64_t garbage_blob_count, uint64_t garbage_blob_bytes) {
  161. auto shared_meta = SharedBlobFileMetaData::Create(
  162. blob_file_number, total_blob_count, total_blob_bytes,
  163. /* checksum_method */ std::string(),
  164. /* checksum_value */ std::string());
  165. auto meta =
  166. BlobFileMetaData::Create(std::move(shared_meta), std::move(linked_ssts),
  167. garbage_blob_count, garbage_blob_bytes);
  168. vstorage_.AddBlobFile(std::move(meta));
  169. }
  170. void UpdateVersionStorageInfo() {
  171. vstorage_.PrepareForVersionAppend(ioptions_, mutable_cf_options_);
  172. vstorage_.SetFinalized();
  173. }
  174. std::string GetOverlappingFiles(int level, const InternalKey& begin,
  175. const InternalKey& end) {
  176. std::vector<FileMetaData*> inputs;
  177. vstorage_.GetOverlappingInputs(level, &begin, &end, &inputs);
  178. std::string result;
  179. for (size_t i = 0; i < inputs.size(); ++i) {
  180. if (i > 0) {
  181. result += ",";
  182. }
  183. AppendNumberTo(&result, inputs[i]->fd.GetNumber());
  184. }
  185. return result;
  186. }
  187. };
  188. class VersionStorageInfoTest : public VersionStorageInfoTestBase {
  189. public:
  190. VersionStorageInfoTest() : VersionStorageInfoTestBase(BytewiseComparator()) {}
  191. ~VersionStorageInfoTest() override = default;
  192. };
  193. TEST_F(VersionStorageInfoTest, MaxBytesForLevelStatic) {
  194. ioptions_.level_compaction_dynamic_level_bytes = false;
  195. mutable_cf_options_.max_bytes_for_level_base = 10;
  196. mutable_cf_options_.max_bytes_for_level_multiplier = 5;
  197. Add(4, 100U, "1", "2", 100U);
  198. Add(5, 101U, "1", "2", 100U);
  199. UpdateVersionStorageInfo();
  200. ASSERT_EQ(vstorage_.MaxBytesForLevel(1), 10U);
  201. ASSERT_EQ(vstorage_.MaxBytesForLevel(2), 50U);
  202. ASSERT_EQ(vstorage_.MaxBytesForLevel(3), 250U);
  203. ASSERT_EQ(vstorage_.MaxBytesForLevel(4), 1250U);
  204. ASSERT_EQ(0, logger_->log_count);
  205. }
  206. TEST_F(VersionStorageInfoTest, MaxBytesForLevelDynamic_1) {
  207. ioptions_.level_compaction_dynamic_level_bytes = true;
  208. mutable_cf_options_.max_bytes_for_level_base = 1000;
  209. mutable_cf_options_.max_bytes_for_level_multiplier = 5;
  210. Add(5, 1U, "1", "2", 500U);
  211. UpdateVersionStorageInfo();
  212. ASSERT_EQ(0, logger_->log_count);
  213. ASSERT_EQ(vstorage_.base_level(), 5);
  214. }
  215. TEST_F(VersionStorageInfoTest, MaxBytesForLevelDynamic_2) {
  216. ioptions_.level_compaction_dynamic_level_bytes = true;
  217. mutable_cf_options_.max_bytes_for_level_base = 1000;
  218. mutable_cf_options_.max_bytes_for_level_multiplier = 5;
  219. Add(5, 1U, "1", "2", 500U);
  220. Add(5, 2U, "3", "4", 550U);
  221. UpdateVersionStorageInfo();
  222. ASSERT_EQ(0, logger_->log_count);
  223. ASSERT_EQ(vstorage_.MaxBytesForLevel(4), 1000U);
  224. ASSERT_EQ(vstorage_.base_level(), 4);
  225. }
  226. TEST_F(VersionStorageInfoTest, MaxBytesForLevelDynamic_3) {
  227. ioptions_.level_compaction_dynamic_level_bytes = true;
  228. mutable_cf_options_.max_bytes_for_level_base = 1000;
  229. mutable_cf_options_.max_bytes_for_level_multiplier = 5;
  230. Add(5, 1U, "1", "2", 500U);
  231. Add(5, 2U, "3", "4", 550U);
  232. Add(4, 3U, "3", "4", 550U);
  233. UpdateVersionStorageInfo();
  234. ASSERT_EQ(0, logger_->log_count);
  235. ASSERT_EQ(vstorage_.MaxBytesForLevel(4), 1000U);
  236. ASSERT_EQ(vstorage_.base_level(), 4);
  237. }
  238. TEST_F(VersionStorageInfoTest, MaxBytesForLevelDynamic_4) {
  239. ioptions_.level_compaction_dynamic_level_bytes = true;
  240. mutable_cf_options_.max_bytes_for_level_base = 1000;
  241. mutable_cf_options_.max_bytes_for_level_multiplier = 5;
  242. Add(5, 1U, "1", "2", 500U);
  243. Add(5, 2U, "3", "4", 550U);
  244. Add(4, 3U, "3", "4", 550U);
  245. Add(3, 4U, "3", "4", 250U);
  246. Add(3, 5U, "5", "7", 300U);
  247. UpdateVersionStorageInfo();
  248. ASSERT_EQ(1, logger_->log_count);
  249. ASSERT_EQ(vstorage_.MaxBytesForLevel(4), 1005U);
  250. ASSERT_EQ(vstorage_.MaxBytesForLevel(3), 1000U);
  251. ASSERT_EQ(vstorage_.base_level(), 3);
  252. }
  253. TEST_F(VersionStorageInfoTest, MaxBytesForLevelDynamic_5) {
  254. ioptions_.level_compaction_dynamic_level_bytes = true;
  255. mutable_cf_options_.max_bytes_for_level_base = 1000;
  256. mutable_cf_options_.max_bytes_for_level_multiplier = 5;
  257. Add(5, 1U, "1", "2", 500U);
  258. Add(5, 2U, "3", "4", 550U);
  259. Add(4, 3U, "3", "4", 550U);
  260. Add(3, 4U, "3", "4", 250U);
  261. Add(3, 5U, "5", "7", 300U);
  262. Add(1, 6U, "3", "4", 5U);
  263. Add(1, 7U, "8", "9", 5U);
  264. UpdateVersionStorageInfo();
  265. ASSERT_EQ(1, logger_->log_count);
  266. ASSERT_GT(vstorage_.MaxBytesForLevel(4), 1005U);
  267. ASSERT_GT(vstorage_.MaxBytesForLevel(3), 1005U);
  268. ASSERT_EQ(vstorage_.MaxBytesForLevel(2), 1005U);
  269. ASSERT_EQ(vstorage_.MaxBytesForLevel(1), 1000U);
  270. ASSERT_EQ(vstorage_.base_level(), 1);
  271. }
  272. TEST_F(VersionStorageInfoTest, MaxBytesForLevelDynamicLotsOfData) {
  273. ioptions_.level_compaction_dynamic_level_bytes = true;
  274. mutable_cf_options_.max_bytes_for_level_base = 100;
  275. mutable_cf_options_.max_bytes_for_level_multiplier = 2;
  276. Add(0, 1U, "1", "2", 50U);
  277. Add(1, 2U, "1", "2", 50U);
  278. Add(2, 3U, "1", "2", 500U);
  279. Add(3, 4U, "1", "2", 500U);
  280. Add(4, 5U, "1", "2", 1700U);
  281. Add(5, 6U, "1", "2", 500U);
  282. UpdateVersionStorageInfo();
  283. ASSERT_EQ(vstorage_.MaxBytesForLevel(4), 800U);
  284. ASSERT_EQ(vstorage_.MaxBytesForLevel(3), 400U);
  285. ASSERT_EQ(vstorage_.MaxBytesForLevel(2), 200U);
  286. ASSERT_EQ(vstorage_.MaxBytesForLevel(1), 100U);
  287. ASSERT_EQ(vstorage_.base_level(), 1);
  288. ASSERT_EQ(0, logger_->log_count);
  289. }
  290. TEST_F(VersionStorageInfoTest, MaxBytesForLevelDynamicLargeLevel) {
  291. uint64_t kOneGB = 1000U * 1000U * 1000U;
  292. ioptions_.level_compaction_dynamic_level_bytes = true;
  293. mutable_cf_options_.max_bytes_for_level_base = 10U * kOneGB;
  294. mutable_cf_options_.max_bytes_for_level_multiplier = 10;
  295. Add(0, 1U, "1", "2", 50U);
  296. Add(3, 4U, "1", "2", 32U * kOneGB);
  297. Add(4, 5U, "1", "2", 500U * kOneGB);
  298. Add(5, 6U, "1", "2", 3000U * kOneGB);
  299. UpdateVersionStorageInfo();
  300. ASSERT_EQ(vstorage_.MaxBytesForLevel(5), 3000U * kOneGB);
  301. ASSERT_EQ(vstorage_.MaxBytesForLevel(4), 300U * kOneGB);
  302. ASSERT_EQ(vstorage_.MaxBytesForLevel(3), 30U * kOneGB);
  303. ASSERT_EQ(vstorage_.MaxBytesForLevel(2), 10U * kOneGB);
  304. ASSERT_EQ(vstorage_.base_level(), 2);
  305. ASSERT_EQ(0, logger_->log_count);
  306. }
  307. TEST_F(VersionStorageInfoTest, MaxBytesForLevelDynamicWithLargeL0_1) {
  308. ioptions_.level_compaction_dynamic_level_bytes = true;
  309. mutable_cf_options_.max_bytes_for_level_base = 40000;
  310. mutable_cf_options_.max_bytes_for_level_multiplier = 5;
  311. mutable_cf_options_.level0_file_num_compaction_trigger = 2;
  312. Add(0, 1U, "1", "2", 10000U);
  313. Add(0, 2U, "1", "2", 10000U);
  314. Add(0, 3U, "1", "2", 10000U);
  315. Add(5, 4U, "1", "2", 1286250U);
  316. Add(4, 5U, "1", "2", 200000U);
  317. Add(3, 6U, "1", "2", 40000U);
  318. Add(2, 7U, "1", "2", 8000U);
  319. UpdateVersionStorageInfo();
  320. ASSERT_EQ(0, logger_->log_count);
  321. ASSERT_EQ(2, vstorage_.base_level());
  322. // level multiplier should be 3.5
  323. ASSERT_EQ(vstorage_.level_multiplier(), 5.0);
  324. ASSERT_EQ(40000U, vstorage_.MaxBytesForLevel(2));
  325. ASSERT_EQ(51450U, vstorage_.MaxBytesForLevel(3));
  326. ASSERT_EQ(257250U, vstorage_.MaxBytesForLevel(4));
  327. vstorage_.ComputeCompactionScore(ioptions_, mutable_cf_options_);
  328. // Only L0 hits compaction.
  329. ASSERT_EQ(vstorage_.CompactionScoreLevel(0), 0);
  330. }
  331. TEST_F(VersionStorageInfoTest, MaxBytesForLevelDynamicWithLargeL0_2) {
  332. ioptions_.level_compaction_dynamic_level_bytes = true;
  333. mutable_cf_options_.max_bytes_for_level_base = 10000;
  334. mutable_cf_options_.max_bytes_for_level_multiplier = 5;
  335. mutable_cf_options_.level0_file_num_compaction_trigger = 4;
  336. Add(0, 11U, "1", "2", 10000U);
  337. Add(0, 12U, "1", "2", 10000U);
  338. Add(0, 13U, "1", "2", 10000U);
  339. // Level size should be around 10,000, 10,290, 51,450, 257,250
  340. Add(5, 4U, "1", "2", 1286250U);
  341. Add(4, 5U, "1", "2", 258000U); // unadjusted score 1.003
  342. Add(3, 6U, "1", "2", 53000U); // unadjusted score 1.03
  343. Add(2, 7U, "1", "2", 20000U); // unadjusted score 1.94
  344. UpdateVersionStorageInfo();
  345. ASSERT_EQ(0, logger_->log_count);
  346. ASSERT_EQ(1, vstorage_.base_level());
  347. ASSERT_EQ(10000U, vstorage_.MaxBytesForLevel(1));
  348. ASSERT_EQ(10290U, vstorage_.MaxBytesForLevel(2));
  349. ASSERT_EQ(51450U, vstorage_.MaxBytesForLevel(3));
  350. ASSERT_EQ(257250U, vstorage_.MaxBytesForLevel(4));
  351. vstorage_.ComputeCompactionScore(ioptions_, mutable_cf_options_);
  352. // Although L2 and l3 have higher unadjusted compaction score, considering
  353. // a relatively large L0 being compacted down soon, L4 is picked up for
  354. // compaction.
  355. // L0 is still picked up for oversizing.
  356. ASSERT_EQ(0, vstorage_.CompactionScoreLevel(0));
  357. ASSERT_EQ(4, vstorage_.CompactionScoreLevel(1));
  358. }
  359. TEST_F(VersionStorageInfoTest, MaxBytesForLevelDynamicWithLargeL0_3) {
  360. ioptions_.level_compaction_dynamic_level_bytes = true;
  361. mutable_cf_options_.max_bytes_for_level_base = 20000;
  362. mutable_cf_options_.max_bytes_for_level_multiplier = 5;
  363. mutable_cf_options_.level0_file_num_compaction_trigger = 5;
  364. Add(0, 11U, "1", "2", 2500U);
  365. Add(0, 12U, "1", "2", 2500U);
  366. Add(0, 13U, "1", "2", 2500U);
  367. Add(0, 14U, "1", "2", 2500U);
  368. // Level size should be around 20,000, 53000, 258000
  369. Add(5, 4U, "1", "2", 1286250U);
  370. Add(4, 5U, "1", "2", 260000U); // Unadjusted score 1.01, adjusted about 4.3
  371. Add(3, 6U, "1", "2", 85000U); // Unadjusted score 1.42, adjusted about 11.6
  372. Add(2, 7U, "1", "2", 30000); // Unadjusted score 1.5, adjusted about 10.0
  373. UpdateVersionStorageInfo();
  374. ASSERT_EQ(0, logger_->log_count);
  375. ASSERT_EQ(2, vstorage_.base_level());
  376. ASSERT_EQ(20000U, vstorage_.MaxBytesForLevel(2));
  377. vstorage_.ComputeCompactionScore(ioptions_, mutable_cf_options_);
  378. // Although L2 has higher unadjusted compaction score, considering
  379. // a relatively large L0 being compacted down soon, L3 is picked up for
  380. // compaction.
  381. ASSERT_EQ(3, vstorage_.CompactionScoreLevel(0));
  382. ASSERT_EQ(2, vstorage_.CompactionScoreLevel(1));
  383. ASSERT_EQ(4, vstorage_.CompactionScoreLevel(2));
  384. }
  385. TEST_F(VersionStorageInfoTest, DrainUnnecessaryLevel) {
  386. ioptions_.level_compaction_dynamic_level_bytes = true;
  387. mutable_cf_options_.max_bytes_for_level_base = 1000;
  388. mutable_cf_options_.max_bytes_for_level_multiplier = 10;
  389. // Create a few unnecessary levels.
  390. // See if score is calculated correctly.
  391. Add(5, 1U, "1", "2", 2000U); // target size 1010000
  392. Add(4, 2U, "1", "2", 200U); // target size 101000
  393. // Unnecessary levels
  394. Add(3, 3U, "1", "2", 100U); // target size 10100
  395. // Level 2: target size 1010
  396. Add(1, 4U, "1", "2",
  397. 10U); // target size 1000 = max(base_bytes_min + 1, base_bytes_max)
  398. UpdateVersionStorageInfo();
  399. ASSERT_EQ(1, vstorage_.base_level());
  400. ASSERT_EQ(1000, vstorage_.MaxBytesForLevel(1));
  401. ASSERT_EQ(10100, vstorage_.MaxBytesForLevel(3));
  402. vstorage_.ComputeCompactionScore(ioptions_, mutable_cf_options_);
  403. // Tests that levels 1 and 3 are eligible for compaction.
  404. // Levels 1 and 3 are much smaller than target size,
  405. // so size does not contribute to a high compaction score.
  406. ASSERT_EQ(1, vstorage_.CompactionScoreLevel(0));
  407. ASSERT_GT(vstorage_.CompactionScore(0), 10);
  408. ASSERT_EQ(3, vstorage_.CompactionScoreLevel(1));
  409. ASSERT_GT(vstorage_.CompactionScore(1), 10);
  410. }
  411. TEST_F(VersionStorageInfoTest, EstimateLiveDataSize) {
  412. // Test whether the overlaps are detected as expected
  413. Add(1, 1U, "4", "7", 1U); // Perfect overlap with last level
  414. Add(2, 2U, "3", "5", 1U); // Partial overlap with last level
  415. Add(2, 3U, "6", "8", 1U); // Partial overlap with last level
  416. Add(3, 4U, "1", "9", 1U); // Contains range of last level
  417. Add(4, 5U, "4", "5", 1U); // Inside range of last level
  418. Add(4, 6U, "6", "7", 1U); // Inside range of last level
  419. Add(5, 7U, "4", "7", 10U);
  420. UpdateVersionStorageInfo();
  421. ASSERT_EQ(10U, vstorage_.EstimateLiveDataSize());
  422. }
  423. TEST_F(VersionStorageInfoTest, EstimateLiveDataSize2) {
  424. Add(0, 1U, "9", "9", 1U); // Level 0 is not ordered
  425. Add(0, 2U, "5", "6", 1U); // Ignored because of [5,6] in l1
  426. Add(1, 3U, "1", "2", 1U); // Ignored because of [2,3] in l2
  427. Add(1, 4U, "3", "4", 1U); // Ignored because of [2,3] in l2
  428. Add(1, 5U, "5", "6", 1U);
  429. Add(2, 6U, "2", "3", 1U);
  430. Add(3, 7U, "7", "8", 1U);
  431. UpdateVersionStorageInfo();
  432. ASSERT_EQ(4U, vstorage_.EstimateLiveDataSize());
  433. }
  434. TEST_F(VersionStorageInfoTest, SingleLevelBottommostData) {
  435. // In case of a single level, the oldest L0 file is bottommost. This could be
  436. // improved in case the L0 files cover disjoint key-ranges.
  437. Add(0 /* level */, 1U /* file_number */, "A" /* smallest */,
  438. "Z" /* largest */, 1U /* file_size */);
  439. Add(0 /* level */, 2U /* file_number */, "A" /* smallest */,
  440. "Z" /* largest */, 1U /* file_size */);
  441. Add(0 /* level */, 3U /* file_number */, "0" /* smallest */,
  442. "9" /* largest */, 1U /* file_size */);
  443. UpdateVersionStorageInfo();
  444. ASSERT_EQ(1, vstorage_.BottommostFiles().size());
  445. ASSERT_EQ(0, vstorage_.BottommostFiles()[0].first);
  446. ASSERT_EQ(3U, vstorage_.BottommostFiles()[0].second->fd.GetNumber());
  447. }
  448. TEST_F(VersionStorageInfoTest, MultiLevelBottommostData) {
  449. // In case of multiple levels, the oldest file for a key-range from each L1+
  450. // level is bottommost. This could be improved in case an L0 file contains the
  451. // oldest data for some range of keys.
  452. Add(0 /* level */, 1U /* file_number */, "A" /* smallest */,
  453. "Z" /* largest */, 1U /* file_size */);
  454. Add(0 /* level */, 2U /* file_number */, "0" /* smallest */,
  455. "9" /* largest */, 1U /* file_size */);
  456. Add(1 /* level */, 3U /* file_number */, "A" /* smallest */,
  457. "D" /* largest */, 1U /* file_size */);
  458. Add(2 /* level */, 4U /* file_number */, "E" /* smallest */,
  459. "H" /* largest */, 1U /* file_size */);
  460. Add(2 /* level */, 5U /* file_number */, "I" /* smallest */,
  461. "L" /* largest */, 1U /* file_size */);
  462. UpdateVersionStorageInfo();
  463. autovector<std::pair<int, FileMetaData*>> bottommost_files =
  464. vstorage_.BottommostFiles();
  465. std::sort(bottommost_files.begin(), bottommost_files.end(),
  466. [](const std::pair<int, FileMetaData*>& lhs,
  467. const std::pair<int, FileMetaData*>& rhs) {
  468. assert(lhs.second);
  469. assert(rhs.second);
  470. return lhs.second->fd.GetNumber() < rhs.second->fd.GetNumber();
  471. });
  472. ASSERT_EQ(3, bottommost_files.size());
  473. ASSERT_EQ(3U, bottommost_files[0].second->fd.GetNumber());
  474. ASSERT_EQ(4U, bottommost_files[1].second->fd.GetNumber());
  475. ASSERT_EQ(5U, bottommost_files[2].second->fd.GetNumber());
  476. }
  477. TEST_F(VersionStorageInfoTest, GetOverlappingInputs) {
  478. // Two files that overlap at the range deletion tombstone sentinel.
  479. Add(1, 1U, {"a", 0, kTypeValue},
  480. {"b", kMaxSequenceNumber, kTypeRangeDeletion}, 1);
  481. Add(1, 2U, {"b", 0, kTypeValue}, {"c", 0, kTypeValue}, 1);
  482. // Two files that overlap at the same user key.
  483. Add(1, 3U, {"d", 0, kTypeValue}, {"e", kMaxSequenceNumber, kTypeValue}, 1);
  484. Add(1, 4U, {"e", 0, kTypeValue}, {"f", 0, kTypeValue}, 1);
  485. // Two files that do not overlap.
  486. Add(1, 5U, {"g", 0, kTypeValue}, {"h", 0, kTypeValue}, 1);
  487. Add(1, 6U, {"i", 0, kTypeValue}, {"j", 0, kTypeValue}, 1);
  488. UpdateVersionStorageInfo();
  489. ASSERT_EQ("1,2",
  490. GetOverlappingFiles(1, {"a", 0, kTypeValue}, {"b", 0, kTypeValue}));
  491. ASSERT_EQ("1",
  492. GetOverlappingFiles(1, {"a", 0, kTypeValue},
  493. {"b", kMaxSequenceNumber, kTypeRangeDeletion}));
  494. ASSERT_EQ("2", GetOverlappingFiles(1, {"b", kMaxSequenceNumber, kTypeValue},
  495. {"c", 0, kTypeValue}));
  496. ASSERT_EQ("3,4",
  497. GetOverlappingFiles(1, {"d", 0, kTypeValue}, {"e", 0, kTypeValue}));
  498. ASSERT_EQ("3",
  499. GetOverlappingFiles(1, {"d", 0, kTypeValue},
  500. {"e", kMaxSequenceNumber, kTypeRangeDeletion}));
  501. ASSERT_EQ("3,4", GetOverlappingFiles(1, {"e", kMaxSequenceNumber, kTypeValue},
  502. {"f", 0, kTypeValue}));
  503. ASSERT_EQ("3,4",
  504. GetOverlappingFiles(1, {"e", 0, kTypeValue}, {"f", 0, kTypeValue}));
  505. ASSERT_EQ("5",
  506. GetOverlappingFiles(1, {"g", 0, kTypeValue}, {"h", 0, kTypeValue}));
  507. ASSERT_EQ("6",
  508. GetOverlappingFiles(1, {"i", 0, kTypeValue}, {"j", 0, kTypeValue}));
  509. }
  510. TEST_F(VersionStorageInfoTest, FileLocationAndMetaDataByNumber) {
  511. Add(0, 11U, "1", "2", 5000U);
  512. Add(0, 12U, "1", "2", 5000U);
  513. Add(2, 7U, "1", "2", 8000U);
  514. UpdateVersionStorageInfo();
  515. ASSERT_EQ(vstorage_.GetFileLocation(11U),
  516. VersionStorageInfo::FileLocation(0, 0));
  517. ASSERT_NE(vstorage_.GetFileMetaDataByNumber(11U), nullptr);
  518. ASSERT_EQ(vstorage_.GetFileLocation(12U),
  519. VersionStorageInfo::FileLocation(0, 1));
  520. ASSERT_NE(vstorage_.GetFileMetaDataByNumber(12U), nullptr);
  521. ASSERT_EQ(vstorage_.GetFileLocation(7U),
  522. VersionStorageInfo::FileLocation(2, 0));
  523. ASSERT_NE(vstorage_.GetFileMetaDataByNumber(7U), nullptr);
  524. ASSERT_FALSE(vstorage_.GetFileLocation(999U).IsValid());
  525. ASSERT_EQ(vstorage_.GetFileMetaDataByNumber(999U), nullptr);
  526. }
  527. TEST_F(VersionStorageInfoTest, ForcedBlobGCEmpty) {
  528. // No SST or blob files in VersionStorageInfo
  529. UpdateVersionStorageInfo();
  530. constexpr double age_cutoff = 0.5;
  531. constexpr double force_threshold = 0.75;
  532. vstorage_.ComputeFilesMarkedForForcedBlobGC(
  533. age_cutoff, force_threshold, /*enable_blob_garbage_collection=*/true);
  534. ASSERT_TRUE(vstorage_.FilesMarkedForForcedBlobGC().empty());
  535. }
  536. TEST_F(VersionStorageInfoTest, ForcedBlobGCSingleBatch) {
  537. // Test the edge case when all blob files are part of the oldest batch.
  538. // We have one L0 SST file #1, and four blob files #10, #11, #12, and #13.
  539. // The oldest blob file used by SST #1 is blob file #10.
  540. constexpr int level = 0;
  541. constexpr uint64_t sst = 1;
  542. constexpr uint64_t first_blob = 10;
  543. constexpr uint64_t second_blob = 11;
  544. constexpr uint64_t third_blob = 12;
  545. constexpr uint64_t fourth_blob = 13;
  546. {
  547. constexpr char smallest[] = "bar1";
  548. constexpr char largest[] = "foo1";
  549. constexpr uint64_t file_size = 1000;
  550. Add(level, sst, smallest, largest, file_size, first_blob);
  551. }
  552. {
  553. constexpr uint64_t total_blob_count = 10;
  554. constexpr uint64_t total_blob_bytes = 100000;
  555. constexpr uint64_t garbage_blob_count = 2;
  556. constexpr uint64_t garbage_blob_bytes = 15000;
  557. AddBlob(first_blob, total_blob_count, total_blob_bytes,
  558. BlobFileMetaData::LinkedSsts{sst}, garbage_blob_count,
  559. garbage_blob_bytes);
  560. }
  561. {
  562. constexpr uint64_t total_blob_count = 4;
  563. constexpr uint64_t total_blob_bytes = 400000;
  564. constexpr uint64_t garbage_blob_count = 3;
  565. constexpr uint64_t garbage_blob_bytes = 235000;
  566. AddBlob(second_blob, total_blob_count, total_blob_bytes,
  567. BlobFileMetaData::LinkedSsts{}, garbage_blob_count,
  568. garbage_blob_bytes);
  569. }
  570. {
  571. constexpr uint64_t total_blob_count = 20;
  572. constexpr uint64_t total_blob_bytes = 1000000;
  573. constexpr uint64_t garbage_blob_count = 8;
  574. constexpr uint64_t garbage_blob_bytes = 400000;
  575. AddBlob(third_blob, total_blob_count, total_blob_bytes,
  576. BlobFileMetaData::LinkedSsts{}, garbage_blob_count,
  577. garbage_blob_bytes);
  578. }
  579. {
  580. constexpr uint64_t total_blob_count = 128;
  581. constexpr uint64_t total_blob_bytes = 1000000;
  582. constexpr uint64_t garbage_blob_count = 67;
  583. constexpr uint64_t garbage_blob_bytes = 600000;
  584. AddBlob(fourth_blob, total_blob_count, total_blob_bytes,
  585. BlobFileMetaData::LinkedSsts{}, garbage_blob_count,
  586. garbage_blob_bytes);
  587. }
  588. UpdateVersionStorageInfo();
  589. assert(vstorage_.num_levels() > 0);
  590. const auto& level_files = vstorage_.LevelFiles(level);
  591. assert(level_files.size() == 1);
  592. assert(level_files[0] && level_files[0]->fd.GetNumber() == sst);
  593. // No blob files eligible for GC due to the age cutoff
  594. {
  595. constexpr double age_cutoff = 0.1;
  596. constexpr double force_threshold = 0.0;
  597. vstorage_.ComputeFilesMarkedForForcedBlobGC(
  598. age_cutoff, force_threshold, /*enable_blob_garbage_collection=*/true);
  599. ASSERT_TRUE(vstorage_.FilesMarkedForForcedBlobGC().empty());
  600. }
  601. // Overall garbage ratio of eligible files is below threshold
  602. {
  603. constexpr double age_cutoff = 1.0;
  604. constexpr double force_threshold = 0.6;
  605. vstorage_.ComputeFilesMarkedForForcedBlobGC(
  606. age_cutoff, force_threshold, /*enable_blob_garbage_collection=*/true);
  607. ASSERT_TRUE(vstorage_.FilesMarkedForForcedBlobGC().empty());
  608. }
  609. // Overall garbage ratio of eligible files meets threshold
  610. {
  611. constexpr double age_cutoff = 1.0;
  612. constexpr double force_threshold = 0.5;
  613. vstorage_.ComputeFilesMarkedForForcedBlobGC(
  614. age_cutoff, force_threshold, /*enable_blob_garbage_collection=*/true);
  615. auto ssts_to_be_compacted = vstorage_.FilesMarkedForForcedBlobGC();
  616. ASSERT_EQ(ssts_to_be_compacted.size(), 1);
  617. const autovector<std::pair<int, FileMetaData*>>
  618. expected_ssts_to_be_compacted{{level, level_files[0]}};
  619. ASSERT_EQ(ssts_to_be_compacted[0], expected_ssts_to_be_compacted[0]);
  620. }
  621. }
  622. TEST_F(VersionStorageInfoTest, ForcedBlobGCMultipleBatches) {
  623. // Add three L0 SSTs (1, 2, and 3) and four blob files (10, 11, 12, and 13).
  624. // The first two SSTs have the same oldest blob file, namely, the very oldest
  625. // one (10), while the third SST's oldest blob file reference points to the
  626. // third blob file (12). Thus, the oldest batch of blob files contains the
  627. // first two blob files 10 and 11, and assuming they are eligible for GC based
  628. // on the age cutoff, compacting away the SSTs 1 and 2 will eliminate them.
  629. constexpr int level = 0;
  630. constexpr uint64_t first_sst = 1;
  631. constexpr uint64_t second_sst = 2;
  632. constexpr uint64_t third_sst = 3;
  633. constexpr uint64_t first_blob = 10;
  634. constexpr uint64_t second_blob = 11;
  635. constexpr uint64_t third_blob = 12;
  636. constexpr uint64_t fourth_blob = 13;
  637. {
  638. constexpr char smallest[] = "bar1";
  639. constexpr char largest[] = "foo1";
  640. constexpr uint64_t file_size = 1000;
  641. Add(level, first_sst, smallest, largest, file_size, first_blob);
  642. }
  643. {
  644. constexpr char smallest[] = "bar2";
  645. constexpr char largest[] = "foo2";
  646. constexpr uint64_t file_size = 2000;
  647. Add(level, second_sst, smallest, largest, file_size, first_blob);
  648. }
  649. {
  650. constexpr char smallest[] = "bar3";
  651. constexpr char largest[] = "foo3";
  652. constexpr uint64_t file_size = 3000;
  653. Add(level, third_sst, smallest, largest, file_size, third_blob);
  654. }
  655. {
  656. constexpr uint64_t total_blob_count = 10;
  657. constexpr uint64_t total_blob_bytes = 100000;
  658. constexpr uint64_t garbage_blob_count = 2;
  659. constexpr uint64_t garbage_blob_bytes = 15000;
  660. AddBlob(first_blob, total_blob_count, total_blob_bytes,
  661. BlobFileMetaData::LinkedSsts{first_sst, second_sst},
  662. garbage_blob_count, garbage_blob_bytes);
  663. }
  664. {
  665. constexpr uint64_t total_blob_count = 4;
  666. constexpr uint64_t total_blob_bytes = 400000;
  667. constexpr uint64_t garbage_blob_count = 3;
  668. constexpr uint64_t garbage_blob_bytes = 235000;
  669. AddBlob(second_blob, total_blob_count, total_blob_bytes,
  670. BlobFileMetaData::LinkedSsts{}, garbage_blob_count,
  671. garbage_blob_bytes);
  672. }
  673. {
  674. constexpr uint64_t total_blob_count = 20;
  675. constexpr uint64_t total_blob_bytes = 1000000;
  676. constexpr uint64_t garbage_blob_count = 8;
  677. constexpr uint64_t garbage_blob_bytes = 123456;
  678. AddBlob(third_blob, total_blob_count, total_blob_bytes,
  679. BlobFileMetaData::LinkedSsts{third_sst}, garbage_blob_count,
  680. garbage_blob_bytes);
  681. }
  682. {
  683. constexpr uint64_t total_blob_count = 128;
  684. constexpr uint64_t total_blob_bytes = 789012345;
  685. constexpr uint64_t garbage_blob_count = 67;
  686. constexpr uint64_t garbage_blob_bytes = 88888888;
  687. AddBlob(fourth_blob, total_blob_count, total_blob_bytes,
  688. BlobFileMetaData::LinkedSsts{}, garbage_blob_count,
  689. garbage_blob_bytes);
  690. }
  691. UpdateVersionStorageInfo();
  692. assert(vstorage_.num_levels() > 0);
  693. const auto& level_files = vstorage_.LevelFiles(level);
  694. assert(level_files.size() == 3);
  695. assert(level_files[0] && level_files[0]->fd.GetNumber() == first_sst);
  696. assert(level_files[1] && level_files[1]->fd.GetNumber() == second_sst);
  697. assert(level_files[2] && level_files[2]->fd.GetNumber() == third_sst);
  698. // No blob files eligible for GC due to the age cutoff
  699. {
  700. constexpr double age_cutoff = 0.1;
  701. constexpr double force_threshold = 0.0;
  702. vstorage_.ComputeFilesMarkedForForcedBlobGC(
  703. age_cutoff, force_threshold, /*enable_blob_garbage_collection=*/true);
  704. ASSERT_TRUE(vstorage_.FilesMarkedForForcedBlobGC().empty());
  705. }
  706. // Overall garbage ratio of eligible files is below threshold
  707. {
  708. constexpr double age_cutoff = 0.5;
  709. constexpr double force_threshold = 0.6;
  710. vstorage_.ComputeFilesMarkedForForcedBlobGC(
  711. age_cutoff, force_threshold, /*enable_blob_garbage_collection=*/true);
  712. ASSERT_TRUE(vstorage_.FilesMarkedForForcedBlobGC().empty());
  713. }
  714. // Overall garbage ratio of eligible files meets threshold
  715. {
  716. constexpr double age_cutoff = 0.5;
  717. constexpr double force_threshold = 0.5;
  718. vstorage_.ComputeFilesMarkedForForcedBlobGC(
  719. age_cutoff, force_threshold, /*enable_blob_garbage_collection=*/true);
  720. auto ssts_to_be_compacted = vstorage_.FilesMarkedForForcedBlobGC();
  721. ASSERT_EQ(ssts_to_be_compacted.size(), 2);
  722. std::sort(ssts_to_be_compacted.begin(), ssts_to_be_compacted.end(),
  723. [](const std::pair<int, FileMetaData*>& lhs,
  724. const std::pair<int, FileMetaData*>& rhs) {
  725. assert(lhs.second);
  726. assert(rhs.second);
  727. return lhs.second->fd.GetNumber() < rhs.second->fd.GetNumber();
  728. });
  729. const autovector<std::pair<int, FileMetaData*>>
  730. expected_ssts_to_be_compacted{{level, level_files[0]},
  731. {level, level_files[1]}};
  732. ASSERT_EQ(ssts_to_be_compacted[0], expected_ssts_to_be_compacted[0]);
  733. ASSERT_EQ(ssts_to_be_compacted[1], expected_ssts_to_be_compacted[1]);
  734. }
  735. }
  736. class VersionStorageInfoTimestampTest : public VersionStorageInfoTestBase {
  737. public:
  738. VersionStorageInfoTimestampTest()
  739. : VersionStorageInfoTestBase(test::BytewiseComparatorWithU64TsWrapper()) {
  740. }
  741. ~VersionStorageInfoTimestampTest() override = default;
  742. std::string Timestamp(uint64_t ts) const {
  743. std::string ret;
  744. PutFixed64(&ret, ts);
  745. return ret;
  746. }
  747. std::string PackUserKeyAndTimestamp(const Slice& ukey, uint64_t ts) const {
  748. std::string ret;
  749. ret.assign(ukey.data(), ukey.size());
  750. PutFixed64(&ret, ts);
  751. return ret;
  752. }
  753. };
  754. TEST_F(VersionStorageInfoTimestampTest, GetOverlappingInputs) {
  755. Add(/*level=*/1, /*file_number=*/1, /*smallest=*/
  756. {PackUserKeyAndTimestamp("a", /*ts=*/9), /*s=*/0, kTypeValue},
  757. /*largest=*/
  758. {PackUserKeyAndTimestamp("a", /*ts=*/8), /*s=*/0, kTypeValue},
  759. /*file_size=*/100);
  760. Add(/*level=*/1, /*file_number=*/2, /*smallest=*/
  761. {PackUserKeyAndTimestamp("a", /*ts=*/5), /*s=*/0, kTypeValue},
  762. /*largest=*/
  763. {PackUserKeyAndTimestamp("b", /*ts=*/10), /*s=*/0, kTypeValue},
  764. /*file_size=*/100);
  765. Add(/*level=*/1, /*file_number=*/3, /*smallest=*/
  766. {PackUserKeyAndTimestamp("c", /*ts=*/12), /*s=*/0, kTypeValue},
  767. /*largest=*/
  768. {PackUserKeyAndTimestamp("d", /*ts=*/1), /*s=*/0, kTypeValue},
  769. /*file_size=*/100);
  770. UpdateVersionStorageInfo();
  771. ASSERT_EQ(
  772. "1,2",
  773. GetOverlappingFiles(
  774. /*level=*/1,
  775. {PackUserKeyAndTimestamp("a", /*ts=*/12), /*s=*/0, kTypeValue},
  776. {PackUserKeyAndTimestamp("a", /*ts=*/11), /*s=*/0, kTypeValue}));
  777. ASSERT_EQ("3",
  778. GetOverlappingFiles(
  779. /*level=*/1,
  780. {PackUserKeyAndTimestamp("c", /*ts=*/15), /*s=*/0, kTypeValue},
  781. {PackUserKeyAndTimestamp("c", /*ts=*/2), /*s=*/0, kTypeValue}));
  782. }
  783. class FindLevelFileTest : public testing::Test {
  784. public:
  785. LevelFilesBrief file_level_;
  786. bool disjoint_sorted_files_;
  787. Arena arena_;
  788. FindLevelFileTest() : disjoint_sorted_files_(true) {}
  789. ~FindLevelFileTest() override = default;
  790. void LevelFileInit(size_t num = 0) {
  791. char* mem = arena_.AllocateAligned(num * sizeof(FdWithKeyRange));
  792. file_level_.files = new (mem) FdWithKeyRange[num];
  793. file_level_.num_files = 0;
  794. }
  795. void Add(const char* smallest, const char* largest,
  796. SequenceNumber smallest_seq = 100,
  797. SequenceNumber largest_seq = 100) {
  798. InternalKey smallest_key = InternalKey(smallest, smallest_seq, kTypeValue);
  799. InternalKey largest_key = InternalKey(largest, largest_seq, kTypeValue);
  800. Slice smallest_slice = smallest_key.Encode();
  801. Slice largest_slice = largest_key.Encode();
  802. char* mem =
  803. arena_.AllocateAligned(smallest_slice.size() + largest_slice.size());
  804. memcpy(mem, smallest_slice.data(), smallest_slice.size());
  805. memcpy(mem + smallest_slice.size(), largest_slice.data(),
  806. largest_slice.size());
  807. // add to file_level_
  808. size_t num = file_level_.num_files;
  809. auto& file = file_level_.files[num];
  810. file.fd = FileDescriptor(num + 1, 0, 0);
  811. file.smallest_key = Slice(mem, smallest_slice.size());
  812. file.largest_key = Slice(mem + smallest_slice.size(), largest_slice.size());
  813. file_level_.num_files++;
  814. }
  815. int Find(const char* key) {
  816. InternalKey target(key, 100, kTypeValue);
  817. InternalKeyComparator cmp(BytewiseComparator());
  818. return FindFile(cmp, file_level_, target.Encode());
  819. }
  820. bool Overlaps(const char* smallest, const char* largest) {
  821. InternalKeyComparator cmp(BytewiseComparator());
  822. Slice s(smallest != nullptr ? smallest : "");
  823. Slice l(largest != nullptr ? largest : "");
  824. return SomeFileOverlapsRange(cmp, disjoint_sorted_files_, file_level_,
  825. (smallest != nullptr ? &s : nullptr),
  826. (largest != nullptr ? &l : nullptr));
  827. }
  828. };
  829. TEST_F(FindLevelFileTest, LevelEmpty) {
  830. LevelFileInit(0);
  831. ASSERT_EQ(0, Find("foo"));
  832. ASSERT_TRUE(!Overlaps("a", "z"));
  833. ASSERT_TRUE(!Overlaps(nullptr, "z"));
  834. ASSERT_TRUE(!Overlaps("a", nullptr));
  835. ASSERT_TRUE(!Overlaps(nullptr, nullptr));
  836. }
  837. TEST_F(FindLevelFileTest, LevelSingle) {
  838. LevelFileInit(1);
  839. Add("p", "q");
  840. ASSERT_EQ(0, Find("a"));
  841. ASSERT_EQ(0, Find("p"));
  842. ASSERT_EQ(0, Find("p1"));
  843. ASSERT_EQ(0, Find("q"));
  844. ASSERT_EQ(1, Find("q1"));
  845. ASSERT_EQ(1, Find("z"));
  846. ASSERT_TRUE(!Overlaps("a", "b"));
  847. ASSERT_TRUE(!Overlaps("z1", "z2"));
  848. ASSERT_TRUE(Overlaps("a", "p"));
  849. ASSERT_TRUE(Overlaps("a", "q"));
  850. ASSERT_TRUE(Overlaps("a", "z"));
  851. ASSERT_TRUE(Overlaps("p", "p1"));
  852. ASSERT_TRUE(Overlaps("p", "q"));
  853. ASSERT_TRUE(Overlaps("p", "z"));
  854. ASSERT_TRUE(Overlaps("p1", "p2"));
  855. ASSERT_TRUE(Overlaps("p1", "z"));
  856. ASSERT_TRUE(Overlaps("q", "q"));
  857. ASSERT_TRUE(Overlaps("q", "q1"));
  858. ASSERT_TRUE(!Overlaps(nullptr, "j"));
  859. ASSERT_TRUE(!Overlaps("r", nullptr));
  860. ASSERT_TRUE(Overlaps(nullptr, "p"));
  861. ASSERT_TRUE(Overlaps(nullptr, "p1"));
  862. ASSERT_TRUE(Overlaps("q", nullptr));
  863. ASSERT_TRUE(Overlaps(nullptr, nullptr));
  864. }
  865. TEST_F(FindLevelFileTest, LevelMultiple) {
  866. LevelFileInit(4);
  867. Add("150", "200");
  868. Add("200", "250");
  869. Add("300", "350");
  870. Add("400", "450");
  871. ASSERT_EQ(0, Find("100"));
  872. ASSERT_EQ(0, Find("150"));
  873. ASSERT_EQ(0, Find("151"));
  874. ASSERT_EQ(0, Find("199"));
  875. ASSERT_EQ(0, Find("200"));
  876. ASSERT_EQ(1, Find("201"));
  877. ASSERT_EQ(1, Find("249"));
  878. ASSERT_EQ(1, Find("250"));
  879. ASSERT_EQ(2, Find("251"));
  880. ASSERT_EQ(2, Find("299"));
  881. ASSERT_EQ(2, Find("300"));
  882. ASSERT_EQ(2, Find("349"));
  883. ASSERT_EQ(2, Find("350"));
  884. ASSERT_EQ(3, Find("351"));
  885. ASSERT_EQ(3, Find("400"));
  886. ASSERT_EQ(3, Find("450"));
  887. ASSERT_EQ(4, Find("451"));
  888. ASSERT_TRUE(!Overlaps("100", "149"));
  889. ASSERT_TRUE(!Overlaps("251", "299"));
  890. ASSERT_TRUE(!Overlaps("451", "500"));
  891. ASSERT_TRUE(!Overlaps("351", "399"));
  892. ASSERT_TRUE(Overlaps("100", "150"));
  893. ASSERT_TRUE(Overlaps("100", "200"));
  894. ASSERT_TRUE(Overlaps("100", "300"));
  895. ASSERT_TRUE(Overlaps("100", "400"));
  896. ASSERT_TRUE(Overlaps("100", "500"));
  897. ASSERT_TRUE(Overlaps("375", "400"));
  898. ASSERT_TRUE(Overlaps("450", "450"));
  899. ASSERT_TRUE(Overlaps("450", "500"));
  900. }
  901. TEST_F(FindLevelFileTest, LevelMultipleNullBoundaries) {
  902. LevelFileInit(4);
  903. Add("150", "200");
  904. Add("200", "250");
  905. Add("300", "350");
  906. Add("400", "450");
  907. ASSERT_TRUE(!Overlaps(nullptr, "149"));
  908. ASSERT_TRUE(!Overlaps("451", nullptr));
  909. ASSERT_TRUE(Overlaps(nullptr, nullptr));
  910. ASSERT_TRUE(Overlaps(nullptr, "150"));
  911. ASSERT_TRUE(Overlaps(nullptr, "199"));
  912. ASSERT_TRUE(Overlaps(nullptr, "200"));
  913. ASSERT_TRUE(Overlaps(nullptr, "201"));
  914. ASSERT_TRUE(Overlaps(nullptr, "400"));
  915. ASSERT_TRUE(Overlaps(nullptr, "800"));
  916. ASSERT_TRUE(Overlaps("100", nullptr));
  917. ASSERT_TRUE(Overlaps("200", nullptr));
  918. ASSERT_TRUE(Overlaps("449", nullptr));
  919. ASSERT_TRUE(Overlaps("450", nullptr));
  920. }
  921. TEST_F(FindLevelFileTest, LevelOverlapSequenceChecks) {
  922. LevelFileInit(1);
  923. Add("200", "200", 5000, 3000);
  924. ASSERT_TRUE(!Overlaps("199", "199"));
  925. ASSERT_TRUE(!Overlaps("201", "300"));
  926. ASSERT_TRUE(Overlaps("200", "200"));
  927. ASSERT_TRUE(Overlaps("190", "200"));
  928. ASSERT_TRUE(Overlaps("200", "210"));
  929. }
  930. TEST_F(FindLevelFileTest, LevelOverlappingFiles) {
  931. LevelFileInit(2);
  932. Add("150", "600");
  933. Add("400", "500");
  934. disjoint_sorted_files_ = false;
  935. ASSERT_TRUE(!Overlaps("100", "149"));
  936. ASSERT_TRUE(!Overlaps("601", "700"));
  937. ASSERT_TRUE(Overlaps("100", "150"));
  938. ASSERT_TRUE(Overlaps("100", "200"));
  939. ASSERT_TRUE(Overlaps("100", "300"));
  940. ASSERT_TRUE(Overlaps("100", "400"));
  941. ASSERT_TRUE(Overlaps("100", "500"));
  942. ASSERT_TRUE(Overlaps("375", "400"));
  943. ASSERT_TRUE(Overlaps("450", "450"));
  944. ASSERT_TRUE(Overlaps("450", "500"));
  945. ASSERT_TRUE(Overlaps("450", "700"));
  946. ASSERT_TRUE(Overlaps("600", "700"));
  947. }
  948. class VersionSetTestBase {
  949. public:
  950. const static std::string kColumnFamilyName1;
  951. const static std::string kColumnFamilyName2;
  952. const static std::string kColumnFamilyName3;
  953. const static int kNumColumnFamilies = 4;
  954. int num_initial_edits_;
  955. explicit VersionSetTestBase(const std::string& name)
  956. : env_(nullptr),
  957. dbname_(test::PerThreadDBPath(name)),
  958. options_(),
  959. db_options_(options_),
  960. cf_options_(options_),
  961. immutable_options_(db_options_, cf_options_),
  962. mutable_cf_options_(cf_options_),
  963. table_cache_(NewLRUCache(50000, 16)),
  964. write_buffer_manager_(db_options_.db_write_buffer_size),
  965. shutting_down_(false),
  966. table_factory_(std::make_shared<mock::MockTableFactory>()) {
  967. EXPECT_OK(test::CreateEnvFromSystem(ConfigOptions(), &env_, &env_guard_));
  968. if (env_ == Env::Default() && getenv("MEM_ENV")) {
  969. env_guard_.reset(NewMemEnv(Env::Default()));
  970. env_ = env_guard_.get();
  971. }
  972. EXPECT_NE(nullptr, env_);
  973. fs_ = env_->GetFileSystem();
  974. EXPECT_OK(fs_->CreateDirIfMissing(dbname_, IOOptions(), nullptr));
  975. options_.env = env_;
  976. db_options_.env = env_;
  977. db_options_.fs = fs_;
  978. immutable_options_.env = env_;
  979. immutable_options_.fs = fs_;
  980. immutable_options_.clock = env_->GetSystemClock().get();
  981. cf_options_.table_factory = table_factory_;
  982. mutable_cf_options_.table_factory = table_factory_;
  983. versions_.reset(new VersionSet(
  984. dbname_, &db_options_, env_options_, table_cache_.get(),
  985. &write_buffer_manager_, &write_controller_,
  986. /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
  987. /*db_id=*/"", /*db_session_id=*/"", /*daily_offpeak_time_utc=*/"",
  988. /*error_handler=*/nullptr, /*read_only=*/false));
  989. reactive_versions_ = std::make_shared<ReactiveVersionSet>(
  990. dbname_, &db_options_, env_options_, table_cache_.get(),
  991. &write_buffer_manager_, &write_controller_, nullptr);
  992. db_options_.db_paths.emplace_back(dbname_,
  993. std::numeric_limits<uint64_t>::max());
  994. }
  995. virtual ~VersionSetTestBase() {
  996. if (getenv("KEEP_DB")) {
  997. fprintf(stdout, "DB is still at %s\n", dbname_.c_str());
  998. } else {
  999. Options options;
  1000. options.env = env_;
  1001. EXPECT_OK(DestroyDB(dbname_, options));
  1002. }
  1003. }
  1004. protected:
  1005. virtual void PrepareManifest(
  1006. std::vector<ColumnFamilyDescriptor>* column_families,
  1007. SequenceNumber* last_seqno, std::unique_ptr<log::Writer>* log_writer) {
  1008. assert(column_families != nullptr);
  1009. assert(last_seqno != nullptr);
  1010. assert(log_writer != nullptr);
  1011. ASSERT_OK(
  1012. SetIdentityFile(WriteOptions(), env_, dbname_, Temperature::kUnknown));
  1013. VersionEdit new_db;
  1014. if (db_options_.write_dbid_to_manifest) {
  1015. DBOptions tmp_db_options;
  1016. tmp_db_options.env = env_;
  1017. std::unique_ptr<DBImpl> impl(new DBImpl(tmp_db_options, dbname_));
  1018. std::string db_id;
  1019. ASSERT_OK(impl->GetDbIdentityFromIdentityFile(IOOptions(), &db_id));
  1020. new_db.SetDBId(db_id);
  1021. }
  1022. new_db.SetLogNumber(0);
  1023. new_db.SetNextFile(2);
  1024. new_db.SetLastSequence(0);
  1025. const std::vector<std::string> cf_names = {
  1026. kDefaultColumnFamilyName, kColumnFamilyName1, kColumnFamilyName2,
  1027. kColumnFamilyName3};
  1028. const int kInitialNumOfCfs = static_cast<int>(cf_names.size());
  1029. autovector<VersionEdit> new_cfs;
  1030. uint64_t last_seq = 1;
  1031. uint32_t cf_id = 1;
  1032. for (int i = 1; i != kInitialNumOfCfs; ++i) {
  1033. VersionEdit new_cf;
  1034. new_cf.AddColumnFamily(cf_names[i]);
  1035. new_cf.SetColumnFamily(cf_id++);
  1036. new_cf.SetLogNumber(0);
  1037. new_cf.SetNextFile(2);
  1038. new_cf.SetLastSequence(last_seq++);
  1039. new_cfs.emplace_back(new_cf);
  1040. }
  1041. *last_seqno = last_seq;
  1042. num_initial_edits_ = static_cast<int>(new_cfs.size() + 1);
  1043. std::unique_ptr<WritableFileWriter> file_writer;
  1044. const std::string manifest = DescriptorFileName(dbname_, 1);
  1045. const auto& fs = env_->GetFileSystem();
  1046. Status s = WritableFileWriter::Create(
  1047. fs, manifest, fs->OptimizeForManifestWrite(env_options_), &file_writer,
  1048. nullptr);
  1049. ASSERT_OK(s);
  1050. {
  1051. log_writer->reset(new log::Writer(std::move(file_writer), 0, false));
  1052. std::string record;
  1053. new_db.EncodeTo(&record);
  1054. s = (*log_writer)->AddRecord(WriteOptions(), record);
  1055. for (const auto& e : new_cfs) {
  1056. record.clear();
  1057. e.EncodeTo(&record);
  1058. s = (*log_writer)->AddRecord(WriteOptions(), record);
  1059. ASSERT_OK(s);
  1060. }
  1061. }
  1062. ASSERT_OK(s);
  1063. cf_options_.table_factory = table_factory_;
  1064. for (const auto& cf_name : cf_names) {
  1065. column_families->emplace_back(cf_name, cf_options_);
  1066. }
  1067. }
  1068. struct SstInfo {
  1069. uint64_t file_number;
  1070. std::string column_family;
  1071. std::string key; // the only key
  1072. int level = 0;
  1073. uint64_t epoch_number;
  1074. bool file_missing = false;
  1075. uint64_t oldest_blob_file_number = kInvalidBlobFileNumber;
  1076. SstInfo(uint64_t file_num, const std::string& cf_name,
  1077. const std::string& _key,
  1078. uint64_t _epoch_number = kUnknownEpochNumber,
  1079. bool _file_missing = false,
  1080. uint64_t _oldest_blob_file_number = kInvalidBlobFileNumber)
  1081. : SstInfo(file_num, cf_name, _key, 0, _epoch_number, _file_missing,
  1082. _oldest_blob_file_number) {}
  1083. SstInfo(uint64_t file_num, const std::string& cf_name,
  1084. const std::string& _key, int lvl,
  1085. uint64_t _epoch_number = kUnknownEpochNumber,
  1086. bool _file_missing = false,
  1087. uint64_t _oldest_blob_file_number = kInvalidBlobFileNumber)
  1088. : file_number(file_num),
  1089. column_family(cf_name),
  1090. key(_key),
  1091. level(lvl),
  1092. epoch_number(_epoch_number),
  1093. file_missing(_file_missing),
  1094. oldest_blob_file_number(_oldest_blob_file_number) {}
  1095. };
  1096. // Create dummy sst, return their metadata. Note that only file name and size
  1097. // are used.
  1098. void CreateDummyTableFiles(const std::vector<SstInfo>& file_infos,
  1099. std::vector<FileMetaData>* file_metas) {
  1100. assert(file_metas != nullptr);
  1101. for (const auto& info : file_infos) {
  1102. uint64_t file_num = info.file_number;
  1103. std::string fname = MakeTableFileName(dbname_, file_num);
  1104. std::unique_ptr<FSWritableFile> file;
  1105. Status s = fs_->NewWritableFile(fname, FileOptions(), &file, nullptr);
  1106. ASSERT_OK(s);
  1107. std::unique_ptr<WritableFileWriter> fwriter(new WritableFileWriter(
  1108. std::move(file), fname, FileOptions(), env_->GetSystemClock().get()));
  1109. InternalTblPropCollFactories internal_tbl_prop_coll_factories;
  1110. const ReadOptions read_options;
  1111. const WriteOptions write_options;
  1112. std::unique_ptr<TableBuilder> builder(table_factory_->NewTableBuilder(
  1113. TableBuilderOptions(
  1114. immutable_options_, mutable_cf_options_, read_options,
  1115. write_options, InternalKeyComparator(options_.comparator),
  1116. &internal_tbl_prop_coll_factories, kNoCompression,
  1117. CompressionOptions(),
  1118. TablePropertiesCollectorFactory::Context::kUnknownColumnFamily,
  1119. info.column_family, info.level, kUnknownNewestKeyTime),
  1120. fwriter.get()));
  1121. InternalKey ikey(info.key, 0, ValueType::kTypeValue);
  1122. builder->Add(ikey.Encode(), "value");
  1123. ASSERT_OK(builder->Finish());
  1124. ASSERT_OK(fwriter->Flush(IOOptions()));
  1125. uint64_t file_size = 0;
  1126. s = fs_->GetFileSize(fname, IOOptions(), &file_size, nullptr);
  1127. ASSERT_OK(s);
  1128. ASSERT_NE(0, file_size);
  1129. file_metas->emplace_back(
  1130. file_num, /*file_path_id=*/0, file_size, ikey, ikey, 0, 0, false,
  1131. Temperature::kUnknown, info.oldest_blob_file_number, 0, 0,
  1132. info.epoch_number, kUnknownFileChecksum, kUnknownFileChecksumFuncName,
  1133. kNullUniqueId64x2, 0, 0,
  1134. /* user_defined_timestamps_persisted */ true);
  1135. if (info.file_missing) {
  1136. ASSERT_OK(fs_->DeleteFile(fname, IOOptions(), nullptr));
  1137. }
  1138. }
  1139. }
  1140. void CreateCurrentFile() {
  1141. // Make "CURRENT" file point to the new manifest file.
  1142. ASSERT_OK(SetCurrentFile(WriteOptions(), fs_.get(), dbname_, 1,
  1143. Temperature::kUnknown,
  1144. /* dir_contains_current_file */ nullptr));
  1145. }
  1146. // Create DB with 3 column families.
  1147. void NewDB() {
  1148. SequenceNumber last_seqno;
  1149. std::unique_ptr<log::Writer> log_writer;
  1150. PrepareManifest(&column_families_, &last_seqno, &log_writer);
  1151. log_writer.reset();
  1152. CreateCurrentFile();
  1153. EXPECT_OK(versions_->Recover(column_families_, false));
  1154. EXPECT_EQ(column_families_.size(),
  1155. versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
  1156. }
  1157. void CloseDB() {
  1158. mutex_.Lock();
  1159. versions_->Close(nullptr, &mutex_).PermitUncheckedError();
  1160. versions_.reset();
  1161. mutex_.Unlock();
  1162. }
  1163. void ReopenDB() {
  1164. versions_.reset(new VersionSet(
  1165. dbname_, &db_options_, env_options_, table_cache_.get(),
  1166. &write_buffer_manager_, &write_controller_,
  1167. /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
  1168. /*db_id=*/"", /*db_session_id=*/"", /*daily_offpeak_time_utc=*/"",
  1169. /*error_handler=*/nullptr, /*read_only=*/false));
  1170. EXPECT_OK(versions_->Recover(column_families_, false));
  1171. }
  1172. void GetManifestPath(std::string* manifest_path) const {
  1173. assert(manifest_path != nullptr);
  1174. uint64_t manifest_file_number = 0;
  1175. Status s = GetCurrentManifestPath(dbname_, fs_.get(), /*is_retry=*/false,
  1176. manifest_path, &manifest_file_number);
  1177. ASSERT_OK(s);
  1178. }
  1179. void VerifyManifest(std::string* manifest_path) const {
  1180. assert(manifest_path != nullptr);
  1181. uint64_t manifest_file_number = 0;
  1182. Status s = GetCurrentManifestPath(dbname_, fs_.get(), /*is_retry=*/false,
  1183. manifest_path, &manifest_file_number);
  1184. ASSERT_OK(s);
  1185. ASSERT_EQ(1, manifest_file_number);
  1186. }
  1187. Status LogAndApplyToDefaultCF(VersionEdit& edit) {
  1188. mutex_.Lock();
  1189. Status s = versions_->LogAndApply(
  1190. versions_->GetColumnFamilySet()->GetDefault(), read_options_,
  1191. write_options_, &edit, &mutex_, nullptr);
  1192. mutex_.Unlock();
  1193. return s;
  1194. }
  1195. Status LogAndApplyToDefaultCF(
  1196. const autovector<std::unique_ptr<VersionEdit>>& edits) {
  1197. autovector<VersionEdit*> vedits;
  1198. for (auto& e : edits) {
  1199. vedits.push_back(e.get());
  1200. }
  1201. mutex_.Lock();
  1202. Status s = versions_->LogAndApply(
  1203. versions_->GetColumnFamilySet()->GetDefault(), read_options_,
  1204. write_options_, vedits, &mutex_, nullptr);
  1205. mutex_.Unlock();
  1206. return s;
  1207. }
  1208. void CreateNewManifest() {
  1209. constexpr FSDirectory* db_directory = nullptr;
  1210. constexpr bool new_descriptor_log = true;
  1211. mutex_.Lock();
  1212. VersionEdit dummy;
  1213. ASSERT_OK(versions_->LogAndApply(
  1214. versions_->GetColumnFamilySet()->GetDefault(), read_options_,
  1215. write_options_, &dummy, &mutex_, db_directory, new_descriptor_log));
  1216. mutex_.Unlock();
  1217. }
  1218. ColumnFamilyData* CreateColumnFamily(const std::string& cf_name,
  1219. const ColumnFamilyOptions& cf_options) {
  1220. VersionEdit new_cf;
  1221. new_cf.AddColumnFamily(cf_name);
  1222. uint32_t new_id = versions_->GetColumnFamilySet()->GetNextColumnFamilyID();
  1223. new_cf.SetColumnFamily(new_id);
  1224. new_cf.SetLogNumber(0);
  1225. new_cf.SetComparatorName(cf_options.comparator->Name());
  1226. new_cf.SetPersistUserDefinedTimestamps(
  1227. cf_options.persist_user_defined_timestamps);
  1228. Status s;
  1229. mutex_.Lock();
  1230. s = versions_->LogAndApply(/*column_family_data=*/nullptr, read_options_,
  1231. write_options_, &new_cf, &mutex_,
  1232. /*db_directory=*/nullptr,
  1233. /*new_descriptor_log=*/false, &cf_options);
  1234. mutex_.Unlock();
  1235. EXPECT_OK(s);
  1236. ColumnFamilyData* cfd =
  1237. versions_->GetColumnFamilySet()->GetColumnFamily(cf_name);
  1238. EXPECT_NE(nullptr, cfd);
  1239. return cfd;
  1240. }
  1241. Env* mem_env_;
  1242. Env* env_;
  1243. std::shared_ptr<Env> env_guard_;
  1244. std::shared_ptr<FileSystem> fs_;
  1245. const std::string dbname_;
  1246. EnvOptions env_options_;
  1247. Options options_;
  1248. ImmutableDBOptions db_options_;
  1249. ColumnFamilyOptions cf_options_;
  1250. ImmutableOptions immutable_options_;
  1251. MutableCFOptions mutable_cf_options_;
  1252. const ReadOptions read_options_;
  1253. const WriteOptions write_options_;
  1254. std::shared_ptr<Cache> table_cache_;
  1255. WriteController write_controller_;
  1256. WriteBufferManager write_buffer_manager_;
  1257. std::shared_ptr<VersionSet> versions_;
  1258. std::shared_ptr<ReactiveVersionSet> reactive_versions_;
  1259. InstrumentedMutex mutex_;
  1260. std::atomic<bool> shutting_down_;
  1261. std::shared_ptr<TableFactory> table_factory_;
  1262. std::vector<ColumnFamilyDescriptor> column_families_;
  1263. };
  1264. const std::string VersionSetTestBase::kColumnFamilyName1 = "alice";
  1265. const std::string VersionSetTestBase::kColumnFamilyName2 = "bob";
  1266. const std::string VersionSetTestBase::kColumnFamilyName3 = "charles";
  1267. class VersionSetTest : public VersionSetTestBase, public testing::Test {
  1268. public:
  1269. VersionSetTest() : VersionSetTestBase("version_set_test") {}
  1270. };
  1271. TEST_F(VersionSetTest, SameColumnFamilyGroupCommit) {
  1272. NewDB();
  1273. const int kGroupSize = 5;
  1274. const ReadOptions read_options;
  1275. const WriteOptions write_options;
  1276. autovector<VersionEdit> edits;
  1277. for (int i = 0; i != kGroupSize; ++i) {
  1278. edits.emplace_back(VersionEdit());
  1279. }
  1280. autovector<ColumnFamilyData*> cfds;
  1281. autovector<autovector<VersionEdit*>> edit_lists;
  1282. for (int i = 0; i != kGroupSize; ++i) {
  1283. cfds.emplace_back(versions_->GetColumnFamilySet()->GetDefault());
  1284. autovector<VersionEdit*> edit_list;
  1285. edit_list.emplace_back(&edits[i]);
  1286. edit_lists.emplace_back(edit_list);
  1287. }
  1288. SyncPoint::GetInstance()->DisableProcessing();
  1289. SyncPoint::GetInstance()->ClearAllCallBacks();
  1290. int count = 0;
  1291. SyncPoint::GetInstance()->SetCallBack(
  1292. "VersionSet::ProcessManifestWrites:SameColumnFamily", [&](void* arg) {
  1293. uint32_t* cf_id = static_cast<uint32_t*>(arg);
  1294. EXPECT_EQ(0u, *cf_id);
  1295. ++count;
  1296. });
  1297. SyncPoint::GetInstance()->EnableProcessing();
  1298. mutex_.Lock();
  1299. Status s = versions_->LogAndApply(cfds, read_options, write_options,
  1300. edit_lists, &mutex_, nullptr);
  1301. mutex_.Unlock();
  1302. EXPECT_OK(s);
  1303. EXPECT_EQ(kGroupSize - 1, count);
  1304. }
  1305. TEST_F(VersionSetTest, PersistBlobFileStateInNewManifest) {
  1306. // Initialize the database and add a couple of blob files, one with some
  1307. // garbage in it, and one without any garbage.
  1308. NewDB();
  1309. assert(versions_);
  1310. assert(versions_->GetColumnFamilySet());
  1311. ColumnFamilyData* const cfd = versions_->GetColumnFamilySet()->GetDefault();
  1312. assert(cfd);
  1313. Version* const version = cfd->current();
  1314. assert(version);
  1315. VersionStorageInfo* const storage_info = version->storage_info();
  1316. assert(storage_info);
  1317. {
  1318. constexpr uint64_t blob_file_number = 123;
  1319. constexpr uint64_t total_blob_count = 456;
  1320. constexpr uint64_t total_blob_bytes = 77777777;
  1321. constexpr char checksum_method[] = "SHA1";
  1322. constexpr char checksum_value[] =
  1323. "\xbd\xb7\xf3\x4a\x59\xdf\xa1\x59\x2c\xe7\xf5\x2e\x99\xf9\x8c\x57\x0c"
  1324. "\x52\x5c\xbd";
  1325. auto shared_meta = SharedBlobFileMetaData::Create(
  1326. blob_file_number, total_blob_count, total_blob_bytes, checksum_method,
  1327. checksum_value);
  1328. constexpr uint64_t garbage_blob_count = 89;
  1329. constexpr uint64_t garbage_blob_bytes = 1000000;
  1330. auto meta = BlobFileMetaData::Create(
  1331. std::move(shared_meta), BlobFileMetaData::LinkedSsts(),
  1332. garbage_blob_count, garbage_blob_bytes);
  1333. storage_info->AddBlobFile(std::move(meta));
  1334. }
  1335. {
  1336. constexpr uint64_t blob_file_number = 234;
  1337. constexpr uint64_t total_blob_count = 555;
  1338. constexpr uint64_t total_blob_bytes = 66666;
  1339. constexpr char checksum_method[] = "CRC32";
  1340. constexpr char checksum_value[] = "\x3d\x87\xff\x57";
  1341. auto shared_meta = SharedBlobFileMetaData::Create(
  1342. blob_file_number, total_blob_count, total_blob_bytes, checksum_method,
  1343. checksum_value);
  1344. constexpr uint64_t garbage_blob_count = 0;
  1345. constexpr uint64_t garbage_blob_bytes = 0;
  1346. auto meta = BlobFileMetaData::Create(
  1347. std::move(shared_meta), BlobFileMetaData::LinkedSsts(),
  1348. garbage_blob_count, garbage_blob_bytes);
  1349. storage_info->AddBlobFile(std::move(meta));
  1350. }
  1351. // Force the creation of a new manifest file and make sure metadata for
  1352. // the blob files is re-persisted.
  1353. size_t addition_encoded = 0;
  1354. SyncPoint::GetInstance()->SetCallBack(
  1355. "BlobFileAddition::EncodeTo::CustomFields",
  1356. [&](void* /* arg */) { ++addition_encoded; });
  1357. size_t garbage_encoded = 0;
  1358. SyncPoint::GetInstance()->SetCallBack(
  1359. "BlobFileGarbage::EncodeTo::CustomFields",
  1360. [&](void* /* arg */) { ++garbage_encoded; });
  1361. SyncPoint::GetInstance()->EnableProcessing();
  1362. CreateNewManifest();
  1363. ASSERT_EQ(addition_encoded, 2);
  1364. ASSERT_EQ(garbage_encoded, 1);
  1365. SyncPoint::GetInstance()->DisableProcessing();
  1366. SyncPoint::GetInstance()->ClearAllCallBacks();
  1367. }
  1368. TEST_F(VersionSetTest, AddLiveBlobFiles) {
  1369. // Initialize the database and add a blob file.
  1370. NewDB();
  1371. assert(versions_);
  1372. assert(versions_->GetColumnFamilySet());
  1373. ColumnFamilyData* const cfd = versions_->GetColumnFamilySet()->GetDefault();
  1374. assert(cfd);
  1375. Version* const first_version = cfd->current();
  1376. assert(first_version);
  1377. VersionStorageInfo* const first_storage_info = first_version->storage_info();
  1378. assert(first_storage_info);
  1379. constexpr uint64_t first_blob_file_number = 234;
  1380. constexpr uint64_t first_total_blob_count = 555;
  1381. constexpr uint64_t first_total_blob_bytes = 66666;
  1382. constexpr char first_checksum_method[] = "CRC32";
  1383. constexpr char first_checksum_value[] = "\x3d\x87\xff\x57";
  1384. auto first_shared_meta = SharedBlobFileMetaData::Create(
  1385. first_blob_file_number, first_total_blob_count, first_total_blob_bytes,
  1386. first_checksum_method, first_checksum_value);
  1387. constexpr uint64_t garbage_blob_count = 0;
  1388. constexpr uint64_t garbage_blob_bytes = 0;
  1389. auto first_meta = BlobFileMetaData::Create(
  1390. std::move(first_shared_meta), BlobFileMetaData::LinkedSsts(),
  1391. garbage_blob_count, garbage_blob_bytes);
  1392. first_storage_info->AddBlobFile(first_meta);
  1393. // Reference the version so it stays alive even after the following version
  1394. // edit.
  1395. first_version->Ref();
  1396. // Get live files directly from version.
  1397. std::vector<uint64_t> version_table_files;
  1398. std::vector<uint64_t> version_blob_files;
  1399. first_version->AddLiveFiles(&version_table_files, &version_blob_files);
  1400. ASSERT_EQ(version_blob_files.size(), 1);
  1401. ASSERT_EQ(version_blob_files[0], first_blob_file_number);
  1402. // Create a new version containing an additional blob file.
  1403. versions_->TEST_CreateAndAppendVersion(cfd);
  1404. Version* const second_version = cfd->current();
  1405. assert(second_version);
  1406. assert(second_version != first_version);
  1407. VersionStorageInfo* const second_storage_info =
  1408. second_version->storage_info();
  1409. assert(second_storage_info);
  1410. constexpr uint64_t second_blob_file_number = 456;
  1411. constexpr uint64_t second_total_blob_count = 100;
  1412. constexpr uint64_t second_total_blob_bytes = 2000000;
  1413. constexpr char second_checksum_method[] = "CRC32B";
  1414. constexpr char second_checksum_value[] = "\x6d\xbd\xf2\x3a";
  1415. auto second_shared_meta = SharedBlobFileMetaData::Create(
  1416. second_blob_file_number, second_total_blob_count, second_total_blob_bytes,
  1417. second_checksum_method, second_checksum_value);
  1418. auto second_meta = BlobFileMetaData::Create(
  1419. std::move(second_shared_meta), BlobFileMetaData::LinkedSsts(),
  1420. garbage_blob_count, garbage_blob_bytes);
  1421. second_storage_info->AddBlobFile(std::move(first_meta));
  1422. second_storage_info->AddBlobFile(std::move(second_meta));
  1423. // Get all live files from version set. Note that the result contains
  1424. // duplicates.
  1425. std::vector<uint64_t> all_table_files;
  1426. std::vector<uint64_t> all_blob_files;
  1427. versions_->AddLiveFiles(&all_table_files, &all_blob_files);
  1428. ASSERT_EQ(all_blob_files.size(), 3);
  1429. ASSERT_EQ(all_blob_files[0], first_blob_file_number);
  1430. ASSERT_EQ(all_blob_files[1], first_blob_file_number);
  1431. ASSERT_EQ(all_blob_files[2], second_blob_file_number);
  1432. // Clean up previous version.
  1433. first_version->Unref();
  1434. }
  1435. TEST_F(VersionSetTest, ObsoleteBlobFile) {
  1436. // Initialize the database and add a blob file that is entirely garbage
  1437. // and thus can immediately be marked obsolete.
  1438. NewDB();
  1439. VersionEdit edit;
  1440. constexpr uint64_t blob_file_number = 234;
  1441. constexpr uint64_t total_blob_count = 555;
  1442. constexpr uint64_t total_blob_bytes = 66666;
  1443. constexpr char checksum_method[] = "CRC32";
  1444. constexpr char checksum_value[] = "\x3d\x87\xff\x57";
  1445. edit.AddBlobFile(blob_file_number, total_blob_count, total_blob_bytes,
  1446. checksum_method, checksum_value);
  1447. edit.AddBlobFileGarbage(blob_file_number, total_blob_count, total_blob_bytes);
  1448. mutex_.Lock();
  1449. Status s = versions_->LogAndApply(
  1450. versions_->GetColumnFamilySet()->GetDefault(), read_options_,
  1451. write_options_, &edit, &mutex_, nullptr);
  1452. mutex_.Unlock();
  1453. ASSERT_OK(s);
  1454. // Make sure blob files from the pending number range are not returned
  1455. // as obsolete.
  1456. {
  1457. std::vector<ObsoleteFileInfo> table_files;
  1458. std::vector<ObsoleteBlobFileInfo> blob_files;
  1459. std::vector<std::string> manifest_files;
  1460. constexpr uint64_t min_pending_output = blob_file_number;
  1461. versions_->GetObsoleteFiles(&table_files, &blob_files, &manifest_files,
  1462. min_pending_output);
  1463. ASSERT_TRUE(blob_files.empty());
  1464. }
  1465. // Make sure the blob file is returned as obsolete if it's not in the pending
  1466. // range.
  1467. {
  1468. std::vector<ObsoleteFileInfo> table_files;
  1469. std::vector<ObsoleteBlobFileInfo> blob_files;
  1470. std::vector<std::string> manifest_files;
  1471. constexpr uint64_t min_pending_output = blob_file_number + 1;
  1472. versions_->GetObsoleteFiles(&table_files, &blob_files, &manifest_files,
  1473. min_pending_output);
  1474. ASSERT_EQ(blob_files.size(), 1);
  1475. ASSERT_EQ(blob_files[0].GetBlobFileNumber(), blob_file_number);
  1476. }
  1477. // Make sure it's not returned a second time.
  1478. {
  1479. std::vector<ObsoleteFileInfo> table_files;
  1480. std::vector<ObsoleteBlobFileInfo> blob_files;
  1481. std::vector<std::string> manifest_files;
  1482. constexpr uint64_t min_pending_output = blob_file_number + 1;
  1483. versions_->GetObsoleteFiles(&table_files, &blob_files, &manifest_files,
  1484. min_pending_output);
  1485. ASSERT_TRUE(blob_files.empty());
  1486. }
  1487. }
  1488. TEST_F(VersionSetTest, WalEditsNotAppliedToVersion) {
  1489. NewDB();
  1490. constexpr uint64_t kNumWals = 5;
  1491. autovector<std::unique_ptr<VersionEdit>> edits;
  1492. // Add some WALs.
  1493. for (uint64_t i = 1; i <= kNumWals; i++) {
  1494. edits.emplace_back(new VersionEdit);
  1495. // WAL's size equals its log number.
  1496. edits.back()->AddWal(i, WalMetadata(i));
  1497. }
  1498. // Delete the first half of the WALs.
  1499. edits.emplace_back(new VersionEdit);
  1500. edits.back()->DeleteWalsBefore(kNumWals / 2 + 1);
  1501. autovector<Version*> versions;
  1502. SyncPoint::GetInstance()->SetCallBack(
  1503. "VersionSet::ProcessManifestWrites:NewVersion",
  1504. [&](void* arg) { versions.push_back(static_cast<Version*>(arg)); });
  1505. SyncPoint::GetInstance()->EnableProcessing();
  1506. ASSERT_OK(LogAndApplyToDefaultCF(edits));
  1507. SyncPoint::GetInstance()->DisableProcessing();
  1508. SyncPoint::GetInstance()->ClearAllCallBacks();
  1509. // Since the edits are all WAL edits, no version should be created.
  1510. ASSERT_EQ(versions.size(), 1);
  1511. ASSERT_EQ(versions[0], nullptr);
  1512. }
  1513. // Similar to WalEditsNotAppliedToVersion, but contains a non-WAL edit.
  1514. TEST_F(VersionSetTest, NonWalEditsAppliedToVersion) {
  1515. NewDB();
  1516. const std::string kDBId = "db_db";
  1517. constexpr uint64_t kNumWals = 5;
  1518. autovector<std::unique_ptr<VersionEdit>> edits;
  1519. // Add some WALs.
  1520. for (uint64_t i = 1; i <= kNumWals; i++) {
  1521. edits.emplace_back(new VersionEdit);
  1522. // WAL's size equals its log number.
  1523. edits.back()->AddWal(i, WalMetadata(i));
  1524. }
  1525. // Delete the first half of the WALs.
  1526. edits.emplace_back(new VersionEdit);
  1527. edits.back()->DeleteWalsBefore(kNumWals / 2 + 1);
  1528. edits.emplace_back(new VersionEdit);
  1529. edits.back()->SetDBId(kDBId);
  1530. autovector<Version*> versions;
  1531. SyncPoint::GetInstance()->SetCallBack(
  1532. "VersionSet::ProcessManifestWrites:NewVersion",
  1533. [&](void* arg) { versions.push_back(static_cast<Version*>(arg)); });
  1534. SyncPoint::GetInstance()->EnableProcessing();
  1535. ASSERT_OK(LogAndApplyToDefaultCF(edits));
  1536. SyncPoint::GetInstance()->DisableProcessing();
  1537. SyncPoint::GetInstance()->ClearAllCallBacks();
  1538. // Since the edits are all WAL edits, no version should be created.
  1539. ASSERT_EQ(versions.size(), 1);
  1540. ASSERT_NE(versions[0], nullptr);
  1541. }
  1542. TEST_F(VersionSetTest, WalAddition) {
  1543. NewDB();
  1544. constexpr WalNumber kLogNumber = 10;
  1545. constexpr uint64_t kSizeInBytes = 111;
  1546. // A WAL is just created.
  1547. {
  1548. VersionEdit edit;
  1549. edit.AddWal(kLogNumber);
  1550. ASSERT_OK(LogAndApplyToDefaultCF(edit));
  1551. const auto& wals = versions_->GetWalSet().GetWals();
  1552. ASSERT_EQ(wals.size(), 1);
  1553. ASSERT_TRUE(wals.find(kLogNumber) != wals.end());
  1554. ASSERT_FALSE(wals.at(kLogNumber).HasSyncedSize());
  1555. }
  1556. // The WAL is synced for several times before closing.
  1557. {
  1558. for (uint64_t size_delta = 100; size_delta > 0; size_delta /= 2) {
  1559. uint64_t size = kSizeInBytes - size_delta;
  1560. WalMetadata wal(size);
  1561. VersionEdit edit;
  1562. edit.AddWal(kLogNumber, wal);
  1563. ASSERT_OK(LogAndApplyToDefaultCF(edit));
  1564. const auto& wals = versions_->GetWalSet().GetWals();
  1565. ASSERT_EQ(wals.size(), 1);
  1566. ASSERT_TRUE(wals.find(kLogNumber) != wals.end());
  1567. ASSERT_TRUE(wals.at(kLogNumber).HasSyncedSize());
  1568. ASSERT_EQ(wals.at(kLogNumber).GetSyncedSizeInBytes(), size);
  1569. }
  1570. }
  1571. // The WAL is closed.
  1572. {
  1573. WalMetadata wal(kSizeInBytes);
  1574. VersionEdit edit;
  1575. edit.AddWal(kLogNumber, wal);
  1576. ASSERT_OK(LogAndApplyToDefaultCF(edit));
  1577. const auto& wals = versions_->GetWalSet().GetWals();
  1578. ASSERT_EQ(wals.size(), 1);
  1579. ASSERT_TRUE(wals.find(kLogNumber) != wals.end());
  1580. ASSERT_TRUE(wals.at(kLogNumber).HasSyncedSize());
  1581. ASSERT_EQ(wals.at(kLogNumber).GetSyncedSizeInBytes(), kSizeInBytes);
  1582. }
  1583. // Recover a new VersionSet.
  1584. {
  1585. std::unique_ptr<VersionSet> new_versions(new VersionSet(
  1586. dbname_, &db_options_, env_options_, table_cache_.get(),
  1587. &write_buffer_manager_, &write_controller_,
  1588. /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
  1589. /*db_id=*/"", /*db_session_id=*/"", /*daily_offpeak_time_utc=*/"",
  1590. /*error_handler=*/nullptr, /*unchanging=*/false));
  1591. ASSERT_OK(new_versions->Recover(column_families_, /*read_only=*/false));
  1592. const auto& wals = new_versions->GetWalSet().GetWals();
  1593. ASSERT_EQ(wals.size(), 1);
  1594. ASSERT_TRUE(wals.find(kLogNumber) != wals.end());
  1595. ASSERT_TRUE(wals.at(kLogNumber).HasSyncedSize());
  1596. ASSERT_EQ(wals.at(kLogNumber).GetSyncedSizeInBytes(), kSizeInBytes);
  1597. }
  1598. }
  1599. TEST_F(VersionSetTest, WalCloseWithoutSync) {
  1600. NewDB();
  1601. constexpr WalNumber kLogNumber = 10;
  1602. constexpr uint64_t kSizeInBytes = 111;
  1603. constexpr uint64_t kSyncedSizeInBytes = kSizeInBytes / 2;
  1604. // A WAL is just created.
  1605. {
  1606. VersionEdit edit;
  1607. edit.AddWal(kLogNumber);
  1608. ASSERT_OK(LogAndApplyToDefaultCF(edit));
  1609. const auto& wals = versions_->GetWalSet().GetWals();
  1610. ASSERT_EQ(wals.size(), 1);
  1611. ASSERT_TRUE(wals.find(kLogNumber) != wals.end());
  1612. ASSERT_FALSE(wals.at(kLogNumber).HasSyncedSize());
  1613. }
  1614. // The WAL is synced before closing.
  1615. {
  1616. WalMetadata wal(kSyncedSizeInBytes);
  1617. VersionEdit edit;
  1618. edit.AddWal(kLogNumber, wal);
  1619. ASSERT_OK(LogAndApplyToDefaultCF(edit));
  1620. const auto& wals = versions_->GetWalSet().GetWals();
  1621. ASSERT_EQ(wals.size(), 1);
  1622. ASSERT_TRUE(wals.find(kLogNumber) != wals.end());
  1623. ASSERT_TRUE(wals.at(kLogNumber).HasSyncedSize());
  1624. ASSERT_EQ(wals.at(kLogNumber).GetSyncedSizeInBytes(), kSyncedSizeInBytes);
  1625. }
  1626. // A new WAL with larger log number is created,
  1627. // implicitly marking the current WAL closed.
  1628. {
  1629. VersionEdit edit;
  1630. edit.AddWal(kLogNumber + 1);
  1631. ASSERT_OK(LogAndApplyToDefaultCF(edit));
  1632. const auto& wals = versions_->GetWalSet().GetWals();
  1633. ASSERT_EQ(wals.size(), 2);
  1634. ASSERT_TRUE(wals.find(kLogNumber) != wals.end());
  1635. ASSERT_TRUE(wals.at(kLogNumber).HasSyncedSize());
  1636. ASSERT_EQ(wals.at(kLogNumber).GetSyncedSizeInBytes(), kSyncedSizeInBytes);
  1637. ASSERT_TRUE(wals.find(kLogNumber + 1) != wals.end());
  1638. ASSERT_FALSE(wals.at(kLogNumber + 1).HasSyncedSize());
  1639. }
  1640. // Recover a new VersionSet.
  1641. {
  1642. std::unique_ptr<VersionSet> new_versions(new VersionSet(
  1643. dbname_, &db_options_, env_options_, table_cache_.get(),
  1644. &write_buffer_manager_, &write_controller_,
  1645. /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
  1646. /*db_id=*/"", /*db_session_id=*/"", /*daily_offpeak_time_utc=*/"",
  1647. /*error_handler=*/nullptr, /*unchanging=*/false));
  1648. ASSERT_OK(new_versions->Recover(column_families_, false));
  1649. const auto& wals = new_versions->GetWalSet().GetWals();
  1650. ASSERT_EQ(wals.size(), 2);
  1651. ASSERT_TRUE(wals.find(kLogNumber) != wals.end());
  1652. ASSERT_TRUE(wals.at(kLogNumber).HasSyncedSize());
  1653. ASSERT_EQ(wals.at(kLogNumber).GetSyncedSizeInBytes(), kSyncedSizeInBytes);
  1654. }
  1655. }
  1656. TEST_F(VersionSetTest, WalDeletion) {
  1657. NewDB();
  1658. constexpr WalNumber kClosedLogNumber = 10;
  1659. constexpr WalNumber kNonClosedLogNumber = 20;
  1660. constexpr uint64_t kSizeInBytes = 111;
  1661. // Add a non-closed and a closed WAL.
  1662. {
  1663. VersionEdit edit;
  1664. edit.AddWal(kClosedLogNumber, WalMetadata(kSizeInBytes));
  1665. edit.AddWal(kNonClosedLogNumber);
  1666. ASSERT_OK(LogAndApplyToDefaultCF(edit));
  1667. const auto& wals = versions_->GetWalSet().GetWals();
  1668. ASSERT_EQ(wals.size(), 2);
  1669. ASSERT_TRUE(wals.find(kNonClosedLogNumber) != wals.end());
  1670. ASSERT_TRUE(wals.find(kClosedLogNumber) != wals.end());
  1671. ASSERT_FALSE(wals.at(kNonClosedLogNumber).HasSyncedSize());
  1672. ASSERT_TRUE(wals.at(kClosedLogNumber).HasSyncedSize());
  1673. ASSERT_EQ(wals.at(kClosedLogNumber).GetSyncedSizeInBytes(), kSizeInBytes);
  1674. }
  1675. // Delete the closed WAL.
  1676. {
  1677. VersionEdit edit;
  1678. edit.DeleteWalsBefore(kNonClosedLogNumber);
  1679. ASSERT_OK(LogAndApplyToDefaultCF(edit));
  1680. const auto& wals = versions_->GetWalSet().GetWals();
  1681. ASSERT_EQ(wals.size(), 1);
  1682. ASSERT_TRUE(wals.find(kNonClosedLogNumber) != wals.end());
  1683. ASSERT_FALSE(wals.at(kNonClosedLogNumber).HasSyncedSize());
  1684. }
  1685. // Recover a new VersionSet, only the non-closed WAL should show up.
  1686. {
  1687. std::unique_ptr<VersionSet> new_versions(new VersionSet(
  1688. dbname_, &db_options_, env_options_, table_cache_.get(),
  1689. &write_buffer_manager_, &write_controller_,
  1690. /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
  1691. /*db_id=*/"", /*db_session_id=*/"", /*daily_offpeak_time_utc=*/"",
  1692. /*error_handler=*/nullptr, /*unchanging=*/false));
  1693. ASSERT_OK(new_versions->Recover(column_families_, false));
  1694. const auto& wals = new_versions->GetWalSet().GetWals();
  1695. ASSERT_EQ(wals.size(), 1);
  1696. ASSERT_TRUE(wals.find(kNonClosedLogNumber) != wals.end());
  1697. ASSERT_FALSE(wals.at(kNonClosedLogNumber).HasSyncedSize());
  1698. }
  1699. // Force the creation of a new MANIFEST file,
  1700. // only the non-closed WAL should be written to the new MANIFEST.
  1701. {
  1702. std::vector<WalAddition> wal_additions;
  1703. SyncPoint::GetInstance()->SetCallBack(
  1704. "VersionSet::WriteCurrentStateToManifest:SaveWal", [&](void* arg) {
  1705. VersionEdit* edit = static_cast<VersionEdit*>(arg);
  1706. ASSERT_TRUE(edit->IsWalAddition());
  1707. for (auto& addition : edit->GetWalAdditions()) {
  1708. wal_additions.push_back(addition);
  1709. }
  1710. });
  1711. SyncPoint::GetInstance()->EnableProcessing();
  1712. CreateNewManifest();
  1713. SyncPoint::GetInstance()->DisableProcessing();
  1714. SyncPoint::GetInstance()->ClearAllCallBacks();
  1715. ASSERT_EQ(wal_additions.size(), 1);
  1716. ASSERT_EQ(wal_additions[0].GetLogNumber(), kNonClosedLogNumber);
  1717. ASSERT_FALSE(wal_additions[0].GetMetadata().HasSyncedSize());
  1718. }
  1719. // Recover from the new MANIFEST, only the non-closed WAL should show up.
  1720. {
  1721. std::unique_ptr<VersionSet> new_versions(new VersionSet(
  1722. dbname_, &db_options_, env_options_, table_cache_.get(),
  1723. &write_buffer_manager_, &write_controller_,
  1724. /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
  1725. /*db_id=*/"", /*db_session_id=*/"", /*daily_offpeak_time_utc=*/"",
  1726. /*error_handler=*/nullptr, /*unchanging=*/false));
  1727. ASSERT_OK(new_versions->Recover(column_families_, false));
  1728. const auto& wals = new_versions->GetWalSet().GetWals();
  1729. ASSERT_EQ(wals.size(), 1);
  1730. ASSERT_TRUE(wals.find(kNonClosedLogNumber) != wals.end());
  1731. ASSERT_FALSE(wals.at(kNonClosedLogNumber).HasSyncedSize());
  1732. }
  1733. }
  1734. TEST_F(VersionSetTest, WalCreateTwice) {
  1735. NewDB();
  1736. constexpr WalNumber kLogNumber = 10;
  1737. VersionEdit edit;
  1738. edit.AddWal(kLogNumber);
  1739. ASSERT_OK(LogAndApplyToDefaultCF(edit));
  1740. Status s = LogAndApplyToDefaultCF(edit);
  1741. ASSERT_TRUE(s.IsCorruption());
  1742. ASSERT_TRUE(s.ToString().find("WAL 10 is created more than once") !=
  1743. std::string::npos)
  1744. << s.ToString();
  1745. }
  1746. TEST_F(VersionSetTest, WalCreateAfterClose) {
  1747. NewDB();
  1748. constexpr WalNumber kLogNumber = 10;
  1749. constexpr uint64_t kSizeInBytes = 111;
  1750. {
  1751. // Add a closed WAL.
  1752. VersionEdit edit;
  1753. edit.AddWal(kLogNumber);
  1754. WalMetadata wal(kSizeInBytes);
  1755. edit.AddWal(kLogNumber, wal);
  1756. ASSERT_OK(LogAndApplyToDefaultCF(edit));
  1757. }
  1758. {
  1759. // Create the same WAL again.
  1760. VersionEdit edit;
  1761. edit.AddWal(kLogNumber);
  1762. Status s = LogAndApplyToDefaultCF(edit);
  1763. ASSERT_TRUE(s.IsCorruption());
  1764. ASSERT_TRUE(s.ToString().find("WAL 10 is created more than once") !=
  1765. std::string::npos)
  1766. << s.ToString();
  1767. }
  1768. }
  1769. TEST_F(VersionSetTest, AddWalWithSmallerSize) {
  1770. NewDB();
  1771. assert(versions_);
  1772. constexpr WalNumber kLogNumber = 10;
  1773. constexpr uint64_t kSizeInBytes = 111;
  1774. {
  1775. // Add a closed WAL.
  1776. VersionEdit edit;
  1777. WalMetadata wal(kSizeInBytes);
  1778. edit.AddWal(kLogNumber, wal);
  1779. ASSERT_OK(LogAndApplyToDefaultCF(edit));
  1780. }
  1781. // Copy for future comparison.
  1782. const std::map<WalNumber, WalMetadata> wals1 =
  1783. versions_->GetWalSet().GetWals();
  1784. {
  1785. // Add the same WAL with smaller synced size.
  1786. VersionEdit edit;
  1787. WalMetadata wal(kSizeInBytes / 2);
  1788. edit.AddWal(kLogNumber, wal);
  1789. Status s = LogAndApplyToDefaultCF(edit);
  1790. ASSERT_OK(s);
  1791. }
  1792. const std::map<WalNumber, WalMetadata> wals2 =
  1793. versions_->GetWalSet().GetWals();
  1794. ASSERT_EQ(wals1, wals2);
  1795. }
  1796. TEST_F(VersionSetTest, DeleteWalsBeforeNonExistingWalNumber) {
  1797. NewDB();
  1798. constexpr WalNumber kLogNumber0 = 10;
  1799. constexpr WalNumber kLogNumber1 = 20;
  1800. constexpr WalNumber kNonExistingNumber = 15;
  1801. constexpr uint64_t kSizeInBytes = 111;
  1802. {
  1803. // Add closed WALs.
  1804. VersionEdit edit;
  1805. WalMetadata wal(kSizeInBytes);
  1806. edit.AddWal(kLogNumber0, wal);
  1807. edit.AddWal(kLogNumber1, wal);
  1808. ASSERT_OK(LogAndApplyToDefaultCF(edit));
  1809. }
  1810. {
  1811. // Delete WALs before a non-existing WAL.
  1812. VersionEdit edit;
  1813. edit.DeleteWalsBefore(kNonExistingNumber);
  1814. ASSERT_OK(LogAndApplyToDefaultCF(edit));
  1815. }
  1816. // Recover a new VersionSet, WAL0 is deleted, WAL1 is not.
  1817. {
  1818. std::unique_ptr<VersionSet> new_versions(new VersionSet(
  1819. dbname_, &db_options_, env_options_, table_cache_.get(),
  1820. &write_buffer_manager_, &write_controller_,
  1821. /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
  1822. /*db_id=*/"", /*db_session_id=*/"", /*daily_offpeak_time_utc=*/"",
  1823. /*error_handler=*/nullptr, /*unchanging=*/false));
  1824. ASSERT_OK(new_versions->Recover(column_families_, false));
  1825. const auto& wals = new_versions->GetWalSet().GetWals();
  1826. ASSERT_EQ(wals.size(), 1);
  1827. ASSERT_TRUE(wals.find(kLogNumber1) != wals.end());
  1828. }
  1829. }
  1830. TEST_F(VersionSetTest, DeleteAllWals) {
  1831. NewDB();
  1832. constexpr WalNumber kMaxLogNumber = 10;
  1833. constexpr uint64_t kSizeInBytes = 111;
  1834. {
  1835. // Add a closed WAL.
  1836. VersionEdit edit;
  1837. WalMetadata wal(kSizeInBytes);
  1838. edit.AddWal(kMaxLogNumber, wal);
  1839. ASSERT_OK(LogAndApplyToDefaultCF(edit));
  1840. }
  1841. {
  1842. VersionEdit edit;
  1843. edit.DeleteWalsBefore(kMaxLogNumber + 10);
  1844. ASSERT_OK(LogAndApplyToDefaultCF(edit));
  1845. }
  1846. // Recover a new VersionSet, all WALs are deleted.
  1847. {
  1848. std::unique_ptr<VersionSet> new_versions(new VersionSet(
  1849. dbname_, &db_options_, env_options_, table_cache_.get(),
  1850. &write_buffer_manager_, &write_controller_,
  1851. /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
  1852. /*db_id=*/"", /*db_session_id=*/"", /*daily_offpeak_time_utc=*/"",
  1853. /*error_handler=*/nullptr, /*unchanging=*/false));
  1854. ASSERT_OK(new_versions->Recover(column_families_, false));
  1855. const auto& wals = new_versions->GetWalSet().GetWals();
  1856. ASSERT_EQ(wals.size(), 0);
  1857. }
  1858. }
  1859. TEST_F(VersionSetTest, AtomicGroupWithWalEdits) {
  1860. NewDB();
  1861. constexpr int kAtomicGroupSize = 7;
  1862. constexpr uint64_t kNumWals = 5;
  1863. const std::string kDBId = "db_db";
  1864. int remaining = kAtomicGroupSize;
  1865. autovector<std::unique_ptr<VersionEdit>> edits;
  1866. // Add 5 WALs.
  1867. for (uint64_t i = 1; i <= kNumWals; i++) {
  1868. edits.emplace_back(new VersionEdit);
  1869. // WAL's size equals its log number.
  1870. edits.back()->AddWal(i, WalMetadata(i));
  1871. edits.back()->MarkAtomicGroup(--remaining);
  1872. }
  1873. // One edit with the min log number set.
  1874. edits.emplace_back(new VersionEdit);
  1875. edits.back()->SetDBId(kDBId);
  1876. edits.back()->MarkAtomicGroup(--remaining);
  1877. // Delete the first added 4 WALs.
  1878. edits.emplace_back(new VersionEdit);
  1879. edits.back()->DeleteWalsBefore(kNumWals);
  1880. edits.back()->MarkAtomicGroup(--remaining);
  1881. ASSERT_EQ(remaining, 0);
  1882. ASSERT_OK(LogAndApplyToDefaultCF(edits));
  1883. // Recover a new VersionSet, the min log number and the last WAL should be
  1884. // kept.
  1885. {
  1886. std::unique_ptr<VersionSet> new_versions(new VersionSet(
  1887. dbname_, &db_options_, env_options_, table_cache_.get(),
  1888. &write_buffer_manager_, &write_controller_,
  1889. /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
  1890. /*db_id=*/"", /*db_session_id=*/"", /*daily_offpeak_time_utc=*/"",
  1891. /*error_handler=*/nullptr, /*unchanging=*/false));
  1892. std::string db_id;
  1893. ASSERT_OK(
  1894. new_versions->Recover(column_families_, /*read_only=*/false, &db_id));
  1895. ASSERT_EQ(db_id, kDBId);
  1896. const auto& wals = new_versions->GetWalSet().GetWals();
  1897. ASSERT_EQ(wals.size(), 1);
  1898. ASSERT_TRUE(wals.find(kNumWals) != wals.end());
  1899. ASSERT_TRUE(wals.at(kNumWals).HasSyncedSize());
  1900. ASSERT_EQ(wals.at(kNumWals).GetSyncedSizeInBytes(), kNumWals);
  1901. }
  1902. }
  1903. TEST_F(VersionSetTest, OffpeakTimeInfoTest) {
  1904. Random rnd(test::RandomSeed());
  1905. // Sets off-peak time from 11:30PM to 4:30AM next day.
  1906. // Starting at 1:30PM, use mock sleep to make time pass
  1907. // and see if IsNowOffpeak() returns correctly per time changes
  1908. int now_hour = 13;
  1909. int now_minute = 30;
  1910. versions_->ChangeOffpeakTimeOption("23:30-04:30");
  1911. auto mock_clock = std::make_shared<MockSystemClock>(env_->GetSystemClock());
  1912. // Add some extra random days to current time
  1913. int days = rnd.Uniform(100);
  1914. mock_clock->SetCurrentTime(days * 86400 + now_hour * 3600 + now_minute * 60);
  1915. int64_t now;
  1916. ASSERT_OK(mock_clock.get()->GetCurrentTime(&now));
  1917. // Starting at 1:30PM. It's not off-peak
  1918. ASSERT_FALSE(
  1919. versions_->offpeak_time_option().GetOffpeakTimeInfo(now).is_now_offpeak);
  1920. // Now it's at 4:30PM. Still not off-peak
  1921. mock_clock->MockSleepForSeconds(3 * 3600);
  1922. ASSERT_OK(mock_clock.get()->GetCurrentTime(&now));
  1923. ASSERT_FALSE(
  1924. versions_->offpeak_time_option().GetOffpeakTimeInfo(now).is_now_offpeak);
  1925. // Now it's at 11:30PM. It's off-peak
  1926. mock_clock->MockSleepForSeconds(7 * 3600);
  1927. ASSERT_OK(mock_clock.get()->GetCurrentTime(&now));
  1928. ASSERT_TRUE(
  1929. versions_->offpeak_time_option().GetOffpeakTimeInfo(now).is_now_offpeak);
  1930. // Now it's at 2:30AM next day. It's still off-peak
  1931. mock_clock->MockSleepForSeconds(3 * 3600);
  1932. ASSERT_OK(mock_clock.get()->GetCurrentTime(&now));
  1933. ASSERT_TRUE(
  1934. versions_->offpeak_time_option().GetOffpeakTimeInfo(now).is_now_offpeak);
  1935. // Now it's at 4:30AM. It's still off-peak
  1936. mock_clock->MockSleepForSeconds(2 * 3600);
  1937. ASSERT_OK(mock_clock.get()->GetCurrentTime(&now));
  1938. ASSERT_TRUE(
  1939. versions_->offpeak_time_option().GetOffpeakTimeInfo(now).is_now_offpeak);
  1940. // Sleep for one more minute. It's at 4:31AM It's no longer off-peak
  1941. mock_clock->MockSleepForSeconds(60);
  1942. ASSERT_OK(mock_clock.get()->GetCurrentTime(&now));
  1943. ASSERT_FALSE(
  1944. versions_->offpeak_time_option().GetOffpeakTimeInfo(now).is_now_offpeak);
  1945. // Entire day offpeak
  1946. versions_->ChangeOffpeakTimeOption("00:00-23:59");
  1947. // It doesn't matter what time it is. It should be just offpeak.
  1948. ASSERT_TRUE(
  1949. versions_->offpeak_time_option().GetOffpeakTimeInfo(now).is_now_offpeak);
  1950. // Mock Sleep for 3 hours. It's still off-peak
  1951. mock_clock->MockSleepForSeconds(3 * 3600);
  1952. ASSERT_OK(mock_clock.get()->GetCurrentTime(&now));
  1953. ASSERT_TRUE(
  1954. versions_->offpeak_time_option().GetOffpeakTimeInfo(now).is_now_offpeak);
  1955. // Mock Sleep for 20 hours. It's still off-peak
  1956. mock_clock->MockSleepForSeconds(20 * 3600);
  1957. ASSERT_OK(mock_clock.get()->GetCurrentTime(&now));
  1958. ASSERT_TRUE(
  1959. versions_->offpeak_time_option().GetOffpeakTimeInfo(now).is_now_offpeak);
  1960. // Mock Sleep for 59 minutes. It's still off-peak
  1961. mock_clock->MockSleepForSeconds(59 * 60);
  1962. ASSERT_OK(mock_clock.get()->GetCurrentTime(&now));
  1963. ASSERT_TRUE(
  1964. versions_->offpeak_time_option().GetOffpeakTimeInfo(now).is_now_offpeak);
  1965. // Mock Sleep for 59 seconds. It's still off-peak
  1966. mock_clock->MockSleepForSeconds(59);
  1967. ASSERT_OK(mock_clock.get()->GetCurrentTime(&now));
  1968. ASSERT_TRUE(
  1969. versions_->offpeak_time_option().GetOffpeakTimeInfo(now).is_now_offpeak);
  1970. // Mock Sleep for 1 second (exactly 24h passed). It's still off-peak
  1971. mock_clock->MockSleepForSeconds(1);
  1972. ASSERT_OK(mock_clock.get()->GetCurrentTime(&now));
  1973. ASSERT_TRUE(
  1974. versions_->offpeak_time_option().GetOffpeakTimeInfo(now).is_now_offpeak);
  1975. // Another second for sanity check
  1976. mock_clock->MockSleepForSeconds(1);
  1977. ASSERT_OK(mock_clock.get()->GetCurrentTime(&now));
  1978. ASSERT_TRUE(
  1979. versions_->offpeak_time_option().GetOffpeakTimeInfo(now).is_now_offpeak);
  1980. }
  1981. TEST_F(VersionSetTest, ManifestTruncateAfterClose) {
  1982. std::string manifest_path;
  1983. VersionEdit edit;
  1984. NewDB();
  1985. ASSERT_OK(LogAndApplyToDefaultCF(edit));
  1986. SyncPoint::GetInstance()->SetCallBack(
  1987. "VersionSet::Close:AfterClose", [&](void*) {
  1988. GetManifestPath(&manifest_path);
  1989. std::unique_ptr<WritableFile> manifest_file;
  1990. EXPECT_OK(env_->ReopenWritableFile(manifest_path, &manifest_file,
  1991. EnvOptions()));
  1992. EXPECT_OK(manifest_file->Truncate(0));
  1993. EXPECT_OK(manifest_file->Close());
  1994. });
  1995. SyncPoint::GetInstance()->EnableProcessing();
  1996. CloseDB();
  1997. SyncPoint::GetInstance()->DisableProcessing();
  1998. ReopenDB();
  1999. }
  2000. TEST_F(VersionStorageInfoTest, AddRangeDeletionCompensatedFileSize) {
  2001. // Tests that compensated range deletion size is added to compensated file
  2002. // size.
  2003. Add(4, 100U, "1", "2", 100U, kInvalidBlobFileNumber, 1000U);
  2004. UpdateVersionStorageInfo();
  2005. auto meta = vstorage_.GetFileMetaDataByNumber(100U);
  2006. ASSERT_EQ(meta->compensated_file_size, 100U + 1000U);
  2007. }
  2008. class VersionSetWithTimestampTest : public VersionSetTest {
  2009. public:
  2010. static const std::string kNewCfName;
  2011. explicit VersionSetWithTimestampTest() : VersionSetTest() {}
  2012. void SetUp() override {
  2013. NewDB();
  2014. Options options;
  2015. options.comparator = test::BytewiseComparatorWithU64TsWrapper();
  2016. cfd_ = CreateColumnFamily(kNewCfName, options);
  2017. EXPECT_NE(nullptr, cfd_);
  2018. column_families_.emplace_back(kNewCfName, options);
  2019. }
  2020. void TearDown() override {
  2021. for (auto* e : edits_) {
  2022. delete e;
  2023. }
  2024. edits_.clear();
  2025. }
  2026. void GenVersionEditsToSetFullHistoryTsLow(
  2027. const std::vector<uint64_t>& ts_lbs) {
  2028. for (const auto ts_lb : ts_lbs) {
  2029. VersionEdit* edit = new VersionEdit;
  2030. edit->SetColumnFamily(cfd_->GetID());
  2031. std::string ts_str = test::EncodeInt(ts_lb);
  2032. edit->SetFullHistoryTsLow(ts_str);
  2033. edits_.emplace_back(edit);
  2034. }
  2035. }
  2036. void VerifyFullHistoryTsLow(uint64_t expected_ts_low) {
  2037. std::unique_ptr<VersionSet> vset(new VersionSet(
  2038. dbname_, &db_options_, env_options_, table_cache_.get(),
  2039. &write_buffer_manager_, &write_controller_,
  2040. /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
  2041. /*db_id=*/"", /*db_session_id=*/"", /*daily_offpeak_time_utc=*/"",
  2042. /*error_handler=*/nullptr, /*unchanging=*/false));
  2043. ASSERT_OK(vset->Recover(column_families_, /*read_only=*/false,
  2044. /*db_id=*/nullptr));
  2045. for (auto* cfd : *(vset->GetColumnFamilySet())) {
  2046. ASSERT_NE(nullptr, cfd);
  2047. if (cfd->GetName() == kNewCfName) {
  2048. ASSERT_EQ(test::EncodeInt(expected_ts_low), cfd->GetFullHistoryTsLow());
  2049. } else {
  2050. ASSERT_TRUE(cfd->GetFullHistoryTsLow().empty());
  2051. }
  2052. }
  2053. }
  2054. void DoTest(const std::vector<uint64_t>& ts_lbs) {
  2055. if (ts_lbs.empty()) {
  2056. return;
  2057. }
  2058. GenVersionEditsToSetFullHistoryTsLow(ts_lbs);
  2059. Status s;
  2060. mutex_.Lock();
  2061. s = versions_->LogAndApply(cfd_, read_options_, write_options_, edits_,
  2062. &mutex_, nullptr);
  2063. mutex_.Unlock();
  2064. ASSERT_OK(s);
  2065. VerifyFullHistoryTsLow(*std::max_element(ts_lbs.begin(), ts_lbs.end()));
  2066. }
  2067. protected:
  2068. ColumnFamilyData* cfd_{nullptr};
  2069. // edits_ must contain and own pointers to heap-alloc VersionEdit objects.
  2070. autovector<VersionEdit*> edits_;
  2071. private:
  2072. const ReadOptions read_options_;
  2073. };
  2074. const std::string VersionSetWithTimestampTest::kNewCfName("new_cf");
  2075. TEST_F(VersionSetWithTimestampTest, SetFullHistoryTsLbOnce) {
  2076. constexpr uint64_t kTsLow = 100;
  2077. DoTest({kTsLow});
  2078. }
  2079. // Simulate the application increasing full_history_ts_low.
  2080. TEST_F(VersionSetWithTimestampTest, IncreaseFullHistoryTsLb) {
  2081. const std::vector<uint64_t> ts_lbs = {100, 101, 102, 103};
  2082. DoTest(ts_lbs);
  2083. }
  2084. // Simulate the application trying to decrease full_history_ts_low
  2085. // unsuccessfully. If the application calls public API sequentially to
  2086. // decrease the lower bound ts, RocksDB will return an InvalidArgument
  2087. // status before involving VersionSet. Only when multiple threads trying
  2088. // to decrease the lower bound concurrently will this case ever happen. Even
  2089. // so, the lower bound cannot be decreased. The application will be notified
  2090. // via return value of the API.
  2091. TEST_F(VersionSetWithTimestampTest, TryDecreaseFullHistoryTsLb) {
  2092. const std::vector<uint64_t> ts_lbs = {103, 102, 101, 100};
  2093. DoTest(ts_lbs);
  2094. }
  2095. class VersionSetAtomicGroupTest : public VersionSetTestBase,
  2096. public testing::Test {
  2097. public:
  2098. VersionSetAtomicGroupTest()
  2099. : VersionSetTestBase("version_set_atomic_group_test") {}
  2100. explicit VersionSetAtomicGroupTest(const std::string& name)
  2101. : VersionSetTestBase(name) {}
  2102. void SetUp() override {
  2103. PrepareManifest(&column_families_, &last_seqno_, &log_writer_);
  2104. SetupTestSyncPoints();
  2105. }
  2106. void SetupValidAtomicGroup(int atomic_group_size) {
  2107. edits_.resize(atomic_group_size);
  2108. int remaining = atomic_group_size;
  2109. for (size_t i = 0; i != edits_.size(); ++i) {
  2110. edits_[i].SetLogNumber(0);
  2111. edits_[i].SetNextFile(2);
  2112. edits_[i].MarkAtomicGroup(--remaining);
  2113. edits_[i].SetLastSequence(last_seqno_++);
  2114. }
  2115. CreateCurrentFile();
  2116. }
  2117. void SetupIncompleteTrailingAtomicGroup(int atomic_group_size) {
  2118. edits_.resize(atomic_group_size);
  2119. int remaining = atomic_group_size;
  2120. for (size_t i = 0; i != edits_.size(); ++i) {
  2121. edits_[i].SetLogNumber(0);
  2122. edits_[i].SetNextFile(2);
  2123. edits_[i].MarkAtomicGroup(--remaining);
  2124. edits_[i].SetLastSequence(last_seqno_++);
  2125. }
  2126. CreateCurrentFile();
  2127. }
  2128. void SetupCorruptedAtomicGroup(int atomic_group_size) {
  2129. edits_.resize(atomic_group_size);
  2130. int remaining = atomic_group_size;
  2131. for (size_t i = 0; i != edits_.size(); ++i) {
  2132. edits_[i].SetLogNumber(0);
  2133. edits_[i].SetNextFile(2);
  2134. if (i != ((size_t)atomic_group_size / 2)) {
  2135. edits_[i].MarkAtomicGroup(--remaining);
  2136. }
  2137. edits_[i].SetLastSequence(last_seqno_++);
  2138. }
  2139. CreateCurrentFile();
  2140. }
  2141. void SetupIncorrectAtomicGroup(int atomic_group_size) {
  2142. edits_.resize(atomic_group_size);
  2143. int remaining = atomic_group_size;
  2144. for (size_t i = 0; i != edits_.size(); ++i) {
  2145. edits_[i].SetLogNumber(0);
  2146. edits_[i].SetNextFile(2);
  2147. if (i != 1) {
  2148. edits_[i].MarkAtomicGroup(--remaining);
  2149. } else {
  2150. edits_[i].MarkAtomicGroup(remaining--);
  2151. }
  2152. edits_[i].SetLastSequence(last_seqno_++);
  2153. }
  2154. CreateCurrentFile();
  2155. }
  2156. void SetupTestSyncPoints() {
  2157. SyncPoint::GetInstance()->DisableProcessing();
  2158. SyncPoint::GetInstance()->ClearAllCallBacks();
  2159. SyncPoint::GetInstance()->SetCallBack(
  2160. "AtomicGroupReadBuffer::AddEdit:FirstInAtomicGroup", [&](void* arg) {
  2161. VersionEdit* e = static_cast<VersionEdit*>(arg);
  2162. EXPECT_EQ(edits_.front().DebugString(),
  2163. e->DebugString()); // compare based on value
  2164. first_in_atomic_group_ = true;
  2165. });
  2166. SyncPoint::GetInstance()->SetCallBack(
  2167. "AtomicGroupReadBuffer::AddEdit:LastInAtomicGroup", [&](void* arg) {
  2168. VersionEdit* e = static_cast<VersionEdit*>(arg);
  2169. EXPECT_EQ(edits_.back().DebugString(),
  2170. e->DebugString()); // compare based on value
  2171. EXPECT_TRUE(first_in_atomic_group_);
  2172. last_in_atomic_group_ = true;
  2173. });
  2174. SyncPoint::GetInstance()->SetCallBack(
  2175. "VersionEditHandlerBase::Iterate:Finish",
  2176. [&](void* arg) { num_recovered_edits_ = *static_cast<size_t*>(arg); });
  2177. SyncPoint::GetInstance()->SetCallBack(
  2178. "AtomicGroupReadBuffer::AddEdit:AtomicGroup",
  2179. [&](void* /* arg */) { ++num_edits_in_atomic_group_; });
  2180. SyncPoint::GetInstance()->SetCallBack(
  2181. "AtomicGroupReadBuffer::AddEdit:AtomicGroupMixedWithNormalEdits",
  2182. [&](void* arg) { corrupted_edit_ = *static_cast<VersionEdit*>(arg); });
  2183. SyncPoint::GetInstance()->SetCallBack(
  2184. "AtomicGroupReadBuffer::AddEdit:IncorrectAtomicGroupSize",
  2185. [&](void* arg) {
  2186. edit_with_incorrect_group_size_ = *static_cast<VersionEdit*>(arg);
  2187. });
  2188. SyncPoint::GetInstance()->EnableProcessing();
  2189. }
  2190. void AddNewEditsToLog(int num_edits) {
  2191. for (int i = 0; i < num_edits; i++) {
  2192. std::string record;
  2193. edits_[i].EncodeTo(&record, 0 /* ts_sz */);
  2194. ASSERT_OK(log_writer_->AddRecord(WriteOptions(), record));
  2195. }
  2196. }
  2197. void TearDown() override {
  2198. SyncPoint::GetInstance()->DisableProcessing();
  2199. SyncPoint::GetInstance()->ClearAllCallBacks();
  2200. log_writer_.reset();
  2201. }
  2202. protected:
  2203. std::vector<ColumnFamilyDescriptor> column_families_;
  2204. SequenceNumber last_seqno_;
  2205. std::vector<VersionEdit> edits_;
  2206. bool first_in_atomic_group_ = false;
  2207. bool last_in_atomic_group_ = false;
  2208. int num_edits_in_atomic_group_ = 0;
  2209. size_t num_recovered_edits_ = 0;
  2210. VersionEdit corrupted_edit_;
  2211. VersionEdit edit_with_incorrect_group_size_;
  2212. std::unique_ptr<log::Writer> log_writer_;
  2213. };
  2214. TEST_F(VersionSetAtomicGroupTest, HandleValidAtomicGroupWithVersionSetRecover) {
  2215. const int kAtomicGroupSize = 3;
  2216. SetupValidAtomicGroup(kAtomicGroupSize);
  2217. AddNewEditsToLog(kAtomicGroupSize);
  2218. EXPECT_OK(versions_->Recover(column_families_, false));
  2219. EXPECT_EQ(column_families_.size(),
  2220. versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
  2221. EXPECT_TRUE(first_in_atomic_group_);
  2222. EXPECT_TRUE(last_in_atomic_group_);
  2223. EXPECT_EQ(num_initial_edits_ + kAtomicGroupSize, num_recovered_edits_);
  2224. }
  2225. TEST_F(VersionSetAtomicGroupTest,
  2226. HandleValidAtomicGroupWithReactiveVersionSetRecover) {
  2227. const int kAtomicGroupSize = 3;
  2228. SetupValidAtomicGroup(kAtomicGroupSize);
  2229. AddNewEditsToLog(kAtomicGroupSize);
  2230. std::unique_ptr<log::FragmentBufferedReader> manifest_reader;
  2231. std::unique_ptr<log::Reader::Reporter> manifest_reporter;
  2232. std::unique_ptr<Status> manifest_reader_status;
  2233. EXPECT_OK(reactive_versions_->Recover(column_families_, &manifest_reader,
  2234. &manifest_reporter,
  2235. &manifest_reader_status));
  2236. EXPECT_EQ(column_families_.size(),
  2237. reactive_versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
  2238. EXPECT_TRUE(first_in_atomic_group_);
  2239. EXPECT_TRUE(last_in_atomic_group_);
  2240. // The recover should clean up the replay buffer.
  2241. EXPECT_TRUE(reactive_versions_->TEST_read_edits_in_atomic_group() == 0);
  2242. EXPECT_TRUE(reactive_versions_->replay_buffer().size() == 0);
  2243. EXPECT_EQ(num_initial_edits_ + kAtomicGroupSize, num_recovered_edits_);
  2244. }
  2245. TEST_F(VersionSetAtomicGroupTest,
  2246. HandleValidAtomicGroupWithReactiveVersionSetReadAndApply) {
  2247. const int kAtomicGroupSize = 3;
  2248. SetupValidAtomicGroup(kAtomicGroupSize);
  2249. std::unique_ptr<log::FragmentBufferedReader> manifest_reader;
  2250. std::unique_ptr<log::Reader::Reporter> manifest_reporter;
  2251. std::unique_ptr<Status> manifest_reader_status;
  2252. EXPECT_OK(reactive_versions_->Recover(column_families_, &manifest_reader,
  2253. &manifest_reporter,
  2254. &manifest_reader_status));
  2255. EXPECT_EQ(num_initial_edits_, num_recovered_edits_);
  2256. AddNewEditsToLog(kAtomicGroupSize);
  2257. InstrumentedMutex mu;
  2258. std::unordered_set<ColumnFamilyData*> cfds_changed;
  2259. mu.Lock();
  2260. EXPECT_OK(reactive_versions_->ReadAndApply(
  2261. &mu, &manifest_reader, manifest_reader_status.get(), &cfds_changed,
  2262. /*files_to_delete=*/nullptr));
  2263. mu.Unlock();
  2264. EXPECT_TRUE(first_in_atomic_group_);
  2265. EXPECT_TRUE(last_in_atomic_group_);
  2266. // The recover should clean up the replay buffer.
  2267. EXPECT_TRUE(reactive_versions_->TEST_read_edits_in_atomic_group() == 0);
  2268. EXPECT_TRUE(reactive_versions_->replay_buffer().size() == 0);
  2269. EXPECT_EQ(kAtomicGroupSize, num_recovered_edits_);
  2270. }
  2271. TEST_F(VersionSetAtomicGroupTest,
  2272. HandleIncompleteTrailingAtomicGroupWithVersionSetRecover) {
  2273. const int kAtomicGroupSize = 4;
  2274. const int kNumberOfPersistedVersionEdits = kAtomicGroupSize - 1;
  2275. SetupIncompleteTrailingAtomicGroup(kAtomicGroupSize);
  2276. AddNewEditsToLog(kNumberOfPersistedVersionEdits);
  2277. EXPECT_OK(versions_->Recover(column_families_, false));
  2278. EXPECT_EQ(column_families_.size(),
  2279. versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
  2280. EXPECT_TRUE(first_in_atomic_group_);
  2281. EXPECT_FALSE(last_in_atomic_group_);
  2282. EXPECT_EQ(kNumberOfPersistedVersionEdits, num_edits_in_atomic_group_);
  2283. EXPECT_EQ(num_initial_edits_, num_recovered_edits_);
  2284. }
  2285. TEST_F(VersionSetAtomicGroupTest,
  2286. HandleIncompleteTrailingAtomicGroupWithReactiveVersionSetRecover) {
  2287. const int kAtomicGroupSize = 4;
  2288. const int kNumberOfPersistedVersionEdits = kAtomicGroupSize - 1;
  2289. SetupIncompleteTrailingAtomicGroup(kAtomicGroupSize);
  2290. AddNewEditsToLog(kNumberOfPersistedVersionEdits);
  2291. std::unique_ptr<log::FragmentBufferedReader> manifest_reader;
  2292. std::unique_ptr<log::Reader::Reporter> manifest_reporter;
  2293. std::unique_ptr<Status> manifest_reader_status;
  2294. EXPECT_OK(reactive_versions_->Recover(column_families_, &manifest_reader,
  2295. &manifest_reporter,
  2296. &manifest_reader_status));
  2297. EXPECT_EQ(column_families_.size(),
  2298. reactive_versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
  2299. EXPECT_TRUE(first_in_atomic_group_);
  2300. EXPECT_FALSE(last_in_atomic_group_);
  2301. EXPECT_EQ(kNumberOfPersistedVersionEdits, num_edits_in_atomic_group_);
  2302. // Reactive version set should store the edits in the replay buffer.
  2303. EXPECT_TRUE(reactive_versions_->TEST_read_edits_in_atomic_group() ==
  2304. kNumberOfPersistedVersionEdits);
  2305. EXPECT_TRUE(reactive_versions_->replay_buffer().size() == kAtomicGroupSize);
  2306. // Write the last record. The reactive version set should now apply all
  2307. // edits.
  2308. std::string last_record;
  2309. edits_[kAtomicGroupSize - 1].EncodeTo(&last_record);
  2310. EXPECT_OK(log_writer_->AddRecord(WriteOptions(), last_record));
  2311. InstrumentedMutex mu;
  2312. std::unordered_set<ColumnFamilyData*> cfds_changed;
  2313. mu.Lock();
  2314. EXPECT_OK(reactive_versions_->ReadAndApply(
  2315. &mu, &manifest_reader, manifest_reader_status.get(), &cfds_changed,
  2316. /*files_to_delete=*/nullptr));
  2317. mu.Unlock();
  2318. // Reactive version set should be empty now.
  2319. EXPECT_TRUE(reactive_versions_->TEST_read_edits_in_atomic_group() == 0);
  2320. EXPECT_TRUE(reactive_versions_->replay_buffer().size() == 0);
  2321. EXPECT_EQ(num_initial_edits_, num_recovered_edits_);
  2322. }
  2323. TEST_F(VersionSetAtomicGroupTest,
  2324. HandleIncompleteTrailingAtomicGroupWithReactiveVersionSetReadAndApply) {
  2325. const int kAtomicGroupSize = 4;
  2326. const int kNumberOfPersistedVersionEdits = kAtomicGroupSize - 1;
  2327. SetupIncompleteTrailingAtomicGroup(kAtomicGroupSize);
  2328. std::unique_ptr<log::FragmentBufferedReader> manifest_reader;
  2329. std::unique_ptr<log::Reader::Reporter> manifest_reporter;
  2330. std::unique_ptr<Status> manifest_reader_status;
  2331. // No edits in an atomic group.
  2332. EXPECT_OK(reactive_versions_->Recover(column_families_, &manifest_reader,
  2333. &manifest_reporter,
  2334. &manifest_reader_status));
  2335. EXPECT_EQ(column_families_.size(),
  2336. reactive_versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
  2337. EXPECT_EQ(num_initial_edits_, num_recovered_edits_);
  2338. // Write a few edits in an atomic group.
  2339. AddNewEditsToLog(kNumberOfPersistedVersionEdits);
  2340. InstrumentedMutex mu;
  2341. std::unordered_set<ColumnFamilyData*> cfds_changed;
  2342. mu.Lock();
  2343. EXPECT_OK(reactive_versions_->ReadAndApply(
  2344. &mu, &manifest_reader, manifest_reader_status.get(), &cfds_changed,
  2345. /*files_to_delete=*/nullptr));
  2346. mu.Unlock();
  2347. EXPECT_TRUE(first_in_atomic_group_);
  2348. EXPECT_FALSE(last_in_atomic_group_);
  2349. EXPECT_EQ(kNumberOfPersistedVersionEdits, num_edits_in_atomic_group_);
  2350. // Reactive version set should store the edits in the replay buffer.
  2351. EXPECT_TRUE(reactive_versions_->TEST_read_edits_in_atomic_group() ==
  2352. kNumberOfPersistedVersionEdits);
  2353. EXPECT_TRUE(reactive_versions_->replay_buffer().size() == kAtomicGroupSize);
  2354. }
  2355. TEST_F(VersionSetAtomicGroupTest,
  2356. HandleCorruptedAtomicGroupWithVersionSetRecover) {
  2357. const int kAtomicGroupSize = 4;
  2358. SetupCorruptedAtomicGroup(kAtomicGroupSize);
  2359. AddNewEditsToLog(kAtomicGroupSize);
  2360. EXPECT_NOK(versions_->Recover(column_families_, false));
  2361. EXPECT_EQ(column_families_.size(),
  2362. versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
  2363. EXPECT_EQ(edits_[kAtomicGroupSize / 2].DebugString(),
  2364. corrupted_edit_.DebugString());
  2365. }
  2366. TEST_F(VersionSetAtomicGroupTest,
  2367. HandleCorruptedAtomicGroupWithReactiveVersionSetRecover) {
  2368. const int kAtomicGroupSize = 4;
  2369. SetupCorruptedAtomicGroup(kAtomicGroupSize);
  2370. AddNewEditsToLog(kAtomicGroupSize);
  2371. std::unique_ptr<log::FragmentBufferedReader> manifest_reader;
  2372. std::unique_ptr<log::Reader::Reporter> manifest_reporter;
  2373. std::unique_ptr<Status> manifest_reader_status;
  2374. EXPECT_NOK(reactive_versions_->Recover(column_families_, &manifest_reader,
  2375. &manifest_reporter,
  2376. &manifest_reader_status));
  2377. EXPECT_EQ(column_families_.size(),
  2378. reactive_versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
  2379. EXPECT_EQ(edits_[kAtomicGroupSize / 2].DebugString(),
  2380. corrupted_edit_.DebugString());
  2381. }
  2382. TEST_F(VersionSetAtomicGroupTest,
  2383. HandleCorruptedAtomicGroupWithReactiveVersionSetReadAndApply) {
  2384. const int kAtomicGroupSize = 4;
  2385. SetupCorruptedAtomicGroup(kAtomicGroupSize);
  2386. InstrumentedMutex mu;
  2387. std::unordered_set<ColumnFamilyData*> cfds_changed;
  2388. std::unique_ptr<log::FragmentBufferedReader> manifest_reader;
  2389. std::unique_ptr<log::Reader::Reporter> manifest_reporter;
  2390. std::unique_ptr<Status> manifest_reader_status;
  2391. EXPECT_OK(reactive_versions_->Recover(column_families_, &manifest_reader,
  2392. &manifest_reporter,
  2393. &manifest_reader_status));
  2394. // Write the corrupted edits.
  2395. AddNewEditsToLog(kAtomicGroupSize);
  2396. mu.Lock();
  2397. EXPECT_NOK(reactive_versions_->ReadAndApply(
  2398. &mu, &manifest_reader, manifest_reader_status.get(), &cfds_changed,
  2399. /*files_to_delete=*/nullptr));
  2400. mu.Unlock();
  2401. EXPECT_EQ(edits_[kAtomicGroupSize / 2].DebugString(),
  2402. corrupted_edit_.DebugString());
  2403. }
  2404. TEST_F(VersionSetAtomicGroupTest,
  2405. HandleIncorrectAtomicGroupSizeWithVersionSetRecover) {
  2406. const int kAtomicGroupSize = 4;
  2407. SetupIncorrectAtomicGroup(kAtomicGroupSize);
  2408. AddNewEditsToLog(kAtomicGroupSize);
  2409. EXPECT_NOK(versions_->Recover(column_families_, false));
  2410. EXPECT_EQ(column_families_.size(),
  2411. versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
  2412. EXPECT_EQ(edits_[1].DebugString(),
  2413. edit_with_incorrect_group_size_.DebugString());
  2414. }
  2415. TEST_F(VersionSetAtomicGroupTest,
  2416. HandleIncorrectAtomicGroupSizeWithReactiveVersionSetRecover) {
  2417. const int kAtomicGroupSize = 4;
  2418. SetupIncorrectAtomicGroup(kAtomicGroupSize);
  2419. AddNewEditsToLog(kAtomicGroupSize);
  2420. std::unique_ptr<log::FragmentBufferedReader> manifest_reader;
  2421. std::unique_ptr<log::Reader::Reporter> manifest_reporter;
  2422. std::unique_ptr<Status> manifest_reader_status;
  2423. EXPECT_NOK(reactive_versions_->Recover(column_families_, &manifest_reader,
  2424. &manifest_reporter,
  2425. &manifest_reader_status));
  2426. EXPECT_EQ(column_families_.size(),
  2427. reactive_versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
  2428. EXPECT_EQ(edits_[1].DebugString(),
  2429. edit_with_incorrect_group_size_.DebugString());
  2430. }
  2431. TEST_F(VersionSetAtomicGroupTest,
  2432. HandleIncorrectAtomicGroupSizeWithReactiveVersionSetReadAndApply) {
  2433. const int kAtomicGroupSize = 4;
  2434. SetupIncorrectAtomicGroup(kAtomicGroupSize);
  2435. InstrumentedMutex mu;
  2436. std::unordered_set<ColumnFamilyData*> cfds_changed;
  2437. std::unique_ptr<log::FragmentBufferedReader> manifest_reader;
  2438. std::unique_ptr<log::Reader::Reporter> manifest_reporter;
  2439. std::unique_ptr<Status> manifest_reader_status;
  2440. EXPECT_OK(reactive_versions_->Recover(column_families_, &manifest_reader,
  2441. &manifest_reporter,
  2442. &manifest_reader_status));
  2443. AddNewEditsToLog(kAtomicGroupSize);
  2444. mu.Lock();
  2445. EXPECT_NOK(reactive_versions_->ReadAndApply(
  2446. &mu, &manifest_reader, manifest_reader_status.get(), &cfds_changed,
  2447. /*files_to_delete=*/nullptr));
  2448. mu.Unlock();
  2449. EXPECT_EQ(edits_[1].DebugString(),
  2450. edit_with_incorrect_group_size_.DebugString());
  2451. }
  2452. class AtomicGroupBestEffortRecoveryTest : public VersionSetAtomicGroupTest {
  2453. public:
  2454. AtomicGroupBestEffortRecoveryTest()
  2455. : VersionSetAtomicGroupTest("atomic_group_best_effort_recovery_test") {}
  2456. };
  2457. TEST_F(AtomicGroupBestEffortRecoveryTest,
  2458. HandleAtomicGroupUpdatesValidInitially) {
  2459. // One AtomicGroup contains updates that are valid at the outset.
  2460. std::vector<SstInfo> file_infos;
  2461. for (int cfid = 0; cfid < kNumColumnFamilies; cfid++) {
  2462. int file_number = 10 + cfid;
  2463. file_infos.emplace_back(file_number, column_families_[cfid].name,
  2464. "" /* key */, 0 /* level */,
  2465. file_number /* epoch_number */);
  2466. }
  2467. std::vector<FileMetaData> file_metas;
  2468. CreateDummyTableFiles(file_infos, &file_metas);
  2469. edits_.clear();
  2470. for (int cfid = 0; cfid < kNumColumnFamilies; cfid++) {
  2471. edits_.emplace_back();
  2472. edits_.back().SetColumnFamily(cfid);
  2473. edits_.back().AddFile(0 /* level */, file_metas[cfid]);
  2474. edits_.back().SetLastSequence(++last_seqno_);
  2475. edits_.back().MarkAtomicGroup(kNumColumnFamilies - 1 -
  2476. cfid /* remaining_entries */);
  2477. }
  2478. AddNewEditsToLog(kNumColumnFamilies);
  2479. {
  2480. bool has_missing_table_file;
  2481. ASSERT_OK(versions_->TryRecover(column_families_, false /* read_only */,
  2482. {DescriptorFileName(1 /* number */)},
  2483. nullptr /* db_id */,
  2484. &has_missing_table_file));
  2485. ASSERT_FALSE(has_missing_table_file);
  2486. }
  2487. std::vector<uint64_t> all_table_files;
  2488. std::vector<uint64_t> all_blob_files;
  2489. versions_->AddLiveFiles(&all_table_files, &all_blob_files);
  2490. ASSERT_EQ(file_metas.size(), all_table_files.size());
  2491. }
  2492. TEST_F(AtomicGroupBestEffortRecoveryTest, HandleAtomicGroupUpdatesValidLater) {
  2493. // One AtomicGroup contains updates that become valid after applying further
  2494. // updates.
  2495. // `SetupTestSyncPoints()` creates sync points that assume there is only one
  2496. // AtomicGroup, which is not the case in this test.
  2497. SyncPoint::GetInstance()->DisableProcessing();
  2498. SyncPoint::GetInstance()->ClearAllCallBacks();
  2499. std::vector<SstInfo> file_infos;
  2500. for (int cfid = 0; cfid < kNumColumnFamilies; cfid++) {
  2501. int file_number = 10 + cfid;
  2502. file_infos.emplace_back(file_number, column_families_[cfid].name,
  2503. "" /* key */, 0 /* level */,
  2504. file_number /* epoch_number */);
  2505. }
  2506. std::vector<FileMetaData> file_metas;
  2507. CreateDummyTableFiles(file_infos, &file_metas);
  2508. edits_.clear();
  2509. for (int cfid = 0; cfid < kNumColumnFamilies; cfid++) {
  2510. if (cfid == kNumColumnFamilies - 1) {
  2511. // Corrupt the number of the last file.
  2512. file_metas[cfid].fd.packed_number_and_path_id =
  2513. PackFileNumberAndPathId(20 /* number */, 0 /* path_id */);
  2514. }
  2515. edits_.emplace_back();
  2516. edits_.back().SetColumnFamily(cfid);
  2517. edits_.back().AddFile(0 /* level */, file_metas[cfid]);
  2518. edits_.back().SetLastSequence(++last_seqno_);
  2519. edits_.back().MarkAtomicGroup(kNumColumnFamilies - 1 -
  2520. cfid /* remaining_entries */);
  2521. }
  2522. AddNewEditsToLog(kNumColumnFamilies);
  2523. {
  2524. // Delete the file with the corrupted number.
  2525. VersionEdit fixup_edit;
  2526. fixup_edit.SetColumnFamily(kNumColumnFamilies - 1);
  2527. fixup_edit.DeleteFile(0 /* level */, 20 /* number */);
  2528. assert(log_writer_.get() != nullptr);
  2529. std::string record;
  2530. ASSERT_TRUE(fixup_edit.EncodeTo(&record, 0 /* ts_sz */));
  2531. ASSERT_OK(log_writer_->AddRecord(WriteOptions(), record));
  2532. // Throw in an impossible AtomicGroup afterwards for extra challenge.
  2533. VersionEdit broken_edit;
  2534. broken_edit.SetColumnFamily(0 /* column_family_id */);
  2535. file_metas[0].fd.packed_number_and_path_id =
  2536. PackFileNumberAndPathId(30 /* number */, 0 /* path_id */);
  2537. broken_edit.AddFile(0 /* level */, file_metas[0]);
  2538. broken_edit.SetLastSequence(++last_seqno_);
  2539. broken_edit.MarkAtomicGroup(0 /* remaining_entries */);
  2540. record.clear();
  2541. ASSERT_TRUE(broken_edit.EncodeTo(&record, 0 /* ts_sz */));
  2542. ASSERT_OK(log_writer_->AddRecord(WriteOptions(), record));
  2543. assert(log_writer_.get() != nullptr);
  2544. }
  2545. {
  2546. bool has_missing_table_file = false;
  2547. ASSERT_OK(versions_->TryRecover(column_families_, false /* read_only */,
  2548. {DescriptorFileName(1 /* number */)},
  2549. nullptr /* db_id */,
  2550. &has_missing_table_file));
  2551. ASSERT_TRUE(has_missing_table_file);
  2552. }
  2553. std::vector<uint64_t> all_table_files;
  2554. std::vector<uint64_t> all_blob_files;
  2555. versions_->AddLiveFiles(&all_table_files, &all_blob_files);
  2556. ASSERT_EQ(file_metas.size() - 1, all_table_files.size());
  2557. }
  2558. TEST_F(AtomicGroupBestEffortRecoveryTest, HandleAtomicGroupUpdatesInvalid) {
  2559. // One AtomicGroup contains updates that never become valid.
  2560. std::vector<SstInfo> file_infos;
  2561. for (int cfid = 0; cfid < kNumColumnFamilies; cfid++) {
  2562. int file_number = 10 + cfid;
  2563. file_infos.emplace_back(file_number, column_families_[cfid].name,
  2564. "" /* key */, 0 /* level */,
  2565. file_number /* epoch_number */);
  2566. }
  2567. std::vector<FileMetaData> file_metas;
  2568. CreateDummyTableFiles(file_infos, &file_metas);
  2569. edits_.clear();
  2570. for (int cfid = 0; cfid < kNumColumnFamilies; cfid++) {
  2571. if (cfid == kNumColumnFamilies - 1) {
  2572. // Corrupt the number of the last file.
  2573. file_metas[cfid].fd.packed_number_and_path_id =
  2574. PackFileNumberAndPathId(20 /* number */, 0 /* path_id */);
  2575. }
  2576. edits_.emplace_back();
  2577. edits_.back().SetColumnFamily(cfid);
  2578. edits_.back().AddFile(0 /* level */, file_metas[cfid]);
  2579. edits_.back().SetLastSequence(++last_seqno_);
  2580. edits_.back().MarkAtomicGroup(kNumColumnFamilies - 1 -
  2581. cfid /* remaining_entries */);
  2582. }
  2583. AddNewEditsToLog(kNumColumnFamilies);
  2584. {
  2585. bool has_missing_table_file = false;
  2586. ASSERT_OK(versions_->TryRecover(column_families_, false /* read_only */,
  2587. {DescriptorFileName(1 /* number */)},
  2588. nullptr /* db_id */,
  2589. &has_missing_table_file));
  2590. ASSERT_TRUE(has_missing_table_file);
  2591. }
  2592. std::vector<uint64_t> all_table_files;
  2593. std::vector<uint64_t> all_blob_files;
  2594. versions_->AddLiveFiles(&all_table_files, &all_blob_files);
  2595. ASSERT_TRUE(all_table_files.empty());
  2596. }
  2597. TEST_F(AtomicGroupBestEffortRecoveryTest,
  2598. HandleAtomicGroupUpdatesValidTooLate) {
  2599. // One AtomicGroup contains updates that become valid after the next
  2600. // AtomicGroup is reached, which is too late.
  2601. // `SetupTestSyncPoints()` creates sync points that assume there is only one
  2602. // AtomicGroup, which is not the case in this test.
  2603. SyncPoint::GetInstance()->DisableProcessing();
  2604. SyncPoint::GetInstance()->ClearAllCallBacks();
  2605. std::vector<SstInfo> file_infos;
  2606. for (int cfid = 0; cfid < kNumColumnFamilies; cfid++) {
  2607. int file_number = 10 + cfid;
  2608. file_infos.emplace_back(file_number, column_families_[cfid].name,
  2609. "" /* key */, 0 /* level */,
  2610. file_number /* epoch_number */);
  2611. }
  2612. std::vector<FileMetaData> file_metas;
  2613. CreateDummyTableFiles(file_infos, &file_metas);
  2614. edits_.clear();
  2615. for (int cfid = 0; cfid < kNumColumnFamilies; cfid++) {
  2616. if (cfid == kNumColumnFamilies - 1) {
  2617. // Corrupt the number of the last file.
  2618. file_metas[cfid].fd.packed_number_and_path_id =
  2619. PackFileNumberAndPathId(20 /* number */, 0 /* path_id */);
  2620. }
  2621. edits_.emplace_back();
  2622. edits_.back().SetColumnFamily(cfid);
  2623. edits_.back().AddFile(0 /* level */, file_metas[cfid]);
  2624. edits_.back().SetLastSequence(++last_seqno_);
  2625. edits_.back().MarkAtomicGroup(kNumColumnFamilies - 1 -
  2626. cfid /* remaining_entries */);
  2627. }
  2628. AddNewEditsToLog(kNumColumnFamilies);
  2629. {
  2630. // Delete the file with the corrupted number. But bundle it in an
  2631. // AtomicGroup with an update that can never be applied.
  2632. VersionEdit broken_edit;
  2633. broken_edit.SetColumnFamily(0 /* column_family_id */);
  2634. file_metas[0].fd.packed_number_and_path_id =
  2635. PackFileNumberAndPathId(30 /* number */, 0 /* path_id */);
  2636. broken_edit.AddFile(0 /* level */, file_metas[0]);
  2637. broken_edit.SetLastSequence(++last_seqno_);
  2638. broken_edit.MarkAtomicGroup(1 /* remaining_entries */);
  2639. std::string record;
  2640. ASSERT_TRUE(broken_edit.EncodeTo(&record, 0 /* ts_sz */));
  2641. ASSERT_OK(log_writer_->AddRecord(WriteOptions(), record));
  2642. VersionEdit fixup_edit;
  2643. fixup_edit.SetColumnFamily(kNumColumnFamilies - 1);
  2644. fixup_edit.DeleteFile(0 /* level */, 20 /* number */);
  2645. fixup_edit.MarkAtomicGroup(0 /* remaining_entries */);
  2646. record.clear();
  2647. ASSERT_TRUE(fixup_edit.EncodeTo(&record, 0 /* ts_sz */));
  2648. ASSERT_OK(log_writer_->AddRecord(WriteOptions(), record));
  2649. assert(log_writer_.get() != nullptr);
  2650. }
  2651. {
  2652. bool has_missing_table_file = false;
  2653. ASSERT_OK(versions_->TryRecover(column_families_, false /* read_only */,
  2654. {DescriptorFileName(1 /* number */)},
  2655. nullptr /* db_id */,
  2656. &has_missing_table_file));
  2657. ASSERT_TRUE(has_missing_table_file);
  2658. }
  2659. std::vector<uint64_t> all_table_files;
  2660. std::vector<uint64_t> all_blob_files;
  2661. versions_->AddLiveFiles(&all_table_files, &all_blob_files);
  2662. ASSERT_TRUE(all_table_files.empty());
  2663. }
  2664. TEST_F(AtomicGroupBestEffortRecoveryTest,
  2665. HandleAtomicGroupUpdatesInDuplicateInvalid) {
  2666. // One AtomicGroup has multiple updates for the same CF. One of the earlier
  2667. // updates for this CF can lead to a valid state if applied. But the last
  2668. // update for this CF is invalid so the AtomicGroup must not be recovered.
  2669. std::vector<SstInfo> file_infos;
  2670. for (int cfid = 0; cfid < kNumColumnFamilies; cfid++) {
  2671. int file_number = 10 + cfid;
  2672. file_infos.emplace_back(file_number, column_families_[cfid].name,
  2673. "" /* key */, 0 /* level */,
  2674. file_number /* epoch_number */);
  2675. }
  2676. std::vector<FileMetaData> file_metas;
  2677. CreateDummyTableFiles(file_infos, &file_metas);
  2678. edits_.clear();
  2679. for (int cfid = 0; cfid < kNumColumnFamilies; cfid++) {
  2680. edits_.emplace_back();
  2681. edits_.back().SetColumnFamily(cfid);
  2682. edits_.back().AddFile(0 /* level */, file_metas[cfid]);
  2683. edits_.back().SetLastSequence(++last_seqno_);
  2684. edits_.back().MarkAtomicGroup(kNumColumnFamilies -
  2685. cfid /* remaining_entries */);
  2686. }
  2687. // Here is the unrecoverable update.
  2688. edits_.emplace_back();
  2689. edits_.back().SetColumnFamily(0 /* column_family_id */);
  2690. file_metas[0].fd.packed_number_and_path_id =
  2691. PackFileNumberAndPathId(20 /* number */, 0 /* path_id */);
  2692. edits_.back().AddFile(0 /* level */, file_metas[0]);
  2693. edits_.back().SetLastSequence(++last_seqno_);
  2694. edits_.back().MarkAtomicGroup(0 /* remaining_entries */);
  2695. AddNewEditsToLog(kNumColumnFamilies + 1);
  2696. {
  2697. bool has_missing_table_file = false;
  2698. ASSERT_OK(versions_->TryRecover(column_families_, false /* read_only */,
  2699. {DescriptorFileName(1 /* number */)},
  2700. nullptr /* db_id */,
  2701. &has_missing_table_file));
  2702. ASSERT_TRUE(has_missing_table_file);
  2703. }
  2704. std::vector<uint64_t> all_table_files;
  2705. std::vector<uint64_t> all_blob_files;
  2706. versions_->AddLiveFiles(&all_table_files, &all_blob_files);
  2707. ASSERT_TRUE(all_table_files.empty());
  2708. }
  2709. TEST_F(AtomicGroupBestEffortRecoveryTest,
  2710. HandleAtomicGroupMadeWholeByDeletingCf) {
  2711. // One AtomicGroup contains an update that becomes valid when its column
  2712. // family is deleted, making it irrelevant.
  2713. std::vector<SstInfo> file_infos;
  2714. for (int cfid = 0; cfid < kNumColumnFamilies; cfid++) {
  2715. int file_number = 10 + cfid;
  2716. file_infos.emplace_back(file_number, column_families_[cfid].name,
  2717. "" /* key */, 0 /* level */,
  2718. file_number /* epoch_number */);
  2719. }
  2720. std::vector<FileMetaData> file_metas;
  2721. CreateDummyTableFiles(file_infos, &file_metas);
  2722. edits_.clear();
  2723. for (int cfid = 0; cfid < kNumColumnFamilies; cfid++) {
  2724. if (cfid == kNumColumnFamilies - 1) {
  2725. // Corrupt the number of the last file.
  2726. file_metas[cfid].fd.packed_number_and_path_id =
  2727. PackFileNumberAndPathId(20 /* number */, 0 /* path_id */);
  2728. }
  2729. edits_.emplace_back();
  2730. edits_.back().SetColumnFamily(cfid);
  2731. edits_.back().AddFile(0 /* level */, file_metas[cfid]);
  2732. edits_.back().SetLastSequence(++last_seqno_);
  2733. edits_.back().MarkAtomicGroup(kNumColumnFamilies - 1 -
  2734. cfid /* remaining_entries */);
  2735. }
  2736. AddNewEditsToLog(kNumColumnFamilies);
  2737. {
  2738. // Delete the column family with the corrupted file number.
  2739. VersionEdit fixup_edit;
  2740. fixup_edit.DropColumnFamily();
  2741. fixup_edit.SetColumnFamily(kNumColumnFamilies - 1);
  2742. assert(log_writer_.get() != nullptr);
  2743. std::string record;
  2744. ASSERT_TRUE(fixup_edit.EncodeTo(&record, 0 /* ts_sz */));
  2745. ASSERT_OK(log_writer_->AddRecord(WriteOptions(), record));
  2746. }
  2747. {
  2748. bool has_missing_table_file = false;
  2749. ASSERT_OK(versions_->TryRecover(column_families_, false /* read_only */,
  2750. {DescriptorFileName(1 /* number */)},
  2751. nullptr /* db_id */,
  2752. &has_missing_table_file));
  2753. ASSERT_FALSE(has_missing_table_file);
  2754. }
  2755. std::vector<uint64_t> all_table_files;
  2756. std::vector<uint64_t> all_blob_files;
  2757. versions_->AddLiveFiles(&all_table_files, &all_blob_files);
  2758. ASSERT_EQ(file_metas.size() - 1, all_table_files.size());
  2759. }
  2760. TEST_F(AtomicGroupBestEffortRecoveryTest,
  2761. HandleAtomicGroupMadeWholeAfterNewCf) {
  2762. // One AtomicGroup contains updates that become valid after a new column
  2763. // family is added.
  2764. std::vector<SstInfo> file_infos;
  2765. for (int cfid = 0; cfid < kNumColumnFamilies; cfid++) {
  2766. int file_number = 10 + cfid;
  2767. file_infos.emplace_back(file_number, column_families_[cfid].name,
  2768. "" /* key */, 0 /* level */,
  2769. file_number /* epoch_number */);
  2770. }
  2771. std::vector<FileMetaData> file_metas;
  2772. CreateDummyTableFiles(file_infos, &file_metas);
  2773. edits_.clear();
  2774. for (int cfid = 0; cfid < kNumColumnFamilies; cfid++) {
  2775. if (cfid == kNumColumnFamilies - 1) {
  2776. // Corrupt the number of the last file.
  2777. file_metas[cfid].fd.packed_number_and_path_id =
  2778. PackFileNumberAndPathId(20 /* number */, 0 /* path_id */);
  2779. }
  2780. edits_.emplace_back();
  2781. edits_.back().SetColumnFamily(cfid);
  2782. edits_.back().AddFile(0 /* level */, file_metas[cfid]);
  2783. edits_.back().SetLastSequence(++last_seqno_);
  2784. edits_.back().MarkAtomicGroup(kNumColumnFamilies - 1 -
  2785. cfid /* remaining_entries */);
  2786. }
  2787. AddNewEditsToLog(kNumColumnFamilies);
  2788. {
  2789. // Add a new CF.
  2790. VersionEdit add_cf_edit;
  2791. add_cf_edit.AddColumnFamily("extra_cf");
  2792. add_cf_edit.SetColumnFamily(kNumColumnFamilies);
  2793. std::string record;
  2794. ASSERT_TRUE(add_cf_edit.EncodeTo(&record, 0 /* ts_sz */));
  2795. ASSERT_OK(log_writer_->AddRecord(WriteOptions(), record));
  2796. // Have the new CF refer to a non-existent file for an extra challenge.
  2797. VersionEdit broken_edit;
  2798. broken_edit.SetColumnFamily(kNumColumnFamilies);
  2799. file_metas[0].fd.packed_number_and_path_id =
  2800. PackFileNumberAndPathId(30 /* number */, 0 /* path_id */);
  2801. broken_edit.AddFile(0 /* level */, file_metas[0]);
  2802. broken_edit.SetLastSequence(++last_seqno_);
  2803. record.clear();
  2804. ASSERT_TRUE(broken_edit.EncodeTo(&record, 0 /* ts_sz */));
  2805. ASSERT_OK(log_writer_->AddRecord(WriteOptions(), record));
  2806. // This fixes up the first of the two non-existent file references.
  2807. VersionEdit fixup_edit;
  2808. fixup_edit.SetColumnFamily(kNumColumnFamilies - 1);
  2809. fixup_edit.DeleteFile(0 /* level */, 20 /* number */);
  2810. record.clear();
  2811. ASSERT_TRUE(fixup_edit.EncodeTo(&record, 0 /* ts_sz */));
  2812. ASSERT_OK(log_writer_->AddRecord(WriteOptions(), record));
  2813. assert(log_writer_.get() != nullptr);
  2814. }
  2815. {
  2816. bool has_missing_table_file = false;
  2817. std::vector<ColumnFamilyDescriptor> column_families = column_families_;
  2818. column_families.emplace_back("extra_cf", cf_options_);
  2819. ASSERT_OK(versions_->TryRecover(column_families, false /* read_only */,
  2820. {DescriptorFileName(1 /* number */)},
  2821. nullptr /* db_id */,
  2822. &has_missing_table_file));
  2823. ASSERT_TRUE(has_missing_table_file);
  2824. }
  2825. std::vector<uint64_t> all_table_files;
  2826. std::vector<uint64_t> all_blob_files;
  2827. versions_->AddLiveFiles(&all_table_files, &all_blob_files);
  2828. ASSERT_EQ(file_metas.size() - 1, all_table_files.size());
  2829. }
  2830. class VersionSetTestDropOneCF : public VersionSetTestBase,
  2831. public testing::TestWithParam<std::string> {
  2832. public:
  2833. VersionSetTestDropOneCF()
  2834. : VersionSetTestBase("version_set_test_drop_one_cf") {}
  2835. };
  2836. // This test simulates the following execution sequence
  2837. // Time thread1 bg_flush_thr
  2838. // | Prepare version edits (e1,e2,e3) for atomic
  2839. // | flush cf1, cf2, cf3
  2840. // | Enqueue e to drop cfi
  2841. // | to manifest_writers_
  2842. // | Enqueue (e1,e2,e3) to manifest_writers_
  2843. // |
  2844. // | Apply e,
  2845. // | cfi.IsDropped() is true
  2846. // | Apply (e1,e2,e3),
  2847. // | since cfi.IsDropped() == true, we need to
  2848. // | drop ei and write the rest to MANIFEST.
  2849. // V
  2850. //
  2851. // Repeat the test for i = 1, 2, 3 to simulate dropping the first, middle and
  2852. // last column family in an atomic group.
  2853. TEST_P(VersionSetTestDropOneCF, HandleDroppedColumnFamilyInAtomicGroup) {
  2854. const ReadOptions read_options;
  2855. const WriteOptions write_options;
  2856. std::vector<ColumnFamilyDescriptor> column_families;
  2857. SequenceNumber last_seqno;
  2858. std::unique_ptr<log::Writer> log_writer;
  2859. PrepareManifest(&column_families, &last_seqno, &log_writer);
  2860. CreateCurrentFile();
  2861. EXPECT_OK(versions_->Recover(column_families, false /* read_only */));
  2862. EXPECT_EQ(column_families.size(),
  2863. versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
  2864. const int kAtomicGroupSize = 3;
  2865. const std::vector<std::string> non_default_cf_names = {
  2866. kColumnFamilyName1, kColumnFamilyName2, kColumnFamilyName3};
  2867. // Drop one column family
  2868. VersionEdit drop_cf_edit;
  2869. drop_cf_edit.DropColumnFamily();
  2870. const std::string cf_to_drop_name(GetParam());
  2871. auto cfd_to_drop =
  2872. versions_->GetColumnFamilySet()->GetColumnFamily(cf_to_drop_name);
  2873. ASSERT_NE(nullptr, cfd_to_drop);
  2874. // Increase its refcount because cfd_to_drop is used later, and we need to
  2875. // prevent it from being deleted.
  2876. cfd_to_drop->Ref();
  2877. drop_cf_edit.SetColumnFamily(cfd_to_drop->GetID());
  2878. mutex_.Lock();
  2879. Status s = versions_->LogAndApply(cfd_to_drop, read_options, write_options,
  2880. &drop_cf_edit, &mutex_, nullptr);
  2881. mutex_.Unlock();
  2882. ASSERT_OK(s);
  2883. std::vector<VersionEdit> edits(kAtomicGroupSize);
  2884. uint32_t remaining = kAtomicGroupSize;
  2885. size_t i = 0;
  2886. autovector<ColumnFamilyData*> cfds;
  2887. autovector<autovector<VersionEdit*>> edit_lists;
  2888. for (const auto& cf_name : non_default_cf_names) {
  2889. auto cfd = (cf_name != cf_to_drop_name)
  2890. ? versions_->GetColumnFamilySet()->GetColumnFamily(cf_name)
  2891. : cfd_to_drop;
  2892. ASSERT_NE(nullptr, cfd);
  2893. cfds.push_back(cfd);
  2894. edits[i].SetColumnFamily(cfd->GetID());
  2895. edits[i].SetLogNumber(0);
  2896. edits[i].SetNextFile(2);
  2897. edits[i].MarkAtomicGroup(--remaining);
  2898. edits[i].SetLastSequence(last_seqno++);
  2899. autovector<VersionEdit*> tmp_edits;
  2900. tmp_edits.push_back(&edits[i]);
  2901. edit_lists.emplace_back(tmp_edits);
  2902. ++i;
  2903. }
  2904. int called = 0;
  2905. SyncPoint::GetInstance()->DisableProcessing();
  2906. SyncPoint::GetInstance()->ClearAllCallBacks();
  2907. SyncPoint::GetInstance()->SetCallBack(
  2908. "VersionSet::ProcessManifestWrites:CheckOneAtomicGroup", [&](void* arg) {
  2909. std::vector<VersionEdit*>* tmp_edits =
  2910. static_cast<std::vector<VersionEdit*>*>(arg);
  2911. EXPECT_EQ(kAtomicGroupSize - 1, tmp_edits->size());
  2912. for (const auto e : *tmp_edits) {
  2913. bool found = false;
  2914. for (const auto& e2 : edits) {
  2915. if (&e2 == e) {
  2916. found = true;
  2917. break;
  2918. }
  2919. }
  2920. ASSERT_TRUE(found);
  2921. }
  2922. ++called;
  2923. });
  2924. SyncPoint::GetInstance()->EnableProcessing();
  2925. mutex_.Lock();
  2926. s = versions_->LogAndApply(cfds, read_options, write_options, edit_lists,
  2927. &mutex_, nullptr);
  2928. mutex_.Unlock();
  2929. ASSERT_OK(s);
  2930. ASSERT_EQ(1, called);
  2931. cfd_to_drop->UnrefAndTryDelete();
  2932. }
  2933. INSTANTIATE_TEST_CASE_P(
  2934. AtomicGroup, VersionSetTestDropOneCF,
  2935. testing::Values(VersionSetTestBase::kColumnFamilyName1,
  2936. VersionSetTestBase::kColumnFamilyName2,
  2937. VersionSetTestBase::kColumnFamilyName3));
  2938. class EmptyDefaultCfNewManifest : public VersionSetTestBase,
  2939. public testing::Test {
  2940. public:
  2941. EmptyDefaultCfNewManifest() : VersionSetTestBase("version_set_new_db_test") {}
  2942. // Emulate DBImpl::NewDB()
  2943. void PrepareManifest(std::vector<ColumnFamilyDescriptor>* /*column_families*/,
  2944. SequenceNumber* /*last_seqno*/,
  2945. std::unique_ptr<log::Writer>* log_writer) override {
  2946. assert(log_writer != nullptr);
  2947. VersionEdit new_db;
  2948. new_db.SetLogNumber(0);
  2949. const std::string manifest_path = DescriptorFileName(dbname_, 1);
  2950. const auto& fs = env_->GetFileSystem();
  2951. std::unique_ptr<WritableFileWriter> file_writer;
  2952. Status s = WritableFileWriter::Create(
  2953. fs, manifest_path, fs->OptimizeForManifestWrite(env_options_),
  2954. &file_writer, nullptr);
  2955. ASSERT_OK(s);
  2956. log_writer->reset(new log::Writer(std::move(file_writer), 0, true));
  2957. std::string record;
  2958. ASSERT_TRUE(new_db.EncodeTo(&record));
  2959. s = (*log_writer)->AddRecord(WriteOptions(), record);
  2960. ASSERT_OK(s);
  2961. // Create new column family
  2962. VersionEdit new_cf;
  2963. new_cf.AddColumnFamily(VersionSetTestBase::kColumnFamilyName1);
  2964. new_cf.SetColumnFamily(1);
  2965. new_cf.SetLastSequence(2);
  2966. new_cf.SetNextFile(2);
  2967. record.clear();
  2968. ASSERT_TRUE(new_cf.EncodeTo(&record));
  2969. s = (*log_writer)->AddRecord(WriteOptions(), record);
  2970. ASSERT_OK(s);
  2971. }
  2972. protected:
  2973. bool write_dbid_to_manifest_ = false;
  2974. std::unique_ptr<log::Writer> log_writer_;
  2975. };
  2976. // Create db, create column family. Cf creation will switch to a new MANIFEST.
  2977. // Then reopen db, trying to recover.
  2978. TEST_F(EmptyDefaultCfNewManifest, Recover) {
  2979. PrepareManifest(nullptr, nullptr, &log_writer_);
  2980. log_writer_.reset();
  2981. CreateCurrentFile();
  2982. std::string manifest_path;
  2983. VerifyManifest(&manifest_path);
  2984. std::vector<ColumnFamilyDescriptor> column_families;
  2985. column_families.emplace_back(kDefaultColumnFamilyName, cf_options_);
  2986. column_families.emplace_back(VersionSetTestBase::kColumnFamilyName1,
  2987. cf_options_);
  2988. std::string db_id;
  2989. bool has_missing_table_file = false;
  2990. Status s = versions_->TryRecoverFromOneManifest(
  2991. manifest_path, column_families, false, &db_id, &has_missing_table_file);
  2992. ASSERT_OK(s);
  2993. ASSERT_FALSE(has_missing_table_file);
  2994. }
  2995. class VersionSetTestEmptyDb
  2996. : public VersionSetTestBase,
  2997. public testing::TestWithParam<
  2998. std::tuple<bool, bool, std::vector<std::string>>> {
  2999. public:
  3000. static const std::string kUnknownColumnFamilyName;
  3001. VersionSetTestEmptyDb() : VersionSetTestBase("version_set_test_empty_db") {}
  3002. protected:
  3003. void PrepareManifest(std::vector<ColumnFamilyDescriptor>* /*column_families*/,
  3004. SequenceNumber* /*last_seqno*/,
  3005. std::unique_ptr<log::Writer>* log_writer) override {
  3006. assert(nullptr != log_writer);
  3007. VersionEdit new_db;
  3008. if (db_options_.write_dbid_to_manifest) {
  3009. ASSERT_OK(SetIdentityFile(WriteOptions(), env_, dbname_,
  3010. Temperature::kUnknown));
  3011. DBOptions tmp_db_options;
  3012. tmp_db_options.env = env_;
  3013. std::unique_ptr<DBImpl> impl(new DBImpl(tmp_db_options, dbname_));
  3014. std::string db_id;
  3015. ASSERT_OK(impl->GetDbIdentityFromIdentityFile(IOOptions(), &db_id));
  3016. new_db.SetDBId(db_id);
  3017. }
  3018. const std::string manifest_path = DescriptorFileName(dbname_, 1);
  3019. const auto& fs = env_->GetFileSystem();
  3020. std::unique_ptr<WritableFileWriter> file_writer;
  3021. Status s = WritableFileWriter::Create(
  3022. fs, manifest_path, fs->OptimizeForManifestWrite(env_options_),
  3023. &file_writer, nullptr);
  3024. ASSERT_OK(s);
  3025. {
  3026. log_writer->reset(new log::Writer(std::move(file_writer), 0, false));
  3027. std::string record;
  3028. new_db.EncodeTo(&record);
  3029. s = (*log_writer)->AddRecord(WriteOptions(), record);
  3030. ASSERT_OK(s);
  3031. }
  3032. }
  3033. std::unique_ptr<log::Writer> log_writer_;
  3034. };
  3035. const std::string VersionSetTestEmptyDb::kUnknownColumnFamilyName = "unknown";
  3036. TEST_P(VersionSetTestEmptyDb, OpenFromIncompleteManifest0) {
  3037. db_options_.write_dbid_to_manifest = std::get<0>(GetParam());
  3038. PrepareManifest(nullptr, nullptr, &log_writer_);
  3039. log_writer_.reset();
  3040. CreateCurrentFile();
  3041. std::string manifest_path;
  3042. VerifyManifest(&manifest_path);
  3043. bool read_only = std::get<1>(GetParam());
  3044. const std::vector<std::string> cf_names = std::get<2>(GetParam());
  3045. std::vector<ColumnFamilyDescriptor> column_families;
  3046. for (const auto& cf_name : cf_names) {
  3047. column_families.emplace_back(cf_name, cf_options_);
  3048. }
  3049. std::string db_id;
  3050. bool has_missing_table_file = false;
  3051. Status s = versions_->TryRecoverFromOneManifest(
  3052. manifest_path, column_families, read_only, &db_id,
  3053. &has_missing_table_file);
  3054. auto iter =
  3055. std::find(cf_names.begin(), cf_names.end(), kDefaultColumnFamilyName);
  3056. if (iter == cf_names.end()) {
  3057. ASSERT_TRUE(s.IsInvalidArgument());
  3058. } else {
  3059. ASSERT_NE(s.ToString().find(manifest_path), std::string::npos);
  3060. ASSERT_TRUE(s.IsCorruption());
  3061. }
  3062. }
  3063. TEST_P(VersionSetTestEmptyDb, OpenFromIncompleteManifest1) {
  3064. db_options_.write_dbid_to_manifest = std::get<0>(GetParam());
  3065. PrepareManifest(nullptr, nullptr, &log_writer_);
  3066. // Only a subset of column families in the MANIFEST.
  3067. VersionEdit new_cf1;
  3068. new_cf1.AddColumnFamily(VersionSetTestBase::kColumnFamilyName1);
  3069. new_cf1.SetColumnFamily(1);
  3070. Status s;
  3071. {
  3072. std::string record;
  3073. new_cf1.EncodeTo(&record);
  3074. s = log_writer_->AddRecord(WriteOptions(), record);
  3075. ASSERT_OK(s);
  3076. }
  3077. log_writer_.reset();
  3078. CreateCurrentFile();
  3079. std::string manifest_path;
  3080. VerifyManifest(&manifest_path);
  3081. bool read_only = std::get<1>(GetParam());
  3082. const std::vector<std::string>& cf_names = std::get<2>(GetParam());
  3083. std::vector<ColumnFamilyDescriptor> column_families;
  3084. for (const auto& cf_name : cf_names) {
  3085. column_families.emplace_back(cf_name, cf_options_);
  3086. }
  3087. std::string db_id;
  3088. bool has_missing_table_file = false;
  3089. s = versions_->TryRecoverFromOneManifest(manifest_path, column_families,
  3090. read_only, &db_id,
  3091. &has_missing_table_file);
  3092. auto iter =
  3093. std::find(cf_names.begin(), cf_names.end(), kDefaultColumnFamilyName);
  3094. if (iter == cf_names.end()) {
  3095. ASSERT_TRUE(s.IsInvalidArgument());
  3096. } else {
  3097. ASSERT_NE(s.ToString().find(manifest_path), std::string::npos);
  3098. ASSERT_TRUE(s.IsCorruption());
  3099. }
  3100. }
  3101. TEST_P(VersionSetTestEmptyDb, OpenFromInCompleteManifest2) {
  3102. db_options_.write_dbid_to_manifest = std::get<0>(GetParam());
  3103. PrepareManifest(nullptr, nullptr, &log_writer_);
  3104. // Write all column families but no log_number, next_file_number and
  3105. // last_sequence.
  3106. const std::vector<std::string> all_cf_names = {
  3107. kDefaultColumnFamilyName, kColumnFamilyName1, kColumnFamilyName2,
  3108. kColumnFamilyName3};
  3109. uint32_t cf_id = 1;
  3110. Status s;
  3111. for (size_t i = 1; i != all_cf_names.size(); ++i) {
  3112. VersionEdit new_cf;
  3113. new_cf.AddColumnFamily(all_cf_names[i]);
  3114. new_cf.SetColumnFamily(cf_id++);
  3115. std::string record;
  3116. ASSERT_TRUE(new_cf.EncodeTo(&record));
  3117. s = log_writer_->AddRecord(WriteOptions(), record);
  3118. ASSERT_OK(s);
  3119. }
  3120. log_writer_.reset();
  3121. CreateCurrentFile();
  3122. std::string manifest_path;
  3123. VerifyManifest(&manifest_path);
  3124. bool read_only = std::get<1>(GetParam());
  3125. const std::vector<std::string>& cf_names = std::get<2>(GetParam());
  3126. std::vector<ColumnFamilyDescriptor> column_families;
  3127. for (const auto& cf_name : cf_names) {
  3128. column_families.emplace_back(cf_name, cf_options_);
  3129. }
  3130. std::string db_id;
  3131. bool has_missing_table_file = false;
  3132. s = versions_->TryRecoverFromOneManifest(manifest_path, column_families,
  3133. read_only, &db_id,
  3134. &has_missing_table_file);
  3135. auto iter =
  3136. std::find(cf_names.begin(), cf_names.end(), kDefaultColumnFamilyName);
  3137. if (iter == cf_names.end()) {
  3138. ASSERT_TRUE(s.IsInvalidArgument());
  3139. } else {
  3140. ASSERT_NE(s.ToString().find(manifest_path), std::string::npos);
  3141. ASSERT_TRUE(s.IsCorruption());
  3142. }
  3143. }
  3144. TEST_P(VersionSetTestEmptyDb, OpenManifestWithUnknownCF) {
  3145. db_options_.write_dbid_to_manifest = std::get<0>(GetParam());
  3146. PrepareManifest(nullptr, nullptr, &log_writer_);
  3147. // Write all column families but no log_number, next_file_number and
  3148. // last_sequence.
  3149. const std::vector<std::string> all_cf_names = {
  3150. kDefaultColumnFamilyName, kColumnFamilyName1, kColumnFamilyName2,
  3151. kColumnFamilyName3};
  3152. uint32_t cf_id = 1;
  3153. Status s;
  3154. for (size_t i = 1; i != all_cf_names.size(); ++i) {
  3155. VersionEdit new_cf;
  3156. new_cf.AddColumnFamily(all_cf_names[i]);
  3157. new_cf.SetColumnFamily(cf_id++);
  3158. std::string record;
  3159. ASSERT_TRUE(new_cf.EncodeTo(&record));
  3160. s = log_writer_->AddRecord(WriteOptions(), record);
  3161. ASSERT_OK(s);
  3162. }
  3163. {
  3164. VersionEdit tmp_edit;
  3165. tmp_edit.SetColumnFamily(4);
  3166. tmp_edit.SetLogNumber(0);
  3167. tmp_edit.SetNextFile(2);
  3168. tmp_edit.SetLastSequence(0);
  3169. std::string record;
  3170. ASSERT_TRUE(tmp_edit.EncodeTo(&record));
  3171. s = log_writer_->AddRecord(WriteOptions(), record);
  3172. ASSERT_OK(s);
  3173. }
  3174. log_writer_.reset();
  3175. CreateCurrentFile();
  3176. std::string manifest_path;
  3177. VerifyManifest(&manifest_path);
  3178. bool read_only = std::get<1>(GetParam());
  3179. const std::vector<std::string>& cf_names = std::get<2>(GetParam());
  3180. std::vector<ColumnFamilyDescriptor> column_families;
  3181. for (const auto& cf_name : cf_names) {
  3182. column_families.emplace_back(cf_name, cf_options_);
  3183. }
  3184. std::string db_id;
  3185. bool has_missing_table_file = false;
  3186. s = versions_->TryRecoverFromOneManifest(manifest_path, column_families,
  3187. read_only, &db_id,
  3188. &has_missing_table_file);
  3189. auto iter =
  3190. std::find(cf_names.begin(), cf_names.end(), kDefaultColumnFamilyName);
  3191. if (iter == cf_names.end()) {
  3192. ASSERT_TRUE(s.IsInvalidArgument());
  3193. } else {
  3194. ASSERT_NE(s.ToString().find(manifest_path), std::string::npos);
  3195. ASSERT_TRUE(s.IsCorruption());
  3196. }
  3197. }
  3198. TEST_P(VersionSetTestEmptyDb, OpenCompleteManifest) {
  3199. db_options_.write_dbid_to_manifest = std::get<0>(GetParam());
  3200. PrepareManifest(nullptr, nullptr, &log_writer_);
  3201. // Write all column families but no log_number, next_file_number and
  3202. // last_sequence.
  3203. const std::vector<std::string> all_cf_names = {
  3204. kDefaultColumnFamilyName, kColumnFamilyName1, kColumnFamilyName2,
  3205. kColumnFamilyName3};
  3206. uint32_t cf_id = 1;
  3207. Status s;
  3208. for (size_t i = 1; i != all_cf_names.size(); ++i) {
  3209. VersionEdit new_cf;
  3210. new_cf.AddColumnFamily(all_cf_names[i]);
  3211. new_cf.SetColumnFamily(cf_id++);
  3212. std::string record;
  3213. ASSERT_TRUE(new_cf.EncodeTo(&record));
  3214. s = log_writer_->AddRecord(WriteOptions(), record);
  3215. ASSERT_OK(s);
  3216. }
  3217. {
  3218. VersionEdit tmp_edit;
  3219. tmp_edit.SetLogNumber(0);
  3220. tmp_edit.SetNextFile(2);
  3221. tmp_edit.SetLastSequence(0);
  3222. std::string record;
  3223. ASSERT_TRUE(tmp_edit.EncodeTo(&record));
  3224. s = log_writer_->AddRecord(WriteOptions(), record);
  3225. ASSERT_OK(s);
  3226. }
  3227. log_writer_.reset();
  3228. CreateCurrentFile();
  3229. std::string manifest_path;
  3230. VerifyManifest(&manifest_path);
  3231. bool read_only = std::get<1>(GetParam());
  3232. const std::vector<std::string>& cf_names = std::get<2>(GetParam());
  3233. std::vector<ColumnFamilyDescriptor> column_families;
  3234. for (const auto& cf_name : cf_names) {
  3235. column_families.emplace_back(cf_name, cf_options_);
  3236. }
  3237. std::string db_id;
  3238. bool has_missing_table_file = false;
  3239. SaveAndRestore<bool> override_unchanging(&versions_->TEST_unchanging(),
  3240. read_only);
  3241. s = versions_->TryRecoverFromOneManifest(manifest_path, column_families,
  3242. read_only, &db_id,
  3243. &has_missing_table_file);
  3244. auto iter =
  3245. std::find(cf_names.begin(), cf_names.end(), kDefaultColumnFamilyName);
  3246. if (iter == cf_names.end()) {
  3247. ASSERT_TRUE(s.IsInvalidArgument());
  3248. } else if (read_only) {
  3249. ASSERT_OK(s);
  3250. ASSERT_FALSE(has_missing_table_file);
  3251. } else if (cf_names.size() == all_cf_names.size()) {
  3252. ASSERT_OK(s);
  3253. ASSERT_FALSE(has_missing_table_file);
  3254. } else if (cf_names.size() < all_cf_names.size()) {
  3255. ASSERT_TRUE(s.IsInvalidArgument());
  3256. } else {
  3257. ASSERT_OK(s);
  3258. ASSERT_FALSE(has_missing_table_file);
  3259. ColumnFamilyData* cfd = versions_->GetColumnFamilySet()->GetColumnFamily(
  3260. kUnknownColumnFamilyName);
  3261. ASSERT_EQ(nullptr, cfd);
  3262. }
  3263. }
  3264. INSTANTIATE_TEST_CASE_P(
  3265. BestEffortRecovery, VersionSetTestEmptyDb,
  3266. testing::Combine(
  3267. /*write_dbid_to_manifest=*/testing::Bool(),
  3268. /*read_only=*/testing::Bool(),
  3269. /*cf_names=*/
  3270. testing::Values(
  3271. std::vector<std::string>(),
  3272. std::vector<std::string>({kDefaultColumnFamilyName}),
  3273. std::vector<std::string>({VersionSetTestBase::kColumnFamilyName1,
  3274. VersionSetTestBase::kColumnFamilyName2,
  3275. VersionSetTestBase::kColumnFamilyName3}),
  3276. std::vector<std::string>({kDefaultColumnFamilyName,
  3277. VersionSetTestBase::kColumnFamilyName1}),
  3278. std::vector<std::string>({kDefaultColumnFamilyName,
  3279. VersionSetTestBase::kColumnFamilyName1,
  3280. VersionSetTestBase::kColumnFamilyName2,
  3281. VersionSetTestBase::kColumnFamilyName3}),
  3282. std::vector<std::string>(
  3283. {kDefaultColumnFamilyName,
  3284. VersionSetTestBase::kColumnFamilyName1,
  3285. VersionSetTestBase::kColumnFamilyName2,
  3286. VersionSetTestBase::kColumnFamilyName3,
  3287. VersionSetTestEmptyDb::kUnknownColumnFamilyName}))));
  3288. class VersionSetTestMissingFiles : public VersionSetTestBase,
  3289. public testing::Test {
  3290. public:
  3291. explicit VersionSetTestMissingFiles(
  3292. const std::string& test_name = "version_set_test_missing_files")
  3293. : VersionSetTestBase(test_name),
  3294. internal_comparator_(
  3295. std::make_shared<InternalKeyComparator>(options_.comparator)) {}
  3296. protected:
  3297. void PrepareManifest(std::vector<ColumnFamilyDescriptor>* column_families,
  3298. SequenceNumber* last_seqno,
  3299. std::unique_ptr<log::Writer>* log_writer) override {
  3300. assert(column_families != nullptr);
  3301. assert(last_seqno != nullptr);
  3302. assert(log_writer != nullptr);
  3303. ASSERT_OK(
  3304. SetIdentityFile(WriteOptions(), env_, dbname_, Temperature::kUnknown));
  3305. const std::string manifest = DescriptorFileName(dbname_, 1);
  3306. const auto& fs = env_->GetFileSystem();
  3307. std::unique_ptr<WritableFileWriter> file_writer;
  3308. Status s = WritableFileWriter::Create(
  3309. fs, manifest, fs->OptimizeForManifestWrite(env_options_), &file_writer,
  3310. nullptr);
  3311. ASSERT_OK(s);
  3312. log_writer->reset(new log::Writer(std::move(file_writer), 0, false));
  3313. VersionEdit new_db;
  3314. if (db_options_.write_dbid_to_manifest) {
  3315. DBOptions tmp_db_options;
  3316. tmp_db_options.env = env_;
  3317. std::unique_ptr<DBImpl> impl(new DBImpl(tmp_db_options, dbname_));
  3318. std::string db_id;
  3319. ASSERT_OK(impl->GetDbIdentityFromIdentityFile(IOOptions(), &db_id));
  3320. new_db.SetDBId(db_id);
  3321. }
  3322. {
  3323. std::string record;
  3324. ASSERT_TRUE(new_db.EncodeTo(&record));
  3325. s = (*log_writer)->AddRecord(WriteOptions(), record);
  3326. ASSERT_OK(s);
  3327. }
  3328. const std::vector<std::string> cf_names = {
  3329. kDefaultColumnFamilyName, kColumnFamilyName1, kColumnFamilyName2,
  3330. kColumnFamilyName3};
  3331. uint32_t cf_id = 1; // default cf id is 0
  3332. cf_options_.table_factory = table_factory_;
  3333. for (const auto& cf_name : cf_names) {
  3334. column_families->emplace_back(cf_name, cf_options_);
  3335. if (cf_name == kDefaultColumnFamilyName) {
  3336. continue;
  3337. }
  3338. VersionEdit new_cf;
  3339. new_cf.AddColumnFamily(cf_name);
  3340. new_cf.SetColumnFamily(cf_id);
  3341. std::string record;
  3342. ASSERT_TRUE(new_cf.EncodeTo(&record));
  3343. s = (*log_writer)->AddRecord(WriteOptions(), record);
  3344. ASSERT_OK(s);
  3345. VersionEdit cf_files;
  3346. cf_files.SetColumnFamily(cf_id);
  3347. cf_files.SetLogNumber(0);
  3348. record.clear();
  3349. ASSERT_TRUE(cf_files.EncodeTo(&record));
  3350. s = (*log_writer)->AddRecord(WriteOptions(), record);
  3351. ASSERT_OK(s);
  3352. ++cf_id;
  3353. }
  3354. SequenceNumber seq = 2;
  3355. {
  3356. VersionEdit edit;
  3357. edit.SetNextFile(7);
  3358. edit.SetLastSequence(seq);
  3359. std::string record;
  3360. ASSERT_TRUE(edit.EncodeTo(&record));
  3361. s = (*log_writer)->AddRecord(WriteOptions(), record);
  3362. ASSERT_OK(s);
  3363. }
  3364. *last_seqno = seq + 1;
  3365. }
  3366. // This method updates last_sequence_.
  3367. void WriteFileAdditionAndDeletionToManifest(
  3368. uint32_t cf, const std::vector<std::pair<int, FileMetaData>>& added_files,
  3369. const std::vector<std::pair<int, uint64_t>>& deleted_files,
  3370. const std::vector<BlobFileAddition>& blob_files = {}) {
  3371. VersionEdit edit;
  3372. edit.SetColumnFamily(cf);
  3373. for (const auto& elem : added_files) {
  3374. int level = elem.first;
  3375. edit.AddFile(level, elem.second);
  3376. }
  3377. for (const auto& elem : deleted_files) {
  3378. int level = elem.first;
  3379. edit.DeleteFile(level, elem.second);
  3380. }
  3381. for (const auto& elem : blob_files) {
  3382. edit.AddBlobFile(elem);
  3383. }
  3384. edit.SetLastSequence(last_seqno_);
  3385. ++last_seqno_;
  3386. assert(log_writer_.get() != nullptr);
  3387. std::string record;
  3388. ASSERT_TRUE(edit.EncodeTo(&record, 0 /* ts_sz */));
  3389. Status s = log_writer_->AddRecord(WriteOptions(), record);
  3390. ASSERT_OK(s);
  3391. }
  3392. std::shared_ptr<InternalKeyComparator> internal_comparator_;
  3393. std::vector<ColumnFamilyDescriptor> column_families_;
  3394. SequenceNumber last_seqno_;
  3395. std::unique_ptr<log::Writer> log_writer_;
  3396. };
  3397. TEST_F(VersionSetTestMissingFiles, ManifestFarBehindSst) {
  3398. std::vector<SstInfo> existing_files = {
  3399. SstInfo(100, kDefaultColumnFamilyName, "a", 100 /* epoch_number */),
  3400. SstInfo(102, kDefaultColumnFamilyName, "b", 102 /* epoch_number */),
  3401. SstInfo(103, kDefaultColumnFamilyName, "c", 103 /* epoch_number */),
  3402. SstInfo(107, kDefaultColumnFamilyName, "d", 107 /* epoch_number */),
  3403. SstInfo(110, kDefaultColumnFamilyName, "e", 110 /* epoch_number */)};
  3404. std::vector<FileMetaData> file_metas;
  3405. CreateDummyTableFiles(existing_files, &file_metas);
  3406. PrepareManifest(&column_families_, &last_seqno_, &log_writer_);
  3407. std::vector<std::pair<int, FileMetaData>> added_files;
  3408. for (uint64_t file_num = 10; file_num < 15; ++file_num) {
  3409. std::string smallest_ukey = "a";
  3410. std::string largest_ukey = "b";
  3411. InternalKey smallest_ikey(smallest_ukey, 1, ValueType::kTypeValue);
  3412. InternalKey largest_ikey(largest_ukey, 1, ValueType::kTypeValue);
  3413. FileMetaData meta = FileMetaData(
  3414. file_num, /*file_path_id=*/0, /*file_size=*/12, smallest_ikey,
  3415. largest_ikey, 0, 0, false, Temperature::kUnknown, 0, 0, 0,
  3416. file_num /* epoch_number */, kUnknownFileChecksum,
  3417. kUnknownFileChecksumFuncName, kNullUniqueId64x2, 0, 0,
  3418. /* user_defined_timestamps_persisted */ true);
  3419. added_files.emplace_back(0, meta);
  3420. }
  3421. WriteFileAdditionAndDeletionToManifest(
  3422. /*cf=*/0, added_files, std::vector<std::pair<int, uint64_t>>());
  3423. std::vector<std::pair<int, uint64_t>> deleted_files;
  3424. deleted_files.emplace_back(0, 10);
  3425. WriteFileAdditionAndDeletionToManifest(
  3426. /*cf=*/0, std::vector<std::pair<int, FileMetaData>>(), deleted_files);
  3427. log_writer_.reset();
  3428. CreateCurrentFile();
  3429. std::string manifest_path;
  3430. VerifyManifest(&manifest_path);
  3431. std::string db_id;
  3432. bool has_missing_table_file = false;
  3433. Status s = versions_->TryRecoverFromOneManifest(
  3434. manifest_path, column_families_,
  3435. /*read_only=*/false, &db_id, &has_missing_table_file);
  3436. ASSERT_OK(s);
  3437. ASSERT_TRUE(has_missing_table_file);
  3438. for (ColumnFamilyData* cfd : *(versions_->GetColumnFamilySet())) {
  3439. VersionStorageInfo* vstorage = cfd->current()->storage_info();
  3440. const std::vector<FileMetaData*>& files = vstorage->LevelFiles(0);
  3441. ASSERT_TRUE(files.empty());
  3442. }
  3443. }
  3444. TEST_F(VersionSetTestMissingFiles, ManifestAheadofSst) {
  3445. std::vector<SstInfo> existing_files = {
  3446. SstInfo(100, kDefaultColumnFamilyName, "a", 0 /* level */,
  3447. 100 /* epoch_number */),
  3448. SstInfo(102, kDefaultColumnFamilyName, "b", 0 /* level */,
  3449. 102 /* epoch_number */),
  3450. SstInfo(103, kDefaultColumnFamilyName, "c", 0 /* level */,
  3451. 103 /* epoch_number */),
  3452. SstInfo(107, kDefaultColumnFamilyName, "d", 0 /* level */,
  3453. 107 /* epoch_number */),
  3454. SstInfo(110, kDefaultColumnFamilyName, "e", 0 /* level */,
  3455. 110 /* epoch_number */)};
  3456. std::vector<FileMetaData> file_metas;
  3457. CreateDummyTableFiles(existing_files, &file_metas);
  3458. PrepareManifest(&column_families_, &last_seqno_, &log_writer_);
  3459. std::vector<std::pair<int, FileMetaData>> added_files;
  3460. for (size_t i = 3; i != 5; ++i) {
  3461. added_files.emplace_back(0, file_metas[i]);
  3462. }
  3463. WriteFileAdditionAndDeletionToManifest(
  3464. /*cf=*/0, added_files, std::vector<std::pair<int, uint64_t>>());
  3465. added_files.clear();
  3466. for (uint64_t file_num = 120; file_num < 130; ++file_num) {
  3467. std::string smallest_ukey = "a";
  3468. std::string largest_ukey = "b";
  3469. InternalKey smallest_ikey(smallest_ukey, 1, ValueType::kTypeValue);
  3470. InternalKey largest_ikey(largest_ukey, 1, ValueType::kTypeValue);
  3471. FileMetaData meta = FileMetaData(
  3472. file_num, /*file_path_id=*/0, /*file_size=*/12, smallest_ikey,
  3473. largest_ikey, 0, 0, false, Temperature::kUnknown, 0, 0, 0,
  3474. file_num /* epoch_number */, kUnknownFileChecksum,
  3475. kUnknownFileChecksumFuncName, kNullUniqueId64x2, 0, 0,
  3476. /* user_defined_timestamps_persisted */ true);
  3477. added_files.emplace_back(0, meta);
  3478. }
  3479. WriteFileAdditionAndDeletionToManifest(
  3480. /*cf=*/0, added_files, std::vector<std::pair<int, uint64_t>>());
  3481. log_writer_.reset();
  3482. CreateCurrentFile();
  3483. std::string manifest_path;
  3484. VerifyManifest(&manifest_path);
  3485. std::string db_id;
  3486. bool has_missing_table_file = false;
  3487. Status s = versions_->TryRecoverFromOneManifest(
  3488. manifest_path, column_families_,
  3489. /*read_only=*/false, &db_id, &has_missing_table_file);
  3490. ASSERT_OK(s);
  3491. ASSERT_TRUE(has_missing_table_file);
  3492. for (ColumnFamilyData* cfd : *(versions_->GetColumnFamilySet())) {
  3493. VersionStorageInfo* vstorage = cfd->current()->storage_info();
  3494. const std::vector<FileMetaData*>& files = vstorage->LevelFiles(0);
  3495. if (cfd->GetName() == kDefaultColumnFamilyName) {
  3496. ASSERT_EQ(2, files.size());
  3497. for (const auto* fmeta : files) {
  3498. if (fmeta->fd.GetNumber() != 107 && fmeta->fd.GetNumber() != 110) {
  3499. ASSERT_FALSE(true);
  3500. }
  3501. }
  3502. } else {
  3503. ASSERT_TRUE(files.empty());
  3504. }
  3505. }
  3506. }
  3507. TEST_F(VersionSetTestMissingFiles, NoFileMissing) {
  3508. std::vector<SstInfo> existing_files = {
  3509. SstInfo(100, kDefaultColumnFamilyName, "a", 0 /* level */,
  3510. 100 /* epoch_number */),
  3511. SstInfo(102, kDefaultColumnFamilyName, "b", 0 /* level */,
  3512. 102 /* epoch_number */),
  3513. SstInfo(103, kDefaultColumnFamilyName, "c", 0 /* level */,
  3514. 103 /* epoch_number */),
  3515. SstInfo(107, kDefaultColumnFamilyName, "d", 0 /* level */,
  3516. 107 /* epoch_number */),
  3517. SstInfo(110, kDefaultColumnFamilyName, "e", 0 /* level */,
  3518. 110 /* epoch_number */)};
  3519. std::vector<FileMetaData> file_metas;
  3520. CreateDummyTableFiles(existing_files, &file_metas);
  3521. PrepareManifest(&column_families_, &last_seqno_, &log_writer_);
  3522. std::vector<std::pair<int, FileMetaData>> added_files;
  3523. for (const auto& meta : file_metas) {
  3524. added_files.emplace_back(0, meta);
  3525. }
  3526. WriteFileAdditionAndDeletionToManifest(
  3527. /*cf=*/0, added_files, std::vector<std::pair<int, uint64_t>>());
  3528. std::vector<std::pair<int, uint64_t>> deleted_files;
  3529. deleted_files.emplace_back(/*level=*/0, 100);
  3530. WriteFileAdditionAndDeletionToManifest(
  3531. /*cf=*/0, std::vector<std::pair<int, FileMetaData>>(), deleted_files);
  3532. log_writer_.reset();
  3533. CreateCurrentFile();
  3534. std::string manifest_path;
  3535. VerifyManifest(&manifest_path);
  3536. std::string db_id;
  3537. bool has_missing_table_file = false;
  3538. Status s = versions_->TryRecoverFromOneManifest(
  3539. manifest_path, column_families_,
  3540. /*read_only=*/false, &db_id, &has_missing_table_file);
  3541. ASSERT_OK(s);
  3542. ASSERT_FALSE(has_missing_table_file);
  3543. for (ColumnFamilyData* cfd : *(versions_->GetColumnFamilySet())) {
  3544. VersionStorageInfo* vstorage = cfd->current()->storage_info();
  3545. const std::vector<FileMetaData*>& files = vstorage->LevelFiles(0);
  3546. if (cfd->GetName() == kDefaultColumnFamilyName) {
  3547. ASSERT_EQ(existing_files.size() - deleted_files.size(), files.size());
  3548. bool has_deleted_file = false;
  3549. for (const auto* fmeta : files) {
  3550. if (fmeta->fd.GetNumber() == 100) {
  3551. has_deleted_file = true;
  3552. break;
  3553. }
  3554. }
  3555. ASSERT_FALSE(has_deleted_file);
  3556. } else {
  3557. ASSERT_TRUE(files.empty());
  3558. }
  3559. }
  3560. }
  3561. TEST_F(VersionSetTestMissingFiles, MinLogNumberToKeep2PC) {
  3562. db_options_.allow_2pc = true;
  3563. NewDB();
  3564. SstInfo sst(100, kDefaultColumnFamilyName, "a", 0 /* level */,
  3565. 100 /* epoch_number */);
  3566. std::vector<FileMetaData> file_metas;
  3567. CreateDummyTableFiles({sst}, &file_metas);
  3568. constexpr WalNumber kMinWalNumberToKeep2PC = 10;
  3569. VersionEdit edit;
  3570. edit.AddFile(0, file_metas[0]);
  3571. edit.SetMinLogNumberToKeep(kMinWalNumberToKeep2PC);
  3572. ASSERT_OK(LogAndApplyToDefaultCF(edit));
  3573. ASSERT_EQ(versions_->min_log_number_to_keep(), kMinWalNumberToKeep2PC);
  3574. for (int i = 0; i < 3; i++) {
  3575. CreateNewManifest();
  3576. ReopenDB();
  3577. ASSERT_EQ(versions_->min_log_number_to_keep(), kMinWalNumberToKeep2PC);
  3578. }
  3579. }
  3580. class BestEffortsRecoverIncompleteVersionTest
  3581. : public VersionSetTestMissingFiles {
  3582. public:
  3583. BestEffortsRecoverIncompleteVersionTest()
  3584. : VersionSetTestMissingFiles("best_efforts_recover_incomplete_version") {}
  3585. struct BlobInfo {
  3586. uint64_t file_number;
  3587. bool file_missing;
  3588. std::string key;
  3589. std::string blob;
  3590. BlobInfo(uint64_t _file_number, bool _file_missing, std::string _key,
  3591. std::string _blob)
  3592. : file_number(_file_number),
  3593. file_missing(_file_missing),
  3594. key(_key),
  3595. blob(_blob) {}
  3596. };
  3597. void CreateDummyBlobFiles(const std::vector<BlobInfo>& infos,
  3598. std::vector<BlobFileAddition>* blob_metas) {
  3599. for (const auto& info : infos) {
  3600. if (!info.file_missing) {
  3601. WriteDummyBlobFile(info.file_number, info.key, info.blob);
  3602. }
  3603. blob_metas->emplace_back(
  3604. info.file_number, 1 /*total_blob_count*/,
  3605. info.key.size() + info.blob.size() /*total_blob_bytes*/,
  3606. "" /*checksum_method*/, "" /*check_sum_value*/);
  3607. }
  3608. }
  3609. // Creates a test blob file that is valid so it can pass the
  3610. // `VersionEditHandlerPointInTime::VerifyBlobFile` check.
  3611. void WriteDummyBlobFile(uint64_t blob_file_number, const Slice& key,
  3612. const Slice& blob) {
  3613. ImmutableOptions options;
  3614. std::string blob_file_path = BlobFileName(dbname_, blob_file_number);
  3615. std::unique_ptr<FSWritableFile> file;
  3616. ASSERT_OK(
  3617. fs_->NewWritableFile(blob_file_path, FileOptions(), &file, nullptr));
  3618. std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
  3619. std::move(file), blob_file_path, FileOptions(), options.clock));
  3620. BlobLogWriter blob_log_writer(std::move(file_writer), options.clock,
  3621. /*statistics*/ nullptr, blob_file_number,
  3622. /*use_fsync*/ true,
  3623. /*do_flush*/ false);
  3624. constexpr ExpirationRange expiration_range;
  3625. BlobLogHeader header(/*column_family_id*/ 0, kNoCompression,
  3626. /*has_ttl*/ false, expiration_range);
  3627. ASSERT_OK(blob_log_writer.WriteHeader(WriteOptions(), header));
  3628. std::string compressed_blob;
  3629. uint64_t key_offset = 0;
  3630. uint64_t blob_offset = 0;
  3631. ASSERT_OK(blob_log_writer.AddRecord(WriteOptions(), key, blob, &key_offset,
  3632. &blob_offset));
  3633. BlobLogFooter footer;
  3634. footer.blob_count = 1;
  3635. footer.expiration_range = expiration_range;
  3636. std::string checksum_method;
  3637. std::string checksum_value;
  3638. ASSERT_OK(blob_log_writer.AppendFooter(WriteOptions(), footer,
  3639. &checksum_method, &checksum_value));
  3640. }
  3641. void RecoverFromManifestWithMissingFiles(
  3642. const std::vector<std::pair<int, FileMetaData>>& added_files,
  3643. const std::vector<BlobFileAddition>& blob_files) {
  3644. PrepareManifest(&column_families_, &last_seqno_, &log_writer_);
  3645. WriteFileAdditionAndDeletionToManifest(
  3646. /*cf=*/0, added_files, std::vector<std::pair<int, uint64_t>>(),
  3647. blob_files);
  3648. log_writer_.reset();
  3649. CreateCurrentFile();
  3650. std::string manifest_path;
  3651. VerifyManifest(&manifest_path);
  3652. std::string db_id;
  3653. bool has_missing_table_file = false;
  3654. Status s = versions_->TryRecoverFromOneManifest(
  3655. manifest_path, column_families_,
  3656. /*read_only=*/false, &db_id, &has_missing_table_file);
  3657. ASSERT_OK(s);
  3658. ASSERT_TRUE(has_missing_table_file);
  3659. }
  3660. };
  3661. TEST_F(BestEffortsRecoverIncompleteVersionTest, NonL0MissingFiles) {
  3662. std::vector<SstInfo> sst_files = {
  3663. SstInfo(100, kDefaultColumnFamilyName, "a", 1 /* level */,
  3664. 100 /* epoch_number */, true /* file_missing */),
  3665. SstInfo(101, kDefaultColumnFamilyName, "a", 0 /* level */,
  3666. 101 /* epoch_number */, false /* file_missing */),
  3667. SstInfo(102, kDefaultColumnFamilyName, "a", 0 /* level */,
  3668. 102 /* epoch_number */, false /* file_missing */),
  3669. };
  3670. std::vector<FileMetaData> file_metas;
  3671. CreateDummyTableFiles(sst_files, &file_metas);
  3672. std::vector<std::pair<int, FileMetaData>> added_files;
  3673. for (size_t i = 0; i < sst_files.size(); i++) {
  3674. const auto& info = sst_files[i];
  3675. const auto& meta = file_metas[i];
  3676. added_files.emplace_back(info.level, meta);
  3677. }
  3678. RecoverFromManifestWithMissingFiles(added_files,
  3679. std::vector<BlobFileAddition>());
  3680. std::vector<uint64_t> all_table_files;
  3681. std::vector<uint64_t> all_blob_files;
  3682. versions_->AddLiveFiles(&all_table_files, &all_blob_files);
  3683. ASSERT_TRUE(all_table_files.empty());
  3684. }
  3685. TEST_F(BestEffortsRecoverIncompleteVersionTest, MissingNonSuffixL0Files) {
  3686. std::vector<SstInfo> sst_files = {
  3687. SstInfo(100, kDefaultColumnFamilyName, "a", 1 /* level */,
  3688. 100 /* epoch_number */, false /* file_missing */),
  3689. SstInfo(101, kDefaultColumnFamilyName, "a", 0 /* level */,
  3690. 101 /* epoch_number */, true /* file_missing */),
  3691. SstInfo(102, kDefaultColumnFamilyName, "a", 0 /* level */,
  3692. 102 /* epoch_number */, false /* file_missing */),
  3693. };
  3694. std::vector<FileMetaData> file_metas;
  3695. CreateDummyTableFiles(sst_files, &file_metas);
  3696. std::vector<std::pair<int, FileMetaData>> added_files;
  3697. for (size_t i = 0; i < sst_files.size(); i++) {
  3698. const auto& info = sst_files[i];
  3699. const auto& meta = file_metas[i];
  3700. added_files.emplace_back(info.level, meta);
  3701. }
  3702. RecoverFromManifestWithMissingFiles(added_files,
  3703. std::vector<BlobFileAddition>());
  3704. std::vector<uint64_t> all_table_files;
  3705. std::vector<uint64_t> all_blob_files;
  3706. versions_->AddLiveFiles(&all_table_files, &all_blob_files);
  3707. ASSERT_TRUE(all_table_files.empty());
  3708. }
  3709. TEST_F(BestEffortsRecoverIncompleteVersionTest, MissingBlobFiles) {
  3710. std::vector<SstInfo> sst_files = {
  3711. SstInfo(100, kDefaultColumnFamilyName, "a", 0 /* level */,
  3712. 100 /* epoch_number */, false /* file_missing */,
  3713. 102 /*oldest_blob_file_number*/),
  3714. SstInfo(101, kDefaultColumnFamilyName, "a", 0 /* level */,
  3715. 101 /* epoch_number */, false /* file_missing */,
  3716. 103 /*oldest_blob_file_number*/),
  3717. };
  3718. std::vector<FileMetaData> file_metas;
  3719. CreateDummyTableFiles(sst_files, &file_metas);
  3720. std::vector<BlobInfo> blob_files = {
  3721. BlobInfo(102, true /*file_missing*/, "a", "blob1"),
  3722. BlobInfo(103, true /*file_missing*/, "a", "blob2"),
  3723. };
  3724. std::vector<BlobFileAddition> blob_meta;
  3725. CreateDummyBlobFiles(blob_files, &blob_meta);
  3726. std::vector<std::pair<int, FileMetaData>> added_files;
  3727. for (size_t i = 0; i < sst_files.size(); i++) {
  3728. const auto& info = sst_files[i];
  3729. const auto& meta = file_metas[i];
  3730. added_files.emplace_back(info.level, meta);
  3731. }
  3732. RecoverFromManifestWithMissingFiles(added_files, blob_meta);
  3733. std::vector<uint64_t> all_table_files;
  3734. std::vector<uint64_t> all_blob_files;
  3735. versions_->AddLiveFiles(&all_table_files, &all_blob_files);
  3736. ASSERT_TRUE(all_table_files.empty());
  3737. }
  3738. TEST_F(BestEffortsRecoverIncompleteVersionTest, MissingL0SuffixOnly) {
  3739. std::vector<SstInfo> sst_files = {
  3740. SstInfo(100, kDefaultColumnFamilyName, "a", 1 /* level */,
  3741. 100 /* epoch_number */, false /* file_missing */),
  3742. SstInfo(101, kDefaultColumnFamilyName, "a", 0 /* level */,
  3743. 101 /* epoch_number */, false /* file_missing */),
  3744. SstInfo(102, kDefaultColumnFamilyName, "a", 0 /* level */,
  3745. 102 /* epoch_number */, true /* file_missing */),
  3746. };
  3747. std::vector<FileMetaData> file_metas;
  3748. CreateDummyTableFiles(sst_files, &file_metas);
  3749. std::vector<std::pair<int, FileMetaData>> added_files;
  3750. for (size_t i = 0; i < sst_files.size(); i++) {
  3751. const auto& info = sst_files[i];
  3752. const auto& meta = file_metas[i];
  3753. added_files.emplace_back(info.level, meta);
  3754. }
  3755. RecoverFromManifestWithMissingFiles(added_files,
  3756. std::vector<BlobFileAddition>());
  3757. std::vector<uint64_t> all_table_files;
  3758. std::vector<uint64_t> all_blob_files;
  3759. versions_->AddLiveFiles(&all_table_files, &all_blob_files);
  3760. ASSERT_EQ(2, all_table_files.size());
  3761. ColumnFamilyData* cfd = versions_->GetColumnFamilySet()->GetDefault();
  3762. VersionStorageInfo* vstorage = cfd->current()->storage_info();
  3763. ASSERT_EQ(1, vstorage->LevelFiles(0).size());
  3764. ASSERT_EQ(1, vstorage->LevelFiles(1).size());
  3765. }
  3766. TEST_F(BestEffortsRecoverIncompleteVersionTest,
  3767. MissingL0SuffixAndTheirBlobFiles) {
  3768. std::vector<SstInfo> sst_files = {
  3769. SstInfo(100, kDefaultColumnFamilyName, "a", 1 /* level */,
  3770. 100 /* epoch_number */, false /* file_missing */),
  3771. SstInfo(101, kDefaultColumnFamilyName, "a", 0 /* level */,
  3772. 101 /* epoch_number */, false /* file_missing */,
  3773. 103 /*oldest_blob_file_number*/),
  3774. SstInfo(102, kDefaultColumnFamilyName, "a", 0 /* level */,
  3775. 102 /* epoch_number */, true /* file_missing */,
  3776. 104 /*oldest_blob_file_number*/),
  3777. };
  3778. std::vector<FileMetaData> file_metas;
  3779. CreateDummyTableFiles(sst_files, &file_metas);
  3780. std::vector<BlobInfo> blob_files = {
  3781. BlobInfo(103, false /*file_missing*/, "a", "blob1"),
  3782. BlobInfo(104, true /*file_missing*/, "a", "blob2"),
  3783. };
  3784. std::vector<BlobFileAddition> blob_meta;
  3785. CreateDummyBlobFiles(blob_files, &blob_meta);
  3786. std::vector<std::pair<int, FileMetaData>> added_files;
  3787. for (size_t i = 0; i < sst_files.size(); i++) {
  3788. const auto& info = sst_files[i];
  3789. const auto& meta = file_metas[i];
  3790. added_files.emplace_back(info.level, meta);
  3791. }
  3792. RecoverFromManifestWithMissingFiles(added_files, blob_meta);
  3793. std::vector<uint64_t> all_table_files;
  3794. std::vector<uint64_t> all_blob_files;
  3795. versions_->AddLiveFiles(&all_table_files, &all_blob_files);
  3796. ASSERT_EQ(2, all_table_files.size());
  3797. ASSERT_EQ(1, all_blob_files.size());
  3798. ColumnFamilyData* cfd = versions_->GetColumnFamilySet()->GetDefault();
  3799. VersionStorageInfo* vstorage = cfd->current()->storage_info();
  3800. ASSERT_EQ(1, vstorage->LevelFiles(0).size());
  3801. ASSERT_EQ(1, vstorage->LevelFiles(1).size());
  3802. ASSERT_EQ(1, vstorage->GetBlobFiles().size());
  3803. }
  3804. class ChargeFileMetadataTest : public DBTestBase {
  3805. public:
  3806. ChargeFileMetadataTest()
  3807. : DBTestBase("charge_file_metadata_test", /*env_do_fsync=*/true) {}
  3808. };
  3809. class ChargeFileMetadataTestWithParam
  3810. : public ChargeFileMetadataTest,
  3811. public testing::WithParamInterface<CacheEntryRoleOptions::Decision> {
  3812. public:
  3813. ChargeFileMetadataTestWithParam() = default;
  3814. };
  3815. INSTANTIATE_TEST_CASE_P(
  3816. ChargeFileMetadataTestWithParam, ChargeFileMetadataTestWithParam,
  3817. ::testing::Values(CacheEntryRoleOptions::Decision::kEnabled,
  3818. CacheEntryRoleOptions::Decision::kDisabled));
  3819. TEST_P(ChargeFileMetadataTestWithParam, Basic) {
  3820. Options options;
  3821. options.level_compaction_dynamic_level_bytes = false;
  3822. BlockBasedTableOptions table_options;
  3823. CacheEntryRoleOptions::Decision charge_file_metadata = GetParam();
  3824. table_options.cache_usage_options.options_overrides.insert(
  3825. {CacheEntryRole::kFileMetadata, {/*.charged = */ charge_file_metadata}});
  3826. std::shared_ptr<TargetCacheChargeTrackingCache<CacheEntryRole::kFileMetadata>>
  3827. file_metadata_charge_only_cache = std::make_shared<
  3828. TargetCacheChargeTrackingCache<CacheEntryRole::kFileMetadata>>(
  3829. NewLRUCache(
  3830. 4 * CacheReservationManagerImpl<
  3831. CacheEntryRole::kFileMetadata>::GetDummyEntrySize(),
  3832. 0 /* num_shard_bits */, true /* strict_capacity_limit */));
  3833. table_options.block_cache = file_metadata_charge_only_cache;
  3834. options.table_factory.reset(NewBlockBasedTableFactory(table_options));
  3835. options.create_if_missing = true;
  3836. options.disable_auto_compactions = true;
  3837. DestroyAndReopen(options);
  3838. // Create 128 file metadata, each of which is roughly 1024 bytes.
  3839. // This results in 1 *
  3840. // CacheReservationManagerImpl<CacheEntryRole::kFileMetadata>::GetDummyEntrySize()
  3841. // cache reservation for file metadata.
  3842. for (int i = 1; i <= 128; ++i) {
  3843. ASSERT_OK(Put(std::string(1024, 'a'), "va"));
  3844. ASSERT_OK(Put("b", "vb"));
  3845. ASSERT_OK(Flush());
  3846. }
  3847. if (charge_file_metadata == CacheEntryRoleOptions::Decision::kEnabled) {
  3848. EXPECT_EQ(file_metadata_charge_only_cache->GetCacheCharge(),
  3849. 1 * CacheReservationManagerImpl<
  3850. CacheEntryRole::kFileMetadata>::GetDummyEntrySize());
  3851. } else {
  3852. EXPECT_EQ(file_metadata_charge_only_cache->GetCacheCharge(), 0);
  3853. }
  3854. // Create another 128 file metadata.
  3855. // This increases the file metadata cache reservation to 2 *
  3856. // CacheReservationManagerImpl<CacheEntryRole::kFileMetadata>::GetDummyEntrySize().
  3857. for (int i = 1; i <= 128; ++i) {
  3858. ASSERT_OK(Put(std::string(1024, 'a'), "vva"));
  3859. ASSERT_OK(Put("b", "vvb"));
  3860. ASSERT_OK(Flush());
  3861. }
  3862. if (charge_file_metadata == CacheEntryRoleOptions::Decision::kEnabled) {
  3863. EXPECT_EQ(file_metadata_charge_only_cache->GetCacheCharge(),
  3864. 2 * CacheReservationManagerImpl<
  3865. CacheEntryRole::kFileMetadata>::GetDummyEntrySize());
  3866. } else {
  3867. EXPECT_EQ(file_metadata_charge_only_cache->GetCacheCharge(), 0);
  3868. }
  3869. // Compaction will create 1 new file metadata, obsolete and delete all 256
  3870. // file metadata above. This results in 1 *
  3871. // CacheReservationManagerImpl<CacheEntryRole::kFileMetadata>::GetDummyEntrySize()
  3872. // cache reservation for file metadata.
  3873. SyncPoint::GetInstance()->LoadDependency(
  3874. {{"DBImpl::BackgroundCallCompaction:PurgedObsoleteFiles",
  3875. "ChargeFileMetadataTestWithParam::"
  3876. "PreVerifyingCacheReservationRelease"}});
  3877. SyncPoint::GetInstance()->EnableProcessing();
  3878. ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
  3879. ASSERT_EQ("0,1", FilesPerLevel(0));
  3880. TEST_SYNC_POINT(
  3881. "ChargeFileMetadataTestWithParam::PreVerifyingCacheReservationRelease");
  3882. if (charge_file_metadata == CacheEntryRoleOptions::Decision::kEnabled) {
  3883. EXPECT_EQ(file_metadata_charge_only_cache->GetCacheCharge(),
  3884. 1 * CacheReservationManagerImpl<
  3885. CacheEntryRole::kFileMetadata>::GetDummyEntrySize());
  3886. } else {
  3887. EXPECT_EQ(file_metadata_charge_only_cache->GetCacheCharge(), 0);
  3888. }
  3889. SyncPoint::GetInstance()->DisableProcessing();
  3890. // Destroying the db will delete the remaining 1 new file metadata
  3891. // This results in no cache reservation for file metadata.
  3892. Destroy(options);
  3893. EXPECT_EQ(file_metadata_charge_only_cache->GetCacheCharge(),
  3894. 0 * CacheReservationManagerImpl<
  3895. CacheEntryRole::kFileMetadata>::GetDummyEntrySize());
  3896. // Reopen the db with a smaller cache in order to test failure in allocating
  3897. // file metadata due to memory limit based on cache capacity
  3898. file_metadata_charge_only_cache = std::make_shared<
  3899. TargetCacheChargeTrackingCache<CacheEntryRole::kFileMetadata>>(
  3900. NewLRUCache(1 * CacheReservationManagerImpl<
  3901. CacheEntryRole::kFileMetadata>::GetDummyEntrySize(),
  3902. 0 /* num_shard_bits */, true /* strict_capacity_limit */));
  3903. table_options.block_cache = file_metadata_charge_only_cache;
  3904. options.table_factory.reset(NewBlockBasedTableFactory(table_options));
  3905. Reopen(options);
  3906. ASSERT_OK(Put(std::string(1024, 'a'), "va"));
  3907. ASSERT_OK(Put("b", "vb"));
  3908. Status s = Flush();
  3909. if (charge_file_metadata == CacheEntryRoleOptions::Decision::kEnabled) {
  3910. EXPECT_TRUE(s.IsMemoryLimit());
  3911. EXPECT_TRUE(s.ToString().find(
  3912. kCacheEntryRoleToCamelString[static_cast<std::uint32_t>(
  3913. CacheEntryRole::kFileMetadata)]) != std::string::npos);
  3914. EXPECT_TRUE(s.ToString().find("memory limit based on cache capacity") !=
  3915. std::string::npos);
  3916. } else {
  3917. EXPECT_TRUE(s.ok());
  3918. }
  3919. }
  3920. } // namespace ROCKSDB_NAMESPACE
  3921. int main(int argc, char** argv) {
  3922. ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
  3923. ::testing::InitGoogleTest(&argc, argv);
  3924. return RUN_ALL_TESTS();
  3925. }