version_set.cc 220 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645264626472648264926502651265226532654265526562657265826592660266126622663266426652666266726682669267026712672267326742675267626772678267926802681268226832684268526862687268826892690269126922693269426952696269726982699270027012702270327042705270627072708270927102711271227132714271527162717271827192720272127222723272427252726272727282729273027312732273327342735273627372738273927402741274227432744274527462747274827492750275127522753275427552756275727582759276027612762276327642765276627672768276927702771277227732774277527762777277827792780278127822783278427852786278727882789279027912792279327942795279627972798279928002801280228032804280528062807280828092810281128122813281428152816281728182819282028212822282328242825282628272828282928302831283228332834283528362837283828392840284128422843284428452846284728482849285028512852285328542855285628572858285928602861286228632864286528662867286828692870287128722873287428752876287728782879288028812882288328842885288628872888288928902891289228932894289528962897289828992900290129022903290429052906290729082909291029112912291329142915291629172918291929202921292229232924292529262927292829292930293129322933293429352936293729382939294029412942294329442945294629472948294929502951295229532954295529562957295829592960296129622963296429652966296729682969297029712972297329742975297629772978297929802981298229832984298529862987298829892990299129922993299429952996299729982999300030013002300330043005300630073008300930103011301230133014301530163017301830193020302130223023302430253026302730283029303030313032303330343035303630373038303930403041304230433044304530463047304830493050305130523053305430553056305730583059306030613062306330643065306630673068306930703071307230733074307530763077307830793080308130823083308430853086308730883089309030913092309330943095309630973098309931003101310231033104310531063107310831093110311131123113311431153116311731183119312031213122312331243125312631273128312931303131313231333134313531363137313831393140314131423143314431453146314731483149315031513152315331543155315631573158315931603161316231633164316531663167316831693170317131723173317431753176317731783179318031813182318331843185318631873188318931903191319231933194319531963197319831993200320132023203320432053206320732083209321032113212321332143215321632173218321932203221322232233224322532263227322832293230323132323233323432353236323732383239324032413242324332443245324632473248324932503251325232533254325532563257325832593260326132623263326432653266326732683269327032713272327332743275327632773278327932803281328232833284328532863287328832893290329132923293329432953296329732983299330033013302330333043305330633073308330933103311331233133314331533163317331833193320332133223323332433253326332733283329333033313332333333343335333633373338333933403341334233433344334533463347334833493350335133523353335433553356335733583359336033613362336333643365336633673368336933703371337233733374337533763377337833793380338133823383338433853386338733883389339033913392339333943395339633973398339934003401340234033404340534063407340834093410341134123413341434153416341734183419342034213422342334243425342634273428342934303431343234333434343534363437343834393440344134423443344434453446344734483449345034513452345334543455345634573458345934603461346234633464346534663467346834693470347134723473347434753476347734783479348034813482348334843485348634873488348934903491349234933494349534963497349834993500350135023503350435053506350735083509351035113512351335143515351635173518351935203521352235233524352535263527352835293530353135323533353435353536353735383539354035413542354335443545354635473548354935503551355235533554355535563557355835593560356135623563356435653566356735683569357035713572357335743575357635773578357935803581358235833584358535863587358835893590359135923593359435953596359735983599360036013602360336043605360636073608360936103611361236133614361536163617361836193620362136223623362436253626362736283629363036313632363336343635363636373638363936403641364236433644364536463647364836493650365136523653365436553656365736583659366036613662366336643665366636673668366936703671367236733674367536763677367836793680368136823683368436853686368736883689369036913692369336943695369636973698369937003701370237033704370537063707370837093710371137123713371437153716371737183719372037213722372337243725372637273728372937303731373237333734373537363737373837393740374137423743374437453746374737483749375037513752375337543755375637573758375937603761376237633764376537663767376837693770377137723773377437753776377737783779378037813782378337843785378637873788378937903791379237933794379537963797379837993800380138023803380438053806380738083809381038113812381338143815381638173818381938203821382238233824382538263827382838293830383138323833383438353836383738383839384038413842384338443845384638473848384938503851385238533854385538563857385838593860386138623863386438653866386738683869387038713872387338743875387638773878387938803881388238833884388538863887388838893890389138923893389438953896389738983899390039013902390339043905390639073908390939103911391239133914391539163917391839193920392139223923392439253926392739283929393039313932393339343935393639373938393939403941394239433944394539463947394839493950395139523953395439553956395739583959396039613962396339643965396639673968396939703971397239733974397539763977397839793980398139823983398439853986398739883989399039913992399339943995399639973998399940004001400240034004400540064007400840094010401140124013401440154016401740184019402040214022402340244025402640274028402940304031403240334034403540364037403840394040404140424043404440454046404740484049405040514052405340544055405640574058405940604061406240634064406540664067406840694070407140724073407440754076407740784079408040814082408340844085408640874088408940904091409240934094409540964097409840994100410141024103410441054106410741084109411041114112411341144115411641174118411941204121412241234124412541264127412841294130413141324133413441354136413741384139414041414142414341444145414641474148414941504151415241534154415541564157415841594160416141624163416441654166416741684169417041714172417341744175417641774178417941804181418241834184418541864187418841894190419141924193419441954196419741984199420042014202420342044205420642074208420942104211421242134214421542164217421842194220422142224223422442254226422742284229423042314232423342344235423642374238423942404241424242434244424542464247424842494250425142524253425442554256425742584259426042614262426342644265426642674268426942704271427242734274427542764277427842794280428142824283428442854286428742884289429042914292429342944295429642974298429943004301430243034304430543064307430843094310431143124313431443154316431743184319432043214322432343244325432643274328432943304331433243334334433543364337433843394340434143424343434443454346434743484349435043514352435343544355435643574358435943604361436243634364436543664367436843694370437143724373437443754376437743784379438043814382438343844385438643874388438943904391439243934394439543964397439843994400440144024403440444054406440744084409441044114412441344144415441644174418441944204421442244234424442544264427442844294430443144324433443444354436443744384439444044414442444344444445444644474448444944504451445244534454445544564457445844594460446144624463446444654466446744684469447044714472447344744475447644774478447944804481448244834484448544864487448844894490449144924493449444954496449744984499450045014502450345044505450645074508450945104511451245134514451545164517451845194520452145224523452445254526452745284529453045314532453345344535453645374538453945404541454245434544454545464547454845494550455145524553455445554556455745584559456045614562456345644565456645674568456945704571457245734574457545764577457845794580458145824583458445854586458745884589459045914592459345944595459645974598459946004601460246034604460546064607460846094610461146124613461446154616461746184619462046214622462346244625462646274628462946304631463246334634463546364637463846394640464146424643464446454646464746484649465046514652465346544655465646574658465946604661466246634664466546664667466846694670467146724673467446754676467746784679468046814682468346844685468646874688468946904691469246934694469546964697469846994700470147024703470447054706470747084709471047114712471347144715471647174718471947204721472247234724472547264727472847294730473147324733473447354736473747384739474047414742474347444745474647474748474947504751475247534754475547564757475847594760476147624763476447654766476747684769477047714772477347744775477647774778477947804781478247834784478547864787478847894790479147924793479447954796479747984799480048014802480348044805480648074808480948104811481248134814481548164817481848194820482148224823482448254826482748284829483048314832483348344835483648374838483948404841484248434844484548464847484848494850485148524853485448554856485748584859486048614862486348644865486648674868486948704871487248734874487548764877487848794880488148824883488448854886488748884889489048914892489348944895489648974898489949004901490249034904490549064907490849094910491149124913491449154916491749184919492049214922492349244925492649274928492949304931493249334934493549364937493849394940494149424943494449454946494749484949495049514952495349544955495649574958495949604961496249634964496549664967496849694970497149724973497449754976497749784979498049814982498349844985498649874988498949904991499249934994499549964997499849995000500150025003500450055006500750085009501050115012501350145015501650175018501950205021502250235024502550265027502850295030503150325033503450355036503750385039504050415042504350445045504650475048504950505051505250535054505550565057505850595060506150625063506450655066506750685069507050715072507350745075507650775078507950805081508250835084508550865087508850895090509150925093509450955096509750985099510051015102510351045105510651075108510951105111511251135114511551165117511851195120512151225123512451255126512751285129513051315132513351345135513651375138513951405141514251435144514551465147514851495150515151525153515451555156515751585159516051615162516351645165516651675168516951705171517251735174517551765177517851795180518151825183518451855186518751885189519051915192519351945195519651975198519952005201520252035204520552065207520852095210521152125213521452155216521752185219522052215222522352245225522652275228522952305231523252335234523552365237523852395240524152425243524452455246524752485249525052515252525352545255525652575258525952605261526252635264526552665267526852695270527152725273527452755276527752785279528052815282528352845285528652875288528952905291529252935294529552965297529852995300530153025303530453055306530753085309531053115312531353145315531653175318531953205321532253235324532553265327532853295330533153325333533453355336533753385339534053415342534353445345534653475348534953505351535253535354535553565357535853595360536153625363536453655366536753685369537053715372537353745375537653775378537953805381538253835384538553865387538853895390539153925393539453955396539753985399540054015402540354045405540654075408540954105411541254135414541554165417541854195420542154225423542454255426542754285429543054315432543354345435543654375438543954405441544254435444544554465447544854495450545154525453545454555456545754585459546054615462546354645465546654675468546954705471547254735474547554765477547854795480548154825483548454855486548754885489549054915492549354945495549654975498549955005501550255035504550555065507550855095510551155125513551455155516551755185519552055215522552355245525552655275528552955305531553255335534553555365537553855395540554155425543554455455546554755485549555055515552555355545555555655575558555955605561556255635564556555665567556855695570557155725573557455755576557755785579558055815582558355845585558655875588558955905591559255935594559555965597559855995600560156025603560456055606560756085609561056115612561356145615561656175618561956205621562256235624562556265627562856295630563156325633563456355636563756385639564056415642564356445645564656475648564956505651565256535654565556565657565856595660566156625663566456655666566756685669567056715672567356745675567656775678567956805681568256835684568556865687568856895690569156925693569456955696569756985699570057015702570357045705570657075708570957105711571257135714571557165717571857195720572157225723572457255726572757285729573057315732573357345735573657375738573957405741574257435744574557465747574857495750575157525753575457555756575757585759576057615762576357645765576657675768576957705771577257735774577557765777577857795780578157825783578457855786578757885789579057915792579357945795579657975798579958005801580258035804580558065807580858095810581158125813581458155816581758185819582058215822582358245825582658275828582958305831583258335834583558365837583858395840584158425843584458455846584758485849585058515852585358545855585658575858585958605861586258635864586558665867586858695870587158725873587458755876587758785879588058815882588358845885588658875888588958905891589258935894589558965897589858995900590159025903590459055906590759085909591059115912591359145915591659175918591959205921592259235924592559265927592859295930593159325933593459355936593759385939594059415942594359445945594659475948594959505951595259535954595559565957595859595960596159625963596459655966596759685969597059715972597359745975597659775978597959805981598259835984598559865987598859895990599159925993599459955996599759985999600060016002600360046005
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #include "db/version_set.h"
  10. #include <stdio.h>
  11. #include <algorithm>
  12. #include <array>
  13. #include <cinttypes>
  14. #include <list>
  15. #include <map>
  16. #include <set>
  17. #include <string>
  18. #include <unordered_map>
  19. #include <vector>
  20. #include "compaction/compaction.h"
  21. #include "db/internal_stats.h"
  22. #include "db/log_reader.h"
  23. #include "db/log_writer.h"
  24. #include "db/memtable.h"
  25. #include "db/merge_context.h"
  26. #include "db/merge_helper.h"
  27. #include "db/pinned_iterators_manager.h"
  28. #include "db/table_cache.h"
  29. #include "db/version_builder.h"
  30. #include "file/filename.h"
  31. #include "file/random_access_file_reader.h"
  32. #include "file/read_write_util.h"
  33. #include "file/writable_file_writer.h"
  34. #include "monitoring/file_read_sample.h"
  35. #include "monitoring/perf_context_imp.h"
  36. #include "monitoring/persistent_stats_history.h"
  37. #include "rocksdb/env.h"
  38. #include "rocksdb/merge_operator.h"
  39. #include "rocksdb/write_buffer_manager.h"
  40. #include "table/format.h"
  41. #include "table/get_context.h"
  42. #include "table/internal_iterator.h"
  43. #include "table/merging_iterator.h"
  44. #include "table/meta_blocks.h"
  45. #include "table/multiget_context.h"
  46. #include "table/plain/plain_table_factory.h"
  47. #include "table/table_reader.h"
  48. #include "table/two_level_iterator.h"
  49. #include "test_util/sync_point.h"
  50. #include "util/coding.h"
  51. #include "util/stop_watch.h"
  52. #include "util/string_util.h"
  53. #include "util/user_comparator_wrapper.h"
  54. namespace ROCKSDB_NAMESPACE {
  55. namespace {
  56. // Find File in LevelFilesBrief data structure
  57. // Within an index range defined by left and right
  58. int FindFileInRange(const InternalKeyComparator& icmp,
  59. const LevelFilesBrief& file_level,
  60. const Slice& key,
  61. uint32_t left,
  62. uint32_t right) {
  63. auto cmp = [&](const FdWithKeyRange& f, const Slice& k) -> bool {
  64. return icmp.InternalKeyComparator::Compare(f.largest_key, k) < 0;
  65. };
  66. const auto &b = file_level.files;
  67. return static_cast<int>(std::lower_bound(b + left,
  68. b + right, key, cmp) - b);
  69. }
  70. Status OverlapWithIterator(const Comparator* ucmp,
  71. const Slice& smallest_user_key,
  72. const Slice& largest_user_key,
  73. InternalIterator* iter,
  74. bool* overlap) {
  75. InternalKey range_start(smallest_user_key, kMaxSequenceNumber,
  76. kValueTypeForSeek);
  77. iter->Seek(range_start.Encode());
  78. if (!iter->status().ok()) {
  79. return iter->status();
  80. }
  81. *overlap = false;
  82. if (iter->Valid()) {
  83. ParsedInternalKey seek_result;
  84. if (!ParseInternalKey(iter->key(), &seek_result)) {
  85. return Status::Corruption("DB have corrupted keys");
  86. }
  87. if (ucmp->CompareWithoutTimestamp(seek_result.user_key, largest_user_key) <=
  88. 0) {
  89. *overlap = true;
  90. }
  91. }
  92. return iter->status();
  93. }
  94. // Class to help choose the next file to search for the particular key.
  95. // Searches and returns files level by level.
  96. // We can search level-by-level since entries never hop across
  97. // levels. Therefore we are guaranteed that if we find data
  98. // in a smaller level, later levels are irrelevant (unless we
  99. // are MergeInProgress).
  100. class FilePicker {
  101. public:
  102. FilePicker(std::vector<FileMetaData*>* files, const Slice& user_key,
  103. const Slice& ikey, autovector<LevelFilesBrief>* file_levels,
  104. unsigned int num_levels, FileIndexer* file_indexer,
  105. const Comparator* user_comparator,
  106. const InternalKeyComparator* internal_comparator)
  107. : num_levels_(num_levels),
  108. curr_level_(static_cast<unsigned int>(-1)),
  109. returned_file_level_(static_cast<unsigned int>(-1)),
  110. hit_file_level_(static_cast<unsigned int>(-1)),
  111. search_left_bound_(0),
  112. search_right_bound_(FileIndexer::kLevelMaxIndex),
  113. #ifndef NDEBUG
  114. files_(files),
  115. #endif
  116. level_files_brief_(file_levels),
  117. is_hit_file_last_in_level_(false),
  118. curr_file_level_(nullptr),
  119. user_key_(user_key),
  120. ikey_(ikey),
  121. file_indexer_(file_indexer),
  122. user_comparator_(user_comparator),
  123. internal_comparator_(internal_comparator) {
  124. #ifdef NDEBUG
  125. (void)files;
  126. #endif
  127. // Setup member variables to search first level.
  128. search_ended_ = !PrepareNextLevel();
  129. if (!search_ended_) {
  130. // Prefetch Level 0 table data to avoid cache miss if possible.
  131. for (unsigned int i = 0; i < (*level_files_brief_)[0].num_files; ++i) {
  132. auto* r = (*level_files_brief_)[0].files[i].fd.table_reader;
  133. if (r) {
  134. r->Prepare(ikey);
  135. }
  136. }
  137. }
  138. }
  139. int GetCurrentLevel() const { return curr_level_; }
  140. FdWithKeyRange* GetNextFile() {
  141. while (!search_ended_) { // Loops over different levels.
  142. while (curr_index_in_curr_level_ < curr_file_level_->num_files) {
  143. // Loops over all files in current level.
  144. FdWithKeyRange* f = &curr_file_level_->files[curr_index_in_curr_level_];
  145. hit_file_level_ = curr_level_;
  146. is_hit_file_last_in_level_ =
  147. curr_index_in_curr_level_ == curr_file_level_->num_files - 1;
  148. int cmp_largest = -1;
  149. // Do key range filtering of files or/and fractional cascading if:
  150. // (1) not all the files are in level 0, or
  151. // (2) there are more than 3 current level files
  152. // If there are only 3 or less current level files in the system, we skip
  153. // the key range filtering. In this case, more likely, the system is
  154. // highly tuned to minimize number of tables queried by each query,
  155. // so it is unlikely that key range filtering is more efficient than
  156. // querying the files.
  157. if (num_levels_ > 1 || curr_file_level_->num_files > 3) {
  158. // Check if key is within a file's range. If search left bound and
  159. // right bound point to the same find, we are sure key falls in
  160. // range.
  161. assert(curr_level_ == 0 ||
  162. curr_index_in_curr_level_ == start_index_in_curr_level_ ||
  163. user_comparator_->CompareWithoutTimestamp(
  164. user_key_, ExtractUserKey(f->smallest_key)) <= 0);
  165. int cmp_smallest = user_comparator_->CompareWithoutTimestamp(
  166. user_key_, ExtractUserKey(f->smallest_key));
  167. if (cmp_smallest >= 0) {
  168. cmp_largest = user_comparator_->CompareWithoutTimestamp(
  169. user_key_, ExtractUserKey(f->largest_key));
  170. }
  171. // Setup file search bound for the next level based on the
  172. // comparison results
  173. if (curr_level_ > 0) {
  174. file_indexer_->GetNextLevelIndex(curr_level_,
  175. curr_index_in_curr_level_,
  176. cmp_smallest, cmp_largest,
  177. &search_left_bound_,
  178. &search_right_bound_);
  179. }
  180. // Key falls out of current file's range
  181. if (cmp_smallest < 0 || cmp_largest > 0) {
  182. if (curr_level_ == 0) {
  183. ++curr_index_in_curr_level_;
  184. continue;
  185. } else {
  186. // Search next level.
  187. break;
  188. }
  189. }
  190. }
  191. #ifndef NDEBUG
  192. // Sanity check to make sure that the files are correctly sorted
  193. if (prev_file_) {
  194. if (curr_level_ != 0) {
  195. int comp_sign = internal_comparator_->Compare(
  196. prev_file_->largest_key, f->smallest_key);
  197. assert(comp_sign < 0);
  198. } else {
  199. // level == 0, the current file cannot be newer than the previous
  200. // one. Use compressed data structure, has no attribute seqNo
  201. assert(curr_index_in_curr_level_ > 0);
  202. assert(!NewestFirstBySeqNo(files_[0][curr_index_in_curr_level_],
  203. files_[0][curr_index_in_curr_level_-1]));
  204. }
  205. }
  206. prev_file_ = f;
  207. #endif
  208. returned_file_level_ = curr_level_;
  209. if (curr_level_ > 0 && cmp_largest < 0) {
  210. // No more files to search in this level.
  211. search_ended_ = !PrepareNextLevel();
  212. } else {
  213. ++curr_index_in_curr_level_;
  214. }
  215. return f;
  216. }
  217. // Start searching next level.
  218. search_ended_ = !PrepareNextLevel();
  219. }
  220. // Search ended.
  221. return nullptr;
  222. }
  223. // getter for current file level
  224. // for GET_HIT_L0, GET_HIT_L1 & GET_HIT_L2_AND_UP counts
  225. unsigned int GetHitFileLevel() { return hit_file_level_; }
  226. // Returns true if the most recent "hit file" (i.e., one returned by
  227. // GetNextFile()) is at the last index in its level.
  228. bool IsHitFileLastInLevel() { return is_hit_file_last_in_level_; }
  229. private:
  230. unsigned int num_levels_;
  231. unsigned int curr_level_;
  232. unsigned int returned_file_level_;
  233. unsigned int hit_file_level_;
  234. int32_t search_left_bound_;
  235. int32_t search_right_bound_;
  236. #ifndef NDEBUG
  237. std::vector<FileMetaData*>* files_;
  238. #endif
  239. autovector<LevelFilesBrief>* level_files_brief_;
  240. bool search_ended_;
  241. bool is_hit_file_last_in_level_;
  242. LevelFilesBrief* curr_file_level_;
  243. unsigned int curr_index_in_curr_level_;
  244. unsigned int start_index_in_curr_level_;
  245. Slice user_key_;
  246. Slice ikey_;
  247. FileIndexer* file_indexer_;
  248. const Comparator* user_comparator_;
  249. const InternalKeyComparator* internal_comparator_;
  250. #ifndef NDEBUG
  251. FdWithKeyRange* prev_file_;
  252. #endif
  253. // Setup local variables to search next level.
  254. // Returns false if there are no more levels to search.
  255. bool PrepareNextLevel() {
  256. curr_level_++;
  257. while (curr_level_ < num_levels_) {
  258. curr_file_level_ = &(*level_files_brief_)[curr_level_];
  259. if (curr_file_level_->num_files == 0) {
  260. // When current level is empty, the search bound generated from upper
  261. // level must be [0, -1] or [0, FileIndexer::kLevelMaxIndex] if it is
  262. // also empty.
  263. assert(search_left_bound_ == 0);
  264. assert(search_right_bound_ == -1 ||
  265. search_right_bound_ == FileIndexer::kLevelMaxIndex);
  266. // Since current level is empty, it will need to search all files in
  267. // the next level
  268. search_left_bound_ = 0;
  269. search_right_bound_ = FileIndexer::kLevelMaxIndex;
  270. curr_level_++;
  271. continue;
  272. }
  273. // Some files may overlap each other. We find
  274. // all files that overlap user_key and process them in order from
  275. // newest to oldest. In the context of merge-operator, this can occur at
  276. // any level. Otherwise, it only occurs at Level-0 (since Put/Deletes
  277. // are always compacted into a single entry).
  278. int32_t start_index;
  279. if (curr_level_ == 0) {
  280. // On Level-0, we read through all files to check for overlap.
  281. start_index = 0;
  282. } else {
  283. // On Level-n (n>=1), files are sorted. Binary search to find the
  284. // earliest file whose largest key >= ikey. Search left bound and
  285. // right bound are used to narrow the range.
  286. if (search_left_bound_ <= search_right_bound_) {
  287. if (search_right_bound_ == FileIndexer::kLevelMaxIndex) {
  288. search_right_bound_ =
  289. static_cast<int32_t>(curr_file_level_->num_files) - 1;
  290. }
  291. // `search_right_bound_` is an inclusive upper-bound, but since it was
  292. // determined based on user key, it is still possible the lookup key
  293. // falls to the right of `search_right_bound_`'s corresponding file.
  294. // So, pass a limit one higher, which allows us to detect this case.
  295. start_index =
  296. FindFileInRange(*internal_comparator_, *curr_file_level_, ikey_,
  297. static_cast<uint32_t>(search_left_bound_),
  298. static_cast<uint32_t>(search_right_bound_) + 1);
  299. if (start_index == search_right_bound_ + 1) {
  300. // `ikey_` comes after `search_right_bound_`. The lookup key does
  301. // not exist on this level, so let's skip this level and do a full
  302. // binary search on the next level.
  303. search_left_bound_ = 0;
  304. search_right_bound_ = FileIndexer::kLevelMaxIndex;
  305. curr_level_++;
  306. continue;
  307. }
  308. } else {
  309. // search_left_bound > search_right_bound, key does not exist in
  310. // this level. Since no comparison is done in this level, it will
  311. // need to search all files in the next level.
  312. search_left_bound_ = 0;
  313. search_right_bound_ = FileIndexer::kLevelMaxIndex;
  314. curr_level_++;
  315. continue;
  316. }
  317. }
  318. start_index_in_curr_level_ = start_index;
  319. curr_index_in_curr_level_ = start_index;
  320. #ifndef NDEBUG
  321. prev_file_ = nullptr;
  322. #endif
  323. return true;
  324. }
  325. // curr_level_ = num_levels_. So, no more levels to search.
  326. return false;
  327. }
  328. };
  329. class FilePickerMultiGet {
  330. private:
  331. struct FilePickerContext;
  332. public:
  333. FilePickerMultiGet(MultiGetRange* range,
  334. autovector<LevelFilesBrief>* file_levels,
  335. unsigned int num_levels, FileIndexer* file_indexer,
  336. const Comparator* user_comparator,
  337. const InternalKeyComparator* internal_comparator)
  338. : num_levels_(num_levels),
  339. curr_level_(static_cast<unsigned int>(-1)),
  340. returned_file_level_(static_cast<unsigned int>(-1)),
  341. hit_file_level_(static_cast<unsigned int>(-1)),
  342. range_(range),
  343. batch_iter_(range->begin()),
  344. batch_iter_prev_(range->begin()),
  345. maybe_repeat_key_(false),
  346. current_level_range_(*range, range->begin(), range->end()),
  347. current_file_range_(*range, range->begin(), range->end()),
  348. level_files_brief_(file_levels),
  349. is_hit_file_last_in_level_(false),
  350. curr_file_level_(nullptr),
  351. file_indexer_(file_indexer),
  352. user_comparator_(user_comparator),
  353. internal_comparator_(internal_comparator) {
  354. for (auto iter = range_->begin(); iter != range_->end(); ++iter) {
  355. fp_ctx_array_[iter.index()] =
  356. FilePickerContext(0, FileIndexer::kLevelMaxIndex);
  357. }
  358. // Setup member variables to search first level.
  359. search_ended_ = !PrepareNextLevel();
  360. if (!search_ended_) {
  361. // REVISIT
  362. // Prefetch Level 0 table data to avoid cache miss if possible.
  363. // As of now, only PlainTableReader and CuckooTableReader do any
  364. // prefetching. This may not be necessary anymore once we implement
  365. // batching in those table readers
  366. for (unsigned int i = 0; i < (*level_files_brief_)[0].num_files; ++i) {
  367. auto* r = (*level_files_brief_)[0].files[i].fd.table_reader;
  368. if (r) {
  369. for (auto iter = range_->begin(); iter != range_->end(); ++iter) {
  370. r->Prepare(iter->ikey);
  371. }
  372. }
  373. }
  374. }
  375. }
  376. int GetCurrentLevel() const { return curr_level_; }
  377. // Iterates through files in the current level until it finds a file that
  378. // contains atleast one key from the MultiGet batch
  379. bool GetNextFileInLevelWithKeys(MultiGetRange* next_file_range,
  380. size_t* file_index, FdWithKeyRange** fd,
  381. bool* is_last_key_in_file) {
  382. size_t curr_file_index = *file_index;
  383. FdWithKeyRange* f = nullptr;
  384. bool file_hit = false;
  385. int cmp_largest = -1;
  386. if (curr_file_index >= curr_file_level_->num_files) {
  387. // In the unlikely case the next key is a duplicate of the current key,
  388. // and the current key is the last in the level and the internal key
  389. // was not found, we need to skip lookup for the remaining keys and
  390. // reset the search bounds
  391. if (batch_iter_ != current_level_range_.end()) {
  392. ++batch_iter_;
  393. for (; batch_iter_ != current_level_range_.end(); ++batch_iter_) {
  394. struct FilePickerContext& fp_ctx = fp_ctx_array_[batch_iter_.index()];
  395. fp_ctx.search_left_bound = 0;
  396. fp_ctx.search_right_bound = FileIndexer::kLevelMaxIndex;
  397. }
  398. }
  399. return false;
  400. }
  401. // Loops over keys in the MultiGet batch until it finds a file with
  402. // atleast one of the keys. Then it keeps moving forward until the
  403. // last key in the batch that falls in that file
  404. while (batch_iter_ != current_level_range_.end() &&
  405. (fp_ctx_array_[batch_iter_.index()].curr_index_in_curr_level ==
  406. curr_file_index ||
  407. !file_hit)) {
  408. struct FilePickerContext& fp_ctx = fp_ctx_array_[batch_iter_.index()];
  409. f = &curr_file_level_->files[fp_ctx.curr_index_in_curr_level];
  410. Slice& user_key = batch_iter_->ukey;
  411. // Do key range filtering of files or/and fractional cascading if:
  412. // (1) not all the files are in level 0, or
  413. // (2) there are more than 3 current level files
  414. // If there are only 3 or less current level files in the system, we
  415. // skip the key range filtering. In this case, more likely, the system
  416. // is highly tuned to minimize number of tables queried by each query,
  417. // so it is unlikely that key range filtering is more efficient than
  418. // querying the files.
  419. if (num_levels_ > 1 || curr_file_level_->num_files > 3) {
  420. // Check if key is within a file's range. If search left bound and
  421. // right bound point to the same find, we are sure key falls in
  422. // range.
  423. assert(curr_level_ == 0 ||
  424. fp_ctx.curr_index_in_curr_level ==
  425. fp_ctx.start_index_in_curr_level ||
  426. user_comparator_->Compare(user_key,
  427. ExtractUserKey(f->smallest_key)) <= 0);
  428. int cmp_smallest = user_comparator_->Compare(
  429. user_key, ExtractUserKey(f->smallest_key));
  430. if (cmp_smallest >= 0) {
  431. cmp_largest = user_comparator_->Compare(
  432. user_key, ExtractUserKey(f->largest_key));
  433. } else {
  434. cmp_largest = -1;
  435. }
  436. // Setup file search bound for the next level based on the
  437. // comparison results
  438. if (curr_level_ > 0) {
  439. file_indexer_->GetNextLevelIndex(
  440. curr_level_, fp_ctx.curr_index_in_curr_level, cmp_smallest,
  441. cmp_largest, &fp_ctx.search_left_bound,
  442. &fp_ctx.search_right_bound);
  443. }
  444. // Key falls out of current file's range
  445. if (cmp_smallest < 0 || cmp_largest > 0) {
  446. next_file_range->SkipKey(batch_iter_);
  447. } else {
  448. file_hit = true;
  449. }
  450. } else {
  451. file_hit = true;
  452. }
  453. if (cmp_largest == 0) {
  454. // cmp_largest is 0, which means the next key will not be in this
  455. // file, so stop looking further. Also don't increment megt_iter_
  456. // as we may have to look for this key in the next file if we don't
  457. // find it in this one
  458. break;
  459. } else {
  460. if (curr_level_ == 0) {
  461. // We need to look through all files in level 0
  462. ++fp_ctx.curr_index_in_curr_level;
  463. }
  464. ++batch_iter_;
  465. }
  466. if (!file_hit) {
  467. curr_file_index =
  468. (batch_iter_ != current_level_range_.end())
  469. ? fp_ctx_array_[batch_iter_.index()].curr_index_in_curr_level
  470. : curr_file_level_->num_files;
  471. }
  472. }
  473. *fd = f;
  474. *file_index = curr_file_index;
  475. *is_last_key_in_file = cmp_largest == 0;
  476. return file_hit;
  477. }
  478. FdWithKeyRange* GetNextFile() {
  479. while (!search_ended_) {
  480. // Start searching next level.
  481. if (batch_iter_ == current_level_range_.end()) {
  482. search_ended_ = !PrepareNextLevel();
  483. continue;
  484. } else {
  485. if (maybe_repeat_key_) {
  486. maybe_repeat_key_ = false;
  487. // Check if we found the final value for the last key in the
  488. // previous lookup range. If we did, then there's no need to look
  489. // any further for that key, so advance batch_iter_. Else, keep
  490. // batch_iter_ positioned on that key so we look it up again in
  491. // the next file
  492. // For L0, always advance the key because we will look in the next
  493. // file regardless for all keys not found yet
  494. if (current_level_range_.CheckKeyDone(batch_iter_) ||
  495. curr_level_ == 0) {
  496. ++batch_iter_;
  497. }
  498. }
  499. // batch_iter_prev_ will become the start key for the next file
  500. // lookup
  501. batch_iter_prev_ = batch_iter_;
  502. }
  503. MultiGetRange next_file_range(current_level_range_, batch_iter_prev_,
  504. current_level_range_.end());
  505. size_t curr_file_index =
  506. (batch_iter_ != current_level_range_.end())
  507. ? fp_ctx_array_[batch_iter_.index()].curr_index_in_curr_level
  508. : curr_file_level_->num_files;
  509. FdWithKeyRange* f;
  510. bool is_last_key_in_file;
  511. if (!GetNextFileInLevelWithKeys(&next_file_range, &curr_file_index, &f,
  512. &is_last_key_in_file)) {
  513. search_ended_ = !PrepareNextLevel();
  514. } else {
  515. MultiGetRange::Iterator upper_key = batch_iter_;
  516. if (is_last_key_in_file) {
  517. // Since cmp_largest is 0, batch_iter_ still points to the last key
  518. // that falls in this file, instead of the next one. Increment
  519. // upper_key so we can set the range properly for SST MultiGet
  520. ++upper_key;
  521. ++(fp_ctx_array_[batch_iter_.index()].curr_index_in_curr_level);
  522. maybe_repeat_key_ = true;
  523. }
  524. // Set the range for this file
  525. current_file_range_ =
  526. MultiGetRange(next_file_range, batch_iter_prev_, upper_key);
  527. returned_file_level_ = curr_level_;
  528. hit_file_level_ = curr_level_;
  529. is_hit_file_last_in_level_ =
  530. curr_file_index == curr_file_level_->num_files - 1;
  531. return f;
  532. }
  533. }
  534. // Search ended
  535. return nullptr;
  536. }
  537. // getter for current file level
  538. // for GET_HIT_L0, GET_HIT_L1 & GET_HIT_L2_AND_UP counts
  539. unsigned int GetHitFileLevel() { return hit_file_level_; }
  540. // Returns true if the most recent "hit file" (i.e., one returned by
  541. // GetNextFile()) is at the last index in its level.
  542. bool IsHitFileLastInLevel() { return is_hit_file_last_in_level_; }
  543. const MultiGetRange& CurrentFileRange() { return current_file_range_; }
  544. private:
  545. unsigned int num_levels_;
  546. unsigned int curr_level_;
  547. unsigned int returned_file_level_;
  548. unsigned int hit_file_level_;
  549. struct FilePickerContext {
  550. int32_t search_left_bound;
  551. int32_t search_right_bound;
  552. unsigned int curr_index_in_curr_level;
  553. unsigned int start_index_in_curr_level;
  554. FilePickerContext(int32_t left, int32_t right)
  555. : search_left_bound(left), search_right_bound(right),
  556. curr_index_in_curr_level(0), start_index_in_curr_level(0) {}
  557. FilePickerContext() = default;
  558. };
  559. std::array<FilePickerContext, MultiGetContext::MAX_BATCH_SIZE> fp_ctx_array_;
  560. MultiGetRange* range_;
  561. // Iterator to iterate through the keys in a MultiGet batch, that gets reset
  562. // at the beginning of each level. Each call to GetNextFile() will position
  563. // batch_iter_ at or right after the last key that was found in the returned
  564. // SST file
  565. MultiGetRange::Iterator batch_iter_;
  566. // An iterator that records the previous position of batch_iter_, i.e last
  567. // key found in the previous SST file, in order to serve as the start of
  568. // the batch key range for the next SST file
  569. MultiGetRange::Iterator batch_iter_prev_;
  570. bool maybe_repeat_key_;
  571. MultiGetRange current_level_range_;
  572. MultiGetRange current_file_range_;
  573. autovector<LevelFilesBrief>* level_files_brief_;
  574. bool search_ended_;
  575. bool is_hit_file_last_in_level_;
  576. LevelFilesBrief* curr_file_level_;
  577. FileIndexer* file_indexer_;
  578. const Comparator* user_comparator_;
  579. const InternalKeyComparator* internal_comparator_;
  580. // Setup local variables to search next level.
  581. // Returns false if there are no more levels to search.
  582. bool PrepareNextLevel() {
  583. if (curr_level_ == 0) {
  584. MultiGetRange::Iterator mget_iter = current_level_range_.begin();
  585. if (fp_ctx_array_[mget_iter.index()].curr_index_in_curr_level <
  586. curr_file_level_->num_files) {
  587. batch_iter_prev_ = current_level_range_.begin();
  588. batch_iter_ = current_level_range_.begin();
  589. return true;
  590. }
  591. }
  592. curr_level_++;
  593. // Reset key range to saved value
  594. while (curr_level_ < num_levels_) {
  595. bool level_contains_keys = false;
  596. curr_file_level_ = &(*level_files_brief_)[curr_level_];
  597. if (curr_file_level_->num_files == 0) {
  598. // When current level is empty, the search bound generated from upper
  599. // level must be [0, -1] or [0, FileIndexer::kLevelMaxIndex] if it is
  600. // also empty.
  601. for (auto mget_iter = current_level_range_.begin();
  602. mget_iter != current_level_range_.end(); ++mget_iter) {
  603. struct FilePickerContext& fp_ctx = fp_ctx_array_[mget_iter.index()];
  604. assert(fp_ctx.search_left_bound == 0);
  605. assert(fp_ctx.search_right_bound == -1 ||
  606. fp_ctx.search_right_bound == FileIndexer::kLevelMaxIndex);
  607. // Since current level is empty, it will need to search all files in
  608. // the next level
  609. fp_ctx.search_left_bound = 0;
  610. fp_ctx.search_right_bound = FileIndexer::kLevelMaxIndex;
  611. }
  612. // Skip all subsequent empty levels
  613. do {
  614. ++curr_level_;
  615. } while ((curr_level_ < num_levels_) &&
  616. (*level_files_brief_)[curr_level_].num_files == 0);
  617. continue;
  618. }
  619. // Some files may overlap each other. We find
  620. // all files that overlap user_key and process them in order from
  621. // newest to oldest. In the context of merge-operator, this can occur at
  622. // any level. Otherwise, it only occurs at Level-0 (since Put/Deletes
  623. // are always compacted into a single entry).
  624. int32_t start_index = -1;
  625. current_level_range_ =
  626. MultiGetRange(*range_, range_->begin(), range_->end());
  627. for (auto mget_iter = current_level_range_.begin();
  628. mget_iter != current_level_range_.end(); ++mget_iter) {
  629. struct FilePickerContext& fp_ctx = fp_ctx_array_[mget_iter.index()];
  630. if (curr_level_ == 0) {
  631. // On Level-0, we read through all files to check for overlap.
  632. start_index = 0;
  633. level_contains_keys = true;
  634. } else {
  635. // On Level-n (n>=1), files are sorted. Binary search to find the
  636. // earliest file whose largest key >= ikey. Search left bound and
  637. // right bound are used to narrow the range.
  638. if (fp_ctx.search_left_bound <= fp_ctx.search_right_bound) {
  639. if (fp_ctx.search_right_bound == FileIndexer::kLevelMaxIndex) {
  640. fp_ctx.search_right_bound =
  641. static_cast<int32_t>(curr_file_level_->num_files) - 1;
  642. }
  643. // `search_right_bound_` is an inclusive upper-bound, but since it
  644. // was determined based on user key, it is still possible the lookup
  645. // key falls to the right of `search_right_bound_`'s corresponding
  646. // file. So, pass a limit one higher, which allows us to detect this
  647. // case.
  648. Slice& ikey = mget_iter->ikey;
  649. start_index = FindFileInRange(
  650. *internal_comparator_, *curr_file_level_, ikey,
  651. static_cast<uint32_t>(fp_ctx.search_left_bound),
  652. static_cast<uint32_t>(fp_ctx.search_right_bound) + 1);
  653. if (start_index == fp_ctx.search_right_bound + 1) {
  654. // `ikey_` comes after `search_right_bound_`. The lookup key does
  655. // not exist on this level, so let's skip this level and do a full
  656. // binary search on the next level.
  657. fp_ctx.search_left_bound = 0;
  658. fp_ctx.search_right_bound = FileIndexer::kLevelMaxIndex;
  659. current_level_range_.SkipKey(mget_iter);
  660. continue;
  661. } else {
  662. level_contains_keys = true;
  663. }
  664. } else {
  665. // search_left_bound > search_right_bound, key does not exist in
  666. // this level. Since no comparison is done in this level, it will
  667. // need to search all files in the next level.
  668. fp_ctx.search_left_bound = 0;
  669. fp_ctx.search_right_bound = FileIndexer::kLevelMaxIndex;
  670. current_level_range_.SkipKey(mget_iter);
  671. continue;
  672. }
  673. }
  674. fp_ctx.start_index_in_curr_level = start_index;
  675. fp_ctx.curr_index_in_curr_level = start_index;
  676. }
  677. if (level_contains_keys) {
  678. batch_iter_prev_ = current_level_range_.begin();
  679. batch_iter_ = current_level_range_.begin();
  680. return true;
  681. }
  682. curr_level_++;
  683. }
  684. // curr_level_ = num_levels_. So, no more levels to search.
  685. return false;
  686. }
  687. };
  688. } // anonymous namespace
  689. VersionStorageInfo::~VersionStorageInfo() { delete[] files_; }
  690. Version::~Version() {
  691. assert(refs_ == 0);
  692. // Remove from linked list
  693. prev_->next_ = next_;
  694. next_->prev_ = prev_;
  695. // Drop references to files
  696. for (int level = 0; level < storage_info_.num_levels_; level++) {
  697. for (size_t i = 0; i < storage_info_.files_[level].size(); i++) {
  698. FileMetaData* f = storage_info_.files_[level][i];
  699. assert(f->refs > 0);
  700. f->refs--;
  701. if (f->refs <= 0) {
  702. assert(cfd_ != nullptr);
  703. uint32_t path_id = f->fd.GetPathId();
  704. assert(path_id < cfd_->ioptions()->cf_paths.size());
  705. vset_->obsolete_files_.push_back(
  706. ObsoleteFileInfo(f, cfd_->ioptions()->cf_paths[path_id].path));
  707. }
  708. }
  709. }
  710. }
  711. int FindFile(const InternalKeyComparator& icmp,
  712. const LevelFilesBrief& file_level,
  713. const Slice& key) {
  714. return FindFileInRange(icmp, file_level, key, 0,
  715. static_cast<uint32_t>(file_level.num_files));
  716. }
  717. void DoGenerateLevelFilesBrief(LevelFilesBrief* file_level,
  718. const std::vector<FileMetaData*>& files,
  719. Arena* arena) {
  720. assert(file_level);
  721. assert(arena);
  722. size_t num = files.size();
  723. file_level->num_files = num;
  724. char* mem = arena->AllocateAligned(num * sizeof(FdWithKeyRange));
  725. file_level->files = new (mem)FdWithKeyRange[num];
  726. for (size_t i = 0; i < num; i++) {
  727. Slice smallest_key = files[i]->smallest.Encode();
  728. Slice largest_key = files[i]->largest.Encode();
  729. // Copy key slice to sequential memory
  730. size_t smallest_size = smallest_key.size();
  731. size_t largest_size = largest_key.size();
  732. mem = arena->AllocateAligned(smallest_size + largest_size);
  733. memcpy(mem, smallest_key.data(), smallest_size);
  734. memcpy(mem + smallest_size, largest_key.data(), largest_size);
  735. FdWithKeyRange& f = file_level->files[i];
  736. f.fd = files[i]->fd;
  737. f.file_metadata = files[i];
  738. f.smallest_key = Slice(mem, smallest_size);
  739. f.largest_key = Slice(mem + smallest_size, largest_size);
  740. }
  741. }
  742. static bool AfterFile(const Comparator* ucmp,
  743. const Slice* user_key, const FdWithKeyRange* f) {
  744. // nullptr user_key occurs before all keys and is therefore never after *f
  745. return (user_key != nullptr &&
  746. ucmp->CompareWithoutTimestamp(*user_key,
  747. ExtractUserKey(f->largest_key)) > 0);
  748. }
  749. static bool BeforeFile(const Comparator* ucmp,
  750. const Slice* user_key, const FdWithKeyRange* f) {
  751. // nullptr user_key occurs after all keys and is therefore never before *f
  752. return (user_key != nullptr &&
  753. ucmp->CompareWithoutTimestamp(*user_key,
  754. ExtractUserKey(f->smallest_key)) < 0);
  755. }
  756. bool SomeFileOverlapsRange(
  757. const InternalKeyComparator& icmp,
  758. bool disjoint_sorted_files,
  759. const LevelFilesBrief& file_level,
  760. const Slice* smallest_user_key,
  761. const Slice* largest_user_key) {
  762. const Comparator* ucmp = icmp.user_comparator();
  763. if (!disjoint_sorted_files) {
  764. // Need to check against all files
  765. for (size_t i = 0; i < file_level.num_files; i++) {
  766. const FdWithKeyRange* f = &(file_level.files[i]);
  767. if (AfterFile(ucmp, smallest_user_key, f) ||
  768. BeforeFile(ucmp, largest_user_key, f)) {
  769. // No overlap
  770. } else {
  771. return true; // Overlap
  772. }
  773. }
  774. return false;
  775. }
  776. // Binary search over file list
  777. uint32_t index = 0;
  778. if (smallest_user_key != nullptr) {
  779. // Find the leftmost possible internal key for smallest_user_key
  780. InternalKey small;
  781. small.SetMinPossibleForUserKey(*smallest_user_key);
  782. index = FindFile(icmp, file_level, small.Encode());
  783. }
  784. if (index >= file_level.num_files) {
  785. // beginning of range is after all files, so no overlap.
  786. return false;
  787. }
  788. return !BeforeFile(ucmp, largest_user_key, &file_level.files[index]);
  789. }
  790. namespace {
  791. class LevelIterator final : public InternalIterator {
  792. public:
  793. LevelIterator(TableCache* table_cache, const ReadOptions& read_options,
  794. const FileOptions& file_options,
  795. const InternalKeyComparator& icomparator,
  796. const LevelFilesBrief* flevel,
  797. const SliceTransform* prefix_extractor, bool should_sample,
  798. HistogramImpl* file_read_hist, TableReaderCaller caller,
  799. bool skip_filters, int level, RangeDelAggregator* range_del_agg,
  800. const std::vector<AtomicCompactionUnitBoundary>*
  801. compaction_boundaries = nullptr)
  802. : table_cache_(table_cache),
  803. read_options_(read_options),
  804. file_options_(file_options),
  805. icomparator_(icomparator),
  806. user_comparator_(icomparator.user_comparator()),
  807. flevel_(flevel),
  808. prefix_extractor_(prefix_extractor),
  809. file_read_hist_(file_read_hist),
  810. should_sample_(should_sample),
  811. caller_(caller),
  812. skip_filters_(skip_filters),
  813. file_index_(flevel_->num_files),
  814. level_(level),
  815. range_del_agg_(range_del_agg),
  816. pinned_iters_mgr_(nullptr),
  817. compaction_boundaries_(compaction_boundaries) {
  818. // Empty level is not supported.
  819. assert(flevel_ != nullptr && flevel_->num_files > 0);
  820. }
  821. ~LevelIterator() override { delete file_iter_.Set(nullptr); }
  822. void Seek(const Slice& target) override;
  823. void SeekForPrev(const Slice& target) override;
  824. void SeekToFirst() override;
  825. void SeekToLast() override;
  826. void Next() final override;
  827. bool NextAndGetResult(IterateResult* result) override;
  828. void Prev() override;
  829. bool Valid() const override { return file_iter_.Valid(); }
  830. Slice key() const override {
  831. assert(Valid());
  832. return file_iter_.key();
  833. }
  834. Slice value() const override {
  835. assert(Valid());
  836. return file_iter_.value();
  837. }
  838. Status status() const override {
  839. return file_iter_.iter() ? file_iter_.status() : Status::OK();
  840. }
  841. inline bool MayBeOutOfLowerBound() override {
  842. assert(Valid());
  843. return may_be_out_of_lower_bound_ && file_iter_.MayBeOutOfLowerBound();
  844. }
  845. inline bool MayBeOutOfUpperBound() override {
  846. assert(Valid());
  847. return file_iter_.MayBeOutOfUpperBound();
  848. }
  849. void SetPinnedItersMgr(PinnedIteratorsManager* pinned_iters_mgr) override {
  850. pinned_iters_mgr_ = pinned_iters_mgr;
  851. if (file_iter_.iter()) {
  852. file_iter_.SetPinnedItersMgr(pinned_iters_mgr);
  853. }
  854. }
  855. bool IsKeyPinned() const override {
  856. return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
  857. file_iter_.iter() && file_iter_.IsKeyPinned();
  858. }
  859. bool IsValuePinned() const override {
  860. return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
  861. file_iter_.iter() && file_iter_.IsValuePinned();
  862. }
  863. private:
  864. // Return true if at least one invalid file is seen and skipped.
  865. bool SkipEmptyFileForward();
  866. void SkipEmptyFileBackward();
  867. void SetFileIterator(InternalIterator* iter);
  868. void InitFileIterator(size_t new_file_index);
  869. // Called by both of Next() and NextAndGetResult(). Force inline.
  870. void NextImpl() {
  871. assert(Valid());
  872. file_iter_.Next();
  873. SkipEmptyFileForward();
  874. }
  875. const Slice& file_smallest_key(size_t file_index) {
  876. assert(file_index < flevel_->num_files);
  877. return flevel_->files[file_index].smallest_key;
  878. }
  879. bool KeyReachedUpperBound(const Slice& internal_key) {
  880. return read_options_.iterate_upper_bound != nullptr &&
  881. user_comparator_.CompareWithoutTimestamp(
  882. ExtractUserKey(internal_key),
  883. *read_options_.iterate_upper_bound) >= 0;
  884. }
  885. InternalIterator* NewFileIterator() {
  886. assert(file_index_ < flevel_->num_files);
  887. auto file_meta = flevel_->files[file_index_];
  888. if (should_sample_) {
  889. sample_file_read_inc(file_meta.file_metadata);
  890. }
  891. const InternalKey* smallest_compaction_key = nullptr;
  892. const InternalKey* largest_compaction_key = nullptr;
  893. if (compaction_boundaries_ != nullptr) {
  894. smallest_compaction_key = (*compaction_boundaries_)[file_index_].smallest;
  895. largest_compaction_key = (*compaction_boundaries_)[file_index_].largest;
  896. }
  897. CheckMayBeOutOfLowerBound();
  898. return table_cache_->NewIterator(
  899. read_options_, file_options_, icomparator_, *file_meta.file_metadata,
  900. range_del_agg_, prefix_extractor_,
  901. nullptr /* don't need reference to table */, file_read_hist_, caller_,
  902. /*arena=*/nullptr, skip_filters_, level_, smallest_compaction_key,
  903. largest_compaction_key);
  904. }
  905. // Check if current file being fully within iterate_lower_bound.
  906. //
  907. // Note MyRocks may update iterate bounds between seek. To workaround it,
  908. // we need to check and update may_be_out_of_lower_bound_ accordingly.
  909. void CheckMayBeOutOfLowerBound() {
  910. if (read_options_.iterate_lower_bound != nullptr &&
  911. file_index_ < flevel_->num_files) {
  912. may_be_out_of_lower_bound_ =
  913. user_comparator_.Compare(
  914. ExtractUserKey(file_smallest_key(file_index_)),
  915. *read_options_.iterate_lower_bound) < 0;
  916. }
  917. }
  918. TableCache* table_cache_;
  919. const ReadOptions read_options_;
  920. const FileOptions& file_options_;
  921. const InternalKeyComparator& icomparator_;
  922. const UserComparatorWrapper user_comparator_;
  923. const LevelFilesBrief* flevel_;
  924. mutable FileDescriptor current_value_;
  925. // `prefix_extractor_` may be non-null even for total order seek. Checking
  926. // this variable is not the right way to identify whether prefix iterator
  927. // is used.
  928. const SliceTransform* prefix_extractor_;
  929. HistogramImpl* file_read_hist_;
  930. bool should_sample_;
  931. TableReaderCaller caller_;
  932. bool skip_filters_;
  933. bool may_be_out_of_lower_bound_ = true;
  934. size_t file_index_;
  935. int level_;
  936. RangeDelAggregator* range_del_agg_;
  937. IteratorWrapper file_iter_; // May be nullptr
  938. PinnedIteratorsManager* pinned_iters_mgr_;
  939. // To be propagated to RangeDelAggregator in order to safely truncate range
  940. // tombstones.
  941. const std::vector<AtomicCompactionUnitBoundary>* compaction_boundaries_;
  942. };
  943. void LevelIterator::Seek(const Slice& target) {
  944. // Check whether the seek key fall under the same file
  945. bool need_to_reseek = true;
  946. if (file_iter_.iter() != nullptr && file_index_ < flevel_->num_files) {
  947. const FdWithKeyRange& cur_file = flevel_->files[file_index_];
  948. if (icomparator_.InternalKeyComparator::Compare(
  949. target, cur_file.largest_key) <= 0 &&
  950. icomparator_.InternalKeyComparator::Compare(
  951. target, cur_file.smallest_key) >= 0) {
  952. need_to_reseek = false;
  953. assert(static_cast<size_t>(FindFile(icomparator_, *flevel_, target)) ==
  954. file_index_);
  955. }
  956. }
  957. if (need_to_reseek) {
  958. TEST_SYNC_POINT("LevelIterator::Seek:BeforeFindFile");
  959. size_t new_file_index = FindFile(icomparator_, *flevel_, target);
  960. InitFileIterator(new_file_index);
  961. }
  962. if (file_iter_.iter() != nullptr) {
  963. file_iter_.Seek(target);
  964. }
  965. if (SkipEmptyFileForward() && prefix_extractor_ != nullptr &&
  966. !read_options_.total_order_seek && !read_options_.auto_prefix_mode &&
  967. file_iter_.iter() != nullptr && file_iter_.Valid()) {
  968. // We've skipped the file we initially positioned to. In the prefix
  969. // seek case, it is likely that the file is skipped because of
  970. // prefix bloom or hash, where more keys are skipped. We then check
  971. // the current key and invalidate the iterator if the prefix is
  972. // already passed.
  973. // When doing prefix iterator seek, when keys for one prefix have
  974. // been exhausted, it can jump to any key that is larger. Here we are
  975. // enforcing a stricter contract than that, in order to make it easier for
  976. // higher layers (merging and DB iterator) to reason the correctness:
  977. // 1. Within the prefix, the result should be accurate.
  978. // 2. If keys for the prefix is exhausted, it is either positioned to the
  979. // next key after the prefix, or make the iterator invalid.
  980. // A side benefit will be that it invalidates the iterator earlier so that
  981. // the upper level merging iterator can merge fewer child iterators.
  982. Slice target_user_key = ExtractUserKey(target);
  983. Slice file_user_key = ExtractUserKey(file_iter_.key());
  984. if (prefix_extractor_->InDomain(target_user_key) &&
  985. (!prefix_extractor_->InDomain(file_user_key) ||
  986. user_comparator_.Compare(
  987. prefix_extractor_->Transform(target_user_key),
  988. prefix_extractor_->Transform(file_user_key)) != 0)) {
  989. SetFileIterator(nullptr);
  990. }
  991. }
  992. CheckMayBeOutOfLowerBound();
  993. }
  994. void LevelIterator::SeekForPrev(const Slice& target) {
  995. size_t new_file_index = FindFile(icomparator_, *flevel_, target);
  996. if (new_file_index >= flevel_->num_files) {
  997. new_file_index = flevel_->num_files - 1;
  998. }
  999. InitFileIterator(new_file_index);
  1000. if (file_iter_.iter() != nullptr) {
  1001. file_iter_.SeekForPrev(target);
  1002. SkipEmptyFileBackward();
  1003. }
  1004. CheckMayBeOutOfLowerBound();
  1005. }
  1006. void LevelIterator::SeekToFirst() {
  1007. InitFileIterator(0);
  1008. if (file_iter_.iter() != nullptr) {
  1009. file_iter_.SeekToFirst();
  1010. }
  1011. SkipEmptyFileForward();
  1012. CheckMayBeOutOfLowerBound();
  1013. }
  1014. void LevelIterator::SeekToLast() {
  1015. InitFileIterator(flevel_->num_files - 1);
  1016. if (file_iter_.iter() != nullptr) {
  1017. file_iter_.SeekToLast();
  1018. }
  1019. SkipEmptyFileBackward();
  1020. CheckMayBeOutOfLowerBound();
  1021. }
  1022. void LevelIterator::Next() { NextImpl(); }
  1023. bool LevelIterator::NextAndGetResult(IterateResult* result) {
  1024. NextImpl();
  1025. bool is_valid = Valid();
  1026. if (is_valid) {
  1027. result->key = key();
  1028. result->may_be_out_of_upper_bound = MayBeOutOfUpperBound();
  1029. }
  1030. return is_valid;
  1031. }
  1032. void LevelIterator::Prev() {
  1033. assert(Valid());
  1034. file_iter_.Prev();
  1035. SkipEmptyFileBackward();
  1036. }
  1037. bool LevelIterator::SkipEmptyFileForward() {
  1038. bool seen_empty_file = false;
  1039. while (file_iter_.iter() == nullptr ||
  1040. (!file_iter_.Valid() && file_iter_.status().ok() &&
  1041. !file_iter_.iter()->IsOutOfBound())) {
  1042. seen_empty_file = true;
  1043. // Move to next file
  1044. if (file_index_ >= flevel_->num_files - 1) {
  1045. // Already at the last file
  1046. SetFileIterator(nullptr);
  1047. break;
  1048. }
  1049. if (KeyReachedUpperBound(file_smallest_key(file_index_ + 1))) {
  1050. SetFileIterator(nullptr);
  1051. break;
  1052. }
  1053. InitFileIterator(file_index_ + 1);
  1054. if (file_iter_.iter() != nullptr) {
  1055. file_iter_.SeekToFirst();
  1056. }
  1057. }
  1058. return seen_empty_file;
  1059. }
  1060. void LevelIterator::SkipEmptyFileBackward() {
  1061. while (file_iter_.iter() == nullptr ||
  1062. (!file_iter_.Valid() && file_iter_.status().ok())) {
  1063. // Move to previous file
  1064. if (file_index_ == 0) {
  1065. // Already the first file
  1066. SetFileIterator(nullptr);
  1067. return;
  1068. }
  1069. InitFileIterator(file_index_ - 1);
  1070. if (file_iter_.iter() != nullptr) {
  1071. file_iter_.SeekToLast();
  1072. }
  1073. }
  1074. }
  1075. void LevelIterator::SetFileIterator(InternalIterator* iter) {
  1076. if (pinned_iters_mgr_ && iter) {
  1077. iter->SetPinnedItersMgr(pinned_iters_mgr_);
  1078. }
  1079. InternalIterator* old_iter = file_iter_.Set(iter);
  1080. if (pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled()) {
  1081. pinned_iters_mgr_->PinIterator(old_iter);
  1082. } else {
  1083. delete old_iter;
  1084. }
  1085. }
  1086. void LevelIterator::InitFileIterator(size_t new_file_index) {
  1087. if (new_file_index >= flevel_->num_files) {
  1088. file_index_ = new_file_index;
  1089. SetFileIterator(nullptr);
  1090. return;
  1091. } else {
  1092. // If the file iterator shows incomplete, we try it again if users seek
  1093. // to the same file, as this time we may go to a different data block
  1094. // which is cached in block cache.
  1095. //
  1096. if (file_iter_.iter() != nullptr && !file_iter_.status().IsIncomplete() &&
  1097. new_file_index == file_index_) {
  1098. // file_iter_ is already constructed with this iterator, so
  1099. // no need to change anything
  1100. } else {
  1101. file_index_ = new_file_index;
  1102. InternalIterator* iter = NewFileIterator();
  1103. SetFileIterator(iter);
  1104. }
  1105. }
  1106. }
  1107. } // anonymous namespace
  1108. // A wrapper of version builder which references the current version in
  1109. // constructor and unref it in the destructor.
  1110. // Both of the constructor and destructor need to be called inside DB Mutex.
  1111. class BaseReferencedVersionBuilder {
  1112. public:
  1113. explicit BaseReferencedVersionBuilder(ColumnFamilyData* cfd)
  1114. : version_builder_(new VersionBuilder(
  1115. cfd->current()->version_set()->file_options(), cfd->table_cache(),
  1116. cfd->current()->storage_info(), cfd->ioptions()->info_log)),
  1117. version_(cfd->current()) {
  1118. version_->Ref();
  1119. }
  1120. ~BaseReferencedVersionBuilder() {
  1121. version_->Unref();
  1122. }
  1123. VersionBuilder* version_builder() { return version_builder_.get(); }
  1124. private:
  1125. std::unique_ptr<VersionBuilder> version_builder_;
  1126. Version* version_;
  1127. };
  1128. Status Version::GetTableProperties(std::shared_ptr<const TableProperties>* tp,
  1129. const FileMetaData* file_meta,
  1130. const std::string* fname) const {
  1131. auto table_cache = cfd_->table_cache();
  1132. auto ioptions = cfd_->ioptions();
  1133. Status s = table_cache->GetTableProperties(
  1134. file_options_, cfd_->internal_comparator(), file_meta->fd, tp,
  1135. mutable_cf_options_.prefix_extractor.get(), true /* no io */);
  1136. if (s.ok()) {
  1137. return s;
  1138. }
  1139. // We only ignore error type `Incomplete` since it's by design that we
  1140. // disallow table when it's not in table cache.
  1141. if (!s.IsIncomplete()) {
  1142. return s;
  1143. }
  1144. // 2. Table is not present in table cache, we'll read the table properties
  1145. // directly from the properties block in the file.
  1146. std::unique_ptr<FSRandomAccessFile> file;
  1147. std::string file_name;
  1148. if (fname != nullptr) {
  1149. file_name = *fname;
  1150. } else {
  1151. file_name =
  1152. TableFileName(ioptions->cf_paths, file_meta->fd.GetNumber(),
  1153. file_meta->fd.GetPathId());
  1154. }
  1155. s = ioptions->fs->NewRandomAccessFile(file_name, file_options_, &file,
  1156. nullptr);
  1157. if (!s.ok()) {
  1158. return s;
  1159. }
  1160. TableProperties* raw_table_properties;
  1161. // By setting the magic number to kInvalidTableMagicNumber, we can by
  1162. // pass the magic number check in the footer.
  1163. std::unique_ptr<RandomAccessFileReader> file_reader(
  1164. new RandomAccessFileReader(
  1165. std::move(file), file_name, nullptr /* env */, nullptr /* stats */,
  1166. 0 /* hist_type */, nullptr /* file_read_hist */,
  1167. nullptr /* rate_limiter */, ioptions->listeners));
  1168. s = ReadTableProperties(
  1169. file_reader.get(), file_meta->fd.GetFileSize(),
  1170. Footer::kInvalidTableMagicNumber /* table's magic number */, *ioptions,
  1171. &raw_table_properties, false /* compression_type_missing */);
  1172. if (!s.ok()) {
  1173. return s;
  1174. }
  1175. RecordTick(ioptions->statistics, NUMBER_DIRECT_LOAD_TABLE_PROPERTIES);
  1176. *tp = std::shared_ptr<const TableProperties>(raw_table_properties);
  1177. return s;
  1178. }
  1179. Status Version::GetPropertiesOfAllTables(TablePropertiesCollection* props) {
  1180. Status s;
  1181. for (int level = 0; level < storage_info_.num_levels_; level++) {
  1182. s = GetPropertiesOfAllTables(props, level);
  1183. if (!s.ok()) {
  1184. return s;
  1185. }
  1186. }
  1187. return Status::OK();
  1188. }
  1189. Status Version::TablesRangeTombstoneSummary(int max_entries_to_print,
  1190. std::string* out_str) {
  1191. if (max_entries_to_print <= 0) {
  1192. return Status::OK();
  1193. }
  1194. int num_entries_left = max_entries_to_print;
  1195. std::stringstream ss;
  1196. for (int level = 0; level < storage_info_.num_levels_; level++) {
  1197. for (const auto& file_meta : storage_info_.files_[level]) {
  1198. auto fname =
  1199. TableFileName(cfd_->ioptions()->cf_paths, file_meta->fd.GetNumber(),
  1200. file_meta->fd.GetPathId());
  1201. ss << "=== file : " << fname << " ===\n";
  1202. TableCache* table_cache = cfd_->table_cache();
  1203. std::unique_ptr<FragmentedRangeTombstoneIterator> tombstone_iter;
  1204. Status s = table_cache->GetRangeTombstoneIterator(
  1205. ReadOptions(), cfd_->internal_comparator(), *file_meta,
  1206. &tombstone_iter);
  1207. if (!s.ok()) {
  1208. return s;
  1209. }
  1210. if (tombstone_iter) {
  1211. tombstone_iter->SeekToFirst();
  1212. while (tombstone_iter->Valid() && num_entries_left > 0) {
  1213. ss << "start: " << tombstone_iter->start_key().ToString(true)
  1214. << " end: " << tombstone_iter->end_key().ToString(true)
  1215. << " seq: " << tombstone_iter->seq() << '\n';
  1216. tombstone_iter->Next();
  1217. num_entries_left--;
  1218. }
  1219. if (num_entries_left <= 0) {
  1220. break;
  1221. }
  1222. }
  1223. }
  1224. if (num_entries_left <= 0) {
  1225. break;
  1226. }
  1227. }
  1228. assert(num_entries_left >= 0);
  1229. if (num_entries_left <= 0) {
  1230. ss << "(results may not be complete)\n";
  1231. }
  1232. *out_str = ss.str();
  1233. return Status::OK();
  1234. }
  1235. Status Version::GetPropertiesOfAllTables(TablePropertiesCollection* props,
  1236. int level) {
  1237. for (const auto& file_meta : storage_info_.files_[level]) {
  1238. auto fname =
  1239. TableFileName(cfd_->ioptions()->cf_paths, file_meta->fd.GetNumber(),
  1240. file_meta->fd.GetPathId());
  1241. // 1. If the table is already present in table cache, load table
  1242. // properties from there.
  1243. std::shared_ptr<const TableProperties> table_properties;
  1244. Status s = GetTableProperties(&table_properties, file_meta, &fname);
  1245. if (s.ok()) {
  1246. props->insert({fname, table_properties});
  1247. } else {
  1248. return s;
  1249. }
  1250. }
  1251. return Status::OK();
  1252. }
  1253. Status Version::GetPropertiesOfTablesInRange(
  1254. const Range* range, std::size_t n, TablePropertiesCollection* props) const {
  1255. for (int level = 0; level < storage_info_.num_non_empty_levels(); level++) {
  1256. for (decltype(n) i = 0; i < n; i++) {
  1257. // Convert user_key into a corresponding internal key.
  1258. InternalKey k1(range[i].start, kMaxSequenceNumber, kValueTypeForSeek);
  1259. InternalKey k2(range[i].limit, kMaxSequenceNumber, kValueTypeForSeek);
  1260. std::vector<FileMetaData*> files;
  1261. storage_info_.GetOverlappingInputs(level, &k1, &k2, &files, -1, nullptr,
  1262. false);
  1263. for (const auto& file_meta : files) {
  1264. auto fname =
  1265. TableFileName(cfd_->ioptions()->cf_paths,
  1266. file_meta->fd.GetNumber(), file_meta->fd.GetPathId());
  1267. if (props->count(fname) == 0) {
  1268. // 1. If the table is already present in table cache, load table
  1269. // properties from there.
  1270. std::shared_ptr<const TableProperties> table_properties;
  1271. Status s = GetTableProperties(&table_properties, file_meta, &fname);
  1272. if (s.ok()) {
  1273. props->insert({fname, table_properties});
  1274. } else {
  1275. return s;
  1276. }
  1277. }
  1278. }
  1279. }
  1280. }
  1281. return Status::OK();
  1282. }
  1283. Status Version::GetAggregatedTableProperties(
  1284. std::shared_ptr<const TableProperties>* tp, int level) {
  1285. TablePropertiesCollection props;
  1286. Status s;
  1287. if (level < 0) {
  1288. s = GetPropertiesOfAllTables(&props);
  1289. } else {
  1290. s = GetPropertiesOfAllTables(&props, level);
  1291. }
  1292. if (!s.ok()) {
  1293. return s;
  1294. }
  1295. auto* new_tp = new TableProperties();
  1296. for (const auto& item : props) {
  1297. new_tp->Add(*item.second);
  1298. }
  1299. tp->reset(new_tp);
  1300. return Status::OK();
  1301. }
  1302. size_t Version::GetMemoryUsageByTableReaders() {
  1303. size_t total_usage = 0;
  1304. for (auto& file_level : storage_info_.level_files_brief_) {
  1305. for (size_t i = 0; i < file_level.num_files; i++) {
  1306. total_usage += cfd_->table_cache()->GetMemoryUsageByTableReader(
  1307. file_options_, cfd_->internal_comparator(), file_level.files[i].fd,
  1308. mutable_cf_options_.prefix_extractor.get());
  1309. }
  1310. }
  1311. return total_usage;
  1312. }
  1313. void Version::GetColumnFamilyMetaData(ColumnFamilyMetaData* cf_meta) {
  1314. assert(cf_meta);
  1315. assert(cfd_);
  1316. cf_meta->name = cfd_->GetName();
  1317. cf_meta->size = 0;
  1318. cf_meta->file_count = 0;
  1319. cf_meta->levels.clear();
  1320. auto* ioptions = cfd_->ioptions();
  1321. auto* vstorage = storage_info();
  1322. for (int level = 0; level < cfd_->NumberLevels(); level++) {
  1323. uint64_t level_size = 0;
  1324. cf_meta->file_count += vstorage->LevelFiles(level).size();
  1325. std::vector<SstFileMetaData> files;
  1326. for (const auto& file : vstorage->LevelFiles(level)) {
  1327. uint32_t path_id = file->fd.GetPathId();
  1328. std::string file_path;
  1329. if (path_id < ioptions->cf_paths.size()) {
  1330. file_path = ioptions->cf_paths[path_id].path;
  1331. } else {
  1332. assert(!ioptions->cf_paths.empty());
  1333. file_path = ioptions->cf_paths.back().path;
  1334. }
  1335. const uint64_t file_number = file->fd.GetNumber();
  1336. files.emplace_back(SstFileMetaData{
  1337. MakeTableFileName("", file_number), file_number, file_path,
  1338. static_cast<size_t>(file->fd.GetFileSize()), file->fd.smallest_seqno,
  1339. file->fd.largest_seqno, file->smallest.user_key().ToString(),
  1340. file->largest.user_key().ToString(),
  1341. file->stats.num_reads_sampled.load(std::memory_order_relaxed),
  1342. file->being_compacted, file->oldest_blob_file_number,
  1343. file->TryGetOldestAncesterTime(), file->TryGetFileCreationTime(),
  1344. file->file_checksum, file->file_checksum_func_name});
  1345. files.back().num_entries = file->num_entries;
  1346. files.back().num_deletions = file->num_deletions;
  1347. level_size += file->fd.GetFileSize();
  1348. }
  1349. cf_meta->levels.emplace_back(
  1350. level, level_size, std::move(files));
  1351. cf_meta->size += level_size;
  1352. }
  1353. }
  1354. uint64_t Version::GetSstFilesSize() {
  1355. uint64_t sst_files_size = 0;
  1356. for (int level = 0; level < storage_info_.num_levels_; level++) {
  1357. for (const auto& file_meta : storage_info_.LevelFiles(level)) {
  1358. sst_files_size += file_meta->fd.GetFileSize();
  1359. }
  1360. }
  1361. return sst_files_size;
  1362. }
  1363. void Version::GetCreationTimeOfOldestFile(uint64_t* creation_time) {
  1364. uint64_t oldest_time = port::kMaxUint64;
  1365. for (int level = 0; level < storage_info_.num_non_empty_levels_; level++) {
  1366. for (FileMetaData* meta : storage_info_.LevelFiles(level)) {
  1367. assert(meta->fd.table_reader != nullptr);
  1368. uint64_t file_creation_time = meta->TryGetFileCreationTime();
  1369. if (file_creation_time == kUnknownFileCreationTime) {
  1370. *creation_time = 0;
  1371. return;
  1372. }
  1373. if (file_creation_time < oldest_time) {
  1374. oldest_time = file_creation_time;
  1375. }
  1376. }
  1377. }
  1378. *creation_time = oldest_time;
  1379. }
  1380. uint64_t VersionStorageInfo::GetEstimatedActiveKeys() const {
  1381. // Estimation will be inaccurate when:
  1382. // (1) there exist merge keys
  1383. // (2) keys are directly overwritten
  1384. // (3) deletion on non-existing keys
  1385. // (4) low number of samples
  1386. if (current_num_samples_ == 0) {
  1387. return 0;
  1388. }
  1389. if (current_num_non_deletions_ <= current_num_deletions_) {
  1390. return 0;
  1391. }
  1392. uint64_t est = current_num_non_deletions_ - current_num_deletions_;
  1393. uint64_t file_count = 0;
  1394. for (int level = 0; level < num_levels_; ++level) {
  1395. file_count += files_[level].size();
  1396. }
  1397. if (current_num_samples_ < file_count) {
  1398. // casting to avoid overflowing
  1399. return
  1400. static_cast<uint64_t>(
  1401. (est * static_cast<double>(file_count) / current_num_samples_)
  1402. );
  1403. } else {
  1404. return est;
  1405. }
  1406. }
  1407. double VersionStorageInfo::GetEstimatedCompressionRatioAtLevel(
  1408. int level) const {
  1409. assert(level < num_levels_);
  1410. uint64_t sum_file_size_bytes = 0;
  1411. uint64_t sum_data_size_bytes = 0;
  1412. for (auto* file_meta : files_[level]) {
  1413. sum_file_size_bytes += file_meta->fd.GetFileSize();
  1414. sum_data_size_bytes += file_meta->raw_key_size + file_meta->raw_value_size;
  1415. }
  1416. if (sum_file_size_bytes == 0) {
  1417. return -1.0;
  1418. }
  1419. return static_cast<double>(sum_data_size_bytes) / sum_file_size_bytes;
  1420. }
  1421. void Version::AddIterators(const ReadOptions& read_options,
  1422. const FileOptions& soptions,
  1423. MergeIteratorBuilder* merge_iter_builder,
  1424. RangeDelAggregator* range_del_agg) {
  1425. assert(storage_info_.finalized_);
  1426. for (int level = 0; level < storage_info_.num_non_empty_levels(); level++) {
  1427. AddIteratorsForLevel(read_options, soptions, merge_iter_builder, level,
  1428. range_del_agg);
  1429. }
  1430. }
  1431. void Version::AddIteratorsForLevel(const ReadOptions& read_options,
  1432. const FileOptions& soptions,
  1433. MergeIteratorBuilder* merge_iter_builder,
  1434. int level,
  1435. RangeDelAggregator* range_del_agg) {
  1436. assert(storage_info_.finalized_);
  1437. if (level >= storage_info_.num_non_empty_levels()) {
  1438. // This is an empty level
  1439. return;
  1440. } else if (storage_info_.LevelFilesBrief(level).num_files == 0) {
  1441. // No files in this level
  1442. return;
  1443. }
  1444. bool should_sample = should_sample_file_read();
  1445. auto* arena = merge_iter_builder->GetArena();
  1446. if (level == 0) {
  1447. // Merge all level zero files together since they may overlap
  1448. for (size_t i = 0; i < storage_info_.LevelFilesBrief(0).num_files; i++) {
  1449. const auto& file = storage_info_.LevelFilesBrief(0).files[i];
  1450. merge_iter_builder->AddIterator(cfd_->table_cache()->NewIterator(
  1451. read_options, soptions, cfd_->internal_comparator(),
  1452. *file.file_metadata, range_del_agg,
  1453. mutable_cf_options_.prefix_extractor.get(), nullptr,
  1454. cfd_->internal_stats()->GetFileReadHist(0),
  1455. TableReaderCaller::kUserIterator, arena,
  1456. /*skip_filters=*/false, /*level=*/0,
  1457. /*smallest_compaction_key=*/nullptr,
  1458. /*largest_compaction_key=*/nullptr));
  1459. }
  1460. if (should_sample) {
  1461. // Count ones for every L0 files. This is done per iterator creation
  1462. // rather than Seek(), while files in other levels are recored per seek.
  1463. // If users execute one range query per iterator, there may be some
  1464. // discrepancy here.
  1465. for (FileMetaData* meta : storage_info_.LevelFiles(0)) {
  1466. sample_file_read_inc(meta);
  1467. }
  1468. }
  1469. } else if (storage_info_.LevelFilesBrief(level).num_files > 0) {
  1470. // For levels > 0, we can use a concatenating iterator that sequentially
  1471. // walks through the non-overlapping files in the level, opening them
  1472. // lazily.
  1473. auto* mem = arena->AllocateAligned(sizeof(LevelIterator));
  1474. merge_iter_builder->AddIterator(new (mem) LevelIterator(
  1475. cfd_->table_cache(), read_options, soptions,
  1476. cfd_->internal_comparator(), &storage_info_.LevelFilesBrief(level),
  1477. mutable_cf_options_.prefix_extractor.get(), should_sample_file_read(),
  1478. cfd_->internal_stats()->GetFileReadHist(level),
  1479. TableReaderCaller::kUserIterator, IsFilterSkipped(level), level,
  1480. range_del_agg, /*largest_compaction_key=*/nullptr));
  1481. }
  1482. }
  1483. Status Version::OverlapWithLevelIterator(const ReadOptions& read_options,
  1484. const FileOptions& file_options,
  1485. const Slice& smallest_user_key,
  1486. const Slice& largest_user_key,
  1487. int level, bool* overlap) {
  1488. assert(storage_info_.finalized_);
  1489. auto icmp = cfd_->internal_comparator();
  1490. auto ucmp = icmp.user_comparator();
  1491. Arena arena;
  1492. Status status;
  1493. ReadRangeDelAggregator range_del_agg(&icmp,
  1494. kMaxSequenceNumber /* upper_bound */);
  1495. *overlap = false;
  1496. if (level == 0) {
  1497. for (size_t i = 0; i < storage_info_.LevelFilesBrief(0).num_files; i++) {
  1498. const auto file = &storage_info_.LevelFilesBrief(0).files[i];
  1499. if (AfterFile(ucmp, &smallest_user_key, file) ||
  1500. BeforeFile(ucmp, &largest_user_key, file)) {
  1501. continue;
  1502. }
  1503. ScopedArenaIterator iter(cfd_->table_cache()->NewIterator(
  1504. read_options, file_options, cfd_->internal_comparator(),
  1505. *file->file_metadata, &range_del_agg,
  1506. mutable_cf_options_.prefix_extractor.get(), nullptr,
  1507. cfd_->internal_stats()->GetFileReadHist(0),
  1508. TableReaderCaller::kUserIterator, &arena,
  1509. /*skip_filters=*/false, /*level=*/0,
  1510. /*smallest_compaction_key=*/nullptr,
  1511. /*largest_compaction_key=*/nullptr));
  1512. status = OverlapWithIterator(
  1513. ucmp, smallest_user_key, largest_user_key, iter.get(), overlap);
  1514. if (!status.ok() || *overlap) {
  1515. break;
  1516. }
  1517. }
  1518. } else if (storage_info_.LevelFilesBrief(level).num_files > 0) {
  1519. auto mem = arena.AllocateAligned(sizeof(LevelIterator));
  1520. ScopedArenaIterator iter(new (mem) LevelIterator(
  1521. cfd_->table_cache(), read_options, file_options,
  1522. cfd_->internal_comparator(), &storage_info_.LevelFilesBrief(level),
  1523. mutable_cf_options_.prefix_extractor.get(), should_sample_file_read(),
  1524. cfd_->internal_stats()->GetFileReadHist(level),
  1525. TableReaderCaller::kUserIterator, IsFilterSkipped(level), level,
  1526. &range_del_agg));
  1527. status = OverlapWithIterator(
  1528. ucmp, smallest_user_key, largest_user_key, iter.get(), overlap);
  1529. }
  1530. if (status.ok() && *overlap == false &&
  1531. range_del_agg.IsRangeOverlapped(smallest_user_key, largest_user_key)) {
  1532. *overlap = true;
  1533. }
  1534. return status;
  1535. }
  1536. VersionStorageInfo::VersionStorageInfo(
  1537. const InternalKeyComparator* internal_comparator,
  1538. const Comparator* user_comparator, int levels,
  1539. CompactionStyle compaction_style, VersionStorageInfo* ref_vstorage,
  1540. bool _force_consistency_checks)
  1541. : internal_comparator_(internal_comparator),
  1542. user_comparator_(user_comparator),
  1543. // cfd is nullptr if Version is dummy
  1544. num_levels_(levels),
  1545. num_non_empty_levels_(0),
  1546. file_indexer_(user_comparator),
  1547. compaction_style_(compaction_style),
  1548. files_(new std::vector<FileMetaData*>[num_levels_]),
  1549. base_level_(num_levels_ == 1 ? -1 : 1),
  1550. level_multiplier_(0.0),
  1551. files_by_compaction_pri_(num_levels_),
  1552. level0_non_overlapping_(false),
  1553. next_file_to_compact_by_size_(num_levels_),
  1554. compaction_score_(num_levels_),
  1555. compaction_level_(num_levels_),
  1556. l0_delay_trigger_count_(0),
  1557. accumulated_file_size_(0),
  1558. accumulated_raw_key_size_(0),
  1559. accumulated_raw_value_size_(0),
  1560. accumulated_num_non_deletions_(0),
  1561. accumulated_num_deletions_(0),
  1562. current_num_non_deletions_(0),
  1563. current_num_deletions_(0),
  1564. current_num_samples_(0),
  1565. estimated_compaction_needed_bytes_(0),
  1566. finalized_(false),
  1567. force_consistency_checks_(_force_consistency_checks) {
  1568. if (ref_vstorage != nullptr) {
  1569. accumulated_file_size_ = ref_vstorage->accumulated_file_size_;
  1570. accumulated_raw_key_size_ = ref_vstorage->accumulated_raw_key_size_;
  1571. accumulated_raw_value_size_ = ref_vstorage->accumulated_raw_value_size_;
  1572. accumulated_num_non_deletions_ =
  1573. ref_vstorage->accumulated_num_non_deletions_;
  1574. accumulated_num_deletions_ = ref_vstorage->accumulated_num_deletions_;
  1575. current_num_non_deletions_ = ref_vstorage->current_num_non_deletions_;
  1576. current_num_deletions_ = ref_vstorage->current_num_deletions_;
  1577. current_num_samples_ = ref_vstorage->current_num_samples_;
  1578. oldest_snapshot_seqnum_ = ref_vstorage->oldest_snapshot_seqnum_;
  1579. }
  1580. }
  1581. Version::Version(ColumnFamilyData* column_family_data, VersionSet* vset,
  1582. const FileOptions& file_opt,
  1583. const MutableCFOptions mutable_cf_options,
  1584. uint64_t version_number)
  1585. : env_(vset->env_),
  1586. cfd_(column_family_data),
  1587. info_log_((cfd_ == nullptr) ? nullptr : cfd_->ioptions()->info_log),
  1588. db_statistics_((cfd_ == nullptr) ? nullptr
  1589. : cfd_->ioptions()->statistics),
  1590. table_cache_((cfd_ == nullptr) ? nullptr : cfd_->table_cache()),
  1591. merge_operator_((cfd_ == nullptr) ? nullptr
  1592. : cfd_->ioptions()->merge_operator),
  1593. storage_info_(
  1594. (cfd_ == nullptr) ? nullptr : &cfd_->internal_comparator(),
  1595. (cfd_ == nullptr) ? nullptr : cfd_->user_comparator(),
  1596. cfd_ == nullptr ? 0 : cfd_->NumberLevels(),
  1597. cfd_ == nullptr ? kCompactionStyleLevel
  1598. : cfd_->ioptions()->compaction_style,
  1599. (cfd_ == nullptr || cfd_->current() == nullptr)
  1600. ? nullptr
  1601. : cfd_->current()->storage_info(),
  1602. cfd_ == nullptr ? false : cfd_->ioptions()->force_consistency_checks),
  1603. vset_(vset),
  1604. next_(this),
  1605. prev_(this),
  1606. refs_(0),
  1607. file_options_(file_opt),
  1608. mutable_cf_options_(mutable_cf_options),
  1609. version_number_(version_number) {}
  1610. void Version::Get(const ReadOptions& read_options, const LookupKey& k,
  1611. PinnableSlice* value, Status* status,
  1612. MergeContext* merge_context,
  1613. SequenceNumber* max_covering_tombstone_seq, bool* value_found,
  1614. bool* key_exists, SequenceNumber* seq, ReadCallback* callback,
  1615. bool* is_blob, bool do_merge) {
  1616. Slice ikey = k.internal_key();
  1617. Slice user_key = k.user_key();
  1618. assert(status->ok() || status->IsMergeInProgress());
  1619. if (key_exists != nullptr) {
  1620. // will falsify below if not found
  1621. *key_exists = true;
  1622. }
  1623. PinnedIteratorsManager pinned_iters_mgr;
  1624. uint64_t tracing_get_id = BlockCacheTraceHelper::kReservedGetId;
  1625. if (vset_ && vset_->block_cache_tracer_ &&
  1626. vset_->block_cache_tracer_->is_tracing_enabled()) {
  1627. tracing_get_id = vset_->block_cache_tracer_->NextGetId();
  1628. }
  1629. GetContext get_context(
  1630. user_comparator(), merge_operator_, info_log_, db_statistics_,
  1631. status->ok() ? GetContext::kNotFound : GetContext::kMerge, user_key,
  1632. do_merge ? value : nullptr, value_found, merge_context, do_merge,
  1633. max_covering_tombstone_seq, this->env_, seq,
  1634. merge_operator_ ? &pinned_iters_mgr : nullptr, callback, is_blob,
  1635. tracing_get_id);
  1636. // Pin blocks that we read to hold merge operands
  1637. if (merge_operator_) {
  1638. pinned_iters_mgr.StartPinning();
  1639. }
  1640. FilePicker fp(
  1641. storage_info_.files_, user_key, ikey, &storage_info_.level_files_brief_,
  1642. storage_info_.num_non_empty_levels_, &storage_info_.file_indexer_,
  1643. user_comparator(), internal_comparator());
  1644. FdWithKeyRange* f = fp.GetNextFile();
  1645. while (f != nullptr) {
  1646. if (*max_covering_tombstone_seq > 0) {
  1647. // The remaining files we look at will only contain covered keys, so we
  1648. // stop here.
  1649. break;
  1650. }
  1651. if (get_context.sample()) {
  1652. sample_file_read_inc(f->file_metadata);
  1653. }
  1654. bool timer_enabled =
  1655. GetPerfLevel() >= PerfLevel::kEnableTimeExceptForMutex &&
  1656. get_perf_context()->per_level_perf_context_enabled;
  1657. StopWatchNano timer(env_, timer_enabled /* auto_start */);
  1658. *status = table_cache_->Get(
  1659. read_options, *internal_comparator(), *f->file_metadata, ikey,
  1660. &get_context, mutable_cf_options_.prefix_extractor.get(),
  1661. cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()),
  1662. IsFilterSkipped(static_cast<int>(fp.GetHitFileLevel()),
  1663. fp.IsHitFileLastInLevel()),
  1664. fp.GetCurrentLevel());
  1665. // TODO: examine the behavior for corrupted key
  1666. if (timer_enabled) {
  1667. PERF_COUNTER_BY_LEVEL_ADD(get_from_table_nanos, timer.ElapsedNanos(),
  1668. fp.GetCurrentLevel());
  1669. }
  1670. if (!status->ok()) {
  1671. return;
  1672. }
  1673. // report the counters before returning
  1674. if (get_context.State() != GetContext::kNotFound &&
  1675. get_context.State() != GetContext::kMerge &&
  1676. db_statistics_ != nullptr) {
  1677. get_context.ReportCounters();
  1678. }
  1679. switch (get_context.State()) {
  1680. case GetContext::kNotFound:
  1681. // Keep searching in other files
  1682. break;
  1683. case GetContext::kMerge:
  1684. // TODO: update per-level perfcontext user_key_return_count for kMerge
  1685. break;
  1686. case GetContext::kFound:
  1687. if (fp.GetHitFileLevel() == 0) {
  1688. RecordTick(db_statistics_, GET_HIT_L0);
  1689. } else if (fp.GetHitFileLevel() == 1) {
  1690. RecordTick(db_statistics_, GET_HIT_L1);
  1691. } else if (fp.GetHitFileLevel() >= 2) {
  1692. RecordTick(db_statistics_, GET_HIT_L2_AND_UP);
  1693. }
  1694. PERF_COUNTER_BY_LEVEL_ADD(user_key_return_count, 1,
  1695. fp.GetHitFileLevel());
  1696. return;
  1697. case GetContext::kDeleted:
  1698. // Use empty error message for speed
  1699. *status = Status::NotFound();
  1700. return;
  1701. case GetContext::kCorrupt:
  1702. *status = Status::Corruption("corrupted key for ", user_key);
  1703. return;
  1704. case GetContext::kBlobIndex:
  1705. ROCKS_LOG_ERROR(info_log_, "Encounter unexpected blob index.");
  1706. *status = Status::NotSupported(
  1707. "Encounter unexpected blob index. Please open DB with "
  1708. "ROCKSDB_NAMESPACE::blob_db::BlobDB instead.");
  1709. return;
  1710. }
  1711. f = fp.GetNextFile();
  1712. }
  1713. if (db_statistics_ != nullptr) {
  1714. get_context.ReportCounters();
  1715. }
  1716. if (GetContext::kMerge == get_context.State()) {
  1717. if (!do_merge) {
  1718. *status = Status::OK();
  1719. return;
  1720. }
  1721. if (!merge_operator_) {
  1722. *status = Status::InvalidArgument(
  1723. "merge_operator is not properly initialized.");
  1724. return;
  1725. }
  1726. // merge_operands are in saver and we hit the beginning of the key history
  1727. // do a final merge of nullptr and operands;
  1728. std::string* str_value = value != nullptr ? value->GetSelf() : nullptr;
  1729. *status = MergeHelper::TimedFullMerge(
  1730. merge_operator_, user_key, nullptr, merge_context->GetOperands(),
  1731. str_value, info_log_, db_statistics_, env_,
  1732. nullptr /* result_operand */, true);
  1733. if (LIKELY(value != nullptr)) {
  1734. value->PinSelf();
  1735. }
  1736. } else {
  1737. if (key_exists != nullptr) {
  1738. *key_exists = false;
  1739. }
  1740. *status = Status::NotFound(); // Use an empty error message for speed
  1741. }
  1742. }
  1743. void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
  1744. ReadCallback* callback, bool* is_blob) {
  1745. PinnedIteratorsManager pinned_iters_mgr;
  1746. // Pin blocks that we read to hold merge operands
  1747. if (merge_operator_) {
  1748. pinned_iters_mgr.StartPinning();
  1749. }
  1750. uint64_t tracing_mget_id = BlockCacheTraceHelper::kReservedGetId;
  1751. if (vset_ && vset_->block_cache_tracer_ &&
  1752. vset_->block_cache_tracer_->is_tracing_enabled()) {
  1753. tracing_mget_id = vset_->block_cache_tracer_->NextGetId();
  1754. }
  1755. // Even though we know the batch size won't be > MAX_BATCH_SIZE,
  1756. // use autovector in order to avoid unnecessary construction of GetContext
  1757. // objects, which is expensive
  1758. autovector<GetContext, 16> get_ctx;
  1759. for (auto iter = range->begin(); iter != range->end(); ++iter) {
  1760. assert(iter->s->ok() || iter->s->IsMergeInProgress());
  1761. get_ctx.emplace_back(
  1762. user_comparator(), merge_operator_, info_log_, db_statistics_,
  1763. iter->s->ok() ? GetContext::kNotFound : GetContext::kMerge, iter->ukey,
  1764. iter->value, nullptr, &(iter->merge_context), true,
  1765. &iter->max_covering_tombstone_seq, this->env_, nullptr,
  1766. merge_operator_ ? &pinned_iters_mgr : nullptr, callback, is_blob,
  1767. tracing_mget_id);
  1768. // MergeInProgress status, if set, has been transferred to the get_context
  1769. // state, so we set status to ok here. From now on, the iter status will
  1770. // be used for IO errors, and get_context state will be used for any
  1771. // key level errors
  1772. *(iter->s) = Status::OK();
  1773. }
  1774. int get_ctx_index = 0;
  1775. for (auto iter = range->begin(); iter != range->end();
  1776. ++iter, get_ctx_index++) {
  1777. iter->get_context = &(get_ctx[get_ctx_index]);
  1778. }
  1779. MultiGetRange file_picker_range(*range, range->begin(), range->end());
  1780. FilePickerMultiGet fp(
  1781. &file_picker_range,
  1782. &storage_info_.level_files_brief_, storage_info_.num_non_empty_levels_,
  1783. &storage_info_.file_indexer_, user_comparator(), internal_comparator());
  1784. FdWithKeyRange* f = fp.GetNextFile();
  1785. while (f != nullptr) {
  1786. MultiGetRange file_range = fp.CurrentFileRange();
  1787. bool timer_enabled =
  1788. GetPerfLevel() >= PerfLevel::kEnableTimeExceptForMutex &&
  1789. get_perf_context()->per_level_perf_context_enabled;
  1790. StopWatchNano timer(env_, timer_enabled /* auto_start */);
  1791. Status s = table_cache_->MultiGet(
  1792. read_options, *internal_comparator(), *f->file_metadata, &file_range,
  1793. mutable_cf_options_.prefix_extractor.get(),
  1794. cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()),
  1795. IsFilterSkipped(static_cast<int>(fp.GetHitFileLevel()),
  1796. fp.IsHitFileLastInLevel()),
  1797. fp.GetCurrentLevel());
  1798. // TODO: examine the behavior for corrupted key
  1799. if (timer_enabled) {
  1800. PERF_COUNTER_BY_LEVEL_ADD(get_from_table_nanos, timer.ElapsedNanos(),
  1801. fp.GetCurrentLevel());
  1802. }
  1803. if (!s.ok()) {
  1804. // TODO: Set status for individual keys appropriately
  1805. for (auto iter = file_range.begin(); iter != file_range.end(); ++iter) {
  1806. *iter->s = s;
  1807. file_range.MarkKeyDone(iter);
  1808. }
  1809. return;
  1810. }
  1811. uint64_t batch_size = 0;
  1812. for (auto iter = file_range.begin(); iter != file_range.end(); ++iter) {
  1813. GetContext& get_context = *iter->get_context;
  1814. Status* status = iter->s;
  1815. // The Status in the KeyContext takes precedence over GetContext state
  1816. // Status may be an error if there were any IO errors in the table
  1817. // reader. We never expect Status to be NotFound(), as that is
  1818. // determined by get_context
  1819. assert(!status->IsNotFound());
  1820. if (!status->ok()) {
  1821. file_range.MarkKeyDone(iter);
  1822. continue;
  1823. }
  1824. if (get_context.sample()) {
  1825. sample_file_read_inc(f->file_metadata);
  1826. }
  1827. batch_size++;
  1828. // report the counters before returning
  1829. if (get_context.State() != GetContext::kNotFound &&
  1830. get_context.State() != GetContext::kMerge &&
  1831. db_statistics_ != nullptr) {
  1832. get_context.ReportCounters();
  1833. } else {
  1834. if (iter->max_covering_tombstone_seq > 0) {
  1835. // The remaining files we look at will only contain covered keys, so
  1836. // we stop here for this key
  1837. file_picker_range.SkipKey(iter);
  1838. }
  1839. }
  1840. switch (get_context.State()) {
  1841. case GetContext::kNotFound:
  1842. // Keep searching in other files
  1843. break;
  1844. case GetContext::kMerge:
  1845. // TODO: update per-level perfcontext user_key_return_count for kMerge
  1846. break;
  1847. case GetContext::kFound:
  1848. if (fp.GetHitFileLevel() == 0) {
  1849. RecordTick(db_statistics_, GET_HIT_L0);
  1850. } else if (fp.GetHitFileLevel() == 1) {
  1851. RecordTick(db_statistics_, GET_HIT_L1);
  1852. } else if (fp.GetHitFileLevel() >= 2) {
  1853. RecordTick(db_statistics_, GET_HIT_L2_AND_UP);
  1854. }
  1855. PERF_COUNTER_BY_LEVEL_ADD(user_key_return_count, 1,
  1856. fp.GetHitFileLevel());
  1857. file_range.MarkKeyDone(iter);
  1858. continue;
  1859. case GetContext::kDeleted:
  1860. // Use empty error message for speed
  1861. *status = Status::NotFound();
  1862. file_range.MarkKeyDone(iter);
  1863. continue;
  1864. case GetContext::kCorrupt:
  1865. *status =
  1866. Status::Corruption("corrupted key for ", iter->lkey->user_key());
  1867. file_range.MarkKeyDone(iter);
  1868. continue;
  1869. case GetContext::kBlobIndex:
  1870. ROCKS_LOG_ERROR(info_log_, "Encounter unexpected blob index.");
  1871. *status = Status::NotSupported(
  1872. "Encounter unexpected blob index. Please open DB with "
  1873. "ROCKSDB_NAMESPACE::blob_db::BlobDB instead.");
  1874. file_range.MarkKeyDone(iter);
  1875. continue;
  1876. }
  1877. }
  1878. RecordInHistogram(db_statistics_, SST_BATCH_SIZE, batch_size);
  1879. if (file_picker_range.empty()) {
  1880. break;
  1881. }
  1882. f = fp.GetNextFile();
  1883. }
  1884. // Process any left over keys
  1885. for (auto iter = range->begin(); iter != range->end(); ++iter) {
  1886. GetContext& get_context = *iter->get_context;
  1887. Status* status = iter->s;
  1888. Slice user_key = iter->lkey->user_key();
  1889. if (db_statistics_ != nullptr) {
  1890. get_context.ReportCounters();
  1891. }
  1892. if (GetContext::kMerge == get_context.State()) {
  1893. if (!merge_operator_) {
  1894. *status = Status::InvalidArgument(
  1895. "merge_operator is not properly initialized.");
  1896. range->MarkKeyDone(iter);
  1897. continue;
  1898. }
  1899. // merge_operands are in saver and we hit the beginning of the key history
  1900. // do a final merge of nullptr and operands;
  1901. std::string* str_value =
  1902. iter->value != nullptr ? iter->value->GetSelf() : nullptr;
  1903. *status = MergeHelper::TimedFullMerge(
  1904. merge_operator_, user_key, nullptr, iter->merge_context.GetOperands(),
  1905. str_value, info_log_, db_statistics_, env_,
  1906. nullptr /* result_operand */, true);
  1907. if (LIKELY(iter->value != nullptr)) {
  1908. iter->value->PinSelf();
  1909. }
  1910. } else {
  1911. range->MarkKeyDone(iter);
  1912. *status = Status::NotFound(); // Use an empty error message for speed
  1913. }
  1914. }
  1915. }
  1916. bool Version::IsFilterSkipped(int level, bool is_file_last_in_level) {
  1917. // Reaching the bottom level implies misses at all upper levels, so we'll
  1918. // skip checking the filters when we predict a hit.
  1919. return cfd_->ioptions()->optimize_filters_for_hits &&
  1920. (level > 0 || is_file_last_in_level) &&
  1921. level == storage_info_.num_non_empty_levels() - 1;
  1922. }
  1923. void VersionStorageInfo::GenerateLevelFilesBrief() {
  1924. level_files_brief_.resize(num_non_empty_levels_);
  1925. for (int level = 0; level < num_non_empty_levels_; level++) {
  1926. DoGenerateLevelFilesBrief(
  1927. &level_files_brief_[level], files_[level], &arena_);
  1928. }
  1929. }
  1930. void Version::PrepareApply(
  1931. const MutableCFOptions& mutable_cf_options,
  1932. bool update_stats) {
  1933. UpdateAccumulatedStats(update_stats);
  1934. storage_info_.UpdateNumNonEmptyLevels();
  1935. storage_info_.CalculateBaseBytes(*cfd_->ioptions(), mutable_cf_options);
  1936. storage_info_.UpdateFilesByCompactionPri(cfd_->ioptions()->compaction_pri);
  1937. storage_info_.GenerateFileIndexer();
  1938. storage_info_.GenerateLevelFilesBrief();
  1939. storage_info_.GenerateLevel0NonOverlapping();
  1940. storage_info_.GenerateBottommostFiles();
  1941. }
  1942. bool Version::MaybeInitializeFileMetaData(FileMetaData* file_meta) {
  1943. if (file_meta->init_stats_from_file ||
  1944. file_meta->compensated_file_size > 0) {
  1945. return false;
  1946. }
  1947. std::shared_ptr<const TableProperties> tp;
  1948. Status s = GetTableProperties(&tp, file_meta);
  1949. file_meta->init_stats_from_file = true;
  1950. if (!s.ok()) {
  1951. ROCKS_LOG_ERROR(vset_->db_options_->info_log,
  1952. "Unable to load table properties for file %" PRIu64
  1953. " --- %s\n",
  1954. file_meta->fd.GetNumber(), s.ToString().c_str());
  1955. return false;
  1956. }
  1957. if (tp.get() == nullptr) return false;
  1958. file_meta->num_entries = tp->num_entries;
  1959. file_meta->num_deletions = tp->num_deletions;
  1960. file_meta->raw_value_size = tp->raw_value_size;
  1961. file_meta->raw_key_size = tp->raw_key_size;
  1962. return true;
  1963. }
  1964. void VersionStorageInfo::UpdateAccumulatedStats(FileMetaData* file_meta) {
  1965. TEST_SYNC_POINT_CALLBACK("VersionStorageInfo::UpdateAccumulatedStats",
  1966. nullptr);
  1967. assert(file_meta->init_stats_from_file);
  1968. accumulated_file_size_ += file_meta->fd.GetFileSize();
  1969. accumulated_raw_key_size_ += file_meta->raw_key_size;
  1970. accumulated_raw_value_size_ += file_meta->raw_value_size;
  1971. accumulated_num_non_deletions_ +=
  1972. file_meta->num_entries - file_meta->num_deletions;
  1973. accumulated_num_deletions_ += file_meta->num_deletions;
  1974. current_num_non_deletions_ +=
  1975. file_meta->num_entries - file_meta->num_deletions;
  1976. current_num_deletions_ += file_meta->num_deletions;
  1977. current_num_samples_++;
  1978. }
  1979. void VersionStorageInfo::RemoveCurrentStats(FileMetaData* file_meta) {
  1980. if (file_meta->init_stats_from_file) {
  1981. current_num_non_deletions_ -=
  1982. file_meta->num_entries - file_meta->num_deletions;
  1983. current_num_deletions_ -= file_meta->num_deletions;
  1984. current_num_samples_--;
  1985. }
  1986. }
  1987. void Version::UpdateAccumulatedStats(bool update_stats) {
  1988. if (update_stats) {
  1989. // maximum number of table properties loaded from files.
  1990. const int kMaxInitCount = 20;
  1991. int init_count = 0;
  1992. // here only the first kMaxInitCount files which haven't been
  1993. // initialized from file will be updated with num_deletions.
  1994. // The motivation here is to cap the maximum I/O per Version creation.
  1995. // The reason for choosing files from lower-level instead of higher-level
  1996. // is that such design is able to propagate the initialization from
  1997. // lower-level to higher-level: When the num_deletions of lower-level
  1998. // files are updated, it will make the lower-level files have accurate
  1999. // compensated_file_size, making lower-level to higher-level compaction
  2000. // will be triggered, which creates higher-level files whose num_deletions
  2001. // will be updated here.
  2002. for (int level = 0;
  2003. level < storage_info_.num_levels_ && init_count < kMaxInitCount;
  2004. ++level) {
  2005. for (auto* file_meta : storage_info_.files_[level]) {
  2006. if (MaybeInitializeFileMetaData(file_meta)) {
  2007. // each FileMeta will be initialized only once.
  2008. storage_info_.UpdateAccumulatedStats(file_meta);
  2009. // when option "max_open_files" is -1, all the file metadata has
  2010. // already been read, so MaybeInitializeFileMetaData() won't incur
  2011. // any I/O cost. "max_open_files=-1" means that the table cache passed
  2012. // to the VersionSet and then to the ColumnFamilySet has a size of
  2013. // TableCache::kInfiniteCapacity
  2014. if (vset_->GetColumnFamilySet()->get_table_cache()->GetCapacity() ==
  2015. TableCache::kInfiniteCapacity) {
  2016. continue;
  2017. }
  2018. if (++init_count >= kMaxInitCount) {
  2019. break;
  2020. }
  2021. }
  2022. }
  2023. }
  2024. // In case all sampled-files contain only deletion entries, then we
  2025. // load the table-property of a file in higher-level to initialize
  2026. // that value.
  2027. for (int level = storage_info_.num_levels_ - 1;
  2028. storage_info_.accumulated_raw_value_size_ == 0 && level >= 0;
  2029. --level) {
  2030. for (int i = static_cast<int>(storage_info_.files_[level].size()) - 1;
  2031. storage_info_.accumulated_raw_value_size_ == 0 && i >= 0; --i) {
  2032. if (MaybeInitializeFileMetaData(storage_info_.files_[level][i])) {
  2033. storage_info_.UpdateAccumulatedStats(storage_info_.files_[level][i]);
  2034. }
  2035. }
  2036. }
  2037. }
  2038. storage_info_.ComputeCompensatedSizes();
  2039. }
  2040. void VersionStorageInfo::ComputeCompensatedSizes() {
  2041. static const int kDeletionWeightOnCompaction = 2;
  2042. uint64_t average_value_size = GetAverageValueSize();
  2043. // compute the compensated size
  2044. for (int level = 0; level < num_levels_; level++) {
  2045. for (auto* file_meta : files_[level]) {
  2046. // Here we only compute compensated_file_size for those file_meta
  2047. // which compensated_file_size is uninitialized (== 0). This is true only
  2048. // for files that have been created right now and no other thread has
  2049. // access to them. That's why we can safely mutate compensated_file_size.
  2050. if (file_meta->compensated_file_size == 0) {
  2051. file_meta->compensated_file_size = file_meta->fd.GetFileSize();
  2052. // Here we only boost the size of deletion entries of a file only
  2053. // when the number of deletion entries is greater than the number of
  2054. // non-deletion entries in the file. The motivation here is that in
  2055. // a stable workload, the number of deletion entries should be roughly
  2056. // equal to the number of non-deletion entries. If we compensate the
  2057. // size of deletion entries in a stable workload, the deletion
  2058. // compensation logic might introduce unwanted effet which changes the
  2059. // shape of LSM tree.
  2060. if (file_meta->num_deletions * 2 >= file_meta->num_entries) {
  2061. file_meta->compensated_file_size +=
  2062. (file_meta->num_deletions * 2 - file_meta->num_entries) *
  2063. average_value_size * kDeletionWeightOnCompaction;
  2064. }
  2065. }
  2066. }
  2067. }
  2068. }
  2069. int VersionStorageInfo::MaxInputLevel() const {
  2070. if (compaction_style_ == kCompactionStyleLevel) {
  2071. return num_levels() - 2;
  2072. }
  2073. return 0;
  2074. }
  2075. int VersionStorageInfo::MaxOutputLevel(bool allow_ingest_behind) const {
  2076. if (allow_ingest_behind) {
  2077. assert(num_levels() > 1);
  2078. return num_levels() - 2;
  2079. }
  2080. return num_levels() - 1;
  2081. }
  2082. void VersionStorageInfo::EstimateCompactionBytesNeeded(
  2083. const MutableCFOptions& mutable_cf_options) {
  2084. // Only implemented for level-based compaction
  2085. if (compaction_style_ != kCompactionStyleLevel) {
  2086. estimated_compaction_needed_bytes_ = 0;
  2087. return;
  2088. }
  2089. // Start from Level 0, if level 0 qualifies compaction to level 1,
  2090. // we estimate the size of compaction.
  2091. // Then we move on to the next level and see whether it qualifies compaction
  2092. // to the next level. The size of the level is estimated as the actual size
  2093. // on the level plus the input bytes from the previous level if there is any.
  2094. // If it exceeds, take the exceeded bytes as compaction input and add the size
  2095. // of the compaction size to tatal size.
  2096. // We keep doing it to Level 2, 3, etc, until the last level and return the
  2097. // accumulated bytes.
  2098. uint64_t bytes_compact_to_next_level = 0;
  2099. uint64_t level_size = 0;
  2100. for (auto* f : files_[0]) {
  2101. level_size += f->fd.GetFileSize();
  2102. }
  2103. // Level 0
  2104. bool level0_compact_triggered = false;
  2105. if (static_cast<int>(files_[0].size()) >=
  2106. mutable_cf_options.level0_file_num_compaction_trigger ||
  2107. level_size >= mutable_cf_options.max_bytes_for_level_base) {
  2108. level0_compact_triggered = true;
  2109. estimated_compaction_needed_bytes_ = level_size;
  2110. bytes_compact_to_next_level = level_size;
  2111. } else {
  2112. estimated_compaction_needed_bytes_ = 0;
  2113. }
  2114. // Level 1 and up.
  2115. uint64_t bytes_next_level = 0;
  2116. for (int level = base_level(); level <= MaxInputLevel(); level++) {
  2117. level_size = 0;
  2118. if (bytes_next_level > 0) {
  2119. #ifndef NDEBUG
  2120. uint64_t level_size2 = 0;
  2121. for (auto* f : files_[level]) {
  2122. level_size2 += f->fd.GetFileSize();
  2123. }
  2124. assert(level_size2 == bytes_next_level);
  2125. #endif
  2126. level_size = bytes_next_level;
  2127. bytes_next_level = 0;
  2128. } else {
  2129. for (auto* f : files_[level]) {
  2130. level_size += f->fd.GetFileSize();
  2131. }
  2132. }
  2133. if (level == base_level() && level0_compact_triggered) {
  2134. // Add base level size to compaction if level0 compaction triggered.
  2135. estimated_compaction_needed_bytes_ += level_size;
  2136. }
  2137. // Add size added by previous compaction
  2138. level_size += bytes_compact_to_next_level;
  2139. bytes_compact_to_next_level = 0;
  2140. uint64_t level_target = MaxBytesForLevel(level);
  2141. if (level_size > level_target) {
  2142. bytes_compact_to_next_level = level_size - level_target;
  2143. // Estimate the actual compaction fan-out ratio as size ratio between
  2144. // the two levels.
  2145. assert(bytes_next_level == 0);
  2146. if (level + 1 < num_levels_) {
  2147. for (auto* f : files_[level + 1]) {
  2148. bytes_next_level += f->fd.GetFileSize();
  2149. }
  2150. }
  2151. if (bytes_next_level > 0) {
  2152. assert(level_size > 0);
  2153. estimated_compaction_needed_bytes_ += static_cast<uint64_t>(
  2154. static_cast<double>(bytes_compact_to_next_level) *
  2155. (static_cast<double>(bytes_next_level) /
  2156. static_cast<double>(level_size) +
  2157. 1));
  2158. }
  2159. }
  2160. }
  2161. }
  2162. namespace {
  2163. uint32_t GetExpiredTtlFilesCount(const ImmutableCFOptions& ioptions,
  2164. const MutableCFOptions& mutable_cf_options,
  2165. const std::vector<FileMetaData*>& files) {
  2166. uint32_t ttl_expired_files_count = 0;
  2167. int64_t _current_time;
  2168. auto status = ioptions.env->GetCurrentTime(&_current_time);
  2169. if (status.ok()) {
  2170. const uint64_t current_time = static_cast<uint64_t>(_current_time);
  2171. for (FileMetaData* f : files) {
  2172. if (!f->being_compacted) {
  2173. uint64_t oldest_ancester_time = f->TryGetOldestAncesterTime();
  2174. if (oldest_ancester_time != 0 &&
  2175. oldest_ancester_time < (current_time - mutable_cf_options.ttl)) {
  2176. ttl_expired_files_count++;
  2177. }
  2178. }
  2179. }
  2180. }
  2181. return ttl_expired_files_count;
  2182. }
  2183. } // anonymous namespace
  2184. void VersionStorageInfo::ComputeCompactionScore(
  2185. const ImmutableCFOptions& immutable_cf_options,
  2186. const MutableCFOptions& mutable_cf_options) {
  2187. for (int level = 0; level <= MaxInputLevel(); level++) {
  2188. double score;
  2189. if (level == 0) {
  2190. // We treat level-0 specially by bounding the number of files
  2191. // instead of number of bytes for two reasons:
  2192. //
  2193. // (1) With larger write-buffer sizes, it is nice not to do too
  2194. // many level-0 compactions.
  2195. //
  2196. // (2) The files in level-0 are merged on every read and
  2197. // therefore we wish to avoid too many files when the individual
  2198. // file size is small (perhaps because of a small write-buffer
  2199. // setting, or very high compression ratios, or lots of
  2200. // overwrites/deletions).
  2201. int num_sorted_runs = 0;
  2202. uint64_t total_size = 0;
  2203. for (auto* f : files_[level]) {
  2204. if (!f->being_compacted) {
  2205. total_size += f->compensated_file_size;
  2206. num_sorted_runs++;
  2207. }
  2208. }
  2209. if (compaction_style_ == kCompactionStyleUniversal) {
  2210. // For universal compaction, we use level0 score to indicate
  2211. // compaction score for the whole DB. Adding other levels as if
  2212. // they are L0 files.
  2213. for (int i = 1; i < num_levels(); i++) {
  2214. if (!files_[i].empty() && !files_[i][0]->being_compacted) {
  2215. num_sorted_runs++;
  2216. }
  2217. }
  2218. }
  2219. if (compaction_style_ == kCompactionStyleFIFO) {
  2220. score = static_cast<double>(total_size) /
  2221. mutable_cf_options.compaction_options_fifo.max_table_files_size;
  2222. if (mutable_cf_options.compaction_options_fifo.allow_compaction) {
  2223. score = std::max(
  2224. static_cast<double>(num_sorted_runs) /
  2225. mutable_cf_options.level0_file_num_compaction_trigger,
  2226. score);
  2227. }
  2228. if (mutable_cf_options.ttl > 0) {
  2229. score = std::max(
  2230. static_cast<double>(GetExpiredTtlFilesCount(
  2231. immutable_cf_options, mutable_cf_options, files_[level])),
  2232. score);
  2233. }
  2234. } else {
  2235. score = static_cast<double>(num_sorted_runs) /
  2236. mutable_cf_options.level0_file_num_compaction_trigger;
  2237. if (compaction_style_ == kCompactionStyleLevel && num_levels() > 1) {
  2238. // Level-based involves L0->L0 compactions that can lead to oversized
  2239. // L0 files. Take into account size as well to avoid later giant
  2240. // compactions to the base level.
  2241. score = std::max(
  2242. score, static_cast<double>(total_size) /
  2243. mutable_cf_options.max_bytes_for_level_base);
  2244. }
  2245. }
  2246. } else {
  2247. // Compute the ratio of current size to size limit.
  2248. uint64_t level_bytes_no_compacting = 0;
  2249. for (auto f : files_[level]) {
  2250. if (!f->being_compacted) {
  2251. level_bytes_no_compacting += f->compensated_file_size;
  2252. }
  2253. }
  2254. score = static_cast<double>(level_bytes_no_compacting) /
  2255. MaxBytesForLevel(level);
  2256. }
  2257. compaction_level_[level] = level;
  2258. compaction_score_[level] = score;
  2259. }
  2260. // sort all the levels based on their score. Higher scores get listed
  2261. // first. Use bubble sort because the number of entries are small.
  2262. for (int i = 0; i < num_levels() - 2; i++) {
  2263. for (int j = i + 1; j < num_levels() - 1; j++) {
  2264. if (compaction_score_[i] < compaction_score_[j]) {
  2265. double score = compaction_score_[i];
  2266. int level = compaction_level_[i];
  2267. compaction_score_[i] = compaction_score_[j];
  2268. compaction_level_[i] = compaction_level_[j];
  2269. compaction_score_[j] = score;
  2270. compaction_level_[j] = level;
  2271. }
  2272. }
  2273. }
  2274. ComputeFilesMarkedForCompaction();
  2275. ComputeBottommostFilesMarkedForCompaction();
  2276. if (mutable_cf_options.ttl > 0) {
  2277. ComputeExpiredTtlFiles(immutable_cf_options, mutable_cf_options.ttl);
  2278. }
  2279. if (mutable_cf_options.periodic_compaction_seconds > 0) {
  2280. ComputeFilesMarkedForPeriodicCompaction(
  2281. immutable_cf_options, mutable_cf_options.periodic_compaction_seconds);
  2282. }
  2283. EstimateCompactionBytesNeeded(mutable_cf_options);
  2284. }
  2285. void VersionStorageInfo::ComputeFilesMarkedForCompaction() {
  2286. files_marked_for_compaction_.clear();
  2287. int last_qualify_level = 0;
  2288. // Do not include files from the last level with data
  2289. // If table properties collector suggests a file on the last level,
  2290. // we should not move it to a new level.
  2291. for (int level = num_levels() - 1; level >= 1; level--) {
  2292. if (!files_[level].empty()) {
  2293. last_qualify_level = level - 1;
  2294. break;
  2295. }
  2296. }
  2297. for (int level = 0; level <= last_qualify_level; level++) {
  2298. for (auto* f : files_[level]) {
  2299. if (!f->being_compacted && f->marked_for_compaction) {
  2300. files_marked_for_compaction_.emplace_back(level, f);
  2301. }
  2302. }
  2303. }
  2304. }
  2305. void VersionStorageInfo::ComputeExpiredTtlFiles(
  2306. const ImmutableCFOptions& ioptions, const uint64_t ttl) {
  2307. assert(ttl > 0);
  2308. expired_ttl_files_.clear();
  2309. int64_t _current_time;
  2310. auto status = ioptions.env->GetCurrentTime(&_current_time);
  2311. if (!status.ok()) {
  2312. return;
  2313. }
  2314. const uint64_t current_time = static_cast<uint64_t>(_current_time);
  2315. for (int level = 0; level < num_levels() - 1; level++) {
  2316. for (FileMetaData* f : files_[level]) {
  2317. if (!f->being_compacted) {
  2318. uint64_t oldest_ancester_time = f->TryGetOldestAncesterTime();
  2319. if (oldest_ancester_time > 0 &&
  2320. oldest_ancester_time < (current_time - ttl)) {
  2321. expired_ttl_files_.emplace_back(level, f);
  2322. }
  2323. }
  2324. }
  2325. }
  2326. }
  2327. void VersionStorageInfo::ComputeFilesMarkedForPeriodicCompaction(
  2328. const ImmutableCFOptions& ioptions,
  2329. const uint64_t periodic_compaction_seconds) {
  2330. assert(periodic_compaction_seconds > 0);
  2331. files_marked_for_periodic_compaction_.clear();
  2332. int64_t temp_current_time;
  2333. auto status = ioptions.env->GetCurrentTime(&temp_current_time);
  2334. if (!status.ok()) {
  2335. return;
  2336. }
  2337. const uint64_t current_time = static_cast<uint64_t>(temp_current_time);
  2338. // If periodic_compaction_seconds is larger than current time, periodic
  2339. // compaction can't possibly be triggered.
  2340. if (periodic_compaction_seconds > current_time) {
  2341. return;
  2342. }
  2343. const uint64_t allowed_time_limit =
  2344. current_time - periodic_compaction_seconds;
  2345. for (int level = 0; level < num_levels(); level++) {
  2346. for (auto f : files_[level]) {
  2347. if (!f->being_compacted) {
  2348. // Compute a file's modification time in the following order:
  2349. // 1. Use file_creation_time table property if it is > 0.
  2350. // 2. Use creation_time table property if it is > 0.
  2351. // 3. Use file's mtime metadata if the above two table properties are 0.
  2352. // Don't consider the file at all if the modification time cannot be
  2353. // correctly determined based on the above conditions.
  2354. uint64_t file_modification_time = f->TryGetFileCreationTime();
  2355. if (file_modification_time == kUnknownFileCreationTime) {
  2356. file_modification_time = f->TryGetOldestAncesterTime();
  2357. }
  2358. if (file_modification_time == kUnknownOldestAncesterTime) {
  2359. auto file_path = TableFileName(ioptions.cf_paths, f->fd.GetNumber(),
  2360. f->fd.GetPathId());
  2361. status = ioptions.env->GetFileModificationTime(
  2362. file_path, &file_modification_time);
  2363. if (!status.ok()) {
  2364. ROCKS_LOG_WARN(ioptions.info_log,
  2365. "Can't get file modification time: %s: %s",
  2366. file_path.c_str(), status.ToString().c_str());
  2367. continue;
  2368. }
  2369. }
  2370. if (file_modification_time > 0 &&
  2371. file_modification_time < allowed_time_limit) {
  2372. files_marked_for_periodic_compaction_.emplace_back(level, f);
  2373. }
  2374. }
  2375. }
  2376. }
  2377. }
  2378. namespace {
  2379. // used to sort files by size
  2380. struct Fsize {
  2381. size_t index;
  2382. FileMetaData* file;
  2383. };
  2384. // Compator that is used to sort files based on their size
  2385. // In normal mode: descending size
  2386. bool CompareCompensatedSizeDescending(const Fsize& first, const Fsize& second) {
  2387. return (first.file->compensated_file_size >
  2388. second.file->compensated_file_size);
  2389. }
  2390. } // anonymous namespace
  2391. void VersionStorageInfo::AddFile(int level, FileMetaData* f, Logger* info_log) {
  2392. auto* level_files = &files_[level];
  2393. // Must not overlap
  2394. #ifndef NDEBUG
  2395. if (level > 0 && !level_files->empty() &&
  2396. internal_comparator_->Compare(
  2397. (*level_files)[level_files->size() - 1]->largest, f->smallest) >= 0) {
  2398. auto* f2 = (*level_files)[level_files->size() - 1];
  2399. if (info_log != nullptr) {
  2400. Error(info_log, "Adding new file %" PRIu64
  2401. " range (%s, %s) to level %d but overlapping "
  2402. "with existing file %" PRIu64 " %s %s",
  2403. f->fd.GetNumber(), f->smallest.DebugString(true).c_str(),
  2404. f->largest.DebugString(true).c_str(), level, f2->fd.GetNumber(),
  2405. f2->smallest.DebugString(true).c_str(),
  2406. f2->largest.DebugString(true).c_str());
  2407. LogFlush(info_log);
  2408. }
  2409. assert(false);
  2410. }
  2411. #else
  2412. (void)info_log;
  2413. #endif
  2414. f->refs++;
  2415. level_files->push_back(f);
  2416. }
  2417. // Version::PrepareApply() need to be called before calling the function, or
  2418. // following functions called:
  2419. // 1. UpdateNumNonEmptyLevels();
  2420. // 2. CalculateBaseBytes();
  2421. // 3. UpdateFilesByCompactionPri();
  2422. // 4. GenerateFileIndexer();
  2423. // 5. GenerateLevelFilesBrief();
  2424. // 6. GenerateLevel0NonOverlapping();
  2425. // 7. GenerateBottommostFiles();
  2426. void VersionStorageInfo::SetFinalized() {
  2427. finalized_ = true;
  2428. #ifndef NDEBUG
  2429. if (compaction_style_ != kCompactionStyleLevel) {
  2430. // Not level based compaction.
  2431. return;
  2432. }
  2433. assert(base_level_ < 0 || num_levels() == 1 ||
  2434. (base_level_ >= 1 && base_level_ < num_levels()));
  2435. // Verify all levels newer than base_level are empty except L0
  2436. for (int level = 1; level < base_level(); level++) {
  2437. assert(NumLevelBytes(level) == 0);
  2438. }
  2439. uint64_t max_bytes_prev_level = 0;
  2440. for (int level = base_level(); level < num_levels() - 1; level++) {
  2441. if (LevelFiles(level).size() == 0) {
  2442. continue;
  2443. }
  2444. assert(MaxBytesForLevel(level) >= max_bytes_prev_level);
  2445. max_bytes_prev_level = MaxBytesForLevel(level);
  2446. }
  2447. int num_empty_non_l0_level = 0;
  2448. for (int level = 0; level < num_levels(); level++) {
  2449. assert(LevelFiles(level).size() == 0 ||
  2450. LevelFiles(level).size() == LevelFilesBrief(level).num_files);
  2451. if (level > 0 && NumLevelBytes(level) > 0) {
  2452. num_empty_non_l0_level++;
  2453. }
  2454. if (LevelFiles(level).size() > 0) {
  2455. assert(level < num_non_empty_levels());
  2456. }
  2457. }
  2458. assert(compaction_level_.size() > 0);
  2459. assert(compaction_level_.size() == compaction_score_.size());
  2460. #endif
  2461. }
  2462. void VersionStorageInfo::UpdateNumNonEmptyLevels() {
  2463. num_non_empty_levels_ = num_levels_;
  2464. for (int i = num_levels_ - 1; i >= 0; i--) {
  2465. if (files_[i].size() != 0) {
  2466. return;
  2467. } else {
  2468. num_non_empty_levels_ = i;
  2469. }
  2470. }
  2471. }
  2472. namespace {
  2473. // Sort `temp` based on ratio of overlapping size over file size
  2474. void SortFileByOverlappingRatio(
  2475. const InternalKeyComparator& icmp, const std::vector<FileMetaData*>& files,
  2476. const std::vector<FileMetaData*>& next_level_files,
  2477. std::vector<Fsize>* temp) {
  2478. std::unordered_map<uint64_t, uint64_t> file_to_order;
  2479. auto next_level_it = next_level_files.begin();
  2480. for (auto& file : files) {
  2481. uint64_t overlapping_bytes = 0;
  2482. // Skip files in next level that is smaller than current file
  2483. while (next_level_it != next_level_files.end() &&
  2484. icmp.Compare((*next_level_it)->largest, file->smallest) < 0) {
  2485. next_level_it++;
  2486. }
  2487. while (next_level_it != next_level_files.end() &&
  2488. icmp.Compare((*next_level_it)->smallest, file->largest) < 0) {
  2489. overlapping_bytes += (*next_level_it)->fd.file_size;
  2490. if (icmp.Compare((*next_level_it)->largest, file->largest) > 0) {
  2491. // next level file cross large boundary of current file.
  2492. break;
  2493. }
  2494. next_level_it++;
  2495. }
  2496. assert(file->compensated_file_size != 0);
  2497. file_to_order[file->fd.GetNumber()] =
  2498. overlapping_bytes * 1024u / file->compensated_file_size;
  2499. }
  2500. std::sort(temp->begin(), temp->end(),
  2501. [&](const Fsize& f1, const Fsize& f2) -> bool {
  2502. return file_to_order[f1.file->fd.GetNumber()] <
  2503. file_to_order[f2.file->fd.GetNumber()];
  2504. });
  2505. }
  2506. } // namespace
  2507. void VersionStorageInfo::UpdateFilesByCompactionPri(
  2508. CompactionPri compaction_pri) {
  2509. if (compaction_style_ == kCompactionStyleNone ||
  2510. compaction_style_ == kCompactionStyleFIFO ||
  2511. compaction_style_ == kCompactionStyleUniversal) {
  2512. // don't need this
  2513. return;
  2514. }
  2515. // No need to sort the highest level because it is never compacted.
  2516. for (int level = 0; level < num_levels() - 1; level++) {
  2517. const std::vector<FileMetaData*>& files = files_[level];
  2518. auto& files_by_compaction_pri = files_by_compaction_pri_[level];
  2519. assert(files_by_compaction_pri.size() == 0);
  2520. // populate a temp vector for sorting based on size
  2521. std::vector<Fsize> temp(files.size());
  2522. for (size_t i = 0; i < files.size(); i++) {
  2523. temp[i].index = i;
  2524. temp[i].file = files[i];
  2525. }
  2526. // sort the top number_of_files_to_sort_ based on file size
  2527. size_t num = VersionStorageInfo::kNumberFilesToSort;
  2528. if (num > temp.size()) {
  2529. num = temp.size();
  2530. }
  2531. switch (compaction_pri) {
  2532. case kByCompensatedSize:
  2533. std::partial_sort(temp.begin(), temp.begin() + num, temp.end(),
  2534. CompareCompensatedSizeDescending);
  2535. break;
  2536. case kOldestLargestSeqFirst:
  2537. std::sort(temp.begin(), temp.end(),
  2538. [](const Fsize& f1, const Fsize& f2) -> bool {
  2539. return f1.file->fd.largest_seqno <
  2540. f2.file->fd.largest_seqno;
  2541. });
  2542. break;
  2543. case kOldestSmallestSeqFirst:
  2544. std::sort(temp.begin(), temp.end(),
  2545. [](const Fsize& f1, const Fsize& f2) -> bool {
  2546. return f1.file->fd.smallest_seqno <
  2547. f2.file->fd.smallest_seqno;
  2548. });
  2549. break;
  2550. case kMinOverlappingRatio:
  2551. SortFileByOverlappingRatio(*internal_comparator_, files_[level],
  2552. files_[level + 1], &temp);
  2553. break;
  2554. default:
  2555. assert(false);
  2556. }
  2557. assert(temp.size() == files.size());
  2558. // initialize files_by_compaction_pri_
  2559. for (size_t i = 0; i < temp.size(); i++) {
  2560. files_by_compaction_pri.push_back(static_cast<int>(temp[i].index));
  2561. }
  2562. next_file_to_compact_by_size_[level] = 0;
  2563. assert(files_[level].size() == files_by_compaction_pri_[level].size());
  2564. }
  2565. }
  2566. void VersionStorageInfo::GenerateLevel0NonOverlapping() {
  2567. assert(!finalized_);
  2568. level0_non_overlapping_ = true;
  2569. if (level_files_brief_.size() == 0) {
  2570. return;
  2571. }
  2572. // A copy of L0 files sorted by smallest key
  2573. std::vector<FdWithKeyRange> level0_sorted_file(
  2574. level_files_brief_[0].files,
  2575. level_files_brief_[0].files + level_files_brief_[0].num_files);
  2576. std::sort(level0_sorted_file.begin(), level0_sorted_file.end(),
  2577. [this](const FdWithKeyRange& f1, const FdWithKeyRange& f2) -> bool {
  2578. return (internal_comparator_->Compare(f1.smallest_key,
  2579. f2.smallest_key) < 0);
  2580. });
  2581. for (size_t i = 1; i < level0_sorted_file.size(); ++i) {
  2582. FdWithKeyRange& f = level0_sorted_file[i];
  2583. FdWithKeyRange& prev = level0_sorted_file[i - 1];
  2584. if (internal_comparator_->Compare(prev.largest_key, f.smallest_key) >= 0) {
  2585. level0_non_overlapping_ = false;
  2586. break;
  2587. }
  2588. }
  2589. }
  2590. void VersionStorageInfo::GenerateBottommostFiles() {
  2591. assert(!finalized_);
  2592. assert(bottommost_files_.empty());
  2593. for (size_t level = 0; level < level_files_brief_.size(); ++level) {
  2594. for (size_t file_idx = 0; file_idx < level_files_brief_[level].num_files;
  2595. ++file_idx) {
  2596. const FdWithKeyRange& f = level_files_brief_[level].files[file_idx];
  2597. int l0_file_idx;
  2598. if (level == 0) {
  2599. l0_file_idx = static_cast<int>(file_idx);
  2600. } else {
  2601. l0_file_idx = -1;
  2602. }
  2603. Slice smallest_user_key = ExtractUserKey(f.smallest_key);
  2604. Slice largest_user_key = ExtractUserKey(f.largest_key);
  2605. if (!RangeMightExistAfterSortedRun(smallest_user_key, largest_user_key,
  2606. static_cast<int>(level),
  2607. l0_file_idx)) {
  2608. bottommost_files_.emplace_back(static_cast<int>(level),
  2609. f.file_metadata);
  2610. }
  2611. }
  2612. }
  2613. }
  2614. void VersionStorageInfo::UpdateOldestSnapshot(SequenceNumber seqnum) {
  2615. assert(seqnum >= oldest_snapshot_seqnum_);
  2616. oldest_snapshot_seqnum_ = seqnum;
  2617. if (oldest_snapshot_seqnum_ > bottommost_files_mark_threshold_) {
  2618. ComputeBottommostFilesMarkedForCompaction();
  2619. }
  2620. }
  2621. void VersionStorageInfo::ComputeBottommostFilesMarkedForCompaction() {
  2622. bottommost_files_marked_for_compaction_.clear();
  2623. bottommost_files_mark_threshold_ = kMaxSequenceNumber;
  2624. for (auto& level_and_file : bottommost_files_) {
  2625. if (!level_and_file.second->being_compacted &&
  2626. level_and_file.second->fd.largest_seqno != 0 &&
  2627. level_and_file.second->num_deletions > 1) {
  2628. // largest_seqno might be nonzero due to containing the final key in an
  2629. // earlier compaction, whose seqnum we didn't zero out. Multiple deletions
  2630. // ensures the file really contains deleted or overwritten keys.
  2631. if (level_and_file.second->fd.largest_seqno < oldest_snapshot_seqnum_) {
  2632. bottommost_files_marked_for_compaction_.push_back(level_and_file);
  2633. } else {
  2634. bottommost_files_mark_threshold_ =
  2635. std::min(bottommost_files_mark_threshold_,
  2636. level_and_file.second->fd.largest_seqno);
  2637. }
  2638. }
  2639. }
  2640. }
  2641. void Version::Ref() {
  2642. ++refs_;
  2643. }
  2644. bool Version::Unref() {
  2645. assert(refs_ >= 1);
  2646. --refs_;
  2647. if (refs_ == 0) {
  2648. delete this;
  2649. return true;
  2650. }
  2651. return false;
  2652. }
  2653. bool VersionStorageInfo::OverlapInLevel(int level,
  2654. const Slice* smallest_user_key,
  2655. const Slice* largest_user_key) {
  2656. if (level >= num_non_empty_levels_) {
  2657. // empty level, no overlap
  2658. return false;
  2659. }
  2660. return SomeFileOverlapsRange(*internal_comparator_, (level > 0),
  2661. level_files_brief_[level], smallest_user_key,
  2662. largest_user_key);
  2663. }
  2664. // Store in "*inputs" all files in "level" that overlap [begin,end]
  2665. // If hint_index is specified, then it points to a file in the
  2666. // overlapping range.
  2667. // The file_index returns a pointer to any file in an overlapping range.
  2668. void VersionStorageInfo::GetOverlappingInputs(
  2669. int level, const InternalKey* begin, const InternalKey* end,
  2670. std::vector<FileMetaData*>* inputs, int hint_index, int* file_index,
  2671. bool expand_range, InternalKey** next_smallest) const {
  2672. if (level >= num_non_empty_levels_) {
  2673. // this level is empty, no overlapping inputs
  2674. return;
  2675. }
  2676. inputs->clear();
  2677. if (file_index) {
  2678. *file_index = -1;
  2679. }
  2680. const Comparator* user_cmp = user_comparator_;
  2681. if (level > 0) {
  2682. GetOverlappingInputsRangeBinarySearch(level, begin, end, inputs, hint_index,
  2683. file_index, false, next_smallest);
  2684. return;
  2685. }
  2686. if (next_smallest) {
  2687. // next_smallest key only makes sense for non-level 0, where files are
  2688. // non-overlapping
  2689. *next_smallest = nullptr;
  2690. }
  2691. Slice user_begin, user_end;
  2692. if (begin != nullptr) {
  2693. user_begin = begin->user_key();
  2694. }
  2695. if (end != nullptr) {
  2696. user_end = end->user_key();
  2697. }
  2698. // index stores the file index need to check.
  2699. std::list<size_t> index;
  2700. for (size_t i = 0; i < level_files_brief_[level].num_files; i++) {
  2701. index.emplace_back(i);
  2702. }
  2703. while (!index.empty()) {
  2704. bool found_overlapping_file = false;
  2705. auto iter = index.begin();
  2706. while (iter != index.end()) {
  2707. FdWithKeyRange* f = &(level_files_brief_[level].files[*iter]);
  2708. const Slice file_start = ExtractUserKey(f->smallest_key);
  2709. const Slice file_limit = ExtractUserKey(f->largest_key);
  2710. if (begin != nullptr &&
  2711. user_cmp->CompareWithoutTimestamp(file_limit, user_begin) < 0) {
  2712. // "f" is completely before specified range; skip it
  2713. iter++;
  2714. } else if (end != nullptr &&
  2715. user_cmp->CompareWithoutTimestamp(file_start, user_end) > 0) {
  2716. // "f" is completely after specified range; skip it
  2717. iter++;
  2718. } else {
  2719. // if overlap
  2720. inputs->emplace_back(files_[level][*iter]);
  2721. found_overlapping_file = true;
  2722. // record the first file index.
  2723. if (file_index && *file_index == -1) {
  2724. *file_index = static_cast<int>(*iter);
  2725. }
  2726. // the related file is overlap, erase to avoid checking again.
  2727. iter = index.erase(iter);
  2728. if (expand_range) {
  2729. if (begin != nullptr &&
  2730. user_cmp->CompareWithoutTimestamp(file_start, user_begin) < 0) {
  2731. user_begin = file_start;
  2732. }
  2733. if (end != nullptr &&
  2734. user_cmp->CompareWithoutTimestamp(file_limit, user_end) > 0) {
  2735. user_end = file_limit;
  2736. }
  2737. }
  2738. }
  2739. }
  2740. // if all the files left are not overlap, break
  2741. if (!found_overlapping_file) {
  2742. break;
  2743. }
  2744. }
  2745. }
  2746. // Store in "*inputs" files in "level" that within range [begin,end]
  2747. // Guarantee a "clean cut" boundary between the files in inputs
  2748. // and the surrounding files and the maxinum number of files.
  2749. // This will ensure that no parts of a key are lost during compaction.
  2750. // If hint_index is specified, then it points to a file in the range.
  2751. // The file_index returns a pointer to any file in an overlapping range.
  2752. void VersionStorageInfo::GetCleanInputsWithinInterval(
  2753. int level, const InternalKey* begin, const InternalKey* end,
  2754. std::vector<FileMetaData*>* inputs, int hint_index, int* file_index) const {
  2755. inputs->clear();
  2756. if (file_index) {
  2757. *file_index = -1;
  2758. }
  2759. if (level >= num_non_empty_levels_ || level == 0 ||
  2760. level_files_brief_[level].num_files == 0) {
  2761. // this level is empty, no inputs within range
  2762. // also don't support clean input interval within L0
  2763. return;
  2764. }
  2765. GetOverlappingInputsRangeBinarySearch(level, begin, end, inputs,
  2766. hint_index, file_index,
  2767. true /* within_interval */);
  2768. }
  2769. // Store in "*inputs" all files in "level" that overlap [begin,end]
  2770. // Employ binary search to find at least one file that overlaps the
  2771. // specified range. From that file, iterate backwards and
  2772. // forwards to find all overlapping files.
  2773. // if within_range is set, then only store the maximum clean inputs
  2774. // within range [begin, end]. "clean" means there is a boudnary
  2775. // between the files in "*inputs" and the surrounding files
  2776. void VersionStorageInfo::GetOverlappingInputsRangeBinarySearch(
  2777. int level, const InternalKey* begin, const InternalKey* end,
  2778. std::vector<FileMetaData*>* inputs, int hint_index, int* file_index,
  2779. bool within_interval, InternalKey** next_smallest) const {
  2780. assert(level > 0);
  2781. auto user_cmp = user_comparator_;
  2782. const FdWithKeyRange* files = level_files_brief_[level].files;
  2783. const int num_files = static_cast<int>(level_files_brief_[level].num_files);
  2784. // begin to use binary search to find lower bound
  2785. // and upper bound.
  2786. int start_index = 0;
  2787. int end_index = num_files;
  2788. if (begin != nullptr) {
  2789. // if within_interval is true, with file_key would find
  2790. // not overlapping ranges in std::lower_bound.
  2791. auto cmp = [&user_cmp, &within_interval](const FdWithKeyRange& f,
  2792. const InternalKey* k) {
  2793. auto& file_key = within_interval ? f.file_metadata->smallest
  2794. : f.file_metadata->largest;
  2795. return sstableKeyCompare(user_cmp, file_key, *k) < 0;
  2796. };
  2797. start_index = static_cast<int>(
  2798. std::lower_bound(files,
  2799. files + (hint_index == -1 ? num_files : hint_index),
  2800. begin, cmp) -
  2801. files);
  2802. if (start_index > 0 && within_interval) {
  2803. bool is_overlapping = true;
  2804. while (is_overlapping && start_index < num_files) {
  2805. auto& pre_limit = files[start_index - 1].file_metadata->largest;
  2806. auto& cur_start = files[start_index].file_metadata->smallest;
  2807. is_overlapping = sstableKeyCompare(user_cmp, pre_limit, cur_start) == 0;
  2808. start_index += is_overlapping;
  2809. }
  2810. }
  2811. }
  2812. if (end != nullptr) {
  2813. // if within_interval is true, with file_key would find
  2814. // not overlapping ranges in std::upper_bound.
  2815. auto cmp = [&user_cmp, &within_interval](const InternalKey* k,
  2816. const FdWithKeyRange& f) {
  2817. auto& file_key = within_interval ? f.file_metadata->largest
  2818. : f.file_metadata->smallest;
  2819. return sstableKeyCompare(user_cmp, *k, file_key) < 0;
  2820. };
  2821. end_index = static_cast<int>(
  2822. std::upper_bound(files + start_index, files + num_files, end, cmp) -
  2823. files);
  2824. if (end_index < num_files && within_interval) {
  2825. bool is_overlapping = true;
  2826. while (is_overlapping && end_index > start_index) {
  2827. auto& next_start = files[end_index].file_metadata->smallest;
  2828. auto& cur_limit = files[end_index - 1].file_metadata->largest;
  2829. is_overlapping =
  2830. sstableKeyCompare(user_cmp, cur_limit, next_start) == 0;
  2831. end_index -= is_overlapping;
  2832. }
  2833. }
  2834. }
  2835. assert(start_index <= end_index);
  2836. // If there were no overlapping files, return immediately.
  2837. if (start_index == end_index) {
  2838. if (next_smallest) {
  2839. *next_smallest = nullptr;
  2840. }
  2841. return;
  2842. }
  2843. assert(start_index < end_index);
  2844. // returns the index where an overlap is found
  2845. if (file_index) {
  2846. *file_index = start_index;
  2847. }
  2848. // insert overlapping files into vector
  2849. for (int i = start_index; i < end_index; i++) {
  2850. inputs->push_back(files_[level][i]);
  2851. }
  2852. if (next_smallest != nullptr) {
  2853. // Provide the next key outside the range covered by inputs
  2854. if (end_index < static_cast<int>(files_[level].size())) {
  2855. **next_smallest = files_[level][end_index]->smallest;
  2856. } else {
  2857. *next_smallest = nullptr;
  2858. }
  2859. }
  2860. }
  2861. uint64_t VersionStorageInfo::NumLevelBytes(int level) const {
  2862. assert(level >= 0);
  2863. assert(level < num_levels());
  2864. return TotalFileSize(files_[level]);
  2865. }
  2866. const char* VersionStorageInfo::LevelSummary(
  2867. LevelSummaryStorage* scratch) const {
  2868. int len = 0;
  2869. if (compaction_style_ == kCompactionStyleLevel && num_levels() > 1) {
  2870. assert(base_level_ < static_cast<int>(level_max_bytes_.size()));
  2871. if (level_multiplier_ != 0.0) {
  2872. len = snprintf(
  2873. scratch->buffer, sizeof(scratch->buffer),
  2874. "base level %d level multiplier %.2f max bytes base %" PRIu64 " ",
  2875. base_level_, level_multiplier_, level_max_bytes_[base_level_]);
  2876. }
  2877. }
  2878. len +=
  2879. snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len, "files[");
  2880. for (int i = 0; i < num_levels(); i++) {
  2881. int sz = sizeof(scratch->buffer) - len;
  2882. int ret = snprintf(scratch->buffer + len, sz, "%d ", int(files_[i].size()));
  2883. if (ret < 0 || ret >= sz) break;
  2884. len += ret;
  2885. }
  2886. if (len > 0) {
  2887. // overwrite the last space
  2888. --len;
  2889. }
  2890. len += snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len,
  2891. "] max score %.2f", compaction_score_[0]);
  2892. if (!files_marked_for_compaction_.empty()) {
  2893. snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len,
  2894. " (%" ROCKSDB_PRIszt " files need compaction)",
  2895. files_marked_for_compaction_.size());
  2896. }
  2897. return scratch->buffer;
  2898. }
  2899. const char* VersionStorageInfo::LevelFileSummary(FileSummaryStorage* scratch,
  2900. int level) const {
  2901. int len = snprintf(scratch->buffer, sizeof(scratch->buffer), "files_size[");
  2902. for (const auto& f : files_[level]) {
  2903. int sz = sizeof(scratch->buffer) - len;
  2904. char sztxt[16];
  2905. AppendHumanBytes(f->fd.GetFileSize(), sztxt, sizeof(sztxt));
  2906. int ret = snprintf(scratch->buffer + len, sz,
  2907. "#%" PRIu64 "(seq=%" PRIu64 ",sz=%s,%d) ",
  2908. f->fd.GetNumber(), f->fd.smallest_seqno, sztxt,
  2909. static_cast<int>(f->being_compacted));
  2910. if (ret < 0 || ret >= sz)
  2911. break;
  2912. len += ret;
  2913. }
  2914. // overwrite the last space (only if files_[level].size() is non-zero)
  2915. if (files_[level].size() && len > 0) {
  2916. --len;
  2917. }
  2918. snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len, "]");
  2919. return scratch->buffer;
  2920. }
  2921. int64_t VersionStorageInfo::MaxNextLevelOverlappingBytes() {
  2922. uint64_t result = 0;
  2923. std::vector<FileMetaData*> overlaps;
  2924. for (int level = 1; level < num_levels() - 1; level++) {
  2925. for (const auto& f : files_[level]) {
  2926. GetOverlappingInputs(level + 1, &f->smallest, &f->largest, &overlaps);
  2927. const uint64_t sum = TotalFileSize(overlaps);
  2928. if (sum > result) {
  2929. result = sum;
  2930. }
  2931. }
  2932. }
  2933. return result;
  2934. }
  2935. uint64_t VersionStorageInfo::MaxBytesForLevel(int level) const {
  2936. // Note: the result for level zero is not really used since we set
  2937. // the level-0 compaction threshold based on number of files.
  2938. assert(level >= 0);
  2939. assert(level < static_cast<int>(level_max_bytes_.size()));
  2940. return level_max_bytes_[level];
  2941. }
  2942. void VersionStorageInfo::CalculateBaseBytes(const ImmutableCFOptions& ioptions,
  2943. const MutableCFOptions& options) {
  2944. // Special logic to set number of sorted runs.
  2945. // It is to match the previous behavior when all files are in L0.
  2946. int num_l0_count = static_cast<int>(files_[0].size());
  2947. if (compaction_style_ == kCompactionStyleUniversal) {
  2948. // For universal compaction, we use level0 score to indicate
  2949. // compaction score for the whole DB. Adding other levels as if
  2950. // they are L0 files.
  2951. for (int i = 1; i < num_levels(); i++) {
  2952. if (!files_[i].empty()) {
  2953. num_l0_count++;
  2954. }
  2955. }
  2956. }
  2957. set_l0_delay_trigger_count(num_l0_count);
  2958. level_max_bytes_.resize(ioptions.num_levels);
  2959. if (!ioptions.level_compaction_dynamic_level_bytes) {
  2960. base_level_ = (ioptions.compaction_style == kCompactionStyleLevel) ? 1 : -1;
  2961. // Calculate for static bytes base case
  2962. for (int i = 0; i < ioptions.num_levels; ++i) {
  2963. if (i == 0 && ioptions.compaction_style == kCompactionStyleUniversal) {
  2964. level_max_bytes_[i] = options.max_bytes_for_level_base;
  2965. } else if (i > 1) {
  2966. level_max_bytes_[i] = MultiplyCheckOverflow(
  2967. MultiplyCheckOverflow(level_max_bytes_[i - 1],
  2968. options.max_bytes_for_level_multiplier),
  2969. options.MaxBytesMultiplerAdditional(i - 1));
  2970. } else {
  2971. level_max_bytes_[i] = options.max_bytes_for_level_base;
  2972. }
  2973. }
  2974. } else {
  2975. uint64_t max_level_size = 0;
  2976. int first_non_empty_level = -1;
  2977. // Find size of non-L0 level of most data.
  2978. // Cannot use the size of the last level because it can be empty or less
  2979. // than previous levels after compaction.
  2980. for (int i = 1; i < num_levels_; i++) {
  2981. uint64_t total_size = 0;
  2982. for (const auto& f : files_[i]) {
  2983. total_size += f->fd.GetFileSize();
  2984. }
  2985. if (total_size > 0 && first_non_empty_level == -1) {
  2986. first_non_empty_level = i;
  2987. }
  2988. if (total_size > max_level_size) {
  2989. max_level_size = total_size;
  2990. }
  2991. }
  2992. // Prefill every level's max bytes to disallow compaction from there.
  2993. for (int i = 0; i < num_levels_; i++) {
  2994. level_max_bytes_[i] = std::numeric_limits<uint64_t>::max();
  2995. }
  2996. if (max_level_size == 0) {
  2997. // No data for L1 and up. L0 compacts to last level directly.
  2998. // No compaction from L1+ needs to be scheduled.
  2999. base_level_ = num_levels_ - 1;
  3000. } else {
  3001. uint64_t l0_size = 0;
  3002. for (const auto& f : files_[0]) {
  3003. l0_size += f->fd.GetFileSize();
  3004. }
  3005. uint64_t base_bytes_max =
  3006. std::max(options.max_bytes_for_level_base, l0_size);
  3007. uint64_t base_bytes_min = static_cast<uint64_t>(
  3008. base_bytes_max / options.max_bytes_for_level_multiplier);
  3009. // Try whether we can make last level's target size to be max_level_size
  3010. uint64_t cur_level_size = max_level_size;
  3011. for (int i = num_levels_ - 2; i >= first_non_empty_level; i--) {
  3012. // Round up after dividing
  3013. cur_level_size = static_cast<uint64_t>(
  3014. cur_level_size / options.max_bytes_for_level_multiplier);
  3015. }
  3016. // Calculate base level and its size.
  3017. uint64_t base_level_size;
  3018. if (cur_level_size <= base_bytes_min) {
  3019. // Case 1. If we make target size of last level to be max_level_size,
  3020. // target size of the first non-empty level would be smaller than
  3021. // base_bytes_min. We set it be base_bytes_min.
  3022. base_level_size = base_bytes_min + 1U;
  3023. base_level_ = first_non_empty_level;
  3024. ROCKS_LOG_INFO(ioptions.info_log,
  3025. "More existing levels in DB than needed. "
  3026. "max_bytes_for_level_multiplier may not be guaranteed.");
  3027. } else {
  3028. // Find base level (where L0 data is compacted to).
  3029. base_level_ = first_non_empty_level;
  3030. while (base_level_ > 1 && cur_level_size > base_bytes_max) {
  3031. --base_level_;
  3032. cur_level_size = static_cast<uint64_t>(
  3033. cur_level_size / options.max_bytes_for_level_multiplier);
  3034. }
  3035. if (cur_level_size > base_bytes_max) {
  3036. // Even L1 will be too large
  3037. assert(base_level_ == 1);
  3038. base_level_size = base_bytes_max;
  3039. } else {
  3040. base_level_size = cur_level_size;
  3041. }
  3042. }
  3043. level_multiplier_ = options.max_bytes_for_level_multiplier;
  3044. assert(base_level_size > 0);
  3045. if (l0_size > base_level_size &&
  3046. (l0_size > options.max_bytes_for_level_base ||
  3047. static_cast<int>(files_[0].size() / 2) >=
  3048. options.level0_file_num_compaction_trigger)) {
  3049. // We adjust the base level according to actual L0 size, and adjust
  3050. // the level multiplier accordingly, when:
  3051. // 1. the L0 size is larger than level size base, or
  3052. // 2. number of L0 files reaches twice the L0->L1 compaction trigger
  3053. // We don't do this otherwise to keep the LSM-tree structure stable
  3054. // unless the L0 compation is backlogged.
  3055. base_level_size = l0_size;
  3056. if (base_level_ == num_levels_ - 1) {
  3057. level_multiplier_ = 1.0;
  3058. } else {
  3059. level_multiplier_ = std::pow(
  3060. static_cast<double>(max_level_size) /
  3061. static_cast<double>(base_level_size),
  3062. 1.0 / static_cast<double>(num_levels_ - base_level_ - 1));
  3063. }
  3064. }
  3065. uint64_t level_size = base_level_size;
  3066. for (int i = base_level_; i < num_levels_; i++) {
  3067. if (i > base_level_) {
  3068. level_size = MultiplyCheckOverflow(level_size, level_multiplier_);
  3069. }
  3070. // Don't set any level below base_bytes_max. Otherwise, the LSM can
  3071. // assume an hourglass shape where L1+ sizes are smaller than L0. This
  3072. // causes compaction scoring, which depends on level sizes, to favor L1+
  3073. // at the expense of L0, which may fill up and stall.
  3074. level_max_bytes_[i] = std::max(level_size, base_bytes_max);
  3075. }
  3076. }
  3077. }
  3078. }
  3079. uint64_t VersionStorageInfo::EstimateLiveDataSize() const {
  3080. // Estimate the live data size by adding up the size of the last level for all
  3081. // key ranges. Note: Estimate depends on the ordering of files in level 0
  3082. // because files in level 0 can be overlapping.
  3083. uint64_t size = 0;
  3084. auto ikey_lt = [this](InternalKey* x, InternalKey* y) {
  3085. return internal_comparator_->Compare(*x, *y) < 0;
  3086. };
  3087. // (Ordered) map of largest keys in non-overlapping files
  3088. std::map<InternalKey*, FileMetaData*, decltype(ikey_lt)> ranges(ikey_lt);
  3089. for (int l = num_levels_ - 1; l >= 0; l--) {
  3090. bool found_end = false;
  3091. for (auto file : files_[l]) {
  3092. // Find the first file where the largest key is larger than the smallest
  3093. // key of the current file. If this file does not overlap with the
  3094. // current file, none of the files in the map does. If there is
  3095. // no potential overlap, we can safely insert the rest of this level
  3096. // (if the level is not 0) into the map without checking again because
  3097. // the elements in the level are sorted and non-overlapping.
  3098. auto lb = (found_end && l != 0) ?
  3099. ranges.end() : ranges.lower_bound(&file->smallest);
  3100. found_end = (lb == ranges.end());
  3101. if (found_end || internal_comparator_->Compare(
  3102. file->largest, (*lb).second->smallest) < 0) {
  3103. ranges.emplace_hint(lb, &file->largest, file);
  3104. size += file->fd.file_size;
  3105. }
  3106. }
  3107. }
  3108. return size;
  3109. }
  3110. bool VersionStorageInfo::RangeMightExistAfterSortedRun(
  3111. const Slice& smallest_user_key, const Slice& largest_user_key,
  3112. int last_level, int last_l0_idx) {
  3113. assert((last_l0_idx != -1) == (last_level == 0));
  3114. // TODO(ajkr): this preserves earlier behavior where we considered an L0 file
  3115. // bottommost only if it's the oldest L0 file and there are no files on older
  3116. // levels. It'd be better to consider it bottommost if there's no overlap in
  3117. // older levels/files.
  3118. if (last_level == 0 &&
  3119. last_l0_idx != static_cast<int>(LevelFiles(0).size() - 1)) {
  3120. return true;
  3121. }
  3122. // Checks whether there are files living beyond the `last_level`. If lower
  3123. // levels have files, it checks for overlap between [`smallest_key`,
  3124. // `largest_key`] and those files. Bottomlevel optimizations can be made if
  3125. // there are no files in lower levels or if there is no overlap with the files
  3126. // in the lower levels.
  3127. for (int level = last_level + 1; level < num_levels(); level++) {
  3128. // The range is not in the bottommost level if there are files in lower
  3129. // levels when the `last_level` is 0 or if there are files in lower levels
  3130. // which overlap with [`smallest_key`, `largest_key`].
  3131. if (files_[level].size() > 0 &&
  3132. (last_level == 0 ||
  3133. OverlapInLevel(level, &smallest_user_key, &largest_user_key))) {
  3134. return true;
  3135. }
  3136. }
  3137. return false;
  3138. }
  3139. void Version::AddLiveFiles(std::vector<FileDescriptor>* live) {
  3140. for (int level = 0; level < storage_info_.num_levels(); level++) {
  3141. const std::vector<FileMetaData*>& files = storage_info_.files_[level];
  3142. for (const auto& file : files) {
  3143. live->push_back(file->fd);
  3144. }
  3145. }
  3146. }
  3147. std::string Version::DebugString(bool hex, bool print_stats) const {
  3148. std::string r;
  3149. for (int level = 0; level < storage_info_.num_levels_; level++) {
  3150. // E.g.,
  3151. // --- level 1 ---
  3152. // 17:123[1 .. 124]['a' .. 'd']
  3153. // 20:43[124 .. 128]['e' .. 'g']
  3154. //
  3155. // if print_stats=true:
  3156. // 17:123[1 .. 124]['a' .. 'd'](4096)
  3157. r.append("--- level ");
  3158. AppendNumberTo(&r, level);
  3159. r.append(" --- version# ");
  3160. AppendNumberTo(&r, version_number_);
  3161. r.append(" ---\n");
  3162. const std::vector<FileMetaData*>& files = storage_info_.files_[level];
  3163. for (size_t i = 0; i < files.size(); i++) {
  3164. r.push_back(' ');
  3165. AppendNumberTo(&r, files[i]->fd.GetNumber());
  3166. r.push_back(':');
  3167. AppendNumberTo(&r, files[i]->fd.GetFileSize());
  3168. r.append("[");
  3169. AppendNumberTo(&r, files[i]->fd.smallest_seqno);
  3170. r.append(" .. ");
  3171. AppendNumberTo(&r, files[i]->fd.largest_seqno);
  3172. r.append("]");
  3173. r.append("[");
  3174. r.append(files[i]->smallest.DebugString(hex));
  3175. r.append(" .. ");
  3176. r.append(files[i]->largest.DebugString(hex));
  3177. r.append("]");
  3178. if (files[i]->oldest_blob_file_number != kInvalidBlobFileNumber) {
  3179. r.append(" blob_file:");
  3180. AppendNumberTo(&r, files[i]->oldest_blob_file_number);
  3181. }
  3182. if (print_stats) {
  3183. r.append("(");
  3184. r.append(ToString(
  3185. files[i]->stats.num_reads_sampled.load(std::memory_order_relaxed)));
  3186. r.append(")");
  3187. }
  3188. r.append("\n");
  3189. }
  3190. }
  3191. return r;
  3192. }
  3193. // this is used to batch writes to the manifest file
  3194. struct VersionSet::ManifestWriter {
  3195. Status status;
  3196. bool done;
  3197. InstrumentedCondVar cv;
  3198. ColumnFamilyData* cfd;
  3199. const MutableCFOptions mutable_cf_options;
  3200. const autovector<VersionEdit*>& edit_list;
  3201. explicit ManifestWriter(InstrumentedMutex* mu, ColumnFamilyData* _cfd,
  3202. const MutableCFOptions& cf_options,
  3203. const autovector<VersionEdit*>& e)
  3204. : done(false),
  3205. cv(mu),
  3206. cfd(_cfd),
  3207. mutable_cf_options(cf_options),
  3208. edit_list(e) {}
  3209. };
  3210. Status AtomicGroupReadBuffer::AddEdit(VersionEdit* edit) {
  3211. assert(edit);
  3212. if (edit->is_in_atomic_group_) {
  3213. TEST_SYNC_POINT("AtomicGroupReadBuffer::AddEdit:AtomicGroup");
  3214. if (replay_buffer_.empty()) {
  3215. replay_buffer_.resize(edit->remaining_entries_ + 1);
  3216. TEST_SYNC_POINT_CALLBACK(
  3217. "AtomicGroupReadBuffer::AddEdit:FirstInAtomicGroup", edit);
  3218. }
  3219. read_edits_in_atomic_group_++;
  3220. if (read_edits_in_atomic_group_ + edit->remaining_entries_ !=
  3221. static_cast<uint32_t>(replay_buffer_.size())) {
  3222. TEST_SYNC_POINT_CALLBACK(
  3223. "AtomicGroupReadBuffer::AddEdit:IncorrectAtomicGroupSize", edit);
  3224. return Status::Corruption("corrupted atomic group");
  3225. }
  3226. replay_buffer_[read_edits_in_atomic_group_ - 1] = *edit;
  3227. if (read_edits_in_atomic_group_ == replay_buffer_.size()) {
  3228. TEST_SYNC_POINT_CALLBACK(
  3229. "AtomicGroupReadBuffer::AddEdit:LastInAtomicGroup", edit);
  3230. return Status::OK();
  3231. }
  3232. return Status::OK();
  3233. }
  3234. // A normal edit.
  3235. if (!replay_buffer().empty()) {
  3236. TEST_SYNC_POINT_CALLBACK(
  3237. "AtomicGroupReadBuffer::AddEdit:AtomicGroupMixedWithNormalEdits", edit);
  3238. return Status::Corruption("corrupted atomic group");
  3239. }
  3240. return Status::OK();
  3241. }
  3242. bool AtomicGroupReadBuffer::IsFull() const {
  3243. return read_edits_in_atomic_group_ == replay_buffer_.size();
  3244. }
  3245. bool AtomicGroupReadBuffer::IsEmpty() const { return replay_buffer_.empty(); }
  3246. void AtomicGroupReadBuffer::Clear() {
  3247. read_edits_in_atomic_group_ = 0;
  3248. replay_buffer_.clear();
  3249. }
  3250. VersionSet::VersionSet(const std::string& dbname,
  3251. const ImmutableDBOptions* _db_options,
  3252. const FileOptions& storage_options, Cache* table_cache,
  3253. WriteBufferManager* write_buffer_manager,
  3254. WriteController* write_controller,
  3255. BlockCacheTracer* const block_cache_tracer)
  3256. : column_family_set_(new ColumnFamilySet(
  3257. dbname, _db_options, storage_options, table_cache,
  3258. write_buffer_manager, write_controller, block_cache_tracer)),
  3259. env_(_db_options->env),
  3260. fs_(_db_options->fs.get()),
  3261. dbname_(dbname),
  3262. db_options_(_db_options),
  3263. next_file_number_(2),
  3264. manifest_file_number_(0), // Filled by Recover()
  3265. options_file_number_(0),
  3266. pending_manifest_file_number_(0),
  3267. last_sequence_(0),
  3268. last_allocated_sequence_(0),
  3269. last_published_sequence_(0),
  3270. prev_log_number_(0),
  3271. current_version_number_(0),
  3272. manifest_file_size_(0),
  3273. file_options_(storage_options),
  3274. block_cache_tracer_(block_cache_tracer) {}
  3275. VersionSet::~VersionSet() {
  3276. // we need to delete column_family_set_ because its destructor depends on
  3277. // VersionSet
  3278. Cache* table_cache = column_family_set_->get_table_cache();
  3279. column_family_set_.reset();
  3280. for (auto& file : obsolete_files_) {
  3281. if (file.metadata->table_reader_handle) {
  3282. table_cache->Release(file.metadata->table_reader_handle);
  3283. TableCache::Evict(table_cache, file.metadata->fd.GetNumber());
  3284. }
  3285. file.DeleteMetadata();
  3286. }
  3287. obsolete_files_.clear();
  3288. }
  3289. void VersionSet::AppendVersion(ColumnFamilyData* column_family_data,
  3290. Version* v) {
  3291. // compute new compaction score
  3292. v->storage_info()->ComputeCompactionScore(
  3293. *column_family_data->ioptions(),
  3294. *column_family_data->GetLatestMutableCFOptions());
  3295. // Mark v finalized
  3296. v->storage_info_.SetFinalized();
  3297. // Make "v" current
  3298. assert(v->refs_ == 0);
  3299. Version* current = column_family_data->current();
  3300. assert(v != current);
  3301. if (current != nullptr) {
  3302. assert(current->refs_ > 0);
  3303. current->Unref();
  3304. }
  3305. column_family_data->SetCurrent(v);
  3306. v->Ref();
  3307. // Append to linked list
  3308. v->prev_ = column_family_data->dummy_versions()->prev_;
  3309. v->next_ = column_family_data->dummy_versions();
  3310. v->prev_->next_ = v;
  3311. v->next_->prev_ = v;
  3312. }
  3313. Status VersionSet::ProcessManifestWrites(
  3314. std::deque<ManifestWriter>& writers, InstrumentedMutex* mu,
  3315. Directory* db_directory, bool new_descriptor_log,
  3316. const ColumnFamilyOptions* new_cf_options) {
  3317. assert(!writers.empty());
  3318. ManifestWriter& first_writer = writers.front();
  3319. ManifestWriter* last_writer = &first_writer;
  3320. assert(!manifest_writers_.empty());
  3321. assert(manifest_writers_.front() == &first_writer);
  3322. autovector<VersionEdit*> batch_edits;
  3323. autovector<Version*> versions;
  3324. autovector<const MutableCFOptions*> mutable_cf_options_ptrs;
  3325. std::vector<std::unique_ptr<BaseReferencedVersionBuilder>> builder_guards;
  3326. if (first_writer.edit_list.front()->IsColumnFamilyManipulation()) {
  3327. // No group commits for column family add or drop
  3328. LogAndApplyCFHelper(first_writer.edit_list.front());
  3329. batch_edits.push_back(first_writer.edit_list.front());
  3330. } else {
  3331. auto it = manifest_writers_.cbegin();
  3332. size_t group_start = std::numeric_limits<size_t>::max();
  3333. while (it != manifest_writers_.cend()) {
  3334. if ((*it)->edit_list.front()->IsColumnFamilyManipulation()) {
  3335. // no group commits for column family add or drop
  3336. break;
  3337. }
  3338. last_writer = *(it++);
  3339. assert(last_writer != nullptr);
  3340. assert(last_writer->cfd != nullptr);
  3341. if (last_writer->cfd->IsDropped()) {
  3342. // If we detect a dropped CF at this point, and the corresponding
  3343. // version edits belong to an atomic group, then we need to find out
  3344. // the preceding version edits in the same atomic group, and update
  3345. // their `remaining_entries_` member variable because we are NOT going
  3346. // to write the version edits' of dropped CF to the MANIFEST. If we
  3347. // don't update, then Recover can report corrupted atomic group because
  3348. // the `remaining_entries_` do not match.
  3349. if (!batch_edits.empty()) {
  3350. if (batch_edits.back()->is_in_atomic_group_ &&
  3351. batch_edits.back()->remaining_entries_ > 0) {
  3352. assert(group_start < batch_edits.size());
  3353. const auto& edit_list = last_writer->edit_list;
  3354. size_t k = 0;
  3355. while (k < edit_list.size()) {
  3356. if (!edit_list[k]->is_in_atomic_group_) {
  3357. break;
  3358. } else if (edit_list[k]->remaining_entries_ == 0) {
  3359. ++k;
  3360. break;
  3361. }
  3362. ++k;
  3363. }
  3364. for (auto i = group_start; i < batch_edits.size(); ++i) {
  3365. assert(static_cast<uint32_t>(k) <=
  3366. batch_edits.back()->remaining_entries_);
  3367. batch_edits[i]->remaining_entries_ -= static_cast<uint32_t>(k);
  3368. }
  3369. }
  3370. }
  3371. continue;
  3372. }
  3373. // We do a linear search on versions because versions is small.
  3374. // TODO(yanqin) maybe consider unordered_map
  3375. Version* version = nullptr;
  3376. VersionBuilder* builder = nullptr;
  3377. for (int i = 0; i != static_cast<int>(versions.size()); ++i) {
  3378. uint32_t cf_id = last_writer->cfd->GetID();
  3379. if (versions[i]->cfd()->GetID() == cf_id) {
  3380. version = versions[i];
  3381. assert(!builder_guards.empty() &&
  3382. builder_guards.size() == versions.size());
  3383. builder = builder_guards[i]->version_builder();
  3384. TEST_SYNC_POINT_CALLBACK(
  3385. "VersionSet::ProcessManifestWrites:SameColumnFamily", &cf_id);
  3386. break;
  3387. }
  3388. }
  3389. if (version == nullptr) {
  3390. version = new Version(last_writer->cfd, this, file_options_,
  3391. last_writer->mutable_cf_options,
  3392. current_version_number_++);
  3393. versions.push_back(version);
  3394. mutable_cf_options_ptrs.push_back(&last_writer->mutable_cf_options);
  3395. builder_guards.emplace_back(
  3396. new BaseReferencedVersionBuilder(last_writer->cfd));
  3397. builder = builder_guards.back()->version_builder();
  3398. }
  3399. assert(builder != nullptr); // make checker happy
  3400. for (const auto& e : last_writer->edit_list) {
  3401. if (e->is_in_atomic_group_) {
  3402. if (batch_edits.empty() || !batch_edits.back()->is_in_atomic_group_ ||
  3403. (batch_edits.back()->is_in_atomic_group_ &&
  3404. batch_edits.back()->remaining_entries_ == 0)) {
  3405. group_start = batch_edits.size();
  3406. }
  3407. } else if (group_start != std::numeric_limits<size_t>::max()) {
  3408. group_start = std::numeric_limits<size_t>::max();
  3409. }
  3410. Status s = LogAndApplyHelper(last_writer->cfd, builder, e, mu);
  3411. if (!s.ok()) {
  3412. // free up the allocated memory
  3413. for (auto v : versions) {
  3414. delete v;
  3415. }
  3416. return s;
  3417. }
  3418. batch_edits.push_back(e);
  3419. }
  3420. }
  3421. for (int i = 0; i < static_cast<int>(versions.size()); ++i) {
  3422. assert(!builder_guards.empty() &&
  3423. builder_guards.size() == versions.size());
  3424. auto* builder = builder_guards[i]->version_builder();
  3425. Status s = builder->SaveTo(versions[i]->storage_info());
  3426. if (!s.ok()) {
  3427. // free up the allocated memory
  3428. for (auto v : versions) {
  3429. delete v;
  3430. }
  3431. return s;
  3432. }
  3433. }
  3434. }
  3435. #ifndef NDEBUG
  3436. // Verify that version edits of atomic groups have correct
  3437. // remaining_entries_.
  3438. size_t k = 0;
  3439. while (k < batch_edits.size()) {
  3440. while (k < batch_edits.size() && !batch_edits[k]->is_in_atomic_group_) {
  3441. ++k;
  3442. }
  3443. if (k == batch_edits.size()) {
  3444. break;
  3445. }
  3446. size_t i = k;
  3447. while (i < batch_edits.size()) {
  3448. if (!batch_edits[i]->is_in_atomic_group_) {
  3449. break;
  3450. }
  3451. assert(i - k + batch_edits[i]->remaining_entries_ ==
  3452. batch_edits[k]->remaining_entries_);
  3453. if (batch_edits[i]->remaining_entries_ == 0) {
  3454. ++i;
  3455. break;
  3456. }
  3457. ++i;
  3458. }
  3459. assert(batch_edits[i - 1]->is_in_atomic_group_);
  3460. assert(0 == batch_edits[i - 1]->remaining_entries_);
  3461. std::vector<VersionEdit*> tmp;
  3462. for (size_t j = k; j != i; ++j) {
  3463. tmp.emplace_back(batch_edits[j]);
  3464. }
  3465. TEST_SYNC_POINT_CALLBACK(
  3466. "VersionSet::ProcessManifestWrites:CheckOneAtomicGroup", &tmp);
  3467. k = i;
  3468. }
  3469. #endif // NDEBUG
  3470. uint64_t new_manifest_file_size = 0;
  3471. Status s;
  3472. assert(pending_manifest_file_number_ == 0);
  3473. if (!descriptor_log_ ||
  3474. manifest_file_size_ > db_options_->max_manifest_file_size) {
  3475. TEST_SYNC_POINT("VersionSet::ProcessManifestWrites:BeforeNewManifest");
  3476. new_descriptor_log = true;
  3477. } else {
  3478. pending_manifest_file_number_ = manifest_file_number_;
  3479. }
  3480. // Local cached copy of state variable(s). WriteCurrentStateToManifest()
  3481. // reads its content after releasing db mutex to avoid race with
  3482. // SwitchMemtable().
  3483. std::unordered_map<uint32_t, MutableCFState> curr_state;
  3484. if (new_descriptor_log) {
  3485. pending_manifest_file_number_ = NewFileNumber();
  3486. batch_edits.back()->SetNextFile(next_file_number_.load());
  3487. // if we are writing out new snapshot make sure to persist max column
  3488. // family.
  3489. if (column_family_set_->GetMaxColumnFamily() > 0) {
  3490. first_writer.edit_list.front()->SetMaxColumnFamily(
  3491. column_family_set_->GetMaxColumnFamily());
  3492. }
  3493. for (const auto* cfd : *column_family_set_) {
  3494. assert(curr_state.find(cfd->GetID()) == curr_state.end());
  3495. curr_state[cfd->GetID()] = {cfd->GetLogNumber()};
  3496. }
  3497. }
  3498. {
  3499. FileOptions opt_file_opts = fs_->OptimizeForManifestWrite(file_options_);
  3500. mu->Unlock();
  3501. TEST_SYNC_POINT("VersionSet::LogAndApply:WriteManifest");
  3502. if (!first_writer.edit_list.front()->IsColumnFamilyManipulation()) {
  3503. for (int i = 0; i < static_cast<int>(versions.size()); ++i) {
  3504. assert(!builder_guards.empty() &&
  3505. builder_guards.size() == versions.size());
  3506. assert(!mutable_cf_options_ptrs.empty() &&
  3507. builder_guards.size() == versions.size());
  3508. ColumnFamilyData* cfd = versions[i]->cfd_;
  3509. s = builder_guards[i]->version_builder()->LoadTableHandlers(
  3510. cfd->internal_stats(), cfd->ioptions()->optimize_filters_for_hits,
  3511. true /* prefetch_index_and_filter_in_cache */,
  3512. false /* is_initial_load */,
  3513. mutable_cf_options_ptrs[i]->prefix_extractor.get());
  3514. if (!s.ok()) {
  3515. if (db_options_->paranoid_checks) {
  3516. break;
  3517. }
  3518. s = Status::OK();
  3519. }
  3520. }
  3521. }
  3522. if (s.ok() && new_descriptor_log) {
  3523. // This is fine because everything inside of this block is serialized --
  3524. // only one thread can be here at the same time
  3525. // create new manifest file
  3526. ROCKS_LOG_INFO(db_options_->info_log, "Creating manifest %" PRIu64 "\n",
  3527. pending_manifest_file_number_);
  3528. std::string descriptor_fname =
  3529. DescriptorFileName(dbname_, pending_manifest_file_number_);
  3530. std::unique_ptr<FSWritableFile> descriptor_file;
  3531. s = NewWritableFile(fs_, descriptor_fname, &descriptor_file,
  3532. opt_file_opts);
  3533. if (s.ok()) {
  3534. descriptor_file->SetPreallocationBlockSize(
  3535. db_options_->manifest_preallocation_size);
  3536. std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
  3537. std::move(descriptor_file), descriptor_fname, opt_file_opts, env_,
  3538. nullptr, db_options_->listeners));
  3539. descriptor_log_.reset(
  3540. new log::Writer(std::move(file_writer), 0, false));
  3541. s = WriteCurrentStateToManifest(curr_state, descriptor_log_.get());
  3542. }
  3543. }
  3544. if (s.ok()) {
  3545. if (!first_writer.edit_list.front()->IsColumnFamilyManipulation()) {
  3546. for (int i = 0; i < static_cast<int>(versions.size()); ++i) {
  3547. versions[i]->PrepareApply(*mutable_cf_options_ptrs[i], true);
  3548. }
  3549. }
  3550. // Write new records to MANIFEST log
  3551. #ifndef NDEBUG
  3552. size_t idx = 0;
  3553. #endif
  3554. for (auto& e : batch_edits) {
  3555. std::string record;
  3556. if (!e->EncodeTo(&record)) {
  3557. s = Status::Corruption("Unable to encode VersionEdit:" +
  3558. e->DebugString(true));
  3559. break;
  3560. }
  3561. TEST_KILL_RANDOM("VersionSet::LogAndApply:BeforeAddRecord",
  3562. rocksdb_kill_odds * REDUCE_ODDS2);
  3563. #ifndef NDEBUG
  3564. if (batch_edits.size() > 1 && batch_edits.size() - 1 == idx) {
  3565. TEST_SYNC_POINT_CALLBACK(
  3566. "VersionSet::ProcessManifestWrites:BeforeWriteLastVersionEdit:0",
  3567. nullptr);
  3568. TEST_SYNC_POINT(
  3569. "VersionSet::ProcessManifestWrites:BeforeWriteLastVersionEdit:1");
  3570. }
  3571. ++idx;
  3572. #endif /* !NDEBUG */
  3573. s = descriptor_log_->AddRecord(record);
  3574. if (!s.ok()) {
  3575. break;
  3576. }
  3577. }
  3578. if (s.ok()) {
  3579. s = SyncManifest(env_, db_options_, descriptor_log_->file());
  3580. }
  3581. if (!s.ok()) {
  3582. ROCKS_LOG_ERROR(db_options_->info_log, "MANIFEST write %s\n",
  3583. s.ToString().c_str());
  3584. }
  3585. }
  3586. // If we just created a new descriptor file, install it by writing a
  3587. // new CURRENT file that points to it.
  3588. if (s.ok() && new_descriptor_log) {
  3589. s = SetCurrentFile(env_, dbname_, pending_manifest_file_number_,
  3590. db_directory);
  3591. TEST_SYNC_POINT("VersionSet::ProcessManifestWrites:AfterNewManifest");
  3592. }
  3593. if (s.ok()) {
  3594. // find offset in manifest file where this version is stored.
  3595. new_manifest_file_size = descriptor_log_->file()->GetFileSize();
  3596. }
  3597. if (first_writer.edit_list.front()->is_column_family_drop_) {
  3598. TEST_SYNC_POINT("VersionSet::LogAndApply::ColumnFamilyDrop:0");
  3599. TEST_SYNC_POINT("VersionSet::LogAndApply::ColumnFamilyDrop:1");
  3600. TEST_SYNC_POINT("VersionSet::LogAndApply::ColumnFamilyDrop:2");
  3601. }
  3602. LogFlush(db_options_->info_log);
  3603. TEST_SYNC_POINT("VersionSet::LogAndApply:WriteManifestDone");
  3604. mu->Lock();
  3605. }
  3606. // Append the old manifest file to the obsolete_manifest_ list to be deleted
  3607. // by PurgeObsoleteFiles later.
  3608. if (s.ok() && new_descriptor_log) {
  3609. obsolete_manifests_.emplace_back(
  3610. DescriptorFileName("", manifest_file_number_));
  3611. }
  3612. // Install the new versions
  3613. if (s.ok()) {
  3614. if (first_writer.edit_list.front()->is_column_family_add_) {
  3615. assert(batch_edits.size() == 1);
  3616. assert(new_cf_options != nullptr);
  3617. CreateColumnFamily(*new_cf_options, first_writer.edit_list.front());
  3618. } else if (first_writer.edit_list.front()->is_column_family_drop_) {
  3619. assert(batch_edits.size() == 1);
  3620. first_writer.cfd->SetDropped();
  3621. first_writer.cfd->UnrefAndTryDelete();
  3622. } else {
  3623. // Each version in versions corresponds to a column family.
  3624. // For each column family, update its log number indicating that logs
  3625. // with number smaller than this should be ignored.
  3626. for (const auto version : versions) {
  3627. uint64_t max_log_number_in_batch = 0;
  3628. uint32_t cf_id = version->cfd_->GetID();
  3629. for (const auto& e : batch_edits) {
  3630. if (e->has_log_number_ && e->column_family_ == cf_id) {
  3631. max_log_number_in_batch =
  3632. std::max(max_log_number_in_batch, e->log_number_);
  3633. }
  3634. }
  3635. if (max_log_number_in_batch != 0) {
  3636. assert(version->cfd_->GetLogNumber() <= max_log_number_in_batch);
  3637. version->cfd_->SetLogNumber(max_log_number_in_batch);
  3638. }
  3639. }
  3640. uint64_t last_min_log_number_to_keep = 0;
  3641. for (auto& e : batch_edits) {
  3642. if (e->has_min_log_number_to_keep_) {
  3643. last_min_log_number_to_keep =
  3644. std::max(last_min_log_number_to_keep, e->min_log_number_to_keep_);
  3645. }
  3646. }
  3647. if (last_min_log_number_to_keep != 0) {
  3648. // Should only be set in 2PC mode.
  3649. MarkMinLogNumberToKeep2PC(last_min_log_number_to_keep);
  3650. }
  3651. for (int i = 0; i < static_cast<int>(versions.size()); ++i) {
  3652. ColumnFamilyData* cfd = versions[i]->cfd_;
  3653. AppendVersion(cfd, versions[i]);
  3654. }
  3655. }
  3656. manifest_file_number_ = pending_manifest_file_number_;
  3657. manifest_file_size_ = new_manifest_file_size;
  3658. prev_log_number_ = first_writer.edit_list.front()->prev_log_number_;
  3659. } else {
  3660. std::string version_edits;
  3661. for (auto& e : batch_edits) {
  3662. version_edits += ("\n" + e->DebugString(true));
  3663. }
  3664. ROCKS_LOG_ERROR(db_options_->info_log,
  3665. "Error in committing version edit to MANIFEST: %s",
  3666. version_edits.c_str());
  3667. for (auto v : versions) {
  3668. delete v;
  3669. }
  3670. // If manifest append failed for whatever reason, the file could be
  3671. // corrupted. So we need to force the next version update to start a
  3672. // new manifest file.
  3673. descriptor_log_.reset();
  3674. if (new_descriptor_log) {
  3675. ROCKS_LOG_INFO(db_options_->info_log,
  3676. "Deleting manifest %" PRIu64 " current manifest %" PRIu64
  3677. "\n",
  3678. manifest_file_number_, pending_manifest_file_number_);
  3679. env_->DeleteFile(
  3680. DescriptorFileName(dbname_, pending_manifest_file_number_));
  3681. }
  3682. }
  3683. pending_manifest_file_number_ = 0;
  3684. // wake up all the waiting writers
  3685. while (true) {
  3686. ManifestWriter* ready = manifest_writers_.front();
  3687. manifest_writers_.pop_front();
  3688. bool need_signal = true;
  3689. for (const auto& w : writers) {
  3690. if (&w == ready) {
  3691. need_signal = false;
  3692. break;
  3693. }
  3694. }
  3695. ready->status = s;
  3696. ready->done = true;
  3697. if (need_signal) {
  3698. ready->cv.Signal();
  3699. }
  3700. if (ready == last_writer) {
  3701. break;
  3702. }
  3703. }
  3704. if (!manifest_writers_.empty()) {
  3705. manifest_writers_.front()->cv.Signal();
  3706. }
  3707. return s;
  3708. }
  3709. // 'datas' is gramatically incorrect. We still use this notation to indicate
  3710. // that this variable represents a collection of column_family_data.
  3711. Status VersionSet::LogAndApply(
  3712. const autovector<ColumnFamilyData*>& column_family_datas,
  3713. const autovector<const MutableCFOptions*>& mutable_cf_options_list,
  3714. const autovector<autovector<VersionEdit*>>& edit_lists,
  3715. InstrumentedMutex* mu, Directory* db_directory, bool new_descriptor_log,
  3716. const ColumnFamilyOptions* new_cf_options) {
  3717. mu->AssertHeld();
  3718. int num_edits = 0;
  3719. for (const auto& elist : edit_lists) {
  3720. num_edits += static_cast<int>(elist.size());
  3721. }
  3722. if (num_edits == 0) {
  3723. return Status::OK();
  3724. } else if (num_edits > 1) {
  3725. #ifndef NDEBUG
  3726. for (const auto& edit_list : edit_lists) {
  3727. for (const auto& edit : edit_list) {
  3728. assert(!edit->IsColumnFamilyManipulation());
  3729. }
  3730. }
  3731. #endif /* ! NDEBUG */
  3732. }
  3733. int num_cfds = static_cast<int>(column_family_datas.size());
  3734. if (num_cfds == 1 && column_family_datas[0] == nullptr) {
  3735. assert(edit_lists.size() == 1 && edit_lists[0].size() == 1);
  3736. assert(edit_lists[0][0]->is_column_family_add_);
  3737. assert(new_cf_options != nullptr);
  3738. }
  3739. std::deque<ManifestWriter> writers;
  3740. if (num_cfds > 0) {
  3741. assert(static_cast<size_t>(num_cfds) == mutable_cf_options_list.size());
  3742. assert(static_cast<size_t>(num_cfds) == edit_lists.size());
  3743. }
  3744. for (int i = 0; i < num_cfds; ++i) {
  3745. writers.emplace_back(mu, column_family_datas[i],
  3746. *mutable_cf_options_list[i], edit_lists[i]);
  3747. manifest_writers_.push_back(&writers[i]);
  3748. }
  3749. assert(!writers.empty());
  3750. ManifestWriter& first_writer = writers.front();
  3751. while (!first_writer.done && &first_writer != manifest_writers_.front()) {
  3752. first_writer.cv.Wait();
  3753. }
  3754. if (first_writer.done) {
  3755. // All non-CF-manipulation operations can be grouped together and committed
  3756. // to MANIFEST. They should all have finished. The status code is stored in
  3757. // the first manifest writer.
  3758. #ifndef NDEBUG
  3759. for (const auto& writer : writers) {
  3760. assert(writer.done);
  3761. }
  3762. #endif /* !NDEBUG */
  3763. return first_writer.status;
  3764. }
  3765. int num_undropped_cfds = 0;
  3766. for (auto cfd : column_family_datas) {
  3767. // if cfd == nullptr, it is a column family add.
  3768. if (cfd == nullptr || !cfd->IsDropped()) {
  3769. ++num_undropped_cfds;
  3770. }
  3771. }
  3772. if (0 == num_undropped_cfds) {
  3773. for (int i = 0; i != num_cfds; ++i) {
  3774. manifest_writers_.pop_front();
  3775. }
  3776. // Notify new head of manifest write queue.
  3777. if (!manifest_writers_.empty()) {
  3778. manifest_writers_.front()->cv.Signal();
  3779. }
  3780. return Status::ColumnFamilyDropped();
  3781. }
  3782. return ProcessManifestWrites(writers, mu, db_directory, new_descriptor_log,
  3783. new_cf_options);
  3784. }
  3785. void VersionSet::LogAndApplyCFHelper(VersionEdit* edit) {
  3786. assert(edit->IsColumnFamilyManipulation());
  3787. edit->SetNextFile(next_file_number_.load());
  3788. // The log might have data that is not visible to memtbale and hence have not
  3789. // updated the last_sequence_ yet. It is also possible that the log has is
  3790. // expecting some new data that is not written yet. Since LastSequence is an
  3791. // upper bound on the sequence, it is ok to record
  3792. // last_allocated_sequence_ as the last sequence.
  3793. edit->SetLastSequence(db_options_->two_write_queues ? last_allocated_sequence_
  3794. : last_sequence_);
  3795. if (edit->is_column_family_drop_) {
  3796. // if we drop column family, we have to make sure to save max column family,
  3797. // so that we don't reuse existing ID
  3798. edit->SetMaxColumnFamily(column_family_set_->GetMaxColumnFamily());
  3799. }
  3800. }
  3801. Status VersionSet::LogAndApplyHelper(ColumnFamilyData* cfd,
  3802. VersionBuilder* builder, VersionEdit* edit,
  3803. InstrumentedMutex* mu) {
  3804. #ifdef NDEBUG
  3805. (void)cfd;
  3806. #endif
  3807. mu->AssertHeld();
  3808. assert(!edit->IsColumnFamilyManipulation());
  3809. if (edit->has_log_number_) {
  3810. assert(edit->log_number_ >= cfd->GetLogNumber());
  3811. assert(edit->log_number_ < next_file_number_.load());
  3812. }
  3813. if (!edit->has_prev_log_number_) {
  3814. edit->SetPrevLogNumber(prev_log_number_);
  3815. }
  3816. edit->SetNextFile(next_file_number_.load());
  3817. // The log might have data that is not visible to memtbale and hence have not
  3818. // updated the last_sequence_ yet. It is also possible that the log has is
  3819. // expecting some new data that is not written yet. Since LastSequence is an
  3820. // upper bound on the sequence, it is ok to record
  3821. // last_allocated_sequence_ as the last sequence.
  3822. edit->SetLastSequence(db_options_->two_write_queues ? last_allocated_sequence_
  3823. : last_sequence_);
  3824. Status s = builder->Apply(edit);
  3825. return s;
  3826. }
  3827. Status VersionSet::ApplyOneVersionEditToBuilder(
  3828. VersionEdit& edit,
  3829. const std::unordered_map<std::string, ColumnFamilyOptions>& name_to_options,
  3830. std::unordered_map<int, std::string>& column_families_not_found,
  3831. std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>&
  3832. builders,
  3833. VersionEditParams* version_edit_params) {
  3834. // Not found means that user didn't supply that column
  3835. // family option AND we encountered column family add
  3836. // record. Once we encounter column family drop record,
  3837. // we will delete the column family from
  3838. // column_families_not_found.
  3839. bool cf_in_not_found = (column_families_not_found.find(edit.column_family_) !=
  3840. column_families_not_found.end());
  3841. // in builders means that user supplied that column family
  3842. // option AND that we encountered column family add record
  3843. bool cf_in_builders = builders.find(edit.column_family_) != builders.end();
  3844. // they can't both be true
  3845. assert(!(cf_in_not_found && cf_in_builders));
  3846. ColumnFamilyData* cfd = nullptr;
  3847. if (edit.is_column_family_add_) {
  3848. if (cf_in_builders || cf_in_not_found) {
  3849. return Status::Corruption(
  3850. "Manifest adding the same column family twice: " +
  3851. edit.column_family_name_);
  3852. }
  3853. auto cf_options = name_to_options.find(edit.column_family_name_);
  3854. // implicitly add persistent_stats column family without requiring user
  3855. // to specify
  3856. bool is_persistent_stats_column_family =
  3857. edit.column_family_name_.compare(kPersistentStatsColumnFamilyName) == 0;
  3858. if (cf_options == name_to_options.end() &&
  3859. !is_persistent_stats_column_family) {
  3860. column_families_not_found.insert(
  3861. {edit.column_family_, edit.column_family_name_});
  3862. } else {
  3863. // recover persistent_stats CF from a DB that already contains it
  3864. if (is_persistent_stats_column_family) {
  3865. ColumnFamilyOptions cfo;
  3866. OptimizeForPersistentStats(&cfo);
  3867. cfd = CreateColumnFamily(cfo, &edit);
  3868. } else {
  3869. cfd = CreateColumnFamily(cf_options->second, &edit);
  3870. }
  3871. cfd->set_initialized();
  3872. builders.insert(std::make_pair(
  3873. edit.column_family_, std::unique_ptr<BaseReferencedVersionBuilder>(
  3874. new BaseReferencedVersionBuilder(cfd))));
  3875. }
  3876. } else if (edit.is_column_family_drop_) {
  3877. if (cf_in_builders) {
  3878. auto builder = builders.find(edit.column_family_);
  3879. assert(builder != builders.end());
  3880. builders.erase(builder);
  3881. cfd = column_family_set_->GetColumnFamily(edit.column_family_);
  3882. assert(cfd != nullptr);
  3883. if (cfd->UnrefAndTryDelete()) {
  3884. cfd = nullptr;
  3885. } else {
  3886. // who else can have reference to cfd!?
  3887. assert(false);
  3888. }
  3889. } else if (cf_in_not_found) {
  3890. column_families_not_found.erase(edit.column_family_);
  3891. } else {
  3892. return Status::Corruption(
  3893. "Manifest - dropping non-existing column family");
  3894. }
  3895. } else if (!cf_in_not_found) {
  3896. if (!cf_in_builders) {
  3897. return Status::Corruption(
  3898. "Manifest record referencing unknown column family");
  3899. }
  3900. cfd = column_family_set_->GetColumnFamily(edit.column_family_);
  3901. // this should never happen since cf_in_builders is true
  3902. assert(cfd != nullptr);
  3903. // if it is not column family add or column family drop,
  3904. // then it's a file add/delete, which should be forwarded
  3905. // to builder
  3906. auto builder = builders.find(edit.column_family_);
  3907. assert(builder != builders.end());
  3908. Status s = builder->second->version_builder()->Apply(&edit);
  3909. if (!s.ok()) {
  3910. return s;
  3911. }
  3912. }
  3913. return ExtractInfoFromVersionEdit(cfd, edit, version_edit_params);
  3914. }
  3915. Status VersionSet::ExtractInfoFromVersionEdit(
  3916. ColumnFamilyData* cfd, const VersionEdit& from_edit,
  3917. VersionEditParams* version_edit_params) {
  3918. if (cfd != nullptr) {
  3919. if (from_edit.has_db_id_) {
  3920. version_edit_params->SetDBId(from_edit.db_id_);
  3921. }
  3922. if (from_edit.has_log_number_) {
  3923. if (cfd->GetLogNumber() > from_edit.log_number_) {
  3924. ROCKS_LOG_WARN(
  3925. db_options_->info_log,
  3926. "MANIFEST corruption detected, but ignored - Log numbers in "
  3927. "records NOT monotonically increasing");
  3928. } else {
  3929. cfd->SetLogNumber(from_edit.log_number_);
  3930. version_edit_params->SetLogNumber(from_edit.log_number_);
  3931. }
  3932. }
  3933. if (from_edit.has_comparator_ &&
  3934. from_edit.comparator_ != cfd->user_comparator()->Name()) {
  3935. return Status::InvalidArgument(
  3936. cfd->user_comparator()->Name(),
  3937. "does not match existing comparator " + from_edit.comparator_);
  3938. }
  3939. }
  3940. if (from_edit.has_prev_log_number_) {
  3941. version_edit_params->SetPrevLogNumber(from_edit.prev_log_number_);
  3942. }
  3943. if (from_edit.has_next_file_number_) {
  3944. version_edit_params->SetNextFile(from_edit.next_file_number_);
  3945. }
  3946. if (from_edit.has_max_column_family_) {
  3947. version_edit_params->SetMaxColumnFamily(from_edit.max_column_family_);
  3948. }
  3949. if (from_edit.has_min_log_number_to_keep_) {
  3950. version_edit_params->min_log_number_to_keep_ =
  3951. std::max(version_edit_params->min_log_number_to_keep_,
  3952. from_edit.min_log_number_to_keep_);
  3953. }
  3954. if (from_edit.has_last_sequence_) {
  3955. version_edit_params->SetLastSequence(from_edit.last_sequence_);
  3956. }
  3957. return Status::OK();
  3958. }
  3959. Status VersionSet::GetCurrentManifestPath(const std::string& dbname,
  3960. FileSystem* fs,
  3961. std::string* manifest_path,
  3962. uint64_t* manifest_file_number) {
  3963. assert(fs != nullptr);
  3964. assert(manifest_path != nullptr);
  3965. assert(manifest_file_number != nullptr);
  3966. std::string fname;
  3967. Status s = ReadFileToString(fs, CurrentFileName(dbname), &fname);
  3968. if (!s.ok()) {
  3969. return s;
  3970. }
  3971. if (fname.empty() || fname.back() != '\n') {
  3972. return Status::Corruption("CURRENT file does not end with newline");
  3973. }
  3974. // remove the trailing '\n'
  3975. fname.resize(fname.size() - 1);
  3976. FileType type;
  3977. bool parse_ok = ParseFileName(fname, manifest_file_number, &type);
  3978. if (!parse_ok || type != kDescriptorFile) {
  3979. return Status::Corruption("CURRENT file corrupted");
  3980. }
  3981. *manifest_path = dbname;
  3982. if (dbname.back() != '/') {
  3983. manifest_path->push_back('/');
  3984. }
  3985. *manifest_path += fname;
  3986. return Status::OK();
  3987. }
  3988. Status VersionSet::ReadAndRecover(
  3989. log::Reader* reader, AtomicGroupReadBuffer* read_buffer,
  3990. const std::unordered_map<std::string, ColumnFamilyOptions>& name_to_options,
  3991. std::unordered_map<int, std::string>& column_families_not_found,
  3992. std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>&
  3993. builders,
  3994. VersionEditParams* version_edit_params, std::string* db_id) {
  3995. assert(reader != nullptr);
  3996. assert(read_buffer != nullptr);
  3997. Status s;
  3998. Slice record;
  3999. std::string scratch;
  4000. size_t recovered_edits = 0;
  4001. while (reader->ReadRecord(&record, &scratch) && s.ok()) {
  4002. VersionEdit edit;
  4003. s = edit.DecodeFrom(record);
  4004. if (!s.ok()) {
  4005. break;
  4006. }
  4007. if (edit.has_db_id_) {
  4008. db_id_ = edit.GetDbId();
  4009. if (db_id != nullptr) {
  4010. db_id->assign(edit.GetDbId());
  4011. }
  4012. }
  4013. s = read_buffer->AddEdit(&edit);
  4014. if (!s.ok()) {
  4015. break;
  4016. }
  4017. if (edit.is_in_atomic_group_) {
  4018. if (read_buffer->IsFull()) {
  4019. // Apply edits in an atomic group when we have read all edits in the
  4020. // group.
  4021. for (auto& e : read_buffer->replay_buffer()) {
  4022. s = ApplyOneVersionEditToBuilder(e, name_to_options,
  4023. column_families_not_found, builders,
  4024. version_edit_params);
  4025. if (!s.ok()) {
  4026. break;
  4027. }
  4028. recovered_edits++;
  4029. }
  4030. if (!s.ok()) {
  4031. break;
  4032. }
  4033. read_buffer->Clear();
  4034. }
  4035. } else {
  4036. // Apply a normal edit immediately.
  4037. s = ApplyOneVersionEditToBuilder(edit, name_to_options,
  4038. column_families_not_found, builders,
  4039. version_edit_params);
  4040. if (s.ok()) {
  4041. recovered_edits++;
  4042. }
  4043. }
  4044. }
  4045. if (!s.ok()) {
  4046. // Clear the buffer if we fail to decode/apply an edit.
  4047. read_buffer->Clear();
  4048. }
  4049. TEST_SYNC_POINT_CALLBACK("VersionSet::ReadAndRecover:RecoveredEdits",
  4050. &recovered_edits);
  4051. return s;
  4052. }
  4053. Status VersionSet::Recover(
  4054. const std::vector<ColumnFamilyDescriptor>& column_families, bool read_only,
  4055. std::string* db_id) {
  4056. std::unordered_map<std::string, ColumnFamilyOptions> cf_name_to_options;
  4057. for (const auto& cf : column_families) {
  4058. cf_name_to_options.emplace(cf.name, cf.options);
  4059. }
  4060. // keeps track of column families in manifest that were not found in
  4061. // column families parameters. if those column families are not dropped
  4062. // by subsequent manifest records, Recover() will return failure status
  4063. std::unordered_map<int, std::string> column_families_not_found;
  4064. // Read "CURRENT" file, which contains a pointer to the current manifest file
  4065. std::string manifest_path;
  4066. Status s = GetCurrentManifestPath(dbname_, fs_, &manifest_path,
  4067. &manifest_file_number_);
  4068. if (!s.ok()) {
  4069. return s;
  4070. }
  4071. ROCKS_LOG_INFO(db_options_->info_log, "Recovering from manifest file: %s\n",
  4072. manifest_path.c_str());
  4073. std::unique_ptr<SequentialFileReader> manifest_file_reader;
  4074. {
  4075. std::unique_ptr<FSSequentialFile> manifest_file;
  4076. s = fs_->NewSequentialFile(manifest_path,
  4077. fs_->OptimizeForManifestRead(file_options_),
  4078. &manifest_file, nullptr);
  4079. if (!s.ok()) {
  4080. return s;
  4081. }
  4082. manifest_file_reader.reset(
  4083. new SequentialFileReader(std::move(manifest_file), manifest_path,
  4084. db_options_->log_readahead_size));
  4085. }
  4086. std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>
  4087. builders;
  4088. // add default column family
  4089. auto default_cf_iter = cf_name_to_options.find(kDefaultColumnFamilyName);
  4090. if (default_cf_iter == cf_name_to_options.end()) {
  4091. return Status::InvalidArgument("Default column family not specified");
  4092. }
  4093. VersionEdit default_cf_edit;
  4094. default_cf_edit.AddColumnFamily(kDefaultColumnFamilyName);
  4095. default_cf_edit.SetColumnFamily(0);
  4096. ColumnFamilyData* default_cfd =
  4097. CreateColumnFamily(default_cf_iter->second, &default_cf_edit);
  4098. // In recovery, nobody else can access it, so it's fine to set it to be
  4099. // initialized earlier.
  4100. default_cfd->set_initialized();
  4101. builders.insert(
  4102. std::make_pair(0, std::unique_ptr<BaseReferencedVersionBuilder>(
  4103. new BaseReferencedVersionBuilder(default_cfd))));
  4104. uint64_t current_manifest_file_size = 0;
  4105. VersionEditParams version_edit_params;
  4106. {
  4107. VersionSet::LogReporter reporter;
  4108. reporter.status = &s;
  4109. log::Reader reader(nullptr, std::move(manifest_file_reader), &reporter,
  4110. true /* checksum */, 0 /* log_number */);
  4111. Slice record;
  4112. std::string scratch;
  4113. AtomicGroupReadBuffer read_buffer;
  4114. s = ReadAndRecover(&reader, &read_buffer, cf_name_to_options,
  4115. column_families_not_found, builders,
  4116. &version_edit_params, db_id);
  4117. current_manifest_file_size = reader.GetReadOffset();
  4118. assert(current_manifest_file_size != 0);
  4119. }
  4120. if (s.ok()) {
  4121. if (!version_edit_params.has_next_file_number_) {
  4122. s = Status::Corruption("no meta-nextfile entry in descriptor");
  4123. } else if (!version_edit_params.has_log_number_) {
  4124. s = Status::Corruption("no meta-lognumber entry in descriptor");
  4125. } else if (!version_edit_params.has_last_sequence_) {
  4126. s = Status::Corruption("no last-sequence-number entry in descriptor");
  4127. }
  4128. if (!version_edit_params.has_prev_log_number_) {
  4129. version_edit_params.SetPrevLogNumber(0);
  4130. }
  4131. column_family_set_->UpdateMaxColumnFamily(
  4132. version_edit_params.max_column_family_);
  4133. // When reading DB generated using old release, min_log_number_to_keep=0.
  4134. // All log files will be scanned for potential prepare entries.
  4135. MarkMinLogNumberToKeep2PC(version_edit_params.min_log_number_to_keep_);
  4136. MarkFileNumberUsed(version_edit_params.prev_log_number_);
  4137. MarkFileNumberUsed(version_edit_params.log_number_);
  4138. }
  4139. // there were some column families in the MANIFEST that weren't specified
  4140. // in the argument. This is OK in read_only mode
  4141. if (read_only == false && !column_families_not_found.empty()) {
  4142. std::string list_of_not_found;
  4143. for (const auto& cf : column_families_not_found) {
  4144. list_of_not_found += ", " + cf.second;
  4145. }
  4146. list_of_not_found = list_of_not_found.substr(2);
  4147. s = Status::InvalidArgument(
  4148. "You have to open all column families. Column families not opened: " +
  4149. list_of_not_found);
  4150. }
  4151. if (s.ok()) {
  4152. for (auto cfd : *column_family_set_) {
  4153. assert(builders.count(cfd->GetID()) > 0);
  4154. auto* builder = builders[cfd->GetID()]->version_builder();
  4155. if (!builder->CheckConsistencyForNumLevels()) {
  4156. s = Status::InvalidArgument(
  4157. "db has more levels than options.num_levels");
  4158. break;
  4159. }
  4160. }
  4161. }
  4162. if (s.ok()) {
  4163. for (auto cfd : *column_family_set_) {
  4164. if (cfd->IsDropped()) {
  4165. continue;
  4166. }
  4167. if (read_only) {
  4168. cfd->table_cache()->SetTablesAreImmortal();
  4169. }
  4170. assert(cfd->initialized());
  4171. auto builders_iter = builders.find(cfd->GetID());
  4172. assert(builders_iter != builders.end());
  4173. auto builder = builders_iter->second->version_builder();
  4174. // unlimited table cache. Pre-load table handle now.
  4175. // Need to do it out of the mutex.
  4176. s = builder->LoadTableHandlers(
  4177. cfd->internal_stats(), db_options_->max_file_opening_threads,
  4178. false /* prefetch_index_and_filter_in_cache */,
  4179. true /* is_initial_load */,
  4180. cfd->GetLatestMutableCFOptions()->prefix_extractor.get());
  4181. if (!s.ok()) {
  4182. if (db_options_->paranoid_checks) {
  4183. return s;
  4184. }
  4185. s = Status::OK();
  4186. }
  4187. Version* v = new Version(cfd, this, file_options_,
  4188. *cfd->GetLatestMutableCFOptions(),
  4189. current_version_number_++);
  4190. builder->SaveTo(v->storage_info());
  4191. // Install recovered version
  4192. v->PrepareApply(*cfd->GetLatestMutableCFOptions(),
  4193. !(db_options_->skip_stats_update_on_db_open));
  4194. AppendVersion(cfd, v);
  4195. }
  4196. manifest_file_size_ = current_manifest_file_size;
  4197. next_file_number_.store(version_edit_params.next_file_number_ + 1);
  4198. last_allocated_sequence_ = version_edit_params.last_sequence_;
  4199. last_published_sequence_ = version_edit_params.last_sequence_;
  4200. last_sequence_ = version_edit_params.last_sequence_;
  4201. prev_log_number_ = version_edit_params.prev_log_number_;
  4202. ROCKS_LOG_INFO(
  4203. db_options_->info_log,
  4204. "Recovered from manifest file:%s succeeded,"
  4205. "manifest_file_number is %" PRIu64 ", next_file_number is %" PRIu64
  4206. ", last_sequence is %" PRIu64 ", log_number is %" PRIu64
  4207. ",prev_log_number is %" PRIu64 ",max_column_family is %" PRIu32
  4208. ",min_log_number_to_keep is %" PRIu64 "\n",
  4209. manifest_path.c_str(), manifest_file_number_, next_file_number_.load(),
  4210. last_sequence_.load(), version_edit_params.log_number_,
  4211. prev_log_number_, column_family_set_->GetMaxColumnFamily(),
  4212. min_log_number_to_keep_2pc());
  4213. for (auto cfd : *column_family_set_) {
  4214. if (cfd->IsDropped()) {
  4215. continue;
  4216. }
  4217. ROCKS_LOG_INFO(db_options_->info_log,
  4218. "Column family [%s] (ID %" PRIu32
  4219. "), log number is %" PRIu64 "\n",
  4220. cfd->GetName().c_str(), cfd->GetID(), cfd->GetLogNumber());
  4221. }
  4222. }
  4223. return s;
  4224. }
  4225. Status VersionSet::ListColumnFamilies(std::vector<std::string>* column_families,
  4226. const std::string& dbname,
  4227. FileSystem* fs) {
  4228. // these are just for performance reasons, not correcntes,
  4229. // so we're fine using the defaults
  4230. FileOptions soptions;
  4231. // Read "CURRENT" file, which contains a pointer to the current manifest file
  4232. std::string manifest_path;
  4233. uint64_t manifest_file_number;
  4234. Status s =
  4235. GetCurrentManifestPath(dbname, fs, &manifest_path, &manifest_file_number);
  4236. if (!s.ok()) {
  4237. return s;
  4238. }
  4239. std::unique_ptr<SequentialFileReader> file_reader;
  4240. {
  4241. std::unique_ptr<FSSequentialFile> file;
  4242. s = fs->NewSequentialFile(manifest_path, soptions, &file, nullptr);
  4243. if (!s.ok()) {
  4244. return s;
  4245. }
  4246. file_reader.reset(new SequentialFileReader(std::move(file), manifest_path));
  4247. }
  4248. std::map<uint32_t, std::string> column_family_names;
  4249. // default column family is always implicitly there
  4250. column_family_names.insert({0, kDefaultColumnFamilyName});
  4251. VersionSet::LogReporter reporter;
  4252. reporter.status = &s;
  4253. log::Reader reader(nullptr, std::move(file_reader), &reporter,
  4254. true /* checksum */, 0 /* log_number */);
  4255. Slice record;
  4256. std::string scratch;
  4257. while (reader.ReadRecord(&record, &scratch) && s.ok()) {
  4258. VersionEdit edit;
  4259. s = edit.DecodeFrom(record);
  4260. if (!s.ok()) {
  4261. break;
  4262. }
  4263. if (edit.is_column_family_add_) {
  4264. if (column_family_names.find(edit.column_family_) !=
  4265. column_family_names.end()) {
  4266. s = Status::Corruption("Manifest adding the same column family twice");
  4267. break;
  4268. }
  4269. column_family_names.insert(
  4270. {edit.column_family_, edit.column_family_name_});
  4271. } else if (edit.is_column_family_drop_) {
  4272. if (column_family_names.find(edit.column_family_) ==
  4273. column_family_names.end()) {
  4274. s = Status::Corruption(
  4275. "Manifest - dropping non-existing column family");
  4276. break;
  4277. }
  4278. column_family_names.erase(edit.column_family_);
  4279. }
  4280. }
  4281. column_families->clear();
  4282. if (s.ok()) {
  4283. for (const auto& iter : column_family_names) {
  4284. column_families->push_back(iter.second);
  4285. }
  4286. }
  4287. return s;
  4288. }
  4289. #ifndef ROCKSDB_LITE
  4290. Status VersionSet::ReduceNumberOfLevels(const std::string& dbname,
  4291. const Options* options,
  4292. const FileOptions& file_options,
  4293. int new_levels) {
  4294. if (new_levels <= 1) {
  4295. return Status::InvalidArgument(
  4296. "Number of levels needs to be bigger than 1");
  4297. }
  4298. ImmutableDBOptions db_options(*options);
  4299. ColumnFamilyOptions cf_options(*options);
  4300. std::shared_ptr<Cache> tc(NewLRUCache(options->max_open_files - 10,
  4301. options->table_cache_numshardbits));
  4302. WriteController wc(options->delayed_write_rate);
  4303. WriteBufferManager wb(options->db_write_buffer_size);
  4304. VersionSet versions(dbname, &db_options, file_options, tc.get(), &wb, &wc,
  4305. /*block_cache_tracer=*/nullptr);
  4306. Status status;
  4307. std::vector<ColumnFamilyDescriptor> dummy;
  4308. ColumnFamilyDescriptor dummy_descriptor(kDefaultColumnFamilyName,
  4309. ColumnFamilyOptions(*options));
  4310. dummy.push_back(dummy_descriptor);
  4311. status = versions.Recover(dummy);
  4312. if (!status.ok()) {
  4313. return status;
  4314. }
  4315. Version* current_version =
  4316. versions.GetColumnFamilySet()->GetDefault()->current();
  4317. auto* vstorage = current_version->storage_info();
  4318. int current_levels = vstorage->num_levels();
  4319. if (current_levels <= new_levels) {
  4320. return Status::OK();
  4321. }
  4322. // Make sure there are file only on one level from
  4323. // (new_levels-1) to (current_levels-1)
  4324. int first_nonempty_level = -1;
  4325. int first_nonempty_level_filenum = 0;
  4326. for (int i = new_levels - 1; i < current_levels; i++) {
  4327. int file_num = vstorage->NumLevelFiles(i);
  4328. if (file_num != 0) {
  4329. if (first_nonempty_level < 0) {
  4330. first_nonempty_level = i;
  4331. first_nonempty_level_filenum = file_num;
  4332. } else {
  4333. char msg[255];
  4334. snprintf(msg, sizeof(msg),
  4335. "Found at least two levels containing files: "
  4336. "[%d:%d],[%d:%d].\n",
  4337. first_nonempty_level, first_nonempty_level_filenum, i,
  4338. file_num);
  4339. return Status::InvalidArgument(msg);
  4340. }
  4341. }
  4342. }
  4343. // we need to allocate an array with the old number of levels size to
  4344. // avoid SIGSEGV in WriteCurrentStatetoManifest()
  4345. // however, all levels bigger or equal to new_levels will be empty
  4346. std::vector<FileMetaData*>* new_files_list =
  4347. new std::vector<FileMetaData*>[current_levels];
  4348. for (int i = 0; i < new_levels - 1; i++) {
  4349. new_files_list[i] = vstorage->LevelFiles(i);
  4350. }
  4351. if (first_nonempty_level > 0) {
  4352. new_files_list[new_levels - 1] = vstorage->LevelFiles(first_nonempty_level);
  4353. }
  4354. delete[] vstorage -> files_;
  4355. vstorage->files_ = new_files_list;
  4356. vstorage->num_levels_ = new_levels;
  4357. MutableCFOptions mutable_cf_options(*options);
  4358. VersionEdit ve;
  4359. InstrumentedMutex dummy_mutex;
  4360. InstrumentedMutexLock l(&dummy_mutex);
  4361. return versions.LogAndApply(
  4362. versions.GetColumnFamilySet()->GetDefault(),
  4363. mutable_cf_options, &ve, &dummy_mutex, nullptr, true);
  4364. }
  4365. // Get the checksum information including the checksum and checksum function
  4366. // name of all SST files in VersionSet. Store the information in
  4367. // FileChecksumList which contains a map from file number to its checksum info.
  4368. // If DB is not running, make sure call VersionSet::Recover() to load the file
  4369. // metadata from Manifest to VersionSet before calling this function.
  4370. Status VersionSet::GetLiveFilesChecksumInfo(FileChecksumList* checksum_list) {
  4371. // Clean the previously stored checksum information if any.
  4372. if (checksum_list == nullptr) {
  4373. return Status::InvalidArgument("checksum_list is nullptr");
  4374. }
  4375. checksum_list->reset();
  4376. for (auto cfd : *column_family_set_) {
  4377. if (cfd->IsDropped() || !cfd->initialized()) {
  4378. continue;
  4379. }
  4380. for (int level = 0; level < cfd->NumberLevels(); level++) {
  4381. for (const auto& file :
  4382. cfd->current()->storage_info()->LevelFiles(level)) {
  4383. checksum_list->InsertOneFileChecksum(file->fd.GetNumber(),
  4384. file->file_checksum,
  4385. file->file_checksum_func_name);
  4386. }
  4387. }
  4388. }
  4389. return Status::OK();
  4390. }
  4391. Status VersionSet::DumpManifest(Options& options, std::string& dscname,
  4392. bool verbose, bool hex, bool json) {
  4393. // Open the specified manifest file.
  4394. std::unique_ptr<SequentialFileReader> file_reader;
  4395. Status s;
  4396. {
  4397. std::unique_ptr<FSSequentialFile> file;
  4398. s = options.file_system->NewSequentialFile(
  4399. dscname,
  4400. options.file_system->OptimizeForManifestRead(file_options_), &file,
  4401. nullptr);
  4402. if (!s.ok()) {
  4403. return s;
  4404. }
  4405. file_reader.reset(new SequentialFileReader(
  4406. std::move(file), dscname, db_options_->log_readahead_size));
  4407. }
  4408. bool have_prev_log_number = false;
  4409. bool have_next_file = false;
  4410. bool have_last_sequence = false;
  4411. uint64_t next_file = 0;
  4412. uint64_t last_sequence = 0;
  4413. uint64_t previous_log_number = 0;
  4414. int count = 0;
  4415. std::unordered_map<uint32_t, std::string> comparators;
  4416. std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>
  4417. builders;
  4418. // add default column family
  4419. VersionEdit default_cf_edit;
  4420. default_cf_edit.AddColumnFamily(kDefaultColumnFamilyName);
  4421. default_cf_edit.SetColumnFamily(0);
  4422. ColumnFamilyData* default_cfd =
  4423. CreateColumnFamily(ColumnFamilyOptions(options), &default_cf_edit);
  4424. builders.insert(
  4425. std::make_pair(0, std::unique_ptr<BaseReferencedVersionBuilder>(
  4426. new BaseReferencedVersionBuilder(default_cfd))));
  4427. {
  4428. VersionSet::LogReporter reporter;
  4429. reporter.status = &s;
  4430. log::Reader reader(nullptr, std::move(file_reader), &reporter,
  4431. true /* checksum */, 0 /* log_number */);
  4432. Slice record;
  4433. std::string scratch;
  4434. while (reader.ReadRecord(&record, &scratch) && s.ok()) {
  4435. VersionEdit edit;
  4436. s = edit.DecodeFrom(record);
  4437. if (!s.ok()) {
  4438. break;
  4439. }
  4440. // Write out each individual edit
  4441. if (verbose && !json) {
  4442. printf("%s\n", edit.DebugString(hex).c_str());
  4443. } else if (json) {
  4444. printf("%s\n", edit.DebugJSON(count, hex).c_str());
  4445. }
  4446. count++;
  4447. bool cf_in_builders =
  4448. builders.find(edit.column_family_) != builders.end();
  4449. if (edit.has_comparator_) {
  4450. comparators.insert({edit.column_family_, edit.comparator_});
  4451. }
  4452. ColumnFamilyData* cfd = nullptr;
  4453. if (edit.is_column_family_add_) {
  4454. if (cf_in_builders) {
  4455. s = Status::Corruption(
  4456. "Manifest adding the same column family twice");
  4457. break;
  4458. }
  4459. cfd = CreateColumnFamily(ColumnFamilyOptions(options), &edit);
  4460. cfd->set_initialized();
  4461. builders.insert(std::make_pair(
  4462. edit.column_family_, std::unique_ptr<BaseReferencedVersionBuilder>(
  4463. new BaseReferencedVersionBuilder(cfd))));
  4464. } else if (edit.is_column_family_drop_) {
  4465. if (!cf_in_builders) {
  4466. s = Status::Corruption(
  4467. "Manifest - dropping non-existing column family");
  4468. break;
  4469. }
  4470. auto builder_iter = builders.find(edit.column_family_);
  4471. builders.erase(builder_iter);
  4472. comparators.erase(edit.column_family_);
  4473. cfd = column_family_set_->GetColumnFamily(edit.column_family_);
  4474. assert(cfd != nullptr);
  4475. cfd->UnrefAndTryDelete();
  4476. cfd = nullptr;
  4477. } else {
  4478. if (!cf_in_builders) {
  4479. s = Status::Corruption(
  4480. "Manifest record referencing unknown column family");
  4481. break;
  4482. }
  4483. cfd = column_family_set_->GetColumnFamily(edit.column_family_);
  4484. // this should never happen since cf_in_builders is true
  4485. assert(cfd != nullptr);
  4486. // if it is not column family add or column family drop,
  4487. // then it's a file add/delete, which should be forwarded
  4488. // to builder
  4489. auto builder = builders.find(edit.column_family_);
  4490. assert(builder != builders.end());
  4491. s = builder->second->version_builder()->Apply(&edit);
  4492. if (!s.ok()) {
  4493. break;
  4494. }
  4495. }
  4496. if (cfd != nullptr && edit.has_log_number_) {
  4497. cfd->SetLogNumber(edit.log_number_);
  4498. }
  4499. if (edit.has_prev_log_number_) {
  4500. previous_log_number = edit.prev_log_number_;
  4501. have_prev_log_number = true;
  4502. }
  4503. if (edit.has_next_file_number_) {
  4504. next_file = edit.next_file_number_;
  4505. have_next_file = true;
  4506. }
  4507. if (edit.has_last_sequence_) {
  4508. last_sequence = edit.last_sequence_;
  4509. have_last_sequence = true;
  4510. }
  4511. if (edit.has_max_column_family_) {
  4512. column_family_set_->UpdateMaxColumnFamily(edit.max_column_family_);
  4513. }
  4514. if (edit.has_min_log_number_to_keep_) {
  4515. MarkMinLogNumberToKeep2PC(edit.min_log_number_to_keep_);
  4516. }
  4517. }
  4518. }
  4519. file_reader.reset();
  4520. if (s.ok()) {
  4521. if (!have_next_file) {
  4522. s = Status::Corruption("no meta-nextfile entry in descriptor");
  4523. printf("no meta-nextfile entry in descriptor");
  4524. } else if (!have_last_sequence) {
  4525. printf("no last-sequence-number entry in descriptor");
  4526. s = Status::Corruption("no last-sequence-number entry in descriptor");
  4527. }
  4528. if (!have_prev_log_number) {
  4529. previous_log_number = 0;
  4530. }
  4531. }
  4532. if (s.ok()) {
  4533. for (auto cfd : *column_family_set_) {
  4534. if (cfd->IsDropped()) {
  4535. continue;
  4536. }
  4537. auto builders_iter = builders.find(cfd->GetID());
  4538. assert(builders_iter != builders.end());
  4539. auto builder = builders_iter->second->version_builder();
  4540. Version* v = new Version(cfd, this, file_options_,
  4541. *cfd->GetLatestMutableCFOptions(),
  4542. current_version_number_++);
  4543. builder->SaveTo(v->storage_info());
  4544. v->PrepareApply(*cfd->GetLatestMutableCFOptions(), false);
  4545. printf("--------------- Column family \"%s\" (ID %" PRIu32
  4546. ") --------------\n",
  4547. cfd->GetName().c_str(), cfd->GetID());
  4548. printf("log number: %" PRIu64 "\n", cfd->GetLogNumber());
  4549. auto comparator = comparators.find(cfd->GetID());
  4550. if (comparator != comparators.end()) {
  4551. printf("comparator: %s\n", comparator->second.c_str());
  4552. } else {
  4553. printf("comparator: <NO COMPARATOR>\n");
  4554. }
  4555. printf("%s \n", v->DebugString(hex).c_str());
  4556. delete v;
  4557. }
  4558. next_file_number_.store(next_file + 1);
  4559. last_allocated_sequence_ = last_sequence;
  4560. last_published_sequence_ = last_sequence;
  4561. last_sequence_ = last_sequence;
  4562. prev_log_number_ = previous_log_number;
  4563. printf("next_file_number %" PRIu64 " last_sequence %" PRIu64
  4564. " prev_log_number %" PRIu64 " max_column_family %" PRIu32
  4565. " min_log_number_to_keep "
  4566. "%" PRIu64 "\n",
  4567. next_file_number_.load(), last_sequence, previous_log_number,
  4568. column_family_set_->GetMaxColumnFamily(),
  4569. min_log_number_to_keep_2pc());
  4570. }
  4571. return s;
  4572. }
  4573. #endif // ROCKSDB_LITE
  4574. void VersionSet::MarkFileNumberUsed(uint64_t number) {
  4575. // only called during recovery and repair which are single threaded, so this
  4576. // works because there can't be concurrent calls
  4577. if (next_file_number_.load(std::memory_order_relaxed) <= number) {
  4578. next_file_number_.store(number + 1, std::memory_order_relaxed);
  4579. }
  4580. }
  4581. // Called only either from ::LogAndApply which is protected by mutex or during
  4582. // recovery which is single-threaded.
  4583. void VersionSet::MarkMinLogNumberToKeep2PC(uint64_t number) {
  4584. if (min_log_number_to_keep_2pc_.load(std::memory_order_relaxed) < number) {
  4585. min_log_number_to_keep_2pc_.store(number, std::memory_order_relaxed);
  4586. }
  4587. }
  4588. Status VersionSet::WriteCurrentStateToManifest(
  4589. const std::unordered_map<uint32_t, MutableCFState>& curr_state,
  4590. log::Writer* log) {
  4591. // TODO: Break up into multiple records to reduce memory usage on recovery?
  4592. // WARNING: This method doesn't hold a mutex!!
  4593. // This is done without DB mutex lock held, but only within single-threaded
  4594. // LogAndApply. Column family manipulations can only happen within LogAndApply
  4595. // (the same single thread), so we're safe to iterate.
  4596. if (db_options_->write_dbid_to_manifest) {
  4597. VersionEdit edit_for_db_id;
  4598. assert(!db_id_.empty());
  4599. edit_for_db_id.SetDBId(db_id_);
  4600. std::string db_id_record;
  4601. if (!edit_for_db_id.EncodeTo(&db_id_record)) {
  4602. return Status::Corruption("Unable to Encode VersionEdit:" +
  4603. edit_for_db_id.DebugString(true));
  4604. }
  4605. Status add_record = log->AddRecord(db_id_record);
  4606. if (!add_record.ok()) {
  4607. return add_record;
  4608. }
  4609. }
  4610. for (auto cfd : *column_family_set_) {
  4611. if (cfd->IsDropped()) {
  4612. continue;
  4613. }
  4614. assert(cfd->initialized());
  4615. {
  4616. // Store column family info
  4617. VersionEdit edit;
  4618. if (cfd->GetID() != 0) {
  4619. // default column family is always there,
  4620. // no need to explicitly write it
  4621. edit.AddColumnFamily(cfd->GetName());
  4622. edit.SetColumnFamily(cfd->GetID());
  4623. }
  4624. edit.SetComparatorName(
  4625. cfd->internal_comparator().user_comparator()->Name());
  4626. std::string record;
  4627. if (!edit.EncodeTo(&record)) {
  4628. return Status::Corruption(
  4629. "Unable to Encode VersionEdit:" + edit.DebugString(true));
  4630. }
  4631. Status s = log->AddRecord(record);
  4632. if (!s.ok()) {
  4633. return s;
  4634. }
  4635. }
  4636. {
  4637. // Save files
  4638. VersionEdit edit;
  4639. edit.SetColumnFamily(cfd->GetID());
  4640. for (int level = 0; level < cfd->NumberLevels(); level++) {
  4641. for (const auto& f :
  4642. cfd->current()->storage_info()->LevelFiles(level)) {
  4643. edit.AddFile(level, f->fd.GetNumber(), f->fd.GetPathId(),
  4644. f->fd.GetFileSize(), f->smallest, f->largest,
  4645. f->fd.smallest_seqno, f->fd.largest_seqno,
  4646. f->marked_for_compaction, f->oldest_blob_file_number,
  4647. f->oldest_ancester_time, f->file_creation_time,
  4648. f->file_checksum, f->file_checksum_func_name);
  4649. }
  4650. }
  4651. const auto iter = curr_state.find(cfd->GetID());
  4652. assert(iter != curr_state.end());
  4653. uint64_t log_number = iter->second.log_number;
  4654. edit.SetLogNumber(log_number);
  4655. std::string record;
  4656. if (!edit.EncodeTo(&record)) {
  4657. return Status::Corruption(
  4658. "Unable to Encode VersionEdit:" + edit.DebugString(true));
  4659. }
  4660. Status s = log->AddRecord(record);
  4661. if (!s.ok()) {
  4662. return s;
  4663. }
  4664. }
  4665. }
  4666. return Status::OK();
  4667. }
  4668. // TODO(aekmekji): in CompactionJob::GenSubcompactionBoundaries(), this
  4669. // function is called repeatedly with consecutive pairs of slices. For example
  4670. // if the slice list is [a, b, c, d] this function is called with arguments
  4671. // (a,b) then (b,c) then (c,d). Knowing this, an optimization is possible where
  4672. // we avoid doing binary search for the keys b and c twice and instead somehow
  4673. // maintain state of where they first appear in the files.
  4674. uint64_t VersionSet::ApproximateSize(const SizeApproximationOptions& options,
  4675. Version* v, const Slice& start,
  4676. const Slice& end, int start_level,
  4677. int end_level, TableReaderCaller caller) {
  4678. const auto& icmp = v->cfd_->internal_comparator();
  4679. // pre-condition
  4680. assert(icmp.Compare(start, end) <= 0);
  4681. uint64_t total_full_size = 0;
  4682. const auto* vstorage = v->storage_info();
  4683. const int num_non_empty_levels = vstorage->num_non_empty_levels();
  4684. end_level = (end_level == -1) ? num_non_empty_levels
  4685. : std::min(end_level, num_non_empty_levels);
  4686. assert(start_level <= end_level);
  4687. // Outline of the optimization that uses options.files_size_error_margin.
  4688. // When approximating the files total size that is used to store a keys range,
  4689. // we first sum up the sizes of the files that fully fall into the range.
  4690. // Then we sum up the sizes of all the files that may intersect with the range
  4691. // (this includes all files in L0 as well). Then, if total_intersecting_size
  4692. // is smaller than total_full_size * options.files_size_error_margin - we can
  4693. // infer that the intersecting files have a sufficiently negligible
  4694. // contribution to the total size, and we can approximate the storage required
  4695. // for the keys in range as just half of the intersecting_files_size.
  4696. // E.g., if the value of files_size_error_margin is 0.1, then the error of the
  4697. // approximation is limited to only ~10% of the total size of files that fully
  4698. // fall into the keys range. In such case, this helps to avoid a costly
  4699. // process of binary searching the intersecting files that is required only
  4700. // for a more precise calculation of the total size.
  4701. autovector<FdWithKeyRange*, 32> first_files;
  4702. autovector<FdWithKeyRange*, 16> last_files;
  4703. // scan all the levels
  4704. for (int level = start_level; level < end_level; ++level) {
  4705. const LevelFilesBrief& files_brief = vstorage->LevelFilesBrief(level);
  4706. if (files_brief.num_files == 0) {
  4707. // empty level, skip exploration
  4708. continue;
  4709. }
  4710. if (level == 0) {
  4711. // level 0 files are not in sorted order, we need to iterate through
  4712. // the list to compute the total bytes that require scanning,
  4713. // so handle the case explicitly (similarly to first_files case)
  4714. for (size_t i = 0; i < files_brief.num_files; i++) {
  4715. first_files.push_back(&files_brief.files[i]);
  4716. }
  4717. continue;
  4718. }
  4719. assert(level > 0);
  4720. assert(files_brief.num_files > 0);
  4721. // identify the file position for start key
  4722. const int idx_start =
  4723. FindFileInRange(icmp, files_brief, start, 0,
  4724. static_cast<uint32_t>(files_brief.num_files - 1));
  4725. assert(static_cast<size_t>(idx_start) < files_brief.num_files);
  4726. // identify the file position for end key
  4727. int idx_end = idx_start;
  4728. if (icmp.Compare(files_brief.files[idx_end].largest_key, end) < 0) {
  4729. idx_end =
  4730. FindFileInRange(icmp, files_brief, end, idx_start,
  4731. static_cast<uint32_t>(files_brief.num_files - 1));
  4732. }
  4733. assert(idx_end >= idx_start &&
  4734. static_cast<size_t>(idx_end) < files_brief.num_files);
  4735. // scan all files from the starting index to the ending index
  4736. // (inferred from the sorted order)
  4737. // first scan all the intermediate full files (excluding first and last)
  4738. for (int i = idx_start + 1; i < idx_end; ++i) {
  4739. uint64_t file_size = files_brief.files[i].fd.GetFileSize();
  4740. // The entire file falls into the range, so we can just take its size.
  4741. assert(file_size ==
  4742. ApproximateSize(v, files_brief.files[i], start, end, caller));
  4743. total_full_size += file_size;
  4744. }
  4745. // save the first and the last files (which may be the same file), so we
  4746. // can scan them later.
  4747. first_files.push_back(&files_brief.files[idx_start]);
  4748. if (idx_start != idx_end) {
  4749. // we need to estimate size for both files, only if they are different
  4750. last_files.push_back(&files_brief.files[idx_end]);
  4751. }
  4752. }
  4753. // The sum of all file sizes that intersect the [start, end] keys range.
  4754. uint64_t total_intersecting_size = 0;
  4755. for (const auto* file_ptr : first_files) {
  4756. total_intersecting_size += file_ptr->fd.GetFileSize();
  4757. }
  4758. for (const auto* file_ptr : last_files) {
  4759. total_intersecting_size += file_ptr->fd.GetFileSize();
  4760. }
  4761. // Now scan all the first & last files at each level, and estimate their size.
  4762. // If the total_intersecting_size is less than X% of the total_full_size - we
  4763. // want to approximate the result in order to avoid the costly binary search
  4764. // inside ApproximateSize. We use half of file size as an approximation below.
  4765. const double margin = options.files_size_error_margin;
  4766. if (margin > 0 && total_intersecting_size <
  4767. static_cast<uint64_t>(total_full_size * margin)) {
  4768. total_full_size += total_intersecting_size / 2;
  4769. } else {
  4770. // Estimate for all the first files, at each level
  4771. for (const auto file_ptr : first_files) {
  4772. total_full_size += ApproximateSize(v, *file_ptr, start, end, caller);
  4773. }
  4774. // Estimate for all the last files, at each level
  4775. for (const auto file_ptr : last_files) {
  4776. // We could use ApproximateSize here, but calling ApproximateOffsetOf
  4777. // directly is just more efficient.
  4778. total_full_size += ApproximateOffsetOf(v, *file_ptr, end, caller);
  4779. }
  4780. }
  4781. return total_full_size;
  4782. }
  4783. uint64_t VersionSet::ApproximateOffsetOf(Version* v, const FdWithKeyRange& f,
  4784. const Slice& key,
  4785. TableReaderCaller caller) {
  4786. // pre-condition
  4787. assert(v);
  4788. const auto& icmp = v->cfd_->internal_comparator();
  4789. uint64_t result = 0;
  4790. if (icmp.Compare(f.largest_key, key) <= 0) {
  4791. // Entire file is before "key", so just add the file size
  4792. result = f.fd.GetFileSize();
  4793. } else if (icmp.Compare(f.smallest_key, key) > 0) {
  4794. // Entire file is after "key", so ignore
  4795. result = 0;
  4796. } else {
  4797. // "key" falls in the range for this table. Add the
  4798. // approximate offset of "key" within the table.
  4799. TableCache* table_cache = v->cfd_->table_cache();
  4800. if (table_cache != nullptr) {
  4801. result = table_cache->ApproximateOffsetOf(
  4802. key, f.file_metadata->fd, caller, icmp,
  4803. v->GetMutableCFOptions().prefix_extractor.get());
  4804. }
  4805. }
  4806. return result;
  4807. }
  4808. uint64_t VersionSet::ApproximateSize(Version* v, const FdWithKeyRange& f,
  4809. const Slice& start, const Slice& end,
  4810. TableReaderCaller caller) {
  4811. // pre-condition
  4812. assert(v);
  4813. const auto& icmp = v->cfd_->internal_comparator();
  4814. assert(icmp.Compare(start, end) <= 0);
  4815. if (icmp.Compare(f.largest_key, start) <= 0 ||
  4816. icmp.Compare(f.smallest_key, end) > 0) {
  4817. // Entire file is before or after the start/end keys range
  4818. return 0;
  4819. }
  4820. if (icmp.Compare(f.smallest_key, start) >= 0) {
  4821. // Start of the range is before the file start - approximate by end offset
  4822. return ApproximateOffsetOf(v, f, end, caller);
  4823. }
  4824. if (icmp.Compare(f.largest_key, end) < 0) {
  4825. // End of the range is after the file end - approximate by subtracting
  4826. // start offset from the file size
  4827. uint64_t start_offset = ApproximateOffsetOf(v, f, start, caller);
  4828. assert(f.fd.GetFileSize() >= start_offset);
  4829. return f.fd.GetFileSize() - start_offset;
  4830. }
  4831. // The interval falls entirely in the range for this file.
  4832. TableCache* table_cache = v->cfd_->table_cache();
  4833. if (table_cache == nullptr) {
  4834. return 0;
  4835. }
  4836. return table_cache->ApproximateSize(
  4837. start, end, f.file_metadata->fd, caller, icmp,
  4838. v->GetMutableCFOptions().prefix_extractor.get());
  4839. }
  4840. void VersionSet::AddLiveFiles(std::vector<FileDescriptor>* live_list) {
  4841. // pre-calculate space requirement
  4842. int64_t total_files = 0;
  4843. for (auto cfd : *column_family_set_) {
  4844. if (!cfd->initialized()) {
  4845. continue;
  4846. }
  4847. Version* dummy_versions = cfd->dummy_versions();
  4848. for (Version* v = dummy_versions->next_; v != dummy_versions;
  4849. v = v->next_) {
  4850. const auto* vstorage = v->storage_info();
  4851. for (int level = 0; level < vstorage->num_levels(); level++) {
  4852. total_files += vstorage->LevelFiles(level).size();
  4853. }
  4854. }
  4855. }
  4856. // just one time extension to the right size
  4857. live_list->reserve(live_list->size() + static_cast<size_t>(total_files));
  4858. for (auto cfd : *column_family_set_) {
  4859. if (!cfd->initialized()) {
  4860. continue;
  4861. }
  4862. auto* current = cfd->current();
  4863. bool found_current = false;
  4864. Version* dummy_versions = cfd->dummy_versions();
  4865. for (Version* v = dummy_versions->next_; v != dummy_versions;
  4866. v = v->next_) {
  4867. v->AddLiveFiles(live_list);
  4868. if (v == current) {
  4869. found_current = true;
  4870. }
  4871. }
  4872. if (!found_current && current != nullptr) {
  4873. // Should never happen unless it is a bug.
  4874. assert(false);
  4875. current->AddLiveFiles(live_list);
  4876. }
  4877. }
  4878. }
  4879. InternalIterator* VersionSet::MakeInputIterator(
  4880. const Compaction* c, RangeDelAggregator* range_del_agg,
  4881. const FileOptions& file_options_compactions) {
  4882. auto cfd = c->column_family_data();
  4883. ReadOptions read_options;
  4884. read_options.verify_checksums = true;
  4885. read_options.fill_cache = false;
  4886. // Compaction iterators shouldn't be confined to a single prefix.
  4887. // Compactions use Seek() for
  4888. // (a) concurrent compactions,
  4889. // (b) CompactionFilter::Decision::kRemoveAndSkipUntil.
  4890. read_options.total_order_seek = true;
  4891. // Level-0 files have to be merged together. For other levels,
  4892. // we will make a concatenating iterator per level.
  4893. // TODO(opt): use concatenating iterator for level-0 if there is no overlap
  4894. const size_t space = (c->level() == 0 ? c->input_levels(0)->num_files +
  4895. c->num_input_levels() - 1
  4896. : c->num_input_levels());
  4897. InternalIterator** list = new InternalIterator* [space];
  4898. size_t num = 0;
  4899. for (size_t which = 0; which < c->num_input_levels(); which++) {
  4900. if (c->input_levels(which)->num_files != 0) {
  4901. if (c->level(which) == 0) {
  4902. const LevelFilesBrief* flevel = c->input_levels(which);
  4903. for (size_t i = 0; i < flevel->num_files; i++) {
  4904. list[num++] = cfd->table_cache()->NewIterator(
  4905. read_options, file_options_compactions,
  4906. cfd->internal_comparator(),
  4907. *flevel->files[i].file_metadata, range_del_agg,
  4908. c->mutable_cf_options()->prefix_extractor.get(),
  4909. /*table_reader_ptr=*/nullptr,
  4910. /*file_read_hist=*/nullptr, TableReaderCaller::kCompaction,
  4911. /*arena=*/nullptr,
  4912. /*skip_filters=*/false, /*level=*/static_cast<int>(which),
  4913. /*smallest_compaction_key=*/nullptr,
  4914. /*largest_compaction_key=*/nullptr);
  4915. }
  4916. } else {
  4917. // Create concatenating iterator for the files from this level
  4918. list[num++] = new LevelIterator(
  4919. cfd->table_cache(), read_options, file_options_compactions,
  4920. cfd->internal_comparator(), c->input_levels(which),
  4921. c->mutable_cf_options()->prefix_extractor.get(),
  4922. /*should_sample=*/false,
  4923. /*no per level latency histogram=*/nullptr,
  4924. TableReaderCaller::kCompaction, /*skip_filters=*/false,
  4925. /*level=*/static_cast<int>(which), range_del_agg,
  4926. c->boundaries(which));
  4927. }
  4928. }
  4929. }
  4930. assert(num <= space);
  4931. InternalIterator* result =
  4932. NewMergingIterator(&c->column_family_data()->internal_comparator(), list,
  4933. static_cast<int>(num));
  4934. delete[] list;
  4935. return result;
  4936. }
  4937. // verify that the files listed in this compaction are present
  4938. // in the current version
  4939. bool VersionSet::VerifyCompactionFileConsistency(Compaction* c) {
  4940. #ifndef NDEBUG
  4941. Version* version = c->column_family_data()->current();
  4942. const VersionStorageInfo* vstorage = version->storage_info();
  4943. if (c->input_version() != version) {
  4944. ROCKS_LOG_INFO(
  4945. db_options_->info_log,
  4946. "[%s] compaction output being applied to a different base version from"
  4947. " input version",
  4948. c->column_family_data()->GetName().c_str());
  4949. if (vstorage->compaction_style_ == kCompactionStyleLevel &&
  4950. c->start_level() == 0 && c->num_input_levels() > 2U) {
  4951. // We are doing a L0->base_level compaction. The assumption is if
  4952. // base level is not L1, levels from L1 to base_level - 1 is empty.
  4953. // This is ensured by having one compaction from L0 going on at the
  4954. // same time in level-based compaction. So that during the time, no
  4955. // compaction/flush can put files to those levels.
  4956. for (int l = c->start_level() + 1; l < c->output_level(); l++) {
  4957. if (vstorage->NumLevelFiles(l) != 0) {
  4958. return false;
  4959. }
  4960. }
  4961. }
  4962. }
  4963. for (size_t input = 0; input < c->num_input_levels(); ++input) {
  4964. int level = c->level(input);
  4965. for (size_t i = 0; i < c->num_input_files(input); ++i) {
  4966. uint64_t number = c->input(input, i)->fd.GetNumber();
  4967. bool found = false;
  4968. for (size_t j = 0; j < vstorage->files_[level].size(); j++) {
  4969. FileMetaData* f = vstorage->files_[level][j];
  4970. if (f->fd.GetNumber() == number) {
  4971. found = true;
  4972. break;
  4973. }
  4974. }
  4975. if (!found) {
  4976. return false; // input files non existent in current version
  4977. }
  4978. }
  4979. }
  4980. #else
  4981. (void)c;
  4982. #endif
  4983. return true; // everything good
  4984. }
  4985. Status VersionSet::GetMetadataForFile(uint64_t number, int* filelevel,
  4986. FileMetaData** meta,
  4987. ColumnFamilyData** cfd) {
  4988. for (auto cfd_iter : *column_family_set_) {
  4989. if (!cfd_iter->initialized()) {
  4990. continue;
  4991. }
  4992. Version* version = cfd_iter->current();
  4993. const auto* vstorage = version->storage_info();
  4994. for (int level = 0; level < vstorage->num_levels(); level++) {
  4995. for (const auto& file : vstorage->LevelFiles(level)) {
  4996. if (file->fd.GetNumber() == number) {
  4997. *meta = file;
  4998. *filelevel = level;
  4999. *cfd = cfd_iter;
  5000. return Status::OK();
  5001. }
  5002. }
  5003. }
  5004. }
  5005. return Status::NotFound("File not present in any level");
  5006. }
  5007. void VersionSet::GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata) {
  5008. for (auto cfd : *column_family_set_) {
  5009. if (cfd->IsDropped() || !cfd->initialized()) {
  5010. continue;
  5011. }
  5012. for (int level = 0; level < cfd->NumberLevels(); level++) {
  5013. for (const auto& file :
  5014. cfd->current()->storage_info()->LevelFiles(level)) {
  5015. LiveFileMetaData filemetadata;
  5016. filemetadata.column_family_name = cfd->GetName();
  5017. uint32_t path_id = file->fd.GetPathId();
  5018. if (path_id < cfd->ioptions()->cf_paths.size()) {
  5019. filemetadata.db_path = cfd->ioptions()->cf_paths[path_id].path;
  5020. } else {
  5021. assert(!cfd->ioptions()->cf_paths.empty());
  5022. filemetadata.db_path = cfd->ioptions()->cf_paths.back().path;
  5023. }
  5024. const uint64_t file_number = file->fd.GetNumber();
  5025. filemetadata.name = MakeTableFileName("", file_number);
  5026. filemetadata.file_number = file_number;
  5027. filemetadata.level = level;
  5028. filemetadata.size = static_cast<size_t>(file->fd.GetFileSize());
  5029. filemetadata.smallestkey = file->smallest.user_key().ToString();
  5030. filemetadata.largestkey = file->largest.user_key().ToString();
  5031. filemetadata.smallest_seqno = file->fd.smallest_seqno;
  5032. filemetadata.largest_seqno = file->fd.largest_seqno;
  5033. filemetadata.num_reads_sampled = file->stats.num_reads_sampled.load(
  5034. std::memory_order_relaxed);
  5035. filemetadata.being_compacted = file->being_compacted;
  5036. filemetadata.num_entries = file->num_entries;
  5037. filemetadata.num_deletions = file->num_deletions;
  5038. filemetadata.oldest_blob_file_number = file->oldest_blob_file_number;
  5039. filemetadata.file_checksum = file->file_checksum;
  5040. filemetadata.file_checksum_func_name = file->file_checksum_func_name;
  5041. metadata->push_back(filemetadata);
  5042. }
  5043. }
  5044. }
  5045. }
  5046. void VersionSet::GetObsoleteFiles(std::vector<ObsoleteFileInfo>* files,
  5047. std::vector<std::string>* manifest_filenames,
  5048. uint64_t min_pending_output) {
  5049. assert(manifest_filenames->empty());
  5050. obsolete_manifests_.swap(*manifest_filenames);
  5051. std::vector<ObsoleteFileInfo> pending_files;
  5052. for (auto& f : obsolete_files_) {
  5053. if (f.metadata->fd.GetNumber() < min_pending_output) {
  5054. files->push_back(std::move(f));
  5055. } else {
  5056. pending_files.push_back(std::move(f));
  5057. }
  5058. }
  5059. obsolete_files_.swap(pending_files);
  5060. }
  5061. ColumnFamilyData* VersionSet::CreateColumnFamily(
  5062. const ColumnFamilyOptions& cf_options, VersionEdit* edit) {
  5063. assert(edit->is_column_family_add_);
  5064. MutableCFOptions dummy_cf_options;
  5065. Version* dummy_versions =
  5066. new Version(nullptr, this, file_options_, dummy_cf_options);
  5067. // Ref() dummy version once so that later we can call Unref() to delete it
  5068. // by avoiding calling "delete" explicitly (~Version is private)
  5069. dummy_versions->Ref();
  5070. auto new_cfd = column_family_set_->CreateColumnFamily(
  5071. edit->column_family_name_, edit->column_family_, dummy_versions,
  5072. cf_options);
  5073. Version* v = new Version(new_cfd, this, file_options_,
  5074. *new_cfd->GetLatestMutableCFOptions(),
  5075. current_version_number_++);
  5076. // Fill level target base information.
  5077. v->storage_info()->CalculateBaseBytes(*new_cfd->ioptions(),
  5078. *new_cfd->GetLatestMutableCFOptions());
  5079. AppendVersion(new_cfd, v);
  5080. // GetLatestMutableCFOptions() is safe here without mutex since the
  5081. // cfd is not available to client
  5082. new_cfd->CreateNewMemtable(*new_cfd->GetLatestMutableCFOptions(),
  5083. LastSequence());
  5084. new_cfd->SetLogNumber(edit->log_number_);
  5085. return new_cfd;
  5086. }
  5087. uint64_t VersionSet::GetNumLiveVersions(Version* dummy_versions) {
  5088. uint64_t count = 0;
  5089. for (Version* v = dummy_versions->next_; v != dummy_versions; v = v->next_) {
  5090. count++;
  5091. }
  5092. return count;
  5093. }
  5094. uint64_t VersionSet::GetTotalSstFilesSize(Version* dummy_versions) {
  5095. std::unordered_set<uint64_t> unique_files;
  5096. uint64_t total_files_size = 0;
  5097. for (Version* v = dummy_versions->next_; v != dummy_versions; v = v->next_) {
  5098. VersionStorageInfo* storage_info = v->storage_info();
  5099. for (int level = 0; level < storage_info->num_levels_; level++) {
  5100. for (const auto& file_meta : storage_info->LevelFiles(level)) {
  5101. if (unique_files.find(file_meta->fd.packed_number_and_path_id) ==
  5102. unique_files.end()) {
  5103. unique_files.insert(file_meta->fd.packed_number_and_path_id);
  5104. total_files_size += file_meta->fd.GetFileSize();
  5105. }
  5106. }
  5107. }
  5108. }
  5109. return total_files_size;
  5110. }
  5111. ReactiveVersionSet::ReactiveVersionSet(const std::string& dbname,
  5112. const ImmutableDBOptions* _db_options,
  5113. const FileOptions& _file_options,
  5114. Cache* table_cache,
  5115. WriteBufferManager* write_buffer_manager,
  5116. WriteController* write_controller)
  5117. : VersionSet(dbname, _db_options, _file_options, table_cache,
  5118. write_buffer_manager, write_controller,
  5119. /*block_cache_tracer=*/nullptr),
  5120. number_of_edits_to_skip_(0) {}
  5121. ReactiveVersionSet::~ReactiveVersionSet() {}
  5122. Status ReactiveVersionSet::Recover(
  5123. const std::vector<ColumnFamilyDescriptor>& column_families,
  5124. std::unique_ptr<log::FragmentBufferedReader>* manifest_reader,
  5125. std::unique_ptr<log::Reader::Reporter>* manifest_reporter,
  5126. std::unique_ptr<Status>* manifest_reader_status) {
  5127. assert(manifest_reader != nullptr);
  5128. assert(manifest_reporter != nullptr);
  5129. assert(manifest_reader_status != nullptr);
  5130. std::unordered_map<std::string, ColumnFamilyOptions> cf_name_to_options;
  5131. for (const auto& cf : column_families) {
  5132. cf_name_to_options.insert({cf.name, cf.options});
  5133. }
  5134. // add default column family
  5135. auto default_cf_iter = cf_name_to_options.find(kDefaultColumnFamilyName);
  5136. if (default_cf_iter == cf_name_to_options.end()) {
  5137. return Status::InvalidArgument("Default column family not specified");
  5138. }
  5139. VersionEdit default_cf_edit;
  5140. default_cf_edit.AddColumnFamily(kDefaultColumnFamilyName);
  5141. default_cf_edit.SetColumnFamily(0);
  5142. ColumnFamilyData* default_cfd =
  5143. CreateColumnFamily(default_cf_iter->second, &default_cf_edit);
  5144. // In recovery, nobody else can access it, so it's fine to set it to be
  5145. // initialized earlier.
  5146. default_cfd->set_initialized();
  5147. std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>
  5148. builders;
  5149. std::unordered_map<int, std::string> column_families_not_found;
  5150. builders.insert(
  5151. std::make_pair(0, std::unique_ptr<BaseReferencedVersionBuilder>(
  5152. new BaseReferencedVersionBuilder(default_cfd))));
  5153. manifest_reader_status->reset(new Status());
  5154. manifest_reporter->reset(new LogReporter());
  5155. static_cast<LogReporter*>(manifest_reporter->get())->status =
  5156. manifest_reader_status->get();
  5157. Status s = MaybeSwitchManifest(manifest_reporter->get(), manifest_reader);
  5158. log::Reader* reader = manifest_reader->get();
  5159. int retry = 0;
  5160. VersionEdit version_edit;
  5161. while (s.ok() && retry < 1) {
  5162. assert(reader != nullptr);
  5163. Slice record;
  5164. std::string scratch;
  5165. s = ReadAndRecover(reader, &read_buffer_, cf_name_to_options,
  5166. column_families_not_found, builders, &version_edit);
  5167. if (s.ok()) {
  5168. bool enough = version_edit.has_next_file_number_ &&
  5169. version_edit.has_log_number_ &&
  5170. version_edit.has_last_sequence_;
  5171. if (enough) {
  5172. for (const auto& cf : column_families) {
  5173. auto cfd = column_family_set_->GetColumnFamily(cf.name);
  5174. if (cfd == nullptr) {
  5175. enough = false;
  5176. break;
  5177. }
  5178. }
  5179. }
  5180. if (enough) {
  5181. for (const auto& cf : column_families) {
  5182. auto cfd = column_family_set_->GetColumnFamily(cf.name);
  5183. assert(cfd != nullptr);
  5184. if (!cfd->IsDropped()) {
  5185. auto builder_iter = builders.find(cfd->GetID());
  5186. assert(builder_iter != builders.end());
  5187. auto builder = builder_iter->second->version_builder();
  5188. assert(builder != nullptr);
  5189. s = builder->LoadTableHandlers(
  5190. cfd->internal_stats(), db_options_->max_file_opening_threads,
  5191. false /* prefetch_index_and_filter_in_cache */,
  5192. true /* is_initial_load */,
  5193. cfd->GetLatestMutableCFOptions()->prefix_extractor.get());
  5194. if (!s.ok()) {
  5195. enough = false;
  5196. if (s.IsPathNotFound()) {
  5197. s = Status::OK();
  5198. }
  5199. break;
  5200. }
  5201. }
  5202. }
  5203. }
  5204. if (enough) {
  5205. break;
  5206. }
  5207. }
  5208. ++retry;
  5209. }
  5210. if (s.ok()) {
  5211. if (!version_edit.has_prev_log_number_) {
  5212. version_edit.prev_log_number_ = 0;
  5213. }
  5214. column_family_set_->UpdateMaxColumnFamily(version_edit.max_column_family_);
  5215. MarkMinLogNumberToKeep2PC(version_edit.min_log_number_to_keep_);
  5216. MarkFileNumberUsed(version_edit.prev_log_number_);
  5217. MarkFileNumberUsed(version_edit.log_number_);
  5218. for (auto cfd : *column_family_set_) {
  5219. assert(builders.count(cfd->GetID()) > 0);
  5220. auto builder = builders[cfd->GetID()]->version_builder();
  5221. if (!builder->CheckConsistencyForNumLevels()) {
  5222. s = Status::InvalidArgument(
  5223. "db has more levels than options.num_levels");
  5224. break;
  5225. }
  5226. }
  5227. }
  5228. if (s.ok()) {
  5229. for (auto cfd : *column_family_set_) {
  5230. if (cfd->IsDropped()) {
  5231. continue;
  5232. }
  5233. assert(cfd->initialized());
  5234. auto builders_iter = builders.find(cfd->GetID());
  5235. assert(builders_iter != builders.end());
  5236. auto* builder = builders_iter->second->version_builder();
  5237. Version* v = new Version(cfd, this, file_options_,
  5238. *cfd->GetLatestMutableCFOptions(),
  5239. current_version_number_++);
  5240. builder->SaveTo(v->storage_info());
  5241. // Install recovered version
  5242. v->PrepareApply(*cfd->GetLatestMutableCFOptions(),
  5243. !(db_options_->skip_stats_update_on_db_open));
  5244. AppendVersion(cfd, v);
  5245. }
  5246. next_file_number_.store(version_edit.next_file_number_ + 1);
  5247. last_allocated_sequence_ = version_edit.last_sequence_;
  5248. last_published_sequence_ = version_edit.last_sequence_;
  5249. last_sequence_ = version_edit.last_sequence_;
  5250. prev_log_number_ = version_edit.prev_log_number_;
  5251. for (auto cfd : *column_family_set_) {
  5252. if (cfd->IsDropped()) {
  5253. continue;
  5254. }
  5255. ROCKS_LOG_INFO(db_options_->info_log,
  5256. "Column family [%s] (ID %u), log number is %" PRIu64 "\n",
  5257. cfd->GetName().c_str(), cfd->GetID(), cfd->GetLogNumber());
  5258. }
  5259. }
  5260. return s;
  5261. }
  5262. Status ReactiveVersionSet::ReadAndApply(
  5263. InstrumentedMutex* mu,
  5264. std::unique_ptr<log::FragmentBufferedReader>* manifest_reader,
  5265. std::unordered_set<ColumnFamilyData*>* cfds_changed) {
  5266. assert(manifest_reader != nullptr);
  5267. assert(cfds_changed != nullptr);
  5268. mu->AssertHeld();
  5269. Status s;
  5270. uint64_t applied_edits = 0;
  5271. while (s.ok()) {
  5272. Slice record;
  5273. std::string scratch;
  5274. log::Reader* reader = manifest_reader->get();
  5275. std::string old_manifest_path = reader->file()->file_name();
  5276. while (reader->ReadRecord(&record, &scratch)) {
  5277. VersionEdit edit;
  5278. s = edit.DecodeFrom(record);
  5279. if (!s.ok()) {
  5280. break;
  5281. }
  5282. // Skip the first VersionEdits of each MANIFEST generated by
  5283. // VersionSet::WriteCurrentStatetoManifest.
  5284. if (number_of_edits_to_skip_ > 0) {
  5285. ColumnFamilyData* cfd =
  5286. column_family_set_->GetColumnFamily(edit.column_family_);
  5287. if (cfd != nullptr && !cfd->IsDropped()) {
  5288. --number_of_edits_to_skip_;
  5289. }
  5290. continue;
  5291. }
  5292. s = read_buffer_.AddEdit(&edit);
  5293. if (!s.ok()) {
  5294. break;
  5295. }
  5296. VersionEdit temp_edit;
  5297. if (edit.is_in_atomic_group_) {
  5298. if (read_buffer_.IsFull()) {
  5299. // Apply edits in an atomic group when we have read all edits in the
  5300. // group.
  5301. for (auto& e : read_buffer_.replay_buffer()) {
  5302. s = ApplyOneVersionEditToBuilder(e, cfds_changed, &temp_edit);
  5303. if (!s.ok()) {
  5304. break;
  5305. }
  5306. applied_edits++;
  5307. }
  5308. if (!s.ok()) {
  5309. break;
  5310. }
  5311. read_buffer_.Clear();
  5312. }
  5313. } else {
  5314. // Apply a normal edit immediately.
  5315. s = ApplyOneVersionEditToBuilder(edit, cfds_changed, &temp_edit);
  5316. if (s.ok()) {
  5317. applied_edits++;
  5318. }
  5319. }
  5320. }
  5321. if (!s.ok()) {
  5322. // Clear the buffer if we fail to decode/apply an edit.
  5323. read_buffer_.Clear();
  5324. }
  5325. // It's possible that:
  5326. // 1) s.IsCorruption(), indicating the current MANIFEST is corrupted.
  5327. // 2) we have finished reading the current MANIFEST.
  5328. // 3) we have encountered an IOError reading the current MANIFEST.
  5329. // We need to look for the next MANIFEST and start from there. If we cannot
  5330. // find the next MANIFEST, we should exit the loop.
  5331. s = MaybeSwitchManifest(reader->GetReporter(), manifest_reader);
  5332. reader = manifest_reader->get();
  5333. if (s.ok()) {
  5334. if (reader->file()->file_name() == old_manifest_path) {
  5335. // Still processing the same MANIFEST, thus no need to continue this
  5336. // loop since no record is available if we have reached here.
  5337. break;
  5338. } else {
  5339. // We have switched to a new MANIFEST whose first records have been
  5340. // generated by VersionSet::WriteCurrentStatetoManifest. Since the
  5341. // secondary instance has already finished recovering upon start, there
  5342. // is no need for the secondary to process these records. Actually, if
  5343. // the secondary were to replay these records, the secondary may end up
  5344. // adding the same SST files AGAIN to each column family, causing
  5345. // consistency checks done by VersionBuilder to fail. Therefore, we
  5346. // record the number of records to skip at the beginning of the new
  5347. // MANIFEST and ignore them.
  5348. number_of_edits_to_skip_ = 0;
  5349. for (auto* cfd : *column_family_set_) {
  5350. if (cfd->IsDropped()) {
  5351. continue;
  5352. }
  5353. // Increase number_of_edits_to_skip by 2 because
  5354. // WriteCurrentStatetoManifest() writes 2 version edits for each
  5355. // column family at the beginning of the newly-generated MANIFEST.
  5356. // TODO(yanqin) remove hard-coded value.
  5357. if (db_options_->write_dbid_to_manifest) {
  5358. number_of_edits_to_skip_ += 3;
  5359. } else {
  5360. number_of_edits_to_skip_ += 2;
  5361. }
  5362. }
  5363. }
  5364. }
  5365. }
  5366. if (s.ok()) {
  5367. for (auto cfd : *column_family_set_) {
  5368. auto builder_iter = active_version_builders_.find(cfd->GetID());
  5369. if (builder_iter == active_version_builders_.end()) {
  5370. continue;
  5371. }
  5372. auto builder = builder_iter->second->version_builder();
  5373. if (!builder->CheckConsistencyForNumLevels()) {
  5374. s = Status::InvalidArgument(
  5375. "db has more levels than options.num_levels");
  5376. break;
  5377. }
  5378. }
  5379. }
  5380. TEST_SYNC_POINT_CALLBACK("ReactiveVersionSet::ReadAndApply:AppliedEdits",
  5381. &applied_edits);
  5382. return s;
  5383. }
  5384. Status ReactiveVersionSet::ApplyOneVersionEditToBuilder(
  5385. VersionEdit& edit, std::unordered_set<ColumnFamilyData*>* cfds_changed,
  5386. VersionEdit* version_edit) {
  5387. ColumnFamilyData* cfd =
  5388. column_family_set_->GetColumnFamily(edit.column_family_);
  5389. // If we cannot find this column family in our column family set, then it
  5390. // may be a new column family created by the primary after the secondary
  5391. // starts. It is also possible that the secondary instance opens only a subset
  5392. // of column families. Ignore it for now.
  5393. if (nullptr == cfd) {
  5394. return Status::OK();
  5395. }
  5396. if (active_version_builders_.find(edit.column_family_) ==
  5397. active_version_builders_.end() &&
  5398. !cfd->IsDropped()) {
  5399. std::unique_ptr<BaseReferencedVersionBuilder> builder_guard(
  5400. new BaseReferencedVersionBuilder(cfd));
  5401. active_version_builders_.insert(
  5402. std::make_pair(edit.column_family_, std::move(builder_guard)));
  5403. }
  5404. auto builder_iter = active_version_builders_.find(edit.column_family_);
  5405. assert(builder_iter != active_version_builders_.end());
  5406. auto builder = builder_iter->second->version_builder();
  5407. assert(builder != nullptr);
  5408. if (edit.is_column_family_add_) {
  5409. // TODO (yanqin) for now the secondary ignores column families created
  5410. // after Open. This also simplifies handling of switching to a new MANIFEST
  5411. // and processing the snapshot of the system at the beginning of the
  5412. // MANIFEST.
  5413. } else if (edit.is_column_family_drop_) {
  5414. // Drop the column family by setting it to be 'dropped' without destroying
  5415. // the column family handle.
  5416. // TODO (haoyu) figure out how to handle column faimly drop for
  5417. // secondary instance. (Is it possible that the ref count for cfd is 0 but
  5418. // the ref count for its versions is higher than 0?)
  5419. cfd->SetDropped();
  5420. if (cfd->UnrefAndTryDelete()) {
  5421. cfd = nullptr;
  5422. }
  5423. active_version_builders_.erase(builder_iter);
  5424. } else {
  5425. Status s = builder->Apply(&edit);
  5426. if (!s.ok()) {
  5427. return s;
  5428. }
  5429. }
  5430. Status s = ExtractInfoFromVersionEdit(cfd, edit, version_edit);
  5431. if (!s.ok()) {
  5432. return s;
  5433. }
  5434. if (cfd != nullptr && !cfd->IsDropped()) {
  5435. s = builder->LoadTableHandlers(
  5436. cfd->internal_stats(), db_options_->max_file_opening_threads,
  5437. false /* prefetch_index_and_filter_in_cache */,
  5438. false /* is_initial_load */,
  5439. cfd->GetLatestMutableCFOptions()->prefix_extractor.get());
  5440. TEST_SYNC_POINT_CALLBACK(
  5441. "ReactiveVersionSet::ApplyOneVersionEditToBuilder:"
  5442. "AfterLoadTableHandlers",
  5443. &s);
  5444. if (s.ok()) {
  5445. auto version = new Version(cfd, this, file_options_,
  5446. *cfd->GetLatestMutableCFOptions(),
  5447. current_version_number_++);
  5448. builder->SaveTo(version->storage_info());
  5449. version->PrepareApply(*cfd->GetLatestMutableCFOptions(), true);
  5450. AppendVersion(cfd, version);
  5451. active_version_builders_.erase(builder_iter);
  5452. if (cfds_changed->count(cfd) == 0) {
  5453. cfds_changed->insert(cfd);
  5454. }
  5455. } else if (s.IsPathNotFound()) {
  5456. s = Status::OK();
  5457. }
  5458. // Some other error has occurred during LoadTableHandlers.
  5459. }
  5460. if (version_edit->HasNextFile()) {
  5461. next_file_number_.store(version_edit->next_file_number_ + 1);
  5462. }
  5463. if (version_edit->has_last_sequence_) {
  5464. last_allocated_sequence_ = version_edit->last_sequence_;
  5465. last_published_sequence_ = version_edit->last_sequence_;
  5466. last_sequence_ = version_edit->last_sequence_;
  5467. }
  5468. if (version_edit->has_prev_log_number_) {
  5469. prev_log_number_ = version_edit->prev_log_number_;
  5470. MarkFileNumberUsed(version_edit->prev_log_number_);
  5471. }
  5472. if (version_edit->has_log_number_) {
  5473. MarkFileNumberUsed(version_edit->log_number_);
  5474. }
  5475. column_family_set_->UpdateMaxColumnFamily(version_edit->max_column_family_);
  5476. MarkMinLogNumberToKeep2PC(version_edit->min_log_number_to_keep_);
  5477. return s;
  5478. }
  5479. Status ReactiveVersionSet::MaybeSwitchManifest(
  5480. log::Reader::Reporter* reporter,
  5481. std::unique_ptr<log::FragmentBufferedReader>* manifest_reader) {
  5482. assert(manifest_reader != nullptr);
  5483. Status s;
  5484. do {
  5485. std::string manifest_path;
  5486. s = GetCurrentManifestPath(dbname_, fs_, &manifest_path,
  5487. &manifest_file_number_);
  5488. std::unique_ptr<FSSequentialFile> manifest_file;
  5489. if (s.ok()) {
  5490. if (nullptr == manifest_reader->get() ||
  5491. manifest_reader->get()->file()->file_name() != manifest_path) {
  5492. TEST_SYNC_POINT(
  5493. "ReactiveVersionSet::MaybeSwitchManifest:"
  5494. "AfterGetCurrentManifestPath:0");
  5495. TEST_SYNC_POINT(
  5496. "ReactiveVersionSet::MaybeSwitchManifest:"
  5497. "AfterGetCurrentManifestPath:1");
  5498. s = fs_->NewSequentialFile(manifest_path,
  5499. env_->OptimizeForManifestRead(file_options_),
  5500. &manifest_file, nullptr);
  5501. } else {
  5502. // No need to switch manifest.
  5503. break;
  5504. }
  5505. }
  5506. std::unique_ptr<SequentialFileReader> manifest_file_reader;
  5507. if (s.ok()) {
  5508. manifest_file_reader.reset(
  5509. new SequentialFileReader(std::move(manifest_file), manifest_path,
  5510. db_options_->log_readahead_size));
  5511. manifest_reader->reset(new log::FragmentBufferedReader(
  5512. nullptr, std::move(manifest_file_reader), reporter,
  5513. true /* checksum */, 0 /* log_number */));
  5514. ROCKS_LOG_INFO(db_options_->info_log, "Switched to new manifest: %s\n",
  5515. manifest_path.c_str());
  5516. // TODO (yanqin) every time we switch to a new MANIFEST, we clear the
  5517. // active_version_builders_ map because we choose to construct the
  5518. // versions from scratch, thanks to the first part of each MANIFEST
  5519. // written by VersionSet::WriteCurrentStatetoManifest. This is not
  5520. // necessary, but we choose this at present for the sake of simplicity.
  5521. active_version_builders_.clear();
  5522. }
  5523. } while (s.IsPathNotFound());
  5524. return s;
  5525. }
  5526. } // namespace ROCKSDB_NAMESPACE