db_impl.cc 159 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645264626472648264926502651265226532654265526562657265826592660266126622663266426652666266726682669267026712672267326742675267626772678267926802681268226832684268526862687268826892690269126922693269426952696269726982699270027012702270327042705270627072708270927102711271227132714271527162717271827192720272127222723272427252726272727282729273027312732273327342735273627372738273927402741274227432744274527462747274827492750275127522753275427552756275727582759276027612762276327642765276627672768276927702771277227732774277527762777277827792780278127822783278427852786278727882789279027912792279327942795279627972798279928002801280228032804280528062807280828092810281128122813281428152816281728182819282028212822282328242825282628272828282928302831283228332834283528362837283828392840284128422843284428452846284728482849285028512852285328542855285628572858285928602861286228632864286528662867286828692870287128722873287428752876287728782879288028812882288328842885288628872888288928902891289228932894289528962897289828992900290129022903290429052906290729082909291029112912291329142915291629172918291929202921292229232924292529262927292829292930293129322933293429352936293729382939294029412942294329442945294629472948294929502951295229532954295529562957295829592960296129622963296429652966296729682969297029712972297329742975297629772978297929802981298229832984298529862987298829892990299129922993299429952996299729982999300030013002300330043005300630073008300930103011301230133014301530163017301830193020302130223023302430253026302730283029303030313032303330343035303630373038303930403041304230433044304530463047304830493050305130523053305430553056305730583059306030613062306330643065306630673068306930703071307230733074307530763077307830793080308130823083308430853086308730883089309030913092309330943095309630973098309931003101310231033104310531063107310831093110311131123113311431153116311731183119312031213122312331243125312631273128312931303131313231333134313531363137313831393140314131423143314431453146314731483149315031513152315331543155315631573158315931603161316231633164316531663167316831693170317131723173317431753176317731783179318031813182318331843185318631873188318931903191319231933194319531963197319831993200320132023203320432053206320732083209321032113212321332143215321632173218321932203221322232233224322532263227322832293230323132323233323432353236323732383239324032413242324332443245324632473248324932503251325232533254325532563257325832593260326132623263326432653266326732683269327032713272327332743275327632773278327932803281328232833284328532863287328832893290329132923293329432953296329732983299330033013302330333043305330633073308330933103311331233133314331533163317331833193320332133223323332433253326332733283329333033313332333333343335333633373338333933403341334233433344334533463347334833493350335133523353335433553356335733583359336033613362336333643365336633673368336933703371337233733374337533763377337833793380338133823383338433853386338733883389339033913392339333943395339633973398339934003401340234033404340534063407340834093410341134123413341434153416341734183419342034213422342334243425342634273428342934303431343234333434343534363437343834393440344134423443344434453446344734483449345034513452345334543455345634573458345934603461346234633464346534663467346834693470347134723473347434753476347734783479348034813482348334843485348634873488348934903491349234933494349534963497349834993500350135023503350435053506350735083509351035113512351335143515351635173518351935203521352235233524352535263527352835293530353135323533353435353536353735383539354035413542354335443545354635473548354935503551355235533554355535563557355835593560356135623563356435653566356735683569357035713572357335743575357635773578357935803581358235833584358535863587358835893590359135923593359435953596359735983599360036013602360336043605360636073608360936103611361236133614361536163617361836193620362136223623362436253626362736283629363036313632363336343635363636373638363936403641364236433644364536463647364836493650365136523653365436553656365736583659366036613662366336643665366636673668366936703671367236733674367536763677367836793680368136823683368436853686368736883689369036913692369336943695369636973698369937003701370237033704370537063707370837093710371137123713371437153716371737183719372037213722372337243725372637273728372937303731373237333734373537363737373837393740374137423743374437453746374737483749375037513752375337543755375637573758375937603761376237633764376537663767376837693770377137723773377437753776377737783779378037813782378337843785378637873788378937903791379237933794379537963797379837993800380138023803380438053806380738083809381038113812381338143815381638173818381938203821382238233824382538263827382838293830383138323833383438353836383738383839384038413842384338443845384638473848384938503851385238533854385538563857385838593860386138623863386438653866386738683869387038713872387338743875387638773878387938803881388238833884388538863887388838893890389138923893389438953896389738983899390039013902390339043905390639073908390939103911391239133914391539163917391839193920392139223923392439253926392739283929393039313932393339343935393639373938393939403941394239433944394539463947394839493950395139523953395439553956395739583959396039613962396339643965396639673968396939703971397239733974397539763977397839793980398139823983398439853986398739883989399039913992399339943995399639973998399940004001400240034004400540064007400840094010401140124013401440154016401740184019402040214022402340244025402640274028402940304031403240334034403540364037403840394040404140424043404440454046404740484049405040514052405340544055405640574058405940604061406240634064406540664067406840694070407140724073407440754076407740784079408040814082408340844085408640874088408940904091409240934094409540964097409840994100410141024103410441054106410741084109411041114112411341144115411641174118411941204121412241234124412541264127412841294130413141324133413441354136413741384139414041414142414341444145414641474148414941504151415241534154415541564157415841594160416141624163416441654166416741684169417041714172417341744175417641774178417941804181418241834184418541864187418841894190419141924193419441954196419741984199420042014202420342044205420642074208420942104211421242134214421542164217421842194220422142224223422442254226422742284229423042314232423342344235423642374238423942404241424242434244424542464247424842494250425142524253425442554256425742584259426042614262426342644265426642674268426942704271427242734274427542764277427842794280428142824283428442854286428742884289429042914292429342944295429642974298429943004301430243034304430543064307430843094310431143124313431443154316431743184319432043214322432343244325432643274328432943304331433243334334433543364337433843394340434143424343434443454346434743484349435043514352435343544355435643574358435943604361436243634364436543664367436843694370437143724373437443754376437743784379438043814382438343844385438643874388438943904391439243934394439543964397439843994400440144024403440444054406440744084409441044114412441344144415441644174418441944204421442244234424442544264427442844294430443144324433443444354436443744384439444044414442444344444445444644474448444944504451445244534454445544564457445844594460446144624463446444654466446744684469447044714472447344744475447644774478447944804481448244834484448544864487448844894490449144924493449444954496449744984499450045014502450345044505450645074508450945104511451245134514451545164517451845194520452145224523452445254526452745284529453045314532453345344535453645374538453945404541454245434544454545464547454845494550
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #include "db/db_impl/db_impl.h"
  10. #include <stdint.h>
  11. #ifdef OS_SOLARIS
  12. #include <alloca.h>
  13. #endif
  14. #include <algorithm>
  15. #include <cinttypes>
  16. #include <cstdio>
  17. #include <map>
  18. #include <set>
  19. #include <stdexcept>
  20. #include <string>
  21. #include <unordered_map>
  22. #include <unordered_set>
  23. #include <utility>
  24. #include <vector>
  25. #include "db/arena_wrapped_db_iter.h"
  26. #include "db/builder.h"
  27. #include "db/compaction/compaction_job.h"
  28. #include "db/db_info_dumper.h"
  29. #include "db/db_iter.h"
  30. #include "db/dbformat.h"
  31. #include "db/error_handler.h"
  32. #include "db/event_helpers.h"
  33. #include "db/external_sst_file_ingestion_job.h"
  34. #include "db/flush_job.h"
  35. #include "db/forward_iterator.h"
  36. #include "db/import_column_family_job.h"
  37. #include "db/job_context.h"
  38. #include "db/log_reader.h"
  39. #include "db/log_writer.h"
  40. #include "db/malloc_stats.h"
  41. #include "db/memtable.h"
  42. #include "db/memtable_list.h"
  43. #include "db/merge_context.h"
  44. #include "db/merge_helper.h"
  45. #include "db/range_tombstone_fragmenter.h"
  46. #include "db/table_cache.h"
  47. #include "db/table_properties_collector.h"
  48. #include "db/transaction_log_impl.h"
  49. #include "db/version_set.h"
  50. #include "db/write_batch_internal.h"
  51. #include "db/write_callback.h"
  52. #include "env/composite_env_wrapper.h"
  53. #include "file/file_util.h"
  54. #include "file/filename.h"
  55. #include "file/random_access_file_reader.h"
  56. #include "file/sst_file_manager_impl.h"
  57. #include "logging/auto_roll_logger.h"
  58. #include "logging/log_buffer.h"
  59. #include "logging/logging.h"
  60. #include "memtable/hash_linklist_rep.h"
  61. #include "memtable/hash_skiplist_rep.h"
  62. #include "monitoring/in_memory_stats_history.h"
  63. #include "monitoring/iostats_context_imp.h"
  64. #include "monitoring/perf_context_imp.h"
  65. #include "monitoring/persistent_stats_history.h"
  66. #include "monitoring/thread_status_updater.h"
  67. #include "monitoring/thread_status_util.h"
  68. #include "options/cf_options.h"
  69. #include "options/options_helper.h"
  70. #include "options/options_parser.h"
  71. #include "port/port.h"
  72. #include "rocksdb/cache.h"
  73. #include "rocksdb/compaction_filter.h"
  74. #include "rocksdb/convenience.h"
  75. #include "rocksdb/db.h"
  76. #include "rocksdb/env.h"
  77. #include "rocksdb/merge_operator.h"
  78. #include "rocksdb/statistics.h"
  79. #include "rocksdb/stats_history.h"
  80. #include "rocksdb/status.h"
  81. #include "rocksdb/table.h"
  82. #include "rocksdb/write_buffer_manager.h"
  83. #include "table/block_based/block.h"
  84. #include "table/block_based/block_based_table_factory.h"
  85. #include "table/get_context.h"
  86. #include "table/merging_iterator.h"
  87. #include "table/multiget_context.h"
  88. #include "table/table_builder.h"
  89. #include "table/two_level_iterator.h"
  90. #include "test_util/sync_point.h"
  91. #include "tools/sst_dump_tool_imp.h"
  92. #include "util/autovector.h"
  93. #include "util/build_version.h"
  94. #include "util/cast_util.h"
  95. #include "util/coding.h"
  96. #include "util/compression.h"
  97. #include "util/crc32c.h"
  98. #include "util/mutexlock.h"
  99. #include "util/stop_watch.h"
  100. #include "util/string_util.h"
  101. namespace ROCKSDB_NAMESPACE {
  102. const std::string kDefaultColumnFamilyName("default");
  103. const std::string kPersistentStatsColumnFamilyName(
  104. "___rocksdb_stats_history___");
  105. void DumpRocksDBBuildVersion(Logger* log);
  106. CompressionType GetCompressionFlush(
  107. const ImmutableCFOptions& ioptions,
  108. const MutableCFOptions& mutable_cf_options) {
  109. // Compressing memtable flushes might not help unless the sequential load
  110. // optimization is used for leveled compaction. Otherwise the CPU and
  111. // latency overhead is not offset by saving much space.
  112. if (ioptions.compaction_style == kCompactionStyleUniversal) {
  113. if (mutable_cf_options.compaction_options_universal
  114. .compression_size_percent < 0) {
  115. return mutable_cf_options.compression;
  116. } else {
  117. return kNoCompression;
  118. }
  119. } else if (!ioptions.compression_per_level.empty()) {
  120. // For leveled compress when min_level_to_compress != 0.
  121. return ioptions.compression_per_level[0];
  122. } else {
  123. return mutable_cf_options.compression;
  124. }
  125. }
  126. namespace {
  127. void DumpSupportInfo(Logger* logger) {
  128. ROCKS_LOG_HEADER(logger, "Compression algorithms supported:");
  129. for (auto& compression : OptionsHelper::compression_type_string_map) {
  130. if (compression.second != kNoCompression &&
  131. compression.second != kDisableCompressionOption) {
  132. ROCKS_LOG_HEADER(logger, "\t%s supported: %d", compression.first.c_str(),
  133. CompressionTypeSupported(compression.second));
  134. }
  135. }
  136. ROCKS_LOG_HEADER(logger, "Fast CRC32 supported: %s",
  137. crc32c::IsFastCrc32Supported().c_str());
  138. }
  139. } // namespace
  140. DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
  141. const bool seq_per_batch, const bool batch_per_txn)
  142. : dbname_(dbname),
  143. own_info_log_(options.info_log == nullptr),
  144. initial_db_options_(SanitizeOptions(dbname, options)),
  145. env_(initial_db_options_.env),
  146. fs_(initial_db_options_.file_system),
  147. immutable_db_options_(initial_db_options_),
  148. mutable_db_options_(initial_db_options_),
  149. stats_(immutable_db_options_.statistics.get()),
  150. mutex_(stats_, env_, DB_MUTEX_WAIT_MICROS,
  151. immutable_db_options_.use_adaptive_mutex),
  152. default_cf_handle_(nullptr),
  153. max_total_in_memory_state_(0),
  154. file_options_(BuildDBOptions(immutable_db_options_, mutable_db_options_)),
  155. file_options_for_compaction_(fs_->OptimizeForCompactionTableWrite(
  156. file_options_, immutable_db_options_)),
  157. seq_per_batch_(seq_per_batch),
  158. batch_per_txn_(batch_per_txn),
  159. db_lock_(nullptr),
  160. shutting_down_(false),
  161. manual_compaction_paused_(false),
  162. bg_cv_(&mutex_),
  163. logfile_number_(0),
  164. log_dir_synced_(false),
  165. log_empty_(true),
  166. persist_stats_cf_handle_(nullptr),
  167. log_sync_cv_(&mutex_),
  168. total_log_size_(0),
  169. is_snapshot_supported_(true),
  170. write_buffer_manager_(immutable_db_options_.write_buffer_manager.get()),
  171. write_thread_(immutable_db_options_),
  172. nonmem_write_thread_(immutable_db_options_),
  173. write_controller_(mutable_db_options_.delayed_write_rate),
  174. last_batch_group_size_(0),
  175. unscheduled_flushes_(0),
  176. unscheduled_compactions_(0),
  177. bg_bottom_compaction_scheduled_(0),
  178. bg_compaction_scheduled_(0),
  179. num_running_compactions_(0),
  180. bg_flush_scheduled_(0),
  181. num_running_flushes_(0),
  182. bg_purge_scheduled_(0),
  183. disable_delete_obsolete_files_(0),
  184. pending_purge_obsolete_files_(0),
  185. delete_obsolete_files_last_run_(env_->NowMicros()),
  186. last_stats_dump_time_microsec_(0),
  187. next_job_id_(1),
  188. has_unpersisted_data_(false),
  189. unable_to_release_oldest_log_(false),
  190. num_running_ingest_file_(0),
  191. #ifndef ROCKSDB_LITE
  192. wal_manager_(immutable_db_options_, file_options_, seq_per_batch),
  193. #endif // ROCKSDB_LITE
  194. event_logger_(immutable_db_options_.info_log.get()),
  195. bg_work_paused_(0),
  196. bg_compaction_paused_(0),
  197. refitting_level_(false),
  198. opened_successfully_(false),
  199. two_write_queues_(options.two_write_queues),
  200. manual_wal_flush_(options.manual_wal_flush),
  201. // last_sequencee_ is always maintained by the main queue that also writes
  202. // to the memtable. When two_write_queues_ is disabled last seq in
  203. // memtable is the same as last seq published to the readers. When it is
  204. // enabled but seq_per_batch_ is disabled, last seq in memtable still
  205. // indicates last published seq since wal-only writes that go to the 2nd
  206. // queue do not consume a sequence number. Otherwise writes performed by
  207. // the 2nd queue could change what is visible to the readers. In this
  208. // cases, last_seq_same_as_publish_seq_==false, the 2nd queue maintains a
  209. // separate variable to indicate the last published sequence.
  210. last_seq_same_as_publish_seq_(
  211. !(seq_per_batch && options.two_write_queues)),
  212. // Since seq_per_batch_ is currently set only by WritePreparedTxn which
  213. // requires a custom gc for compaction, we use that to set use_custom_gc_
  214. // as well.
  215. use_custom_gc_(seq_per_batch),
  216. shutdown_initiated_(false),
  217. own_sfm_(options.sst_file_manager == nullptr),
  218. preserve_deletes_(options.preserve_deletes),
  219. closed_(false),
  220. error_handler_(this, immutable_db_options_, &mutex_),
  221. atomic_flush_install_cv_(&mutex_) {
  222. // !batch_per_trx_ implies seq_per_batch_ because it is only unset for
  223. // WriteUnprepared, which should use seq_per_batch_.
  224. assert(batch_per_txn_ || seq_per_batch_);
  225. env_->GetAbsolutePath(dbname, &db_absolute_path_);
  226. // Reserve ten files or so for other uses and give the rest to TableCache.
  227. // Give a large number for setting of "infinite" open files.
  228. const int table_cache_size = (mutable_db_options_.max_open_files == -1)
  229. ? TableCache::kInfiniteCapacity
  230. : mutable_db_options_.max_open_files - 10;
  231. LRUCacheOptions co;
  232. co.capacity = table_cache_size;
  233. co.num_shard_bits = immutable_db_options_.table_cache_numshardbits;
  234. co.metadata_charge_policy = kDontChargeCacheMetadata;
  235. table_cache_ = NewLRUCache(co);
  236. versions_.reset(new VersionSet(dbname_, &immutable_db_options_, file_options_,
  237. table_cache_.get(), write_buffer_manager_,
  238. &write_controller_, &block_cache_tracer_));
  239. column_family_memtables_.reset(
  240. new ColumnFamilyMemTablesImpl(versions_->GetColumnFamilySet()));
  241. DumpRocksDBBuildVersion(immutable_db_options_.info_log.get());
  242. DumpDBFileSummary(immutable_db_options_, dbname_);
  243. immutable_db_options_.Dump(immutable_db_options_.info_log.get());
  244. mutable_db_options_.Dump(immutable_db_options_.info_log.get());
  245. DumpSupportInfo(immutable_db_options_.info_log.get());
  246. // always open the DB with 0 here, which means if preserve_deletes_==true
  247. // we won't drop any deletion markers until SetPreserveDeletesSequenceNumber()
  248. // is called by client and this seqnum is advanced.
  249. preserve_deletes_seqnum_.store(0);
  250. }
  251. Status DBImpl::Resume() {
  252. ROCKS_LOG_INFO(immutable_db_options_.info_log, "Resuming DB");
  253. InstrumentedMutexLock db_mutex(&mutex_);
  254. if (!error_handler_.IsDBStopped() && !error_handler_.IsBGWorkStopped()) {
  255. // Nothing to do
  256. return Status::OK();
  257. }
  258. if (error_handler_.IsRecoveryInProgress()) {
  259. // Don't allow a mix of manual and automatic recovery
  260. return Status::Busy();
  261. }
  262. mutex_.Unlock();
  263. Status s = error_handler_.RecoverFromBGError(true);
  264. mutex_.Lock();
  265. return s;
  266. }
  267. // This function implements the guts of recovery from a background error. It
  268. // is eventually called for both manual as well as automatic recovery. It does
  269. // the following -
  270. // 1. Wait for currently scheduled background flush/compaction to exit, in
  271. // order to inadvertently causing an error and thinking recovery failed
  272. // 2. Flush memtables if there's any data for all the CFs. This may result
  273. // another error, which will be saved by error_handler_ and reported later
  274. // as the recovery status
  275. // 3. Find and delete any obsolete files
  276. // 4. Schedule compactions if needed for all the CFs. This is needed as the
  277. // flush in the prior step might have been a no-op for some CFs, which
  278. // means a new super version wouldn't have been installed
  279. Status DBImpl::ResumeImpl() {
  280. mutex_.AssertHeld();
  281. WaitForBackgroundWork();
  282. Status bg_error = error_handler_.GetBGError();
  283. Status s;
  284. if (shutdown_initiated_) {
  285. // Returning shutdown status to SFM during auto recovery will cause it
  286. // to abort the recovery and allow the shutdown to progress
  287. s = Status::ShutdownInProgress();
  288. }
  289. if (s.ok() && bg_error.severity() > Status::Severity::kHardError) {
  290. ROCKS_LOG_INFO(
  291. immutable_db_options_.info_log,
  292. "DB resume requested but failed due to Fatal/Unrecoverable error");
  293. s = bg_error;
  294. }
  295. // We cannot guarantee consistency of the WAL. So force flush Memtables of
  296. // all the column families
  297. if (s.ok()) {
  298. FlushOptions flush_opts;
  299. // We allow flush to stall write since we are trying to resume from error.
  300. flush_opts.allow_write_stall = true;
  301. if (immutable_db_options_.atomic_flush) {
  302. autovector<ColumnFamilyData*> cfds;
  303. SelectColumnFamiliesForAtomicFlush(&cfds);
  304. mutex_.Unlock();
  305. s = AtomicFlushMemTables(cfds, flush_opts, FlushReason::kErrorRecovery);
  306. mutex_.Lock();
  307. } else {
  308. for (auto cfd : *versions_->GetColumnFamilySet()) {
  309. if (cfd->IsDropped()) {
  310. continue;
  311. }
  312. cfd->Ref();
  313. mutex_.Unlock();
  314. s = FlushMemTable(cfd, flush_opts, FlushReason::kErrorRecovery);
  315. mutex_.Lock();
  316. cfd->UnrefAndTryDelete();
  317. if (!s.ok()) {
  318. break;
  319. }
  320. }
  321. }
  322. if (!s.ok()) {
  323. ROCKS_LOG_INFO(immutable_db_options_.info_log,
  324. "DB resume requested but failed due to Flush failure [%s]",
  325. s.ToString().c_str());
  326. }
  327. }
  328. JobContext job_context(0);
  329. FindObsoleteFiles(&job_context, true);
  330. if (s.ok()) {
  331. s = error_handler_.ClearBGError();
  332. }
  333. mutex_.Unlock();
  334. job_context.manifest_file_number = 1;
  335. if (job_context.HaveSomethingToDelete()) {
  336. PurgeObsoleteFiles(job_context);
  337. }
  338. job_context.Clean();
  339. if (s.ok()) {
  340. ROCKS_LOG_INFO(immutable_db_options_.info_log, "Successfully resumed DB");
  341. }
  342. mutex_.Lock();
  343. // Check for shutdown again before scheduling further compactions,
  344. // since we released and re-acquired the lock above
  345. if (shutdown_initiated_) {
  346. s = Status::ShutdownInProgress();
  347. }
  348. if (s.ok()) {
  349. for (auto cfd : *versions_->GetColumnFamilySet()) {
  350. SchedulePendingCompaction(cfd);
  351. }
  352. MaybeScheduleFlushOrCompaction();
  353. }
  354. // Wake up any waiters - in this case, it could be the shutdown thread
  355. bg_cv_.SignalAll();
  356. // No need to check BGError again. If something happened, event listener would
  357. // be notified and the operation causing it would have failed
  358. return s;
  359. }
  360. void DBImpl::WaitForBackgroundWork() {
  361. // Wait for background work to finish
  362. while (bg_bottom_compaction_scheduled_ || bg_compaction_scheduled_ ||
  363. bg_flush_scheduled_) {
  364. bg_cv_.Wait();
  365. }
  366. }
  367. // Will lock the mutex_, will wait for completion if wait is true
  368. void DBImpl::CancelAllBackgroundWork(bool wait) {
  369. ROCKS_LOG_INFO(immutable_db_options_.info_log,
  370. "Shutdown: canceling all background work");
  371. if (thread_dump_stats_ != nullptr) {
  372. thread_dump_stats_->cancel();
  373. thread_dump_stats_.reset();
  374. }
  375. if (thread_persist_stats_ != nullptr) {
  376. thread_persist_stats_->cancel();
  377. thread_persist_stats_.reset();
  378. }
  379. InstrumentedMutexLock l(&mutex_);
  380. if (!shutting_down_.load(std::memory_order_acquire) &&
  381. has_unpersisted_data_.load(std::memory_order_relaxed) &&
  382. !mutable_db_options_.avoid_flush_during_shutdown) {
  383. if (immutable_db_options_.atomic_flush) {
  384. autovector<ColumnFamilyData*> cfds;
  385. SelectColumnFamiliesForAtomicFlush(&cfds);
  386. mutex_.Unlock();
  387. AtomicFlushMemTables(cfds, FlushOptions(), FlushReason::kShutDown);
  388. mutex_.Lock();
  389. } else {
  390. for (auto cfd : *versions_->GetColumnFamilySet()) {
  391. if (!cfd->IsDropped() && cfd->initialized() && !cfd->mem()->IsEmpty()) {
  392. cfd->Ref();
  393. mutex_.Unlock();
  394. FlushMemTable(cfd, FlushOptions(), FlushReason::kShutDown);
  395. mutex_.Lock();
  396. cfd->UnrefAndTryDelete();
  397. }
  398. }
  399. }
  400. versions_->GetColumnFamilySet()->FreeDeadColumnFamilies();
  401. }
  402. shutting_down_.store(true, std::memory_order_release);
  403. bg_cv_.SignalAll();
  404. if (!wait) {
  405. return;
  406. }
  407. WaitForBackgroundWork();
  408. }
  409. Status DBImpl::CloseHelper() {
  410. // Guarantee that there is no background error recovery in progress before
  411. // continuing with the shutdown
  412. mutex_.Lock();
  413. shutdown_initiated_ = true;
  414. error_handler_.CancelErrorRecovery();
  415. while (error_handler_.IsRecoveryInProgress()) {
  416. bg_cv_.Wait();
  417. }
  418. mutex_.Unlock();
  419. // CancelAllBackgroundWork called with false means we just set the shutdown
  420. // marker. After this we do a variant of the waiting and unschedule work
  421. // (to consider: moving all the waiting into CancelAllBackgroundWork(true))
  422. CancelAllBackgroundWork(false);
  423. int bottom_compactions_unscheduled =
  424. env_->UnSchedule(this, Env::Priority::BOTTOM);
  425. int compactions_unscheduled = env_->UnSchedule(this, Env::Priority::LOW);
  426. int flushes_unscheduled = env_->UnSchedule(this, Env::Priority::HIGH);
  427. Status ret;
  428. mutex_.Lock();
  429. bg_bottom_compaction_scheduled_ -= bottom_compactions_unscheduled;
  430. bg_compaction_scheduled_ -= compactions_unscheduled;
  431. bg_flush_scheduled_ -= flushes_unscheduled;
  432. // Wait for background work to finish
  433. while (bg_bottom_compaction_scheduled_ || bg_compaction_scheduled_ ||
  434. bg_flush_scheduled_ || bg_purge_scheduled_ ||
  435. pending_purge_obsolete_files_ ||
  436. error_handler_.IsRecoveryInProgress()) {
  437. TEST_SYNC_POINT("DBImpl::~DBImpl:WaitJob");
  438. bg_cv_.Wait();
  439. }
  440. TEST_SYNC_POINT_CALLBACK("DBImpl::CloseHelper:PendingPurgeFinished",
  441. &files_grabbed_for_purge_);
  442. EraseThreadStatusDbInfo();
  443. flush_scheduler_.Clear();
  444. trim_history_scheduler_.Clear();
  445. while (!flush_queue_.empty()) {
  446. const FlushRequest& flush_req = PopFirstFromFlushQueue();
  447. for (const auto& iter : flush_req) {
  448. iter.first->UnrefAndTryDelete();
  449. }
  450. }
  451. while (!compaction_queue_.empty()) {
  452. auto cfd = PopFirstFromCompactionQueue();
  453. cfd->UnrefAndTryDelete();
  454. }
  455. if (default_cf_handle_ != nullptr || persist_stats_cf_handle_ != nullptr) {
  456. // we need to delete handle outside of lock because it does its own locking
  457. mutex_.Unlock();
  458. if (default_cf_handle_) {
  459. delete default_cf_handle_;
  460. default_cf_handle_ = nullptr;
  461. }
  462. if (persist_stats_cf_handle_) {
  463. delete persist_stats_cf_handle_;
  464. persist_stats_cf_handle_ = nullptr;
  465. }
  466. mutex_.Lock();
  467. }
  468. // Clean up obsolete files due to SuperVersion release.
  469. // (1) Need to delete to obsolete files before closing because RepairDB()
  470. // scans all existing files in the file system and builds manifest file.
  471. // Keeping obsolete files confuses the repair process.
  472. // (2) Need to check if we Open()/Recover() the DB successfully before
  473. // deleting because if VersionSet recover fails (may be due to corrupted
  474. // manifest file), it is not able to identify live files correctly. As a
  475. // result, all "live" files can get deleted by accident. However, corrupted
  476. // manifest is recoverable by RepairDB().
  477. if (opened_successfully_) {
  478. JobContext job_context(next_job_id_.fetch_add(1));
  479. FindObsoleteFiles(&job_context, true);
  480. mutex_.Unlock();
  481. // manifest number starting from 2
  482. job_context.manifest_file_number = 1;
  483. if (job_context.HaveSomethingToDelete()) {
  484. PurgeObsoleteFiles(job_context);
  485. }
  486. job_context.Clean();
  487. mutex_.Lock();
  488. }
  489. for (auto l : logs_to_free_) {
  490. delete l;
  491. }
  492. for (auto& log : logs_) {
  493. uint64_t log_number = log.writer->get_log_number();
  494. Status s = log.ClearWriter();
  495. if (!s.ok()) {
  496. ROCKS_LOG_WARN(
  497. immutable_db_options_.info_log,
  498. "Unable to Sync WAL file %s with error -- %s",
  499. LogFileName(immutable_db_options_.wal_dir, log_number).c_str(),
  500. s.ToString().c_str());
  501. // Retain the first error
  502. if (ret.ok()) {
  503. ret = s;
  504. }
  505. }
  506. }
  507. logs_.clear();
  508. // Table cache may have table handles holding blocks from the block cache.
  509. // We need to release them before the block cache is destroyed. The block
  510. // cache may be destroyed inside versions_.reset(), when column family data
  511. // list is destroyed, so leaving handles in table cache after
  512. // versions_.reset() may cause issues.
  513. // Here we clean all unreferenced handles in table cache.
  514. // Now we assume all user queries have finished, so only version set itself
  515. // can possibly hold the blocks from block cache. After releasing unreferenced
  516. // handles here, only handles held by version set left and inside
  517. // versions_.reset(), we will release them. There, we need to make sure every
  518. // time a handle is released, we erase it from the cache too. By doing that,
  519. // we can guarantee that after versions_.reset(), table cache is empty
  520. // so the cache can be safely destroyed.
  521. table_cache_->EraseUnRefEntries();
  522. for (auto& txn_entry : recovered_transactions_) {
  523. delete txn_entry.second;
  524. }
  525. // versions need to be destroyed before table_cache since it can hold
  526. // references to table_cache.
  527. versions_.reset();
  528. mutex_.Unlock();
  529. if (db_lock_ != nullptr) {
  530. env_->UnlockFile(db_lock_);
  531. }
  532. ROCKS_LOG_INFO(immutable_db_options_.info_log, "Shutdown complete");
  533. LogFlush(immutable_db_options_.info_log);
  534. #ifndef ROCKSDB_LITE
  535. // If the sst_file_manager was allocated by us during DB::Open(), ccall
  536. // Close() on it before closing the info_log. Otherwise, background thread
  537. // in SstFileManagerImpl might try to log something
  538. if (immutable_db_options_.sst_file_manager && own_sfm_) {
  539. auto sfm = static_cast<SstFileManagerImpl*>(
  540. immutable_db_options_.sst_file_manager.get());
  541. sfm->Close();
  542. }
  543. #endif // ROCKSDB_LITE
  544. if (immutable_db_options_.info_log && own_info_log_) {
  545. Status s = immutable_db_options_.info_log->Close();
  546. if (ret.ok()) {
  547. ret = s;
  548. }
  549. }
  550. if (ret.IsAborted()) {
  551. // Reserve IsAborted() error for those where users didn't release
  552. // certain resource and they can release them and come back and
  553. // retry. In this case, we wrap this exception to something else.
  554. return Status::Incomplete(ret.ToString());
  555. }
  556. return ret;
  557. }
  558. Status DBImpl::CloseImpl() { return CloseHelper(); }
  559. DBImpl::~DBImpl() {
  560. if (!closed_) {
  561. closed_ = true;
  562. CloseHelper();
  563. }
  564. }
  565. void DBImpl::MaybeIgnoreError(Status* s) const {
  566. if (s->ok() || immutable_db_options_.paranoid_checks) {
  567. // No change needed
  568. } else {
  569. ROCKS_LOG_WARN(immutable_db_options_.info_log, "Ignoring error %s",
  570. s->ToString().c_str());
  571. *s = Status::OK();
  572. }
  573. }
  574. const Status DBImpl::CreateArchivalDirectory() {
  575. if (immutable_db_options_.wal_ttl_seconds > 0 ||
  576. immutable_db_options_.wal_size_limit_mb > 0) {
  577. std::string archivalPath = ArchivalDirectory(immutable_db_options_.wal_dir);
  578. return env_->CreateDirIfMissing(archivalPath);
  579. }
  580. return Status::OK();
  581. }
  582. void DBImpl::PrintStatistics() {
  583. auto dbstats = immutable_db_options_.statistics.get();
  584. if (dbstats) {
  585. ROCKS_LOG_INFO(immutable_db_options_.info_log, "STATISTICS:\n %s",
  586. dbstats->ToString().c_str());
  587. }
  588. }
  589. void DBImpl::StartTimedTasks() {
  590. unsigned int stats_dump_period_sec = 0;
  591. unsigned int stats_persist_period_sec = 0;
  592. {
  593. InstrumentedMutexLock l(&mutex_);
  594. stats_dump_period_sec = mutable_db_options_.stats_dump_period_sec;
  595. if (stats_dump_period_sec > 0) {
  596. if (!thread_dump_stats_) {
  597. thread_dump_stats_.reset(new ROCKSDB_NAMESPACE::RepeatableThread(
  598. [this]() { DBImpl::DumpStats(); }, "dump_st", env_,
  599. static_cast<uint64_t>(stats_dump_period_sec) * kMicrosInSecond));
  600. }
  601. }
  602. stats_persist_period_sec = mutable_db_options_.stats_persist_period_sec;
  603. if (stats_persist_period_sec > 0) {
  604. if (!thread_persist_stats_) {
  605. thread_persist_stats_.reset(new ROCKSDB_NAMESPACE::RepeatableThread(
  606. [this]() { DBImpl::PersistStats(); }, "pst_st", env_,
  607. static_cast<uint64_t>(stats_persist_period_sec) * kMicrosInSecond));
  608. }
  609. }
  610. }
  611. }
  612. // esitmate the total size of stats_history_
  613. size_t DBImpl::EstimateInMemoryStatsHistorySize() const {
  614. size_t size_total =
  615. sizeof(std::map<uint64_t, std::map<std::string, uint64_t>>);
  616. if (stats_history_.size() == 0) return size_total;
  617. size_t size_per_slice =
  618. sizeof(uint64_t) + sizeof(std::map<std::string, uint64_t>);
  619. // non-empty map, stats_history_.begin() guaranteed to exist
  620. std::map<std::string, uint64_t> sample_slice(stats_history_.begin()->second);
  621. for (const auto& pairs : sample_slice) {
  622. size_per_slice +=
  623. pairs.first.capacity() + sizeof(pairs.first) + sizeof(pairs.second);
  624. }
  625. size_total = size_per_slice * stats_history_.size();
  626. return size_total;
  627. }
  628. void DBImpl::PersistStats() {
  629. TEST_SYNC_POINT("DBImpl::PersistStats:Entry");
  630. #ifndef ROCKSDB_LITE
  631. if (shutdown_initiated_) {
  632. return;
  633. }
  634. uint64_t now_seconds = env_->NowMicros() / kMicrosInSecond;
  635. Statistics* statistics = immutable_db_options_.statistics.get();
  636. if (!statistics) {
  637. return;
  638. }
  639. size_t stats_history_size_limit = 0;
  640. {
  641. InstrumentedMutexLock l(&mutex_);
  642. stats_history_size_limit = mutable_db_options_.stats_history_buffer_size;
  643. }
  644. std::map<std::string, uint64_t> stats_map;
  645. if (!statistics->getTickerMap(&stats_map)) {
  646. return;
  647. }
  648. ROCKS_LOG_INFO(immutable_db_options_.info_log,
  649. "------- PERSISTING STATS -------");
  650. if (immutable_db_options_.persist_stats_to_disk) {
  651. WriteBatch batch;
  652. if (stats_slice_initialized_) {
  653. ROCKS_LOG_INFO(immutable_db_options_.info_log,
  654. "Reading %" ROCKSDB_PRIszt " stats from statistics\n",
  655. stats_slice_.size());
  656. for (const auto& stat : stats_map) {
  657. char key[100];
  658. int length =
  659. EncodePersistentStatsKey(now_seconds, stat.first, 100, key);
  660. // calculate the delta from last time
  661. if (stats_slice_.find(stat.first) != stats_slice_.end()) {
  662. uint64_t delta = stat.second - stats_slice_[stat.first];
  663. batch.Put(persist_stats_cf_handle_, Slice(key, std::min(100, length)),
  664. ToString(delta));
  665. }
  666. }
  667. }
  668. stats_slice_initialized_ = true;
  669. std::swap(stats_slice_, stats_map);
  670. WriteOptions wo;
  671. wo.low_pri = true;
  672. wo.no_slowdown = true;
  673. wo.sync = false;
  674. Status s = Write(wo, &batch);
  675. if (!s.ok()) {
  676. ROCKS_LOG_INFO(immutable_db_options_.info_log,
  677. "Writing to persistent stats CF failed -- %s",
  678. s.ToString().c_str());
  679. } else {
  680. ROCKS_LOG_INFO(immutable_db_options_.info_log,
  681. "Writing %" ROCKSDB_PRIszt " stats with timestamp %" PRIu64
  682. " to persistent stats CF succeeded",
  683. stats_slice_.size(), now_seconds);
  684. }
  685. // TODO(Zhongyi): add purging for persisted data
  686. } else {
  687. InstrumentedMutexLock l(&stats_history_mutex_);
  688. // calculate the delta from last time
  689. if (stats_slice_initialized_) {
  690. std::map<std::string, uint64_t> stats_delta;
  691. for (const auto& stat : stats_map) {
  692. if (stats_slice_.find(stat.first) != stats_slice_.end()) {
  693. stats_delta[stat.first] = stat.second - stats_slice_[stat.first];
  694. }
  695. }
  696. ROCKS_LOG_INFO(immutable_db_options_.info_log,
  697. "Storing %" ROCKSDB_PRIszt " stats with timestamp %" PRIu64
  698. " to in-memory stats history",
  699. stats_slice_.size(), now_seconds);
  700. stats_history_[now_seconds] = stats_delta;
  701. }
  702. stats_slice_initialized_ = true;
  703. std::swap(stats_slice_, stats_map);
  704. TEST_SYNC_POINT("DBImpl::PersistStats:StatsCopied");
  705. // delete older stats snapshots to control memory consumption
  706. size_t stats_history_size = EstimateInMemoryStatsHistorySize();
  707. bool purge_needed = stats_history_size > stats_history_size_limit;
  708. ROCKS_LOG_INFO(immutable_db_options_.info_log,
  709. "[Pre-GC] In-memory stats history size: %" ROCKSDB_PRIszt
  710. " bytes, slice count: %" ROCKSDB_PRIszt,
  711. stats_history_size, stats_history_.size());
  712. while (purge_needed && !stats_history_.empty()) {
  713. stats_history_.erase(stats_history_.begin());
  714. purge_needed =
  715. EstimateInMemoryStatsHistorySize() > stats_history_size_limit;
  716. }
  717. ROCKS_LOG_INFO(immutable_db_options_.info_log,
  718. "[Post-GC] In-memory stats history size: %" ROCKSDB_PRIszt
  719. " bytes, slice count: %" ROCKSDB_PRIszt,
  720. stats_history_size, stats_history_.size());
  721. }
  722. #endif // !ROCKSDB_LITE
  723. }
  724. bool DBImpl::FindStatsByTime(uint64_t start_time, uint64_t end_time,
  725. uint64_t* new_time,
  726. std::map<std::string, uint64_t>* stats_map) {
  727. assert(new_time);
  728. assert(stats_map);
  729. if (!new_time || !stats_map) return false;
  730. // lock when search for start_time
  731. {
  732. InstrumentedMutexLock l(&stats_history_mutex_);
  733. auto it = stats_history_.lower_bound(start_time);
  734. if (it != stats_history_.end() && it->first < end_time) {
  735. // make a copy for timestamp and stats_map
  736. *new_time = it->first;
  737. *stats_map = it->second;
  738. return true;
  739. } else {
  740. return false;
  741. }
  742. }
  743. }
  744. Status DBImpl::GetStatsHistory(
  745. uint64_t start_time, uint64_t end_time,
  746. std::unique_ptr<StatsHistoryIterator>* stats_iterator) {
  747. if (!stats_iterator) {
  748. return Status::InvalidArgument("stats_iterator not preallocated.");
  749. }
  750. if (immutable_db_options_.persist_stats_to_disk) {
  751. stats_iterator->reset(
  752. new PersistentStatsHistoryIterator(start_time, end_time, this));
  753. } else {
  754. stats_iterator->reset(
  755. new InMemoryStatsHistoryIterator(start_time, end_time, this));
  756. }
  757. return (*stats_iterator)->status();
  758. }
  759. void DBImpl::DumpStats() {
  760. TEST_SYNC_POINT("DBImpl::DumpStats:1");
  761. #ifndef ROCKSDB_LITE
  762. const DBPropertyInfo* cf_property_info =
  763. GetPropertyInfo(DB::Properties::kCFStats);
  764. assert(cf_property_info != nullptr);
  765. const DBPropertyInfo* db_property_info =
  766. GetPropertyInfo(DB::Properties::kDBStats);
  767. assert(db_property_info != nullptr);
  768. std::string stats;
  769. if (shutdown_initiated_) {
  770. return;
  771. }
  772. {
  773. InstrumentedMutexLock l(&mutex_);
  774. default_cf_internal_stats_->GetStringProperty(
  775. *db_property_info, DB::Properties::kDBStats, &stats);
  776. for (auto cfd : *versions_->GetColumnFamilySet()) {
  777. if (cfd->initialized()) {
  778. cfd->internal_stats()->GetStringProperty(
  779. *cf_property_info, DB::Properties::kCFStatsNoFileHistogram, &stats);
  780. }
  781. }
  782. for (auto cfd : *versions_->GetColumnFamilySet()) {
  783. if (cfd->initialized()) {
  784. cfd->internal_stats()->GetStringProperty(
  785. *cf_property_info, DB::Properties::kCFFileHistogram, &stats);
  786. }
  787. }
  788. }
  789. TEST_SYNC_POINT("DBImpl::DumpStats:2");
  790. ROCKS_LOG_INFO(immutable_db_options_.info_log,
  791. "------- DUMPING STATS -------");
  792. ROCKS_LOG_INFO(immutable_db_options_.info_log, "%s", stats.c_str());
  793. if (immutable_db_options_.dump_malloc_stats) {
  794. stats.clear();
  795. DumpMallocStats(&stats);
  796. if (!stats.empty()) {
  797. ROCKS_LOG_INFO(immutable_db_options_.info_log,
  798. "------- Malloc STATS -------");
  799. ROCKS_LOG_INFO(immutable_db_options_.info_log, "%s", stats.c_str());
  800. }
  801. }
  802. #endif // !ROCKSDB_LITE
  803. PrintStatistics();
  804. }
  805. Status DBImpl::TablesRangeTombstoneSummary(ColumnFamilyHandle* column_family,
  806. int max_entries_to_print,
  807. std::string* out_str) {
  808. auto* cfh =
  809. static_cast_with_check<ColumnFamilyHandleImpl, ColumnFamilyHandle>(
  810. column_family);
  811. ColumnFamilyData* cfd = cfh->cfd();
  812. SuperVersion* super_version = cfd->GetReferencedSuperVersion(this);
  813. Version* version = super_version->current;
  814. Status s =
  815. version->TablesRangeTombstoneSummary(max_entries_to_print, out_str);
  816. CleanupSuperVersion(super_version);
  817. return s;
  818. }
  819. void DBImpl::ScheduleBgLogWriterClose(JobContext* job_context) {
  820. if (!job_context->logs_to_free.empty()) {
  821. for (auto l : job_context->logs_to_free) {
  822. AddToLogsToFreeQueue(l);
  823. }
  824. job_context->logs_to_free.clear();
  825. }
  826. }
  827. Directory* DBImpl::GetDataDir(ColumnFamilyData* cfd, size_t path_id) const {
  828. assert(cfd);
  829. Directory* ret_dir = cfd->GetDataDir(path_id);
  830. if (ret_dir == nullptr) {
  831. return directories_.GetDataDir(path_id);
  832. }
  833. return ret_dir;
  834. }
  835. Status DBImpl::SetOptions(
  836. ColumnFamilyHandle* column_family,
  837. const std::unordered_map<std::string, std::string>& options_map) {
  838. #ifdef ROCKSDB_LITE
  839. (void)column_family;
  840. (void)options_map;
  841. return Status::NotSupported("Not supported in ROCKSDB LITE");
  842. #else
  843. auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd();
  844. if (options_map.empty()) {
  845. ROCKS_LOG_WARN(immutable_db_options_.info_log,
  846. "SetOptions() on column family [%s], empty input",
  847. cfd->GetName().c_str());
  848. return Status::InvalidArgument("empty input");
  849. }
  850. MutableCFOptions new_options;
  851. Status s;
  852. Status persist_options_status;
  853. SuperVersionContext sv_context(/* create_superversion */ true);
  854. {
  855. auto db_options = GetDBOptions();
  856. InstrumentedMutexLock l(&mutex_);
  857. s = cfd->SetOptions(db_options, options_map);
  858. if (s.ok()) {
  859. new_options = *cfd->GetLatestMutableCFOptions();
  860. // Append new version to recompute compaction score.
  861. VersionEdit dummy_edit;
  862. versions_->LogAndApply(cfd, new_options, &dummy_edit, &mutex_,
  863. directories_.GetDbDir());
  864. // Trigger possible flush/compactions. This has to be before we persist
  865. // options to file, otherwise there will be a deadlock with writer
  866. // thread.
  867. InstallSuperVersionAndScheduleWork(cfd, &sv_context, new_options);
  868. persist_options_status = WriteOptionsFile(
  869. false /*need_mutex_lock*/, true /*need_enter_write_thread*/);
  870. bg_cv_.SignalAll();
  871. }
  872. }
  873. sv_context.Clean();
  874. ROCKS_LOG_INFO(
  875. immutable_db_options_.info_log,
  876. "SetOptions() on column family [%s], inputs:", cfd->GetName().c_str());
  877. for (const auto& o : options_map) {
  878. ROCKS_LOG_INFO(immutable_db_options_.info_log, "%s: %s\n", o.first.c_str(),
  879. o.second.c_str());
  880. }
  881. if (s.ok()) {
  882. ROCKS_LOG_INFO(immutable_db_options_.info_log,
  883. "[%s] SetOptions() succeeded", cfd->GetName().c_str());
  884. new_options.Dump(immutable_db_options_.info_log.get());
  885. if (!persist_options_status.ok()) {
  886. s = persist_options_status;
  887. }
  888. } else {
  889. ROCKS_LOG_WARN(immutable_db_options_.info_log, "[%s] SetOptions() failed",
  890. cfd->GetName().c_str());
  891. }
  892. LogFlush(immutable_db_options_.info_log);
  893. return s;
  894. #endif // ROCKSDB_LITE
  895. }
  896. Status DBImpl::SetDBOptions(
  897. const std::unordered_map<std::string, std::string>& options_map) {
  898. #ifdef ROCKSDB_LITE
  899. (void)options_map;
  900. return Status::NotSupported("Not supported in ROCKSDB LITE");
  901. #else
  902. if (options_map.empty()) {
  903. ROCKS_LOG_WARN(immutable_db_options_.info_log,
  904. "SetDBOptions(), empty input.");
  905. return Status::InvalidArgument("empty input");
  906. }
  907. MutableDBOptions new_options;
  908. Status s;
  909. Status persist_options_status;
  910. bool wal_changed = false;
  911. WriteContext write_context;
  912. {
  913. InstrumentedMutexLock l(&mutex_);
  914. s = GetMutableDBOptionsFromStrings(mutable_db_options_, options_map,
  915. &new_options);
  916. if (new_options.bytes_per_sync == 0) {
  917. new_options.bytes_per_sync = 1024 * 1024;
  918. }
  919. DBOptions new_db_options =
  920. BuildDBOptions(immutable_db_options_, new_options);
  921. if (s.ok()) {
  922. s = ValidateOptions(new_db_options);
  923. }
  924. if (s.ok()) {
  925. for (auto c : *versions_->GetColumnFamilySet()) {
  926. if (!c->IsDropped()) {
  927. auto cf_options = c->GetLatestCFOptions();
  928. s = ColumnFamilyData::ValidateOptions(new_db_options, cf_options);
  929. if (!s.ok()) {
  930. break;
  931. }
  932. }
  933. }
  934. }
  935. if (s.ok()) {
  936. const BGJobLimits current_bg_job_limits =
  937. GetBGJobLimits(immutable_db_options_.max_background_flushes,
  938. mutable_db_options_.max_background_compactions,
  939. mutable_db_options_.max_background_jobs,
  940. /* parallelize_compactions */ true);
  941. const BGJobLimits new_bg_job_limits = GetBGJobLimits(
  942. immutable_db_options_.max_background_flushes,
  943. new_options.max_background_compactions,
  944. new_options.max_background_jobs, /* parallelize_compactions */ true);
  945. const bool max_flushes_increased =
  946. new_bg_job_limits.max_flushes > current_bg_job_limits.max_flushes;
  947. const bool max_compactions_increased =
  948. new_bg_job_limits.max_compactions >
  949. current_bg_job_limits.max_compactions;
  950. if (max_flushes_increased || max_compactions_increased) {
  951. if (max_flushes_increased) {
  952. env_->IncBackgroundThreadsIfNeeded(new_bg_job_limits.max_flushes,
  953. Env::Priority::HIGH);
  954. }
  955. if (max_compactions_increased) {
  956. env_->IncBackgroundThreadsIfNeeded(new_bg_job_limits.max_compactions,
  957. Env::Priority::LOW);
  958. }
  959. MaybeScheduleFlushOrCompaction();
  960. }
  961. if (new_options.stats_dump_period_sec !=
  962. mutable_db_options_.stats_dump_period_sec) {
  963. if (thread_dump_stats_) {
  964. mutex_.Unlock();
  965. thread_dump_stats_->cancel();
  966. mutex_.Lock();
  967. }
  968. if (new_options.stats_dump_period_sec > 0) {
  969. thread_dump_stats_.reset(new ROCKSDB_NAMESPACE::RepeatableThread(
  970. [this]() { DBImpl::DumpStats(); }, "dump_st", env_,
  971. static_cast<uint64_t>(new_options.stats_dump_period_sec) *
  972. kMicrosInSecond));
  973. } else {
  974. thread_dump_stats_.reset();
  975. }
  976. }
  977. if (new_options.stats_persist_period_sec !=
  978. mutable_db_options_.stats_persist_period_sec) {
  979. if (thread_persist_stats_) {
  980. mutex_.Unlock();
  981. thread_persist_stats_->cancel();
  982. mutex_.Lock();
  983. }
  984. if (new_options.stats_persist_period_sec > 0) {
  985. thread_persist_stats_.reset(new ROCKSDB_NAMESPACE::RepeatableThread(
  986. [this]() { DBImpl::PersistStats(); }, "pst_st", env_,
  987. static_cast<uint64_t>(new_options.stats_persist_period_sec) *
  988. kMicrosInSecond));
  989. } else {
  990. thread_persist_stats_.reset();
  991. }
  992. }
  993. write_controller_.set_max_delayed_write_rate(
  994. new_options.delayed_write_rate);
  995. table_cache_.get()->SetCapacity(new_options.max_open_files == -1
  996. ? TableCache::kInfiniteCapacity
  997. : new_options.max_open_files - 10);
  998. wal_changed = mutable_db_options_.wal_bytes_per_sync !=
  999. new_options.wal_bytes_per_sync;
  1000. mutable_db_options_ = new_options;
  1001. file_options_for_compaction_ = FileOptions(new_db_options);
  1002. file_options_for_compaction_ = fs_->OptimizeForCompactionTableWrite(
  1003. file_options_for_compaction_, immutable_db_options_);
  1004. versions_->ChangeFileOptions(mutable_db_options_);
  1005. //TODO(xiez): clarify why apply optimize for read to write options
  1006. file_options_for_compaction_ = fs_->OptimizeForCompactionTableRead(
  1007. file_options_for_compaction_, immutable_db_options_);
  1008. file_options_for_compaction_.compaction_readahead_size =
  1009. mutable_db_options_.compaction_readahead_size;
  1010. WriteThread::Writer w;
  1011. write_thread_.EnterUnbatched(&w, &mutex_);
  1012. if (total_log_size_ > GetMaxTotalWalSize() || wal_changed) {
  1013. Status purge_wal_status = SwitchWAL(&write_context);
  1014. if (!purge_wal_status.ok()) {
  1015. ROCKS_LOG_WARN(immutable_db_options_.info_log,
  1016. "Unable to purge WAL files in SetDBOptions() -- %s",
  1017. purge_wal_status.ToString().c_str());
  1018. }
  1019. }
  1020. persist_options_status = WriteOptionsFile(
  1021. false /*need_mutex_lock*/, false /*need_enter_write_thread*/);
  1022. write_thread_.ExitUnbatched(&w);
  1023. }
  1024. }
  1025. ROCKS_LOG_INFO(immutable_db_options_.info_log, "SetDBOptions(), inputs:");
  1026. for (const auto& o : options_map) {
  1027. ROCKS_LOG_INFO(immutable_db_options_.info_log, "%s: %s\n", o.first.c_str(),
  1028. o.second.c_str());
  1029. }
  1030. if (s.ok()) {
  1031. ROCKS_LOG_INFO(immutable_db_options_.info_log, "SetDBOptions() succeeded");
  1032. new_options.Dump(immutable_db_options_.info_log.get());
  1033. if (!persist_options_status.ok()) {
  1034. if (immutable_db_options_.fail_if_options_file_error) {
  1035. s = Status::IOError(
  1036. "SetDBOptions() succeeded, but unable to persist options",
  1037. persist_options_status.ToString());
  1038. }
  1039. ROCKS_LOG_WARN(immutable_db_options_.info_log,
  1040. "Unable to persist options in SetDBOptions() -- %s",
  1041. persist_options_status.ToString().c_str());
  1042. }
  1043. } else {
  1044. ROCKS_LOG_WARN(immutable_db_options_.info_log, "SetDBOptions failed");
  1045. }
  1046. LogFlush(immutable_db_options_.info_log);
  1047. return s;
  1048. #endif // ROCKSDB_LITE
  1049. }
  1050. // return the same level if it cannot be moved
  1051. int DBImpl::FindMinimumEmptyLevelFitting(
  1052. ColumnFamilyData* cfd, const MutableCFOptions& /*mutable_cf_options*/,
  1053. int level) {
  1054. mutex_.AssertHeld();
  1055. const auto* vstorage = cfd->current()->storage_info();
  1056. int minimum_level = level;
  1057. for (int i = level - 1; i > 0; --i) {
  1058. // stop if level i is not empty
  1059. if (vstorage->NumLevelFiles(i) > 0) break;
  1060. // stop if level i is too small (cannot fit the level files)
  1061. if (vstorage->MaxBytesForLevel(i) < vstorage->NumLevelBytes(level)) {
  1062. break;
  1063. }
  1064. minimum_level = i;
  1065. }
  1066. return minimum_level;
  1067. }
  1068. Status DBImpl::FlushWAL(bool sync) {
  1069. if (manual_wal_flush_) {
  1070. Status s;
  1071. {
  1072. // We need to lock log_write_mutex_ since logs_ might change concurrently
  1073. InstrumentedMutexLock wl(&log_write_mutex_);
  1074. log::Writer* cur_log_writer = logs_.back().writer;
  1075. s = cur_log_writer->WriteBuffer();
  1076. }
  1077. if (!s.ok()) {
  1078. ROCKS_LOG_ERROR(immutable_db_options_.info_log, "WAL flush error %s",
  1079. s.ToString().c_str());
  1080. // In case there is a fs error we should set it globally to prevent the
  1081. // future writes
  1082. WriteStatusCheck(s);
  1083. // whether sync or not, we should abort the rest of function upon error
  1084. return s;
  1085. }
  1086. if (!sync) {
  1087. ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "FlushWAL sync=false");
  1088. return s;
  1089. }
  1090. }
  1091. if (!sync) {
  1092. return Status::OK();
  1093. }
  1094. // sync = true
  1095. ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "FlushWAL sync=true");
  1096. return SyncWAL();
  1097. }
  1098. Status DBImpl::SyncWAL() {
  1099. autovector<log::Writer*, 1> logs_to_sync;
  1100. bool need_log_dir_sync;
  1101. uint64_t current_log_number;
  1102. {
  1103. InstrumentedMutexLock l(&mutex_);
  1104. assert(!logs_.empty());
  1105. // This SyncWAL() call only cares about logs up to this number.
  1106. current_log_number = logfile_number_;
  1107. while (logs_.front().number <= current_log_number &&
  1108. logs_.front().getting_synced) {
  1109. log_sync_cv_.Wait();
  1110. }
  1111. // First check that logs are safe to sync in background.
  1112. for (auto it = logs_.begin();
  1113. it != logs_.end() && it->number <= current_log_number; ++it) {
  1114. if (!it->writer->file()->writable_file()->IsSyncThreadSafe()) {
  1115. return Status::NotSupported(
  1116. "SyncWAL() is not supported for this implementation of WAL file",
  1117. immutable_db_options_.allow_mmap_writes
  1118. ? "try setting Options::allow_mmap_writes to false"
  1119. : Slice());
  1120. }
  1121. }
  1122. for (auto it = logs_.begin();
  1123. it != logs_.end() && it->number <= current_log_number; ++it) {
  1124. auto& log = *it;
  1125. assert(!log.getting_synced);
  1126. log.getting_synced = true;
  1127. logs_to_sync.push_back(log.writer);
  1128. }
  1129. need_log_dir_sync = !log_dir_synced_;
  1130. }
  1131. TEST_SYNC_POINT("DBWALTest::SyncWALNotWaitWrite:1");
  1132. RecordTick(stats_, WAL_FILE_SYNCED);
  1133. Status status;
  1134. for (log::Writer* log : logs_to_sync) {
  1135. status = log->file()->SyncWithoutFlush(immutable_db_options_.use_fsync);
  1136. if (!status.ok()) {
  1137. break;
  1138. }
  1139. }
  1140. if (status.ok() && need_log_dir_sync) {
  1141. status = directories_.GetWalDir()->Fsync();
  1142. }
  1143. TEST_SYNC_POINT("DBWALTest::SyncWALNotWaitWrite:2");
  1144. TEST_SYNC_POINT("DBImpl::SyncWAL:BeforeMarkLogsSynced:1");
  1145. {
  1146. InstrumentedMutexLock l(&mutex_);
  1147. MarkLogsSynced(current_log_number, need_log_dir_sync, status);
  1148. }
  1149. TEST_SYNC_POINT("DBImpl::SyncWAL:BeforeMarkLogsSynced:2");
  1150. return status;
  1151. }
  1152. Status DBImpl::LockWAL() {
  1153. log_write_mutex_.Lock();
  1154. auto cur_log_writer = logs_.back().writer;
  1155. auto status = cur_log_writer->WriteBuffer();
  1156. if (!status.ok()) {
  1157. ROCKS_LOG_ERROR(immutable_db_options_.info_log, "WAL flush error %s",
  1158. status.ToString().c_str());
  1159. // In case there is a fs error we should set it globally to prevent the
  1160. // future writes
  1161. WriteStatusCheck(status);
  1162. }
  1163. return status;
  1164. }
  1165. Status DBImpl::UnlockWAL() {
  1166. log_write_mutex_.Unlock();
  1167. return Status::OK();
  1168. }
  1169. void DBImpl::MarkLogsSynced(uint64_t up_to, bool synced_dir,
  1170. const Status& status) {
  1171. mutex_.AssertHeld();
  1172. if (synced_dir && logfile_number_ == up_to && status.ok()) {
  1173. log_dir_synced_ = true;
  1174. }
  1175. for (auto it = logs_.begin(); it != logs_.end() && it->number <= up_to;) {
  1176. auto& log = *it;
  1177. assert(log.getting_synced);
  1178. if (status.ok() && logs_.size() > 1) {
  1179. logs_to_free_.push_back(log.ReleaseWriter());
  1180. // To modify logs_ both mutex_ and log_write_mutex_ must be held
  1181. InstrumentedMutexLock l(&log_write_mutex_);
  1182. it = logs_.erase(it);
  1183. } else {
  1184. log.getting_synced = false;
  1185. ++it;
  1186. }
  1187. }
  1188. assert(!status.ok() || logs_.empty() || logs_[0].number > up_to ||
  1189. (logs_.size() == 1 && !logs_[0].getting_synced));
  1190. log_sync_cv_.SignalAll();
  1191. }
  1192. SequenceNumber DBImpl::GetLatestSequenceNumber() const {
  1193. return versions_->LastSequence();
  1194. }
  1195. void DBImpl::SetLastPublishedSequence(SequenceNumber seq) {
  1196. versions_->SetLastPublishedSequence(seq);
  1197. }
  1198. bool DBImpl::SetPreserveDeletesSequenceNumber(SequenceNumber seqnum) {
  1199. if (seqnum > preserve_deletes_seqnum_.load()) {
  1200. preserve_deletes_seqnum_.store(seqnum);
  1201. return true;
  1202. } else {
  1203. return false;
  1204. }
  1205. }
  1206. InternalIterator* DBImpl::NewInternalIterator(
  1207. Arena* arena, RangeDelAggregator* range_del_agg, SequenceNumber sequence,
  1208. ColumnFamilyHandle* column_family) {
  1209. ColumnFamilyData* cfd;
  1210. if (column_family == nullptr) {
  1211. cfd = default_cf_handle_->cfd();
  1212. } else {
  1213. auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
  1214. cfd = cfh->cfd();
  1215. }
  1216. mutex_.Lock();
  1217. SuperVersion* super_version = cfd->GetSuperVersion()->Ref();
  1218. mutex_.Unlock();
  1219. ReadOptions roptions;
  1220. return NewInternalIterator(roptions, cfd, super_version, arena, range_del_agg,
  1221. sequence);
  1222. }
  1223. void DBImpl::SchedulePurge() {
  1224. mutex_.AssertHeld();
  1225. assert(opened_successfully_);
  1226. // Purge operations are put into High priority queue
  1227. bg_purge_scheduled_++;
  1228. env_->Schedule(&DBImpl::BGWorkPurge, this, Env::Priority::HIGH, nullptr);
  1229. }
  1230. void DBImpl::BackgroundCallPurge() {
  1231. mutex_.Lock();
  1232. while (!logs_to_free_queue_.empty()) {
  1233. assert(!logs_to_free_queue_.empty());
  1234. log::Writer* log_writer = *(logs_to_free_queue_.begin());
  1235. logs_to_free_queue_.pop_front();
  1236. mutex_.Unlock();
  1237. delete log_writer;
  1238. mutex_.Lock();
  1239. }
  1240. while (!superversions_to_free_queue_.empty()) {
  1241. assert(!superversions_to_free_queue_.empty());
  1242. SuperVersion* sv = superversions_to_free_queue_.front();
  1243. superversions_to_free_queue_.pop_front();
  1244. mutex_.Unlock();
  1245. delete sv;
  1246. mutex_.Lock();
  1247. }
  1248. // Can't use iterator to go over purge_files_ because inside the loop we're
  1249. // unlocking the mutex that protects purge_files_.
  1250. while (!purge_files_.empty()) {
  1251. auto it = purge_files_.begin();
  1252. // Need to make a copy of the PurgeFilesInfo before unlocking the mutex.
  1253. PurgeFileInfo purge_file = it->second;
  1254. const std::string& fname = purge_file.fname;
  1255. const std::string& dir_to_sync = purge_file.dir_to_sync;
  1256. FileType type = purge_file.type;
  1257. uint64_t number = purge_file.number;
  1258. int job_id = purge_file.job_id;
  1259. purge_files_.erase(it);
  1260. mutex_.Unlock();
  1261. DeleteObsoleteFileImpl(job_id, fname, dir_to_sync, type, number);
  1262. mutex_.Lock();
  1263. }
  1264. bg_purge_scheduled_--;
  1265. bg_cv_.SignalAll();
  1266. // IMPORTANT:there should be no code after calling SignalAll. This call may
  1267. // signal the DB destructor that it's OK to proceed with destruction. In
  1268. // that case, all DB variables will be dealloacated and referencing them
  1269. // will cause trouble.
  1270. mutex_.Unlock();
  1271. }
  1272. namespace {
  1273. struct IterState {
  1274. IterState(DBImpl* _db, InstrumentedMutex* _mu, SuperVersion* _super_version,
  1275. bool _background_purge)
  1276. : db(_db),
  1277. mu(_mu),
  1278. super_version(_super_version),
  1279. background_purge(_background_purge) {}
  1280. DBImpl* db;
  1281. InstrumentedMutex* mu;
  1282. SuperVersion* super_version;
  1283. bool background_purge;
  1284. };
  1285. static void CleanupIteratorState(void* arg1, void* /*arg2*/) {
  1286. IterState* state = reinterpret_cast<IterState*>(arg1);
  1287. if (state->super_version->Unref()) {
  1288. // Job id == 0 means that this is not our background process, but rather
  1289. // user thread
  1290. JobContext job_context(0);
  1291. state->mu->Lock();
  1292. state->super_version->Cleanup();
  1293. state->db->FindObsoleteFiles(&job_context, false, true);
  1294. if (state->background_purge) {
  1295. state->db->ScheduleBgLogWriterClose(&job_context);
  1296. state->db->AddSuperVersionsToFreeQueue(state->super_version);
  1297. state->db->SchedulePurge();
  1298. }
  1299. state->mu->Unlock();
  1300. if (!state->background_purge) {
  1301. delete state->super_version;
  1302. }
  1303. if (job_context.HaveSomethingToDelete()) {
  1304. if (state->background_purge) {
  1305. // PurgeObsoleteFiles here does not delete files. Instead, it adds the
  1306. // files to be deleted to a job queue, and deletes it in a separate
  1307. // background thread.
  1308. state->db->PurgeObsoleteFiles(job_context, true /* schedule only */);
  1309. state->mu->Lock();
  1310. state->db->SchedulePurge();
  1311. state->mu->Unlock();
  1312. } else {
  1313. state->db->PurgeObsoleteFiles(job_context);
  1314. }
  1315. }
  1316. job_context.Clean();
  1317. }
  1318. delete state;
  1319. }
  1320. } // namespace
  1321. InternalIterator* DBImpl::NewInternalIterator(const ReadOptions& read_options,
  1322. ColumnFamilyData* cfd,
  1323. SuperVersion* super_version,
  1324. Arena* arena,
  1325. RangeDelAggregator* range_del_agg,
  1326. SequenceNumber sequence) {
  1327. InternalIterator* internal_iter;
  1328. assert(arena != nullptr);
  1329. assert(range_del_agg != nullptr);
  1330. // Need to create internal iterator from the arena.
  1331. MergeIteratorBuilder merge_iter_builder(
  1332. &cfd->internal_comparator(), arena,
  1333. !read_options.total_order_seek &&
  1334. super_version->mutable_cf_options.prefix_extractor != nullptr);
  1335. // Collect iterator for mutable mem
  1336. merge_iter_builder.AddIterator(
  1337. super_version->mem->NewIterator(read_options, arena));
  1338. std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter;
  1339. Status s;
  1340. if (!read_options.ignore_range_deletions) {
  1341. range_del_iter.reset(
  1342. super_version->mem->NewRangeTombstoneIterator(read_options, sequence));
  1343. range_del_agg->AddTombstones(std::move(range_del_iter));
  1344. }
  1345. // Collect all needed child iterators for immutable memtables
  1346. if (s.ok()) {
  1347. super_version->imm->AddIterators(read_options, &merge_iter_builder);
  1348. if (!read_options.ignore_range_deletions) {
  1349. s = super_version->imm->AddRangeTombstoneIterators(read_options, arena,
  1350. range_del_agg);
  1351. }
  1352. }
  1353. TEST_SYNC_POINT_CALLBACK("DBImpl::NewInternalIterator:StatusCallback", &s);
  1354. if (s.ok()) {
  1355. // Collect iterators for files in L0 - Ln
  1356. if (read_options.read_tier != kMemtableTier) {
  1357. super_version->current->AddIterators(read_options, file_options_,
  1358. &merge_iter_builder, range_del_agg);
  1359. }
  1360. internal_iter = merge_iter_builder.Finish();
  1361. IterState* cleanup =
  1362. new IterState(this, &mutex_, super_version,
  1363. read_options.background_purge_on_iterator_cleanup ||
  1364. immutable_db_options_.avoid_unnecessary_blocking_io);
  1365. internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, nullptr);
  1366. return internal_iter;
  1367. } else {
  1368. CleanupSuperVersion(super_version);
  1369. }
  1370. return NewErrorInternalIterator<Slice>(s, arena);
  1371. }
  1372. ColumnFamilyHandle* DBImpl::DefaultColumnFamily() const {
  1373. return default_cf_handle_;
  1374. }
  1375. ColumnFamilyHandle* DBImpl::PersistentStatsColumnFamily() const {
  1376. return persist_stats_cf_handle_;
  1377. }
  1378. Status DBImpl::Get(const ReadOptions& read_options,
  1379. ColumnFamilyHandle* column_family, const Slice& key,
  1380. PinnableSlice* value) {
  1381. GetImplOptions get_impl_options;
  1382. get_impl_options.column_family = column_family;
  1383. get_impl_options.value = value;
  1384. return GetImpl(read_options, key, get_impl_options);
  1385. }
  1386. Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key,
  1387. GetImplOptions get_impl_options) {
  1388. assert(get_impl_options.value != nullptr ||
  1389. get_impl_options.merge_operands != nullptr);
  1390. PERF_CPU_TIMER_GUARD(get_cpu_nanos, env_);
  1391. StopWatch sw(env_, stats_, DB_GET);
  1392. PERF_TIMER_GUARD(get_snapshot_time);
  1393. auto cfh =
  1394. reinterpret_cast<ColumnFamilyHandleImpl*>(get_impl_options.column_family);
  1395. auto cfd = cfh->cfd();
  1396. if (tracer_) {
  1397. // TODO: This mutex should be removed later, to improve performance when
  1398. // tracing is enabled.
  1399. InstrumentedMutexLock lock(&trace_mutex_);
  1400. if (tracer_) {
  1401. tracer_->Get(get_impl_options.column_family, key);
  1402. }
  1403. }
  1404. // Acquire SuperVersion
  1405. SuperVersion* sv = GetAndRefSuperVersion(cfd);
  1406. TEST_SYNC_POINT("DBImpl::GetImpl:1");
  1407. TEST_SYNC_POINT("DBImpl::GetImpl:2");
  1408. SequenceNumber snapshot;
  1409. if (read_options.snapshot != nullptr) {
  1410. if (get_impl_options.callback) {
  1411. // Already calculated based on read_options.snapshot
  1412. snapshot = get_impl_options.callback->max_visible_seq();
  1413. } else {
  1414. snapshot =
  1415. reinterpret_cast<const SnapshotImpl*>(read_options.snapshot)->number_;
  1416. }
  1417. } else {
  1418. // Note that the snapshot is assigned AFTER referencing the super
  1419. // version because otherwise a flush happening in between may compact away
  1420. // data for the snapshot, so the reader would see neither data that was be
  1421. // visible to the snapshot before compaction nor the newer data inserted
  1422. // afterwards.
  1423. snapshot = last_seq_same_as_publish_seq_
  1424. ? versions_->LastSequence()
  1425. : versions_->LastPublishedSequence();
  1426. if (get_impl_options.callback) {
  1427. // The unprep_seqs are not published for write unprepared, so it could be
  1428. // that max_visible_seq is larger. Seek to the std::max of the two.
  1429. // However, we still want our callback to contain the actual snapshot so
  1430. // that it can do the correct visibility filtering.
  1431. get_impl_options.callback->Refresh(snapshot);
  1432. // Internally, WriteUnpreparedTxnReadCallback::Refresh would set
  1433. // max_visible_seq = max(max_visible_seq, snapshot)
  1434. //
  1435. // Currently, the commented out assert is broken by
  1436. // InvalidSnapshotReadCallback, but if write unprepared recovery followed
  1437. // the regular transaction flow, then this special read callback would not
  1438. // be needed.
  1439. //
  1440. // assert(callback->max_visible_seq() >= snapshot);
  1441. snapshot = get_impl_options.callback->max_visible_seq();
  1442. }
  1443. }
  1444. TEST_SYNC_POINT("DBImpl::GetImpl:3");
  1445. TEST_SYNC_POINT("DBImpl::GetImpl:4");
  1446. // Prepare to store a list of merge operations if merge occurs.
  1447. MergeContext merge_context;
  1448. SequenceNumber max_covering_tombstone_seq = 0;
  1449. Status s;
  1450. // First look in the memtable, then in the immutable memtable (if any).
  1451. // s is both in/out. When in, s could either be OK or MergeInProgress.
  1452. // merge_operands will contain the sequence of merges in the latter case.
  1453. LookupKey lkey(key, snapshot, read_options.timestamp);
  1454. PERF_TIMER_STOP(get_snapshot_time);
  1455. bool skip_memtable = (read_options.read_tier == kPersistedTier &&
  1456. has_unpersisted_data_.load(std::memory_order_relaxed));
  1457. bool done = false;
  1458. if (!skip_memtable) {
  1459. // Get value associated with key
  1460. if (get_impl_options.get_value) {
  1461. if (sv->mem->Get(lkey, get_impl_options.value->GetSelf(), &s,
  1462. &merge_context, &max_covering_tombstone_seq,
  1463. read_options, get_impl_options.callback,
  1464. get_impl_options.is_blob_index)) {
  1465. done = true;
  1466. get_impl_options.value->PinSelf();
  1467. RecordTick(stats_, MEMTABLE_HIT);
  1468. } else if ((s.ok() || s.IsMergeInProgress()) &&
  1469. sv->imm->Get(lkey, get_impl_options.value->GetSelf(), &s,
  1470. &merge_context, &max_covering_tombstone_seq,
  1471. read_options, get_impl_options.callback,
  1472. get_impl_options.is_blob_index)) {
  1473. done = true;
  1474. get_impl_options.value->PinSelf();
  1475. RecordTick(stats_, MEMTABLE_HIT);
  1476. }
  1477. } else {
  1478. // Get Merge Operands associated with key, Merge Operands should not be
  1479. // merged and raw values should be returned to the user.
  1480. if (sv->mem->Get(lkey, nullptr, &s, &merge_context,
  1481. &max_covering_tombstone_seq, read_options, nullptr,
  1482. nullptr, false)) {
  1483. done = true;
  1484. RecordTick(stats_, MEMTABLE_HIT);
  1485. } else if ((s.ok() || s.IsMergeInProgress()) &&
  1486. sv->imm->GetMergeOperands(lkey, &s, &merge_context,
  1487. &max_covering_tombstone_seq,
  1488. read_options)) {
  1489. done = true;
  1490. RecordTick(stats_, MEMTABLE_HIT);
  1491. }
  1492. }
  1493. if (!done && !s.ok() && !s.IsMergeInProgress()) {
  1494. ReturnAndCleanupSuperVersion(cfd, sv);
  1495. return s;
  1496. }
  1497. }
  1498. if (!done) {
  1499. PERF_TIMER_GUARD(get_from_output_files_time);
  1500. sv->current->Get(
  1501. read_options, lkey, get_impl_options.value, &s, &merge_context,
  1502. &max_covering_tombstone_seq,
  1503. get_impl_options.get_value ? get_impl_options.value_found : nullptr,
  1504. nullptr, nullptr,
  1505. get_impl_options.get_value ? get_impl_options.callback : nullptr,
  1506. get_impl_options.get_value ? get_impl_options.is_blob_index : nullptr,
  1507. get_impl_options.get_value);
  1508. RecordTick(stats_, MEMTABLE_MISS);
  1509. }
  1510. {
  1511. PERF_TIMER_GUARD(get_post_process_time);
  1512. ReturnAndCleanupSuperVersion(cfd, sv);
  1513. RecordTick(stats_, NUMBER_KEYS_READ);
  1514. size_t size = 0;
  1515. if (s.ok()) {
  1516. if (get_impl_options.get_value) {
  1517. size = get_impl_options.value->size();
  1518. } else {
  1519. // Return all merge operands for get_impl_options.key
  1520. *get_impl_options.number_of_operands =
  1521. static_cast<int>(merge_context.GetNumOperands());
  1522. if (*get_impl_options.number_of_operands >
  1523. get_impl_options.get_merge_operands_options
  1524. ->expected_max_number_of_operands) {
  1525. s = Status::Incomplete(
  1526. Status::SubCode::KMergeOperandsInsufficientCapacity);
  1527. } else {
  1528. for (const Slice& sl : merge_context.GetOperands()) {
  1529. size += sl.size();
  1530. get_impl_options.merge_operands->PinSelf(sl);
  1531. get_impl_options.merge_operands++;
  1532. }
  1533. }
  1534. }
  1535. RecordTick(stats_, BYTES_READ, size);
  1536. PERF_COUNTER_ADD(get_read_bytes, size);
  1537. }
  1538. RecordInHistogram(stats_, BYTES_PER_READ, size);
  1539. }
  1540. return s;
  1541. }
  1542. std::vector<Status> DBImpl::MultiGet(
  1543. const ReadOptions& read_options,
  1544. const std::vector<ColumnFamilyHandle*>& column_family,
  1545. const std::vector<Slice>& keys, std::vector<std::string>* values) {
  1546. PERF_CPU_TIMER_GUARD(get_cpu_nanos, env_);
  1547. StopWatch sw(env_, stats_, DB_MULTIGET);
  1548. PERF_TIMER_GUARD(get_snapshot_time);
  1549. SequenceNumber consistent_seqnum;
  1550. ;
  1551. std::unordered_map<uint32_t, MultiGetColumnFamilyData> multiget_cf_data(
  1552. column_family.size());
  1553. for (auto cf : column_family) {
  1554. auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(cf);
  1555. auto cfd = cfh->cfd();
  1556. if (multiget_cf_data.find(cfd->GetID()) == multiget_cf_data.end()) {
  1557. multiget_cf_data.emplace(cfd->GetID(),
  1558. MultiGetColumnFamilyData(cfh, nullptr));
  1559. }
  1560. }
  1561. std::function<MultiGetColumnFamilyData*(
  1562. std::unordered_map<uint32_t, MultiGetColumnFamilyData>::iterator&)>
  1563. iter_deref_lambda =
  1564. [](std::unordered_map<uint32_t, MultiGetColumnFamilyData>::iterator&
  1565. cf_iter) { return &cf_iter->second; };
  1566. bool unref_only =
  1567. MultiCFSnapshot<std::unordered_map<uint32_t, MultiGetColumnFamilyData>>(
  1568. read_options, nullptr, iter_deref_lambda, &multiget_cf_data,
  1569. &consistent_seqnum);
  1570. // Contain a list of merge operations if merge occurs.
  1571. MergeContext merge_context;
  1572. // Note: this always resizes the values array
  1573. size_t num_keys = keys.size();
  1574. std::vector<Status> stat_list(num_keys);
  1575. values->resize(num_keys);
  1576. // Keep track of bytes that we read for statistics-recording later
  1577. uint64_t bytes_read = 0;
  1578. PERF_TIMER_STOP(get_snapshot_time);
  1579. // For each of the given keys, apply the entire "get" process as follows:
  1580. // First look in the memtable, then in the immutable memtable (if any).
  1581. // s is both in/out. When in, s could either be OK or MergeInProgress.
  1582. // merge_operands will contain the sequence of merges in the latter case.
  1583. size_t num_found = 0;
  1584. for (size_t i = 0; i < num_keys; ++i) {
  1585. merge_context.Clear();
  1586. Status& s = stat_list[i];
  1587. std::string* value = &(*values)[i];
  1588. LookupKey lkey(keys[i], consistent_seqnum);
  1589. auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family[i]);
  1590. SequenceNumber max_covering_tombstone_seq = 0;
  1591. auto mgd_iter = multiget_cf_data.find(cfh->cfd()->GetID());
  1592. assert(mgd_iter != multiget_cf_data.end());
  1593. auto mgd = mgd_iter->second;
  1594. auto super_version = mgd.super_version;
  1595. bool skip_memtable =
  1596. (read_options.read_tier == kPersistedTier &&
  1597. has_unpersisted_data_.load(std::memory_order_relaxed));
  1598. bool done = false;
  1599. if (!skip_memtable) {
  1600. if (super_version->mem->Get(lkey, value, &s, &merge_context,
  1601. &max_covering_tombstone_seq, read_options)) {
  1602. done = true;
  1603. RecordTick(stats_, MEMTABLE_HIT);
  1604. } else if (super_version->imm->Get(lkey, value, &s, &merge_context,
  1605. &max_covering_tombstone_seq,
  1606. read_options)) {
  1607. done = true;
  1608. RecordTick(stats_, MEMTABLE_HIT);
  1609. }
  1610. }
  1611. if (!done) {
  1612. PinnableSlice pinnable_val;
  1613. PERF_TIMER_GUARD(get_from_output_files_time);
  1614. super_version->current->Get(read_options, lkey, &pinnable_val, &s,
  1615. &merge_context, &max_covering_tombstone_seq);
  1616. value->assign(pinnable_val.data(), pinnable_val.size());
  1617. RecordTick(stats_, MEMTABLE_MISS);
  1618. }
  1619. if (s.ok()) {
  1620. bytes_read += value->size();
  1621. num_found++;
  1622. }
  1623. }
  1624. // Post processing (decrement reference counts and record statistics)
  1625. PERF_TIMER_GUARD(get_post_process_time);
  1626. autovector<SuperVersion*> superversions_to_delete;
  1627. for (auto mgd_iter : multiget_cf_data) {
  1628. auto mgd = mgd_iter.second;
  1629. if (!unref_only) {
  1630. ReturnAndCleanupSuperVersion(mgd.cfd, mgd.super_version);
  1631. } else {
  1632. mgd.cfd->GetSuperVersion()->Unref();
  1633. }
  1634. }
  1635. RecordTick(stats_, NUMBER_MULTIGET_CALLS);
  1636. RecordTick(stats_, NUMBER_MULTIGET_KEYS_READ, num_keys);
  1637. RecordTick(stats_, NUMBER_MULTIGET_KEYS_FOUND, num_found);
  1638. RecordTick(stats_, NUMBER_MULTIGET_BYTES_READ, bytes_read);
  1639. RecordInHistogram(stats_, BYTES_PER_MULTIGET, bytes_read);
  1640. PERF_COUNTER_ADD(multiget_read_bytes, bytes_read);
  1641. PERF_TIMER_STOP(get_post_process_time);
  1642. return stat_list;
  1643. }
  1644. template <class T>
  1645. bool DBImpl::MultiCFSnapshot(
  1646. const ReadOptions& read_options, ReadCallback* callback,
  1647. std::function<MultiGetColumnFamilyData*(typename T::iterator&)>&
  1648. iter_deref_func,
  1649. T* cf_list, SequenceNumber* snapshot) {
  1650. PERF_TIMER_GUARD(get_snapshot_time);
  1651. bool last_try = false;
  1652. if (cf_list->size() == 1) {
  1653. // Fast path for a single column family. We can simply get the thread loca
  1654. // super version
  1655. auto cf_iter = cf_list->begin();
  1656. auto node = iter_deref_func(cf_iter);
  1657. node->super_version = GetAndRefSuperVersion(node->cfd);
  1658. if (read_options.snapshot != nullptr) {
  1659. // Note: In WritePrepared txns this is not necessary but not harmful
  1660. // either. Because prep_seq > snapshot => commit_seq > snapshot so if
  1661. // a snapshot is specified we should be fine with skipping seq numbers
  1662. // that are greater than that.
  1663. //
  1664. // In WriteUnprepared, we cannot set snapshot in the lookup key because we
  1665. // may skip uncommitted data that should be visible to the transaction for
  1666. // reading own writes.
  1667. *snapshot =
  1668. static_cast<const SnapshotImpl*>(read_options.snapshot)->number_;
  1669. if (callback) {
  1670. *snapshot = std::max(*snapshot, callback->max_visible_seq());
  1671. }
  1672. } else {
  1673. // Since we get and reference the super version before getting
  1674. // the snapshot number, without a mutex protection, it is possible
  1675. // that a memtable switch happened in the middle and not all the
  1676. // data for this snapshot is available. But it will contain all
  1677. // the data available in the super version we have, which is also
  1678. // a valid snapshot to read from.
  1679. // We shouldn't get snapshot before finding and referencing the super
  1680. // version because a flush happening in between may compact away data for
  1681. // the snapshot, but the snapshot is earlier than the data overwriting it,
  1682. // so users may see wrong results.
  1683. *snapshot = last_seq_same_as_publish_seq_
  1684. ? versions_->LastSequence()
  1685. : versions_->LastPublishedSequence();
  1686. }
  1687. } else {
  1688. // If we end up with the same issue of memtable geting sealed during 2
  1689. // consecutive retries, it means the write rate is very high. In that case
  1690. // its probably ok to take the mutex on the 3rd try so we can succeed for
  1691. // sure
  1692. static const int num_retries = 3;
  1693. for (int i = 0; i < num_retries; ++i) {
  1694. last_try = (i == num_retries - 1);
  1695. bool retry = false;
  1696. if (i > 0) {
  1697. for (auto cf_iter = cf_list->begin(); cf_iter != cf_list->end();
  1698. ++cf_iter) {
  1699. auto node = iter_deref_func(cf_iter);
  1700. SuperVersion* super_version = node->super_version;
  1701. ColumnFamilyData* cfd = node->cfd;
  1702. if (super_version != nullptr) {
  1703. ReturnAndCleanupSuperVersion(cfd, super_version);
  1704. }
  1705. node->super_version = nullptr;
  1706. }
  1707. }
  1708. if (read_options.snapshot == nullptr) {
  1709. if (last_try) {
  1710. TEST_SYNC_POINT("DBImpl::MultiGet::LastTry");
  1711. // We're close to max number of retries. For the last retry,
  1712. // acquire the lock so we're sure to succeed
  1713. mutex_.Lock();
  1714. }
  1715. *snapshot = last_seq_same_as_publish_seq_
  1716. ? versions_->LastSequence()
  1717. : versions_->LastPublishedSequence();
  1718. } else {
  1719. *snapshot = reinterpret_cast<const SnapshotImpl*>(read_options.snapshot)
  1720. ->number_;
  1721. }
  1722. for (auto cf_iter = cf_list->begin(); cf_iter != cf_list->end();
  1723. ++cf_iter) {
  1724. auto node = iter_deref_func(cf_iter);
  1725. if (!last_try) {
  1726. node->super_version = GetAndRefSuperVersion(node->cfd);
  1727. } else {
  1728. node->super_version = node->cfd->GetSuperVersion()->Ref();
  1729. }
  1730. TEST_SYNC_POINT("DBImpl::MultiGet::AfterRefSV");
  1731. if (read_options.snapshot != nullptr || last_try) {
  1732. // If user passed a snapshot, then we don't care if a memtable is
  1733. // sealed or compaction happens because the snapshot would ensure
  1734. // that older key versions are kept around. If this is the last
  1735. // retry, then we have the lock so nothing bad can happen
  1736. continue;
  1737. }
  1738. // We could get the earliest sequence number for the whole list of
  1739. // memtables, which will include immutable memtables as well, but that
  1740. // might be tricky to maintain in case we decide, in future, to do
  1741. // memtable compaction.
  1742. if (!last_try) {
  1743. SequenceNumber seq =
  1744. node->super_version->mem->GetEarliestSequenceNumber();
  1745. if (seq > *snapshot) {
  1746. retry = true;
  1747. break;
  1748. }
  1749. }
  1750. }
  1751. if (!retry) {
  1752. if (last_try) {
  1753. mutex_.Unlock();
  1754. }
  1755. break;
  1756. }
  1757. }
  1758. }
  1759. // Keep track of bytes that we read for statistics-recording later
  1760. PERF_TIMER_STOP(get_snapshot_time);
  1761. return last_try;
  1762. }
  1763. void DBImpl::MultiGet(const ReadOptions& read_options, const size_t num_keys,
  1764. ColumnFamilyHandle** column_families, const Slice* keys,
  1765. PinnableSlice* values, Status* statuses,
  1766. const bool sorted_input) {
  1767. if (num_keys == 0) {
  1768. return;
  1769. }
  1770. autovector<KeyContext, MultiGetContext::MAX_BATCH_SIZE> key_context;
  1771. autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE> sorted_keys;
  1772. sorted_keys.resize(num_keys);
  1773. for (size_t i = 0; i < num_keys; ++i) {
  1774. key_context.emplace_back(column_families[i], keys[i], &values[i],
  1775. &statuses[i]);
  1776. }
  1777. for (size_t i = 0; i < num_keys; ++i) {
  1778. sorted_keys[i] = &key_context[i];
  1779. }
  1780. PrepareMultiGetKeys(num_keys, sorted_input, &sorted_keys);
  1781. autovector<MultiGetColumnFamilyData, MultiGetContext::MAX_BATCH_SIZE>
  1782. multiget_cf_data;
  1783. size_t cf_start = 0;
  1784. ColumnFamilyHandle* cf = sorted_keys[0]->column_family;
  1785. for (size_t i = 0; i < num_keys; ++i) {
  1786. KeyContext* key_ctx = sorted_keys[i];
  1787. if (key_ctx->column_family != cf) {
  1788. multiget_cf_data.emplace_back(
  1789. MultiGetColumnFamilyData(cf, cf_start, i - cf_start, nullptr));
  1790. cf_start = i;
  1791. cf = key_ctx->column_family;
  1792. }
  1793. }
  1794. {
  1795. // multiget_cf_data.emplace_back(
  1796. // MultiGetColumnFamilyData(cf, cf_start, num_keys - cf_start, nullptr));
  1797. multiget_cf_data.emplace_back(cf, cf_start, num_keys - cf_start, nullptr);
  1798. }
  1799. std::function<MultiGetColumnFamilyData*(
  1800. autovector<MultiGetColumnFamilyData,
  1801. MultiGetContext::MAX_BATCH_SIZE>::iterator&)>
  1802. iter_deref_lambda =
  1803. [](autovector<MultiGetColumnFamilyData,
  1804. MultiGetContext::MAX_BATCH_SIZE>::iterator& cf_iter) {
  1805. return &(*cf_iter);
  1806. };
  1807. SequenceNumber consistent_seqnum;
  1808. bool unref_only = MultiCFSnapshot<
  1809. autovector<MultiGetColumnFamilyData, MultiGetContext::MAX_BATCH_SIZE>>(
  1810. read_options, nullptr, iter_deref_lambda, &multiget_cf_data,
  1811. &consistent_seqnum);
  1812. for (auto cf_iter = multiget_cf_data.begin();
  1813. cf_iter != multiget_cf_data.end(); ++cf_iter) {
  1814. MultiGetImpl(read_options, cf_iter->start, cf_iter->num_keys, &sorted_keys,
  1815. cf_iter->super_version, consistent_seqnum, nullptr, nullptr);
  1816. if (!unref_only) {
  1817. ReturnAndCleanupSuperVersion(cf_iter->cfd, cf_iter->super_version);
  1818. } else {
  1819. cf_iter->cfd->GetSuperVersion()->Unref();
  1820. }
  1821. }
  1822. }
  1823. namespace {
  1824. // Order keys by CF ID, followed by key contents
  1825. struct CompareKeyContext {
  1826. inline bool operator()(const KeyContext* lhs, const KeyContext* rhs) {
  1827. ColumnFamilyHandleImpl* cfh =
  1828. static_cast<ColumnFamilyHandleImpl*>(lhs->column_family);
  1829. uint32_t cfd_id1 = cfh->cfd()->GetID();
  1830. const Comparator* comparator = cfh->cfd()->user_comparator();
  1831. cfh = static_cast<ColumnFamilyHandleImpl*>(lhs->column_family);
  1832. uint32_t cfd_id2 = cfh->cfd()->GetID();
  1833. if (cfd_id1 < cfd_id2) {
  1834. return true;
  1835. } else if (cfd_id1 > cfd_id2) {
  1836. return false;
  1837. }
  1838. // Both keys are from the same column family
  1839. int cmp = comparator->Compare(*(lhs->key), *(rhs->key));
  1840. if (cmp < 0) {
  1841. return true;
  1842. }
  1843. return false;
  1844. }
  1845. };
  1846. } // anonymous namespace
  1847. void DBImpl::PrepareMultiGetKeys(
  1848. size_t num_keys, bool sorted_input,
  1849. autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE>* sorted_keys) {
  1850. #ifndef NDEBUG
  1851. if (sorted_input) {
  1852. for (size_t index = 0; index < sorted_keys->size(); ++index) {
  1853. if (index > 0) {
  1854. KeyContext* lhs = (*sorted_keys)[index - 1];
  1855. KeyContext* rhs = (*sorted_keys)[index];
  1856. ColumnFamilyHandleImpl* cfh =
  1857. reinterpret_cast<ColumnFamilyHandleImpl*>(lhs->column_family);
  1858. uint32_t cfd_id1 = cfh->cfd()->GetID();
  1859. const Comparator* comparator = cfh->cfd()->user_comparator();
  1860. cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(lhs->column_family);
  1861. uint32_t cfd_id2 = cfh->cfd()->GetID();
  1862. assert(cfd_id1 <= cfd_id2);
  1863. if (cfd_id1 < cfd_id2) {
  1864. continue;
  1865. }
  1866. // Both keys are from the same column family
  1867. int cmp = comparator->Compare(*(lhs->key), *(rhs->key));
  1868. assert(cmp <= 0);
  1869. }
  1870. index++;
  1871. }
  1872. }
  1873. #endif
  1874. if (!sorted_input) {
  1875. CompareKeyContext sort_comparator;
  1876. std::sort(sorted_keys->begin(), sorted_keys->begin() + num_keys,
  1877. sort_comparator);
  1878. }
  1879. }
  1880. void DBImpl::MultiGet(const ReadOptions& read_options,
  1881. ColumnFamilyHandle* column_family, const size_t num_keys,
  1882. const Slice* keys, PinnableSlice* values,
  1883. Status* statuses, const bool sorted_input) {
  1884. autovector<KeyContext, MultiGetContext::MAX_BATCH_SIZE> key_context;
  1885. autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE> sorted_keys;
  1886. sorted_keys.resize(num_keys);
  1887. for (size_t i = 0; i < num_keys; ++i) {
  1888. key_context.emplace_back(column_family, keys[i], &values[i], &statuses[i]);
  1889. }
  1890. for (size_t i = 0; i < num_keys; ++i) {
  1891. sorted_keys[i] = &key_context[i];
  1892. }
  1893. PrepareMultiGetKeys(num_keys, sorted_input, &sorted_keys);
  1894. MultiGetWithCallback(read_options, column_family, nullptr, &sorted_keys);
  1895. }
  1896. void DBImpl::MultiGetWithCallback(
  1897. const ReadOptions& read_options, ColumnFamilyHandle* column_family,
  1898. ReadCallback* callback,
  1899. autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE>* sorted_keys) {
  1900. std::array<MultiGetColumnFamilyData, 1> multiget_cf_data;
  1901. multiget_cf_data[0] = MultiGetColumnFamilyData(column_family, nullptr);
  1902. std::function<MultiGetColumnFamilyData*(
  1903. std::array<MultiGetColumnFamilyData, 1>::iterator&)>
  1904. iter_deref_lambda =
  1905. [](std::array<MultiGetColumnFamilyData, 1>::iterator& cf_iter) {
  1906. return &(*cf_iter);
  1907. };
  1908. size_t num_keys = sorted_keys->size();
  1909. SequenceNumber consistent_seqnum;
  1910. bool unref_only = MultiCFSnapshot<std::array<MultiGetColumnFamilyData, 1>>(
  1911. read_options, callback, iter_deref_lambda, &multiget_cf_data,
  1912. &consistent_seqnum);
  1913. #ifndef NDEBUG
  1914. assert(!unref_only);
  1915. #else
  1916. // Silence unused variable warning
  1917. (void)unref_only;
  1918. #endif // NDEBUG
  1919. if (callback && read_options.snapshot == nullptr) {
  1920. // The unprep_seqs are not published for write unprepared, so it could be
  1921. // that max_visible_seq is larger. Seek to the std::max of the two.
  1922. // However, we still want our callback to contain the actual snapshot so
  1923. // that it can do the correct visibility filtering.
  1924. callback->Refresh(consistent_seqnum);
  1925. // Internally, WriteUnpreparedTxnReadCallback::Refresh would set
  1926. // max_visible_seq = max(max_visible_seq, snapshot)
  1927. //
  1928. // Currently, the commented out assert is broken by
  1929. // InvalidSnapshotReadCallback, but if write unprepared recovery followed
  1930. // the regular transaction flow, then this special read callback would not
  1931. // be needed.
  1932. //
  1933. // assert(callback->max_visible_seq() >= snapshot);
  1934. consistent_seqnum = callback->max_visible_seq();
  1935. }
  1936. MultiGetImpl(read_options, 0, num_keys, sorted_keys,
  1937. multiget_cf_data[0].super_version, consistent_seqnum, nullptr,
  1938. nullptr);
  1939. ReturnAndCleanupSuperVersion(multiget_cf_data[0].cfd,
  1940. multiget_cf_data[0].super_version);
  1941. }
  1942. void DBImpl::MultiGetImpl(
  1943. const ReadOptions& read_options, size_t start_key, size_t num_keys,
  1944. autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE>* sorted_keys,
  1945. SuperVersion* super_version, SequenceNumber snapshot,
  1946. ReadCallback* callback, bool* is_blob_index) {
  1947. PERF_CPU_TIMER_GUARD(get_cpu_nanos, env_);
  1948. StopWatch sw(env_, stats_, DB_MULTIGET);
  1949. // For each of the given keys, apply the entire "get" process as follows:
  1950. // First look in the memtable, then in the immutable memtable (if any).
  1951. // s is both in/out. When in, s could either be OK or MergeInProgress.
  1952. // merge_operands will contain the sequence of merges in the latter case.
  1953. size_t keys_left = num_keys;
  1954. while (keys_left) {
  1955. size_t batch_size = (keys_left > MultiGetContext::MAX_BATCH_SIZE)
  1956. ? MultiGetContext::MAX_BATCH_SIZE
  1957. : keys_left;
  1958. MultiGetContext ctx(sorted_keys, start_key + num_keys - keys_left,
  1959. batch_size, snapshot);
  1960. MultiGetRange range = ctx.GetMultiGetRange();
  1961. bool lookup_current = false;
  1962. keys_left -= batch_size;
  1963. for (auto mget_iter = range.begin(); mget_iter != range.end();
  1964. ++mget_iter) {
  1965. mget_iter->merge_context.Clear();
  1966. *mget_iter->s = Status::OK();
  1967. }
  1968. bool skip_memtable =
  1969. (read_options.read_tier == kPersistedTier &&
  1970. has_unpersisted_data_.load(std::memory_order_relaxed));
  1971. if (!skip_memtable) {
  1972. super_version->mem->MultiGet(read_options, &range, callback,
  1973. is_blob_index);
  1974. if (!range.empty()) {
  1975. super_version->imm->MultiGet(read_options, &range, callback,
  1976. is_blob_index);
  1977. }
  1978. if (!range.empty()) {
  1979. lookup_current = true;
  1980. uint64_t left = range.KeysLeft();
  1981. RecordTick(stats_, MEMTABLE_MISS, left);
  1982. }
  1983. }
  1984. if (lookup_current) {
  1985. PERF_TIMER_GUARD(get_from_output_files_time);
  1986. super_version->current->MultiGet(read_options, &range, callback,
  1987. is_blob_index);
  1988. }
  1989. }
  1990. // Post processing (decrement reference counts and record statistics)
  1991. PERF_TIMER_GUARD(get_post_process_time);
  1992. size_t num_found = 0;
  1993. uint64_t bytes_read = 0;
  1994. for (size_t i = start_key; i < start_key + num_keys; ++i) {
  1995. KeyContext* key = (*sorted_keys)[i];
  1996. if (key->s->ok()) {
  1997. bytes_read += key->value->size();
  1998. num_found++;
  1999. }
  2000. }
  2001. RecordTick(stats_, NUMBER_MULTIGET_CALLS);
  2002. RecordTick(stats_, NUMBER_MULTIGET_KEYS_READ, num_keys);
  2003. RecordTick(stats_, NUMBER_MULTIGET_KEYS_FOUND, num_found);
  2004. RecordTick(stats_, NUMBER_MULTIGET_BYTES_READ, bytes_read);
  2005. RecordInHistogram(stats_, BYTES_PER_MULTIGET, bytes_read);
  2006. PERF_COUNTER_ADD(multiget_read_bytes, bytes_read);
  2007. PERF_TIMER_STOP(get_post_process_time);
  2008. }
  2009. Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& cf_options,
  2010. const std::string& column_family,
  2011. ColumnFamilyHandle** handle) {
  2012. assert(handle != nullptr);
  2013. Status s = CreateColumnFamilyImpl(cf_options, column_family, handle);
  2014. if (s.ok()) {
  2015. s = WriteOptionsFile(true /*need_mutex_lock*/,
  2016. true /*need_enter_write_thread*/);
  2017. }
  2018. return s;
  2019. }
  2020. Status DBImpl::CreateColumnFamilies(
  2021. const ColumnFamilyOptions& cf_options,
  2022. const std::vector<std::string>& column_family_names,
  2023. std::vector<ColumnFamilyHandle*>* handles) {
  2024. assert(handles != nullptr);
  2025. handles->clear();
  2026. size_t num_cf = column_family_names.size();
  2027. Status s;
  2028. bool success_once = false;
  2029. for (size_t i = 0; i < num_cf; i++) {
  2030. ColumnFamilyHandle* handle;
  2031. s = CreateColumnFamilyImpl(cf_options, column_family_names[i], &handle);
  2032. if (!s.ok()) {
  2033. break;
  2034. }
  2035. handles->push_back(handle);
  2036. success_once = true;
  2037. }
  2038. if (success_once) {
  2039. Status persist_options_status = WriteOptionsFile(
  2040. true /*need_mutex_lock*/, true /*need_enter_write_thread*/);
  2041. if (s.ok() && !persist_options_status.ok()) {
  2042. s = persist_options_status;
  2043. }
  2044. }
  2045. return s;
  2046. }
  2047. Status DBImpl::CreateColumnFamilies(
  2048. const std::vector<ColumnFamilyDescriptor>& column_families,
  2049. std::vector<ColumnFamilyHandle*>* handles) {
  2050. assert(handles != nullptr);
  2051. handles->clear();
  2052. size_t num_cf = column_families.size();
  2053. Status s;
  2054. bool success_once = false;
  2055. for (size_t i = 0; i < num_cf; i++) {
  2056. ColumnFamilyHandle* handle;
  2057. s = CreateColumnFamilyImpl(column_families[i].options,
  2058. column_families[i].name, &handle);
  2059. if (!s.ok()) {
  2060. break;
  2061. }
  2062. handles->push_back(handle);
  2063. success_once = true;
  2064. }
  2065. if (success_once) {
  2066. Status persist_options_status = WriteOptionsFile(
  2067. true /*need_mutex_lock*/, true /*need_enter_write_thread*/);
  2068. if (s.ok() && !persist_options_status.ok()) {
  2069. s = persist_options_status;
  2070. }
  2071. }
  2072. return s;
  2073. }
  2074. Status DBImpl::CreateColumnFamilyImpl(const ColumnFamilyOptions& cf_options,
  2075. const std::string& column_family_name,
  2076. ColumnFamilyHandle** handle) {
  2077. Status s;
  2078. Status persist_options_status;
  2079. *handle = nullptr;
  2080. DBOptions db_options =
  2081. BuildDBOptions(immutable_db_options_, mutable_db_options_);
  2082. s = ColumnFamilyData::ValidateOptions(db_options, cf_options);
  2083. if (s.ok()) {
  2084. for (auto& cf_path : cf_options.cf_paths) {
  2085. s = env_->CreateDirIfMissing(cf_path.path);
  2086. if (!s.ok()) {
  2087. break;
  2088. }
  2089. }
  2090. }
  2091. if (!s.ok()) {
  2092. return s;
  2093. }
  2094. SuperVersionContext sv_context(/* create_superversion */ true);
  2095. {
  2096. InstrumentedMutexLock l(&mutex_);
  2097. if (versions_->GetColumnFamilySet()->GetColumnFamily(column_family_name) !=
  2098. nullptr) {
  2099. return Status::InvalidArgument("Column family already exists");
  2100. }
  2101. VersionEdit edit;
  2102. edit.AddColumnFamily(column_family_name);
  2103. uint32_t new_id = versions_->GetColumnFamilySet()->GetNextColumnFamilyID();
  2104. edit.SetColumnFamily(new_id);
  2105. edit.SetLogNumber(logfile_number_);
  2106. edit.SetComparatorName(cf_options.comparator->Name());
  2107. // LogAndApply will both write the creation in MANIFEST and create
  2108. // ColumnFamilyData object
  2109. { // write thread
  2110. WriteThread::Writer w;
  2111. write_thread_.EnterUnbatched(&w, &mutex_);
  2112. // LogAndApply will both write the creation in MANIFEST and create
  2113. // ColumnFamilyData object
  2114. s = versions_->LogAndApply(nullptr, MutableCFOptions(cf_options), &edit,
  2115. &mutex_, directories_.GetDbDir(), false,
  2116. &cf_options);
  2117. write_thread_.ExitUnbatched(&w);
  2118. }
  2119. if (s.ok()) {
  2120. auto* cfd =
  2121. versions_->GetColumnFamilySet()->GetColumnFamily(column_family_name);
  2122. assert(cfd != nullptr);
  2123. std::map<std::string, std::shared_ptr<Directory>> dummy_created_dirs;
  2124. s = cfd->AddDirectories(&dummy_created_dirs);
  2125. }
  2126. if (s.ok()) {
  2127. single_column_family_mode_ = false;
  2128. auto* cfd =
  2129. versions_->GetColumnFamilySet()->GetColumnFamily(column_family_name);
  2130. assert(cfd != nullptr);
  2131. InstallSuperVersionAndScheduleWork(cfd, &sv_context,
  2132. *cfd->GetLatestMutableCFOptions());
  2133. if (!cfd->mem()->IsSnapshotSupported()) {
  2134. is_snapshot_supported_ = false;
  2135. }
  2136. cfd->set_initialized();
  2137. *handle = new ColumnFamilyHandleImpl(cfd, this, &mutex_);
  2138. ROCKS_LOG_INFO(immutable_db_options_.info_log,
  2139. "Created column family [%s] (ID %u)",
  2140. column_family_name.c_str(), (unsigned)cfd->GetID());
  2141. } else {
  2142. ROCKS_LOG_ERROR(immutable_db_options_.info_log,
  2143. "Creating column family [%s] FAILED -- %s",
  2144. column_family_name.c_str(), s.ToString().c_str());
  2145. }
  2146. } // InstrumentedMutexLock l(&mutex_)
  2147. sv_context.Clean();
  2148. // this is outside the mutex
  2149. if (s.ok()) {
  2150. NewThreadStatusCfInfo(
  2151. reinterpret_cast<ColumnFamilyHandleImpl*>(*handle)->cfd());
  2152. }
  2153. return s;
  2154. }
  2155. Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) {
  2156. assert(column_family != nullptr);
  2157. Status s = DropColumnFamilyImpl(column_family);
  2158. if (s.ok()) {
  2159. s = WriteOptionsFile(true /*need_mutex_lock*/,
  2160. true /*need_enter_write_thread*/);
  2161. }
  2162. return s;
  2163. }
  2164. Status DBImpl::DropColumnFamilies(
  2165. const std::vector<ColumnFamilyHandle*>& column_families) {
  2166. Status s;
  2167. bool success_once = false;
  2168. for (auto* handle : column_families) {
  2169. s = DropColumnFamilyImpl(handle);
  2170. if (!s.ok()) {
  2171. break;
  2172. }
  2173. success_once = true;
  2174. }
  2175. if (success_once) {
  2176. Status persist_options_status = WriteOptionsFile(
  2177. true /*need_mutex_lock*/, true /*need_enter_write_thread*/);
  2178. if (s.ok() && !persist_options_status.ok()) {
  2179. s = persist_options_status;
  2180. }
  2181. }
  2182. return s;
  2183. }
  2184. Status DBImpl::DropColumnFamilyImpl(ColumnFamilyHandle* column_family) {
  2185. auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
  2186. auto cfd = cfh->cfd();
  2187. if (cfd->GetID() == 0) {
  2188. return Status::InvalidArgument("Can't drop default column family");
  2189. }
  2190. bool cf_support_snapshot = cfd->mem()->IsSnapshotSupported();
  2191. VersionEdit edit;
  2192. edit.DropColumnFamily();
  2193. edit.SetColumnFamily(cfd->GetID());
  2194. Status s;
  2195. {
  2196. InstrumentedMutexLock l(&mutex_);
  2197. if (cfd->IsDropped()) {
  2198. s = Status::InvalidArgument("Column family already dropped!\n");
  2199. }
  2200. if (s.ok()) {
  2201. // we drop column family from a single write thread
  2202. WriteThread::Writer w;
  2203. write_thread_.EnterUnbatched(&w, &mutex_);
  2204. s = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(), &edit,
  2205. &mutex_);
  2206. write_thread_.ExitUnbatched(&w);
  2207. }
  2208. if (s.ok()) {
  2209. auto* mutable_cf_options = cfd->GetLatestMutableCFOptions();
  2210. max_total_in_memory_state_ -= mutable_cf_options->write_buffer_size *
  2211. mutable_cf_options->max_write_buffer_number;
  2212. }
  2213. if (!cf_support_snapshot) {
  2214. // Dropped Column Family doesn't support snapshot. Need to recalculate
  2215. // is_snapshot_supported_.
  2216. bool new_is_snapshot_supported = true;
  2217. for (auto c : *versions_->GetColumnFamilySet()) {
  2218. if (!c->IsDropped() && !c->mem()->IsSnapshotSupported()) {
  2219. new_is_snapshot_supported = false;
  2220. break;
  2221. }
  2222. }
  2223. is_snapshot_supported_ = new_is_snapshot_supported;
  2224. }
  2225. bg_cv_.SignalAll();
  2226. }
  2227. if (s.ok()) {
  2228. // Note that here we erase the associated cf_info of the to-be-dropped
  2229. // cfd before its ref-count goes to zero to avoid having to erase cf_info
  2230. // later inside db_mutex.
  2231. EraseThreadStatusCfInfo(cfd);
  2232. assert(cfd->IsDropped());
  2233. ROCKS_LOG_INFO(immutable_db_options_.info_log,
  2234. "Dropped column family with id %u\n", cfd->GetID());
  2235. } else {
  2236. ROCKS_LOG_ERROR(immutable_db_options_.info_log,
  2237. "Dropping column family with id %u FAILED -- %s\n",
  2238. cfd->GetID(), s.ToString().c_str());
  2239. }
  2240. return s;
  2241. }
  2242. bool DBImpl::KeyMayExist(const ReadOptions& read_options,
  2243. ColumnFamilyHandle* column_family, const Slice& key,
  2244. std::string* value, bool* value_found) {
  2245. assert(value != nullptr);
  2246. if (value_found != nullptr) {
  2247. // falsify later if key-may-exist but can't fetch value
  2248. *value_found = true;
  2249. }
  2250. ReadOptions roptions = read_options;
  2251. roptions.read_tier = kBlockCacheTier; // read from block cache only
  2252. PinnableSlice pinnable_val;
  2253. GetImplOptions get_impl_options;
  2254. get_impl_options.column_family = column_family;
  2255. get_impl_options.value = &pinnable_val;
  2256. get_impl_options.value_found = value_found;
  2257. auto s = GetImpl(roptions, key, get_impl_options);
  2258. value->assign(pinnable_val.data(), pinnable_val.size());
  2259. // If block_cache is enabled and the index block of the table didn't
  2260. // not present in block_cache, the return value will be Status::Incomplete.
  2261. // In this case, key may still exist in the table.
  2262. return s.ok() || s.IsIncomplete();
  2263. }
  2264. Iterator* DBImpl::NewIterator(const ReadOptions& read_options,
  2265. ColumnFamilyHandle* column_family) {
  2266. if (read_options.managed) {
  2267. return NewErrorIterator(
  2268. Status::NotSupported("Managed iterator is not supported anymore."));
  2269. }
  2270. Iterator* result = nullptr;
  2271. if (read_options.read_tier == kPersistedTier) {
  2272. return NewErrorIterator(Status::NotSupported(
  2273. "ReadTier::kPersistedData is not yet supported in iterators."));
  2274. }
  2275. // if iterator wants internal keys, we can only proceed if
  2276. // we can guarantee the deletes haven't been processed yet
  2277. if (immutable_db_options_.preserve_deletes &&
  2278. read_options.iter_start_seqnum > 0 &&
  2279. read_options.iter_start_seqnum < preserve_deletes_seqnum_.load()) {
  2280. return NewErrorIterator(Status::InvalidArgument(
  2281. "Iterator requested internal keys which are too old and are not"
  2282. " guaranteed to be preserved, try larger iter_start_seqnum opt."));
  2283. }
  2284. auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
  2285. auto cfd = cfh->cfd();
  2286. ReadCallback* read_callback = nullptr; // No read callback provided.
  2287. if (read_options.tailing) {
  2288. #ifdef ROCKSDB_LITE
  2289. // not supported in lite version
  2290. result = nullptr;
  2291. #else
  2292. SuperVersion* sv = cfd->GetReferencedSuperVersion(this);
  2293. auto iter = new ForwardIterator(this, read_options, cfd, sv);
  2294. result = NewDBIterator(
  2295. env_, read_options, *cfd->ioptions(), sv->mutable_cf_options,
  2296. cfd->user_comparator(), iter, kMaxSequenceNumber,
  2297. sv->mutable_cf_options.max_sequential_skip_in_iterations, read_callback,
  2298. this, cfd);
  2299. #endif
  2300. } else {
  2301. // Note: no need to consider the special case of
  2302. // last_seq_same_as_publish_seq_==false since NewIterator is overridden in
  2303. // WritePreparedTxnDB
  2304. auto snapshot = read_options.snapshot != nullptr
  2305. ? read_options.snapshot->GetSequenceNumber()
  2306. : versions_->LastSequence();
  2307. result = NewIteratorImpl(read_options, cfd, snapshot, read_callback);
  2308. }
  2309. return result;
  2310. }
  2311. ArenaWrappedDBIter* DBImpl::NewIteratorImpl(const ReadOptions& read_options,
  2312. ColumnFamilyData* cfd,
  2313. SequenceNumber snapshot,
  2314. ReadCallback* read_callback,
  2315. bool allow_blob,
  2316. bool allow_refresh) {
  2317. SuperVersion* sv = cfd->GetReferencedSuperVersion(this);
  2318. // Try to generate a DB iterator tree in continuous memory area to be
  2319. // cache friendly. Here is an example of result:
  2320. // +-------------------------------+
  2321. // | |
  2322. // | ArenaWrappedDBIter |
  2323. // | + |
  2324. // | +---> Inner Iterator ------------+
  2325. // | | | |
  2326. // | | +-- -- -- -- -- -- -- --+ |
  2327. // | +--- | Arena | |
  2328. // | | | |
  2329. // | Allocated Memory: | |
  2330. // | | +-------------------+ |
  2331. // | | | DBIter | <---+
  2332. // | | + |
  2333. // | | | +-> iter_ ------------+
  2334. // | | | | |
  2335. // | | +-------------------+ |
  2336. // | | | MergingIterator | <---+
  2337. // | | + |
  2338. // | | | +->child iter1 ------------+
  2339. // | | | | | |
  2340. // | | +->child iter2 ----------+ |
  2341. // | | | | | | |
  2342. // | | | +->child iter3 --------+ | |
  2343. // | | | | | |
  2344. // | | +-------------------+ | | |
  2345. // | | | Iterator1 | <--------+
  2346. // | | +-------------------+ | |
  2347. // | | | Iterator2 | <------+
  2348. // | | +-------------------+ |
  2349. // | | | Iterator3 | <----+
  2350. // | | +-------------------+
  2351. // | | |
  2352. // +-------+-----------------------+
  2353. //
  2354. // ArenaWrappedDBIter inlines an arena area where all the iterators in
  2355. // the iterator tree are allocated in the order of being accessed when
  2356. // querying.
  2357. // Laying out the iterators in the order of being accessed makes it more
  2358. // likely that any iterator pointer is close to the iterator it points to so
  2359. // that they are likely to be in the same cache line and/or page.
  2360. ArenaWrappedDBIter* db_iter = NewArenaWrappedDbIterator(
  2361. env_, read_options, *cfd->ioptions(), sv->mutable_cf_options, snapshot,
  2362. sv->mutable_cf_options.max_sequential_skip_in_iterations,
  2363. sv->version_number, read_callback, this, cfd, allow_blob,
  2364. read_options.snapshot != nullptr ? false : allow_refresh);
  2365. InternalIterator* internal_iter =
  2366. NewInternalIterator(read_options, cfd, sv, db_iter->GetArena(),
  2367. db_iter->GetRangeDelAggregator(), snapshot);
  2368. db_iter->SetIterUnderDBIter(internal_iter);
  2369. return db_iter;
  2370. }
  2371. Status DBImpl::NewIterators(
  2372. const ReadOptions& read_options,
  2373. const std::vector<ColumnFamilyHandle*>& column_families,
  2374. std::vector<Iterator*>* iterators) {
  2375. if (read_options.managed) {
  2376. return Status::NotSupported("Managed iterator is not supported anymore.");
  2377. }
  2378. if (read_options.read_tier == kPersistedTier) {
  2379. return Status::NotSupported(
  2380. "ReadTier::kPersistedData is not yet supported in iterators.");
  2381. }
  2382. ReadCallback* read_callback = nullptr; // No read callback provided.
  2383. iterators->clear();
  2384. iterators->reserve(column_families.size());
  2385. if (read_options.tailing) {
  2386. #ifdef ROCKSDB_LITE
  2387. return Status::InvalidArgument(
  2388. "Tailing iterator not supported in RocksDB lite");
  2389. #else
  2390. for (auto cfh : column_families) {
  2391. auto cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(cfh)->cfd();
  2392. SuperVersion* sv = cfd->GetReferencedSuperVersion(this);
  2393. auto iter = new ForwardIterator(this, read_options, cfd, sv);
  2394. iterators->push_back(NewDBIterator(
  2395. env_, read_options, *cfd->ioptions(), sv->mutable_cf_options,
  2396. cfd->user_comparator(), iter, kMaxSequenceNumber,
  2397. sv->mutable_cf_options.max_sequential_skip_in_iterations,
  2398. read_callback, this, cfd));
  2399. }
  2400. #endif
  2401. } else {
  2402. // Note: no need to consider the special case of
  2403. // last_seq_same_as_publish_seq_==false since NewIterators is overridden in
  2404. // WritePreparedTxnDB
  2405. auto snapshot = read_options.snapshot != nullptr
  2406. ? read_options.snapshot->GetSequenceNumber()
  2407. : versions_->LastSequence();
  2408. for (size_t i = 0; i < column_families.size(); ++i) {
  2409. auto* cfd =
  2410. reinterpret_cast<ColumnFamilyHandleImpl*>(column_families[i])->cfd();
  2411. iterators->push_back(
  2412. NewIteratorImpl(read_options, cfd, snapshot, read_callback));
  2413. }
  2414. }
  2415. return Status::OK();
  2416. }
  2417. const Snapshot* DBImpl::GetSnapshot() { return GetSnapshotImpl(false); }
  2418. #ifndef ROCKSDB_LITE
  2419. const Snapshot* DBImpl::GetSnapshotForWriteConflictBoundary() {
  2420. return GetSnapshotImpl(true);
  2421. }
  2422. #endif // ROCKSDB_LITE
  2423. SnapshotImpl* DBImpl::GetSnapshotImpl(bool is_write_conflict_boundary,
  2424. bool lock) {
  2425. int64_t unix_time = 0;
  2426. env_->GetCurrentTime(&unix_time); // Ignore error
  2427. SnapshotImpl* s = new SnapshotImpl;
  2428. if (lock) {
  2429. mutex_.Lock();
  2430. }
  2431. // returns null if the underlying memtable does not support snapshot.
  2432. if (!is_snapshot_supported_) {
  2433. if (lock) {
  2434. mutex_.Unlock();
  2435. }
  2436. delete s;
  2437. return nullptr;
  2438. }
  2439. auto snapshot_seq = last_seq_same_as_publish_seq_
  2440. ? versions_->LastSequence()
  2441. : versions_->LastPublishedSequence();
  2442. SnapshotImpl* snapshot =
  2443. snapshots_.New(s, snapshot_seq, unix_time, is_write_conflict_boundary);
  2444. if (lock) {
  2445. mutex_.Unlock();
  2446. }
  2447. return snapshot;
  2448. }
  2449. namespace {
  2450. typedef autovector<ColumnFamilyData*, 2> CfdList;
  2451. bool CfdListContains(const CfdList& list, ColumnFamilyData* cfd) {
  2452. for (const ColumnFamilyData* t : list) {
  2453. if (t == cfd) {
  2454. return true;
  2455. }
  2456. }
  2457. return false;
  2458. }
  2459. } // namespace
  2460. void DBImpl::ReleaseSnapshot(const Snapshot* s) {
  2461. const SnapshotImpl* casted_s = reinterpret_cast<const SnapshotImpl*>(s);
  2462. {
  2463. InstrumentedMutexLock l(&mutex_);
  2464. snapshots_.Delete(casted_s);
  2465. uint64_t oldest_snapshot;
  2466. if (snapshots_.empty()) {
  2467. oldest_snapshot = last_seq_same_as_publish_seq_
  2468. ? versions_->LastSequence()
  2469. : versions_->LastPublishedSequence();
  2470. } else {
  2471. oldest_snapshot = snapshots_.oldest()->number_;
  2472. }
  2473. // Avoid to go through every column family by checking a global threshold
  2474. // first.
  2475. if (oldest_snapshot > bottommost_files_mark_threshold_) {
  2476. CfdList cf_scheduled;
  2477. for (auto* cfd : *versions_->GetColumnFamilySet()) {
  2478. cfd->current()->storage_info()->UpdateOldestSnapshot(oldest_snapshot);
  2479. if (!cfd->current()
  2480. ->storage_info()
  2481. ->BottommostFilesMarkedForCompaction()
  2482. .empty()) {
  2483. SchedulePendingCompaction(cfd);
  2484. MaybeScheduleFlushOrCompaction();
  2485. cf_scheduled.push_back(cfd);
  2486. }
  2487. }
  2488. // Calculate a new threshold, skipping those CFs where compactions are
  2489. // scheduled. We do not do the same pass as the previous loop because
  2490. // mutex might be unlocked during the loop, making the result inaccurate.
  2491. SequenceNumber new_bottommost_files_mark_threshold = kMaxSequenceNumber;
  2492. for (auto* cfd : *versions_->GetColumnFamilySet()) {
  2493. if (CfdListContains(cf_scheduled, cfd)) {
  2494. continue;
  2495. }
  2496. new_bottommost_files_mark_threshold = std::min(
  2497. new_bottommost_files_mark_threshold,
  2498. cfd->current()->storage_info()->bottommost_files_mark_threshold());
  2499. }
  2500. bottommost_files_mark_threshold_ = new_bottommost_files_mark_threshold;
  2501. }
  2502. }
  2503. delete casted_s;
  2504. }
  2505. #ifndef ROCKSDB_LITE
  2506. Status DBImpl::GetPropertiesOfAllTables(ColumnFamilyHandle* column_family,
  2507. TablePropertiesCollection* props) {
  2508. auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
  2509. auto cfd = cfh->cfd();
  2510. // Increment the ref count
  2511. mutex_.Lock();
  2512. auto version = cfd->current();
  2513. version->Ref();
  2514. mutex_.Unlock();
  2515. auto s = version->GetPropertiesOfAllTables(props);
  2516. // Decrement the ref count
  2517. mutex_.Lock();
  2518. version->Unref();
  2519. mutex_.Unlock();
  2520. return s;
  2521. }
  2522. Status DBImpl::GetPropertiesOfTablesInRange(ColumnFamilyHandle* column_family,
  2523. const Range* range, std::size_t n,
  2524. TablePropertiesCollection* props) {
  2525. auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
  2526. auto cfd = cfh->cfd();
  2527. // Increment the ref count
  2528. mutex_.Lock();
  2529. auto version = cfd->current();
  2530. version->Ref();
  2531. mutex_.Unlock();
  2532. auto s = version->GetPropertiesOfTablesInRange(range, n, props);
  2533. // Decrement the ref count
  2534. mutex_.Lock();
  2535. version->Unref();
  2536. mutex_.Unlock();
  2537. return s;
  2538. }
  2539. #endif // ROCKSDB_LITE
  2540. const std::string& DBImpl::GetName() const { return dbname_; }
  2541. Env* DBImpl::GetEnv() const { return env_; }
  2542. FileSystem* DB::GetFileSystem() const {
  2543. static LegacyFileSystemWrapper fs_wrap(GetEnv());
  2544. return &fs_wrap;
  2545. }
  2546. FileSystem* DBImpl::GetFileSystem() const {
  2547. return immutable_db_options_.fs.get();
  2548. }
  2549. Options DBImpl::GetOptions(ColumnFamilyHandle* column_family) const {
  2550. InstrumentedMutexLock l(&mutex_);
  2551. auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
  2552. return Options(BuildDBOptions(immutable_db_options_, mutable_db_options_),
  2553. cfh->cfd()->GetLatestCFOptions());
  2554. }
  2555. DBOptions DBImpl::GetDBOptions() const {
  2556. InstrumentedMutexLock l(&mutex_);
  2557. return BuildDBOptions(immutable_db_options_, mutable_db_options_);
  2558. }
  2559. bool DBImpl::GetProperty(ColumnFamilyHandle* column_family,
  2560. const Slice& property, std::string* value) {
  2561. const DBPropertyInfo* property_info = GetPropertyInfo(property);
  2562. value->clear();
  2563. auto cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd();
  2564. if (property_info == nullptr) {
  2565. return false;
  2566. } else if (property_info->handle_int) {
  2567. uint64_t int_value;
  2568. bool ret_value =
  2569. GetIntPropertyInternal(cfd, *property_info, false, &int_value);
  2570. if (ret_value) {
  2571. *value = ToString(int_value);
  2572. }
  2573. return ret_value;
  2574. } else if (property_info->handle_string) {
  2575. InstrumentedMutexLock l(&mutex_);
  2576. return cfd->internal_stats()->GetStringProperty(*property_info, property,
  2577. value);
  2578. } else if (property_info->handle_string_dbimpl) {
  2579. std::string tmp_value;
  2580. bool ret_value = (this->*(property_info->handle_string_dbimpl))(&tmp_value);
  2581. if (ret_value) {
  2582. *value = tmp_value;
  2583. }
  2584. return ret_value;
  2585. }
  2586. // Shouldn't reach here since exactly one of handle_string and handle_int
  2587. // should be non-nullptr.
  2588. assert(false);
  2589. return false;
  2590. }
  2591. bool DBImpl::GetMapProperty(ColumnFamilyHandle* column_family,
  2592. const Slice& property,
  2593. std::map<std::string, std::string>* value) {
  2594. const DBPropertyInfo* property_info = GetPropertyInfo(property);
  2595. value->clear();
  2596. auto cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd();
  2597. if (property_info == nullptr) {
  2598. return false;
  2599. } else if (property_info->handle_map) {
  2600. InstrumentedMutexLock l(&mutex_);
  2601. return cfd->internal_stats()->GetMapProperty(*property_info, property,
  2602. value);
  2603. }
  2604. // If we reach this point it means that handle_map is not provided for the
  2605. // requested property
  2606. return false;
  2607. }
  2608. bool DBImpl::GetIntProperty(ColumnFamilyHandle* column_family,
  2609. const Slice& property, uint64_t* value) {
  2610. const DBPropertyInfo* property_info = GetPropertyInfo(property);
  2611. if (property_info == nullptr || property_info->handle_int == nullptr) {
  2612. return false;
  2613. }
  2614. auto cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd();
  2615. return GetIntPropertyInternal(cfd, *property_info, false, value);
  2616. }
  2617. bool DBImpl::GetIntPropertyInternal(ColumnFamilyData* cfd,
  2618. const DBPropertyInfo& property_info,
  2619. bool is_locked, uint64_t* value) {
  2620. assert(property_info.handle_int != nullptr);
  2621. if (!property_info.need_out_of_mutex) {
  2622. if (is_locked) {
  2623. mutex_.AssertHeld();
  2624. return cfd->internal_stats()->GetIntProperty(property_info, value, this);
  2625. } else {
  2626. InstrumentedMutexLock l(&mutex_);
  2627. return cfd->internal_stats()->GetIntProperty(property_info, value, this);
  2628. }
  2629. } else {
  2630. SuperVersion* sv = nullptr;
  2631. if (!is_locked) {
  2632. sv = GetAndRefSuperVersion(cfd);
  2633. } else {
  2634. sv = cfd->GetSuperVersion();
  2635. }
  2636. bool ret = cfd->internal_stats()->GetIntPropertyOutOfMutex(
  2637. property_info, sv->current, value);
  2638. if (!is_locked) {
  2639. ReturnAndCleanupSuperVersion(cfd, sv);
  2640. }
  2641. return ret;
  2642. }
  2643. }
  2644. bool DBImpl::GetPropertyHandleOptionsStatistics(std::string* value) {
  2645. assert(value != nullptr);
  2646. Statistics* statistics = immutable_db_options_.statistics.get();
  2647. if (!statistics) {
  2648. return false;
  2649. }
  2650. *value = statistics->ToString();
  2651. return true;
  2652. }
  2653. #ifndef ROCKSDB_LITE
  2654. Status DBImpl::ResetStats() {
  2655. InstrumentedMutexLock l(&mutex_);
  2656. for (auto* cfd : *versions_->GetColumnFamilySet()) {
  2657. if (cfd->initialized()) {
  2658. cfd->internal_stats()->Clear();
  2659. }
  2660. }
  2661. return Status::OK();
  2662. }
  2663. #endif // ROCKSDB_LITE
  2664. bool DBImpl::GetAggregatedIntProperty(const Slice& property,
  2665. uint64_t* aggregated_value) {
  2666. const DBPropertyInfo* property_info = GetPropertyInfo(property);
  2667. if (property_info == nullptr || property_info->handle_int == nullptr) {
  2668. return false;
  2669. }
  2670. uint64_t sum = 0;
  2671. {
  2672. // Needs mutex to protect the list of column families.
  2673. InstrumentedMutexLock l(&mutex_);
  2674. uint64_t value;
  2675. for (auto* cfd : *versions_->GetColumnFamilySet()) {
  2676. if (!cfd->initialized()) {
  2677. continue;
  2678. }
  2679. if (GetIntPropertyInternal(cfd, *property_info, true, &value)) {
  2680. sum += value;
  2681. } else {
  2682. return false;
  2683. }
  2684. }
  2685. }
  2686. *aggregated_value = sum;
  2687. return true;
  2688. }
  2689. SuperVersion* DBImpl::GetAndRefSuperVersion(ColumnFamilyData* cfd) {
  2690. // TODO(ljin): consider using GetReferencedSuperVersion() directly
  2691. return cfd->GetThreadLocalSuperVersion(this);
  2692. }
  2693. // REQUIRED: this function should only be called on the write thread or if the
  2694. // mutex is held.
  2695. SuperVersion* DBImpl::GetAndRefSuperVersion(uint32_t column_family_id) {
  2696. auto column_family_set = versions_->GetColumnFamilySet();
  2697. auto cfd = column_family_set->GetColumnFamily(column_family_id);
  2698. if (!cfd) {
  2699. return nullptr;
  2700. }
  2701. return GetAndRefSuperVersion(cfd);
  2702. }
  2703. void DBImpl::CleanupSuperVersion(SuperVersion* sv) {
  2704. // Release SuperVersion
  2705. if (sv->Unref()) {
  2706. bool defer_purge =
  2707. immutable_db_options().avoid_unnecessary_blocking_io;
  2708. {
  2709. InstrumentedMutexLock l(&mutex_);
  2710. sv->Cleanup();
  2711. if (defer_purge) {
  2712. AddSuperVersionsToFreeQueue(sv);
  2713. SchedulePurge();
  2714. }
  2715. }
  2716. if (!defer_purge) {
  2717. delete sv;
  2718. }
  2719. RecordTick(stats_, NUMBER_SUPERVERSION_CLEANUPS);
  2720. }
  2721. RecordTick(stats_, NUMBER_SUPERVERSION_RELEASES);
  2722. }
  2723. void DBImpl::ReturnAndCleanupSuperVersion(ColumnFamilyData* cfd,
  2724. SuperVersion* sv) {
  2725. if (!cfd->ReturnThreadLocalSuperVersion(sv)) {
  2726. CleanupSuperVersion(sv);
  2727. }
  2728. }
  2729. // REQUIRED: this function should only be called on the write thread.
  2730. void DBImpl::ReturnAndCleanupSuperVersion(uint32_t column_family_id,
  2731. SuperVersion* sv) {
  2732. auto column_family_set = versions_->GetColumnFamilySet();
  2733. auto cfd = column_family_set->GetColumnFamily(column_family_id);
  2734. // If SuperVersion is held, and we successfully fetched a cfd using
  2735. // GetAndRefSuperVersion(), it must still exist.
  2736. assert(cfd != nullptr);
  2737. ReturnAndCleanupSuperVersion(cfd, sv);
  2738. }
  2739. // REQUIRED: this function should only be called on the write thread or if the
  2740. // mutex is held.
  2741. ColumnFamilyHandle* DBImpl::GetColumnFamilyHandle(uint32_t column_family_id) {
  2742. ColumnFamilyMemTables* cf_memtables = column_family_memtables_.get();
  2743. if (!cf_memtables->Seek(column_family_id)) {
  2744. return nullptr;
  2745. }
  2746. return cf_memtables->GetColumnFamilyHandle();
  2747. }
  2748. // REQUIRED: mutex is NOT held.
  2749. std::unique_ptr<ColumnFamilyHandle> DBImpl::GetColumnFamilyHandleUnlocked(
  2750. uint32_t column_family_id) {
  2751. InstrumentedMutexLock l(&mutex_);
  2752. auto* cfd =
  2753. versions_->GetColumnFamilySet()->GetColumnFamily(column_family_id);
  2754. if (cfd == nullptr) {
  2755. return nullptr;
  2756. }
  2757. return std::unique_ptr<ColumnFamilyHandleImpl>(
  2758. new ColumnFamilyHandleImpl(cfd, this, &mutex_));
  2759. }
  2760. void DBImpl::GetApproximateMemTableStats(ColumnFamilyHandle* column_family,
  2761. const Range& range,
  2762. uint64_t* const count,
  2763. uint64_t* const size) {
  2764. ColumnFamilyHandleImpl* cfh =
  2765. reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
  2766. ColumnFamilyData* cfd = cfh->cfd();
  2767. SuperVersion* sv = GetAndRefSuperVersion(cfd);
  2768. // Convert user_key into a corresponding internal key.
  2769. InternalKey k1(range.start, kMaxSequenceNumber, kValueTypeForSeek);
  2770. InternalKey k2(range.limit, kMaxSequenceNumber, kValueTypeForSeek);
  2771. MemTable::MemTableStats memStats =
  2772. sv->mem->ApproximateStats(k1.Encode(), k2.Encode());
  2773. MemTable::MemTableStats immStats =
  2774. sv->imm->ApproximateStats(k1.Encode(), k2.Encode());
  2775. *count = memStats.count + immStats.count;
  2776. *size = memStats.size + immStats.size;
  2777. ReturnAndCleanupSuperVersion(cfd, sv);
  2778. }
  2779. Status DBImpl::GetApproximateSizes(const SizeApproximationOptions& options,
  2780. ColumnFamilyHandle* column_family,
  2781. const Range* range, int n, uint64_t* sizes) {
  2782. if (!options.include_memtabtles && !options.include_files) {
  2783. return Status::InvalidArgument("Invalid options");
  2784. }
  2785. Version* v;
  2786. auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
  2787. auto cfd = cfh->cfd();
  2788. SuperVersion* sv = GetAndRefSuperVersion(cfd);
  2789. v = sv->current;
  2790. for (int i = 0; i < n; i++) {
  2791. // Convert user_key into a corresponding internal key.
  2792. InternalKey k1(range[i].start, kMaxSequenceNumber, kValueTypeForSeek);
  2793. InternalKey k2(range[i].limit, kMaxSequenceNumber, kValueTypeForSeek);
  2794. sizes[i] = 0;
  2795. if (options.include_files) {
  2796. sizes[i] += versions_->ApproximateSize(
  2797. options, v, k1.Encode(), k2.Encode(), /*start_level=*/0,
  2798. /*end_level=*/-1, TableReaderCaller::kUserApproximateSize);
  2799. }
  2800. if (options.include_memtabtles) {
  2801. sizes[i] += sv->mem->ApproximateStats(k1.Encode(), k2.Encode()).size;
  2802. sizes[i] += sv->imm->ApproximateStats(k1.Encode(), k2.Encode()).size;
  2803. }
  2804. }
  2805. ReturnAndCleanupSuperVersion(cfd, sv);
  2806. return Status::OK();
  2807. }
  2808. std::list<uint64_t>::iterator
  2809. DBImpl::CaptureCurrentFileNumberInPendingOutputs() {
  2810. // We need to remember the iterator of our insert, because after the
  2811. // background job is done, we need to remove that element from
  2812. // pending_outputs_.
  2813. pending_outputs_.push_back(versions_->current_next_file_number());
  2814. auto pending_outputs_inserted_elem = pending_outputs_.end();
  2815. --pending_outputs_inserted_elem;
  2816. return pending_outputs_inserted_elem;
  2817. }
  2818. void DBImpl::ReleaseFileNumberFromPendingOutputs(
  2819. std::unique_ptr<std::list<uint64_t>::iterator>& v) {
  2820. if (v.get() != nullptr) {
  2821. pending_outputs_.erase(*v.get());
  2822. v.reset();
  2823. }
  2824. }
  2825. #ifndef ROCKSDB_LITE
  2826. Status DBImpl::GetUpdatesSince(
  2827. SequenceNumber seq, std::unique_ptr<TransactionLogIterator>* iter,
  2828. const TransactionLogIterator::ReadOptions& read_options) {
  2829. RecordTick(stats_, GET_UPDATES_SINCE_CALLS);
  2830. if (seq > versions_->LastSequence()) {
  2831. return Status::NotFound("Requested sequence not yet written in the db");
  2832. }
  2833. return wal_manager_.GetUpdatesSince(seq, iter, read_options, versions_.get());
  2834. }
  2835. Status DBImpl::DeleteFile(std::string name) {
  2836. uint64_t number;
  2837. FileType type;
  2838. WalFileType log_type;
  2839. if (!ParseFileName(name, &number, &type, &log_type) ||
  2840. (type != kTableFile && type != kLogFile)) {
  2841. ROCKS_LOG_ERROR(immutable_db_options_.info_log, "DeleteFile %s failed.\n",
  2842. name.c_str());
  2843. return Status::InvalidArgument("Invalid file name");
  2844. }
  2845. Status status;
  2846. if (type == kLogFile) {
  2847. // Only allow deleting archived log files
  2848. if (log_type != kArchivedLogFile) {
  2849. ROCKS_LOG_ERROR(immutable_db_options_.info_log,
  2850. "DeleteFile %s failed - not archived log.\n",
  2851. name.c_str());
  2852. return Status::NotSupported("Delete only supported for archived logs");
  2853. }
  2854. status = wal_manager_.DeleteFile(name, number);
  2855. if (!status.ok()) {
  2856. ROCKS_LOG_ERROR(immutable_db_options_.info_log,
  2857. "DeleteFile %s failed -- %s.\n", name.c_str(),
  2858. status.ToString().c_str());
  2859. }
  2860. return status;
  2861. }
  2862. int level;
  2863. FileMetaData* metadata;
  2864. ColumnFamilyData* cfd;
  2865. VersionEdit edit;
  2866. JobContext job_context(next_job_id_.fetch_add(1), true);
  2867. {
  2868. InstrumentedMutexLock l(&mutex_);
  2869. status = versions_->GetMetadataForFile(number, &level, &metadata, &cfd);
  2870. if (!status.ok()) {
  2871. ROCKS_LOG_WARN(immutable_db_options_.info_log,
  2872. "DeleteFile %s failed. File not found\n", name.c_str());
  2873. job_context.Clean();
  2874. return Status::InvalidArgument("File not found");
  2875. }
  2876. assert(level < cfd->NumberLevels());
  2877. // If the file is being compacted no need to delete.
  2878. if (metadata->being_compacted) {
  2879. ROCKS_LOG_INFO(immutable_db_options_.info_log,
  2880. "DeleteFile %s Skipped. File about to be compacted\n",
  2881. name.c_str());
  2882. job_context.Clean();
  2883. return Status::OK();
  2884. }
  2885. // Only the files in the last level can be deleted externally.
  2886. // This is to make sure that any deletion tombstones are not
  2887. // lost. Check that the level passed is the last level.
  2888. auto* vstoreage = cfd->current()->storage_info();
  2889. for (int i = level + 1; i < cfd->NumberLevels(); i++) {
  2890. if (vstoreage->NumLevelFiles(i) != 0) {
  2891. ROCKS_LOG_WARN(immutable_db_options_.info_log,
  2892. "DeleteFile %s FAILED. File not in last level\n",
  2893. name.c_str());
  2894. job_context.Clean();
  2895. return Status::InvalidArgument("File not in last level");
  2896. }
  2897. }
  2898. // if level == 0, it has to be the oldest file
  2899. if (level == 0 &&
  2900. vstoreage->LevelFiles(0).back()->fd.GetNumber() != number) {
  2901. ROCKS_LOG_WARN(immutable_db_options_.info_log,
  2902. "DeleteFile %s failed ---"
  2903. " target file in level 0 must be the oldest.",
  2904. name.c_str());
  2905. job_context.Clean();
  2906. return Status::InvalidArgument("File in level 0, but not oldest");
  2907. }
  2908. edit.SetColumnFamily(cfd->GetID());
  2909. edit.DeleteFile(level, number);
  2910. status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(),
  2911. &edit, &mutex_, directories_.GetDbDir());
  2912. if (status.ok()) {
  2913. InstallSuperVersionAndScheduleWork(cfd,
  2914. &job_context.superversion_contexts[0],
  2915. *cfd->GetLatestMutableCFOptions());
  2916. }
  2917. FindObsoleteFiles(&job_context, false);
  2918. } // lock released here
  2919. LogFlush(immutable_db_options_.info_log);
  2920. // remove files outside the db-lock
  2921. if (job_context.HaveSomethingToDelete()) {
  2922. // Call PurgeObsoleteFiles() without holding mutex.
  2923. PurgeObsoleteFiles(job_context);
  2924. }
  2925. job_context.Clean();
  2926. return status;
  2927. }
  2928. Status DBImpl::DeleteFilesInRanges(ColumnFamilyHandle* column_family,
  2929. const RangePtr* ranges, size_t n,
  2930. bool include_end) {
  2931. Status status;
  2932. auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
  2933. ColumnFamilyData* cfd = cfh->cfd();
  2934. VersionEdit edit;
  2935. std::set<FileMetaData*> deleted_files;
  2936. JobContext job_context(next_job_id_.fetch_add(1), true);
  2937. {
  2938. InstrumentedMutexLock l(&mutex_);
  2939. Version* input_version = cfd->current();
  2940. auto* vstorage = input_version->storage_info();
  2941. for (size_t r = 0; r < n; r++) {
  2942. auto begin = ranges[r].start, end = ranges[r].limit;
  2943. for (int i = 1; i < cfd->NumberLevels(); i++) {
  2944. if (vstorage->LevelFiles(i).empty() ||
  2945. !vstorage->OverlapInLevel(i, begin, end)) {
  2946. continue;
  2947. }
  2948. std::vector<FileMetaData*> level_files;
  2949. InternalKey begin_storage, end_storage, *begin_key, *end_key;
  2950. if (begin == nullptr) {
  2951. begin_key = nullptr;
  2952. } else {
  2953. begin_storage.SetMinPossibleForUserKey(*begin);
  2954. begin_key = &begin_storage;
  2955. }
  2956. if (end == nullptr) {
  2957. end_key = nullptr;
  2958. } else {
  2959. end_storage.SetMaxPossibleForUserKey(*end);
  2960. end_key = &end_storage;
  2961. }
  2962. vstorage->GetCleanInputsWithinInterval(
  2963. i, begin_key, end_key, &level_files, -1 /* hint_index */,
  2964. nullptr /* file_index */);
  2965. FileMetaData* level_file;
  2966. for (uint32_t j = 0; j < level_files.size(); j++) {
  2967. level_file = level_files[j];
  2968. if (level_file->being_compacted) {
  2969. continue;
  2970. }
  2971. if (deleted_files.find(level_file) != deleted_files.end()) {
  2972. continue;
  2973. }
  2974. if (!include_end && end != nullptr &&
  2975. cfd->user_comparator()->Compare(level_file->largest.user_key(),
  2976. *end) == 0) {
  2977. continue;
  2978. }
  2979. edit.SetColumnFamily(cfd->GetID());
  2980. edit.DeleteFile(i, level_file->fd.GetNumber());
  2981. deleted_files.insert(level_file);
  2982. level_file->being_compacted = true;
  2983. }
  2984. }
  2985. }
  2986. if (edit.GetDeletedFiles().empty()) {
  2987. job_context.Clean();
  2988. return Status::OK();
  2989. }
  2990. input_version->Ref();
  2991. status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(),
  2992. &edit, &mutex_, directories_.GetDbDir());
  2993. if (status.ok()) {
  2994. InstallSuperVersionAndScheduleWork(cfd,
  2995. &job_context.superversion_contexts[0],
  2996. *cfd->GetLatestMutableCFOptions());
  2997. }
  2998. for (auto* deleted_file : deleted_files) {
  2999. deleted_file->being_compacted = false;
  3000. }
  3001. input_version->Unref();
  3002. FindObsoleteFiles(&job_context, false);
  3003. } // lock released here
  3004. LogFlush(immutable_db_options_.info_log);
  3005. // remove files outside the db-lock
  3006. if (job_context.HaveSomethingToDelete()) {
  3007. // Call PurgeObsoleteFiles() without holding mutex.
  3008. PurgeObsoleteFiles(job_context);
  3009. }
  3010. job_context.Clean();
  3011. return status;
  3012. }
  3013. void DBImpl::GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata) {
  3014. InstrumentedMutexLock l(&mutex_);
  3015. versions_->GetLiveFilesMetaData(metadata);
  3016. }
  3017. void DBImpl::GetColumnFamilyMetaData(ColumnFamilyHandle* column_family,
  3018. ColumnFamilyMetaData* cf_meta) {
  3019. assert(column_family);
  3020. auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd();
  3021. auto* sv = GetAndRefSuperVersion(cfd);
  3022. {
  3023. // Without mutex, Version::GetColumnFamilyMetaData will have data race with
  3024. // Compaction::MarkFilesBeingCompacted. One solution is to use mutex, but
  3025. // this may cause regression. An alternative is to make
  3026. // FileMetaData::being_compacted atomic, but it will make FileMetaData
  3027. // non-copy-able. Another option is to separate these variables from
  3028. // original FileMetaData struct, and this requires re-organization of data
  3029. // structures. For now, we take the easy approach. If
  3030. // DB::GetColumnFamilyMetaData is not called frequently, the regression
  3031. // should not be big. We still need to keep an eye on it.
  3032. InstrumentedMutexLock l(&mutex_);
  3033. sv->current->GetColumnFamilyMetaData(cf_meta);
  3034. }
  3035. ReturnAndCleanupSuperVersion(cfd, sv);
  3036. }
  3037. #endif // ROCKSDB_LITE
  3038. Status DBImpl::CheckConsistency() {
  3039. mutex_.AssertHeld();
  3040. std::vector<LiveFileMetaData> metadata;
  3041. versions_->GetLiveFilesMetaData(&metadata);
  3042. TEST_SYNC_POINT("DBImpl::CheckConsistency:AfterGetLiveFilesMetaData");
  3043. std::string corruption_messages;
  3044. if (immutable_db_options_.skip_checking_sst_file_sizes_on_db_open) {
  3045. // Instead of calling GetFileSize() for each expected file, call
  3046. // GetChildren() for the DB directory and check that all expected files
  3047. // are listed, without checking their sizes.
  3048. // Since sst files might be in different directories, do it for each
  3049. // directory separately.
  3050. std::map<std::string, std::vector<std::string>> files_by_directory;
  3051. for (const auto& md : metadata) {
  3052. // md.name has a leading "/". Remove it.
  3053. std::string fname = md.name;
  3054. if (!fname.empty() && fname[0] == '/') {
  3055. fname = fname.substr(1);
  3056. }
  3057. files_by_directory[md.db_path].push_back(fname);
  3058. }
  3059. for (const auto& dir_files : files_by_directory) {
  3060. std::string directory = dir_files.first;
  3061. std::vector<std::string> existing_files;
  3062. Status s = env_->GetChildren(directory, &existing_files);
  3063. if (!s.ok()) {
  3064. corruption_messages +=
  3065. "Can't list files in " + directory + ": " + s.ToString() + "\n";
  3066. continue;
  3067. }
  3068. std::sort(existing_files.begin(), existing_files.end());
  3069. for (const std::string& fname : dir_files.second) {
  3070. if (!std::binary_search(existing_files.begin(), existing_files.end(),
  3071. fname) &&
  3072. !std::binary_search(existing_files.begin(), existing_files.end(),
  3073. Rocks2LevelTableFileName(fname))) {
  3074. corruption_messages +=
  3075. "Missing sst file " + fname + " in " + directory + "\n";
  3076. }
  3077. }
  3078. }
  3079. } else {
  3080. for (const auto& md : metadata) {
  3081. // md.name has a leading "/".
  3082. std::string file_path = md.db_path + md.name;
  3083. uint64_t fsize = 0;
  3084. TEST_SYNC_POINT("DBImpl::CheckConsistency:BeforeGetFileSize");
  3085. Status s = env_->GetFileSize(file_path, &fsize);
  3086. if (!s.ok() &&
  3087. env_->GetFileSize(Rocks2LevelTableFileName(file_path), &fsize).ok()) {
  3088. s = Status::OK();
  3089. }
  3090. if (!s.ok()) {
  3091. corruption_messages +=
  3092. "Can't access " + md.name + ": " + s.ToString() + "\n";
  3093. } else if (fsize != md.size) {
  3094. corruption_messages += "Sst file size mismatch: " + file_path +
  3095. ". Size recorded in manifest " +
  3096. ToString(md.size) + ", actual size " +
  3097. ToString(fsize) + "\n";
  3098. }
  3099. }
  3100. }
  3101. if (corruption_messages.size() == 0) {
  3102. return Status::OK();
  3103. } else {
  3104. return Status::Corruption(corruption_messages);
  3105. }
  3106. }
  3107. Status DBImpl::GetDbIdentity(std::string& identity) const {
  3108. identity.assign(db_id_);
  3109. return Status::OK();
  3110. }
  3111. Status DBImpl::GetDbIdentityFromIdentityFile(std::string* identity) const {
  3112. std::string idfilename = IdentityFileName(dbname_);
  3113. const FileOptions soptions;
  3114. Status s = ReadFileToString(fs_.get(), idfilename, identity);
  3115. if (!s.ok()) {
  3116. return s;
  3117. }
  3118. // If last character is '\n' remove it from identity
  3119. if (identity->size() > 0 && identity->back() == '\n') {
  3120. identity->pop_back();
  3121. }
  3122. return s;
  3123. }
  3124. // Default implementation -- returns not supported status
  3125. Status DB::CreateColumnFamily(const ColumnFamilyOptions& /*cf_options*/,
  3126. const std::string& /*column_family_name*/,
  3127. ColumnFamilyHandle** /*handle*/) {
  3128. return Status::NotSupported("");
  3129. }
  3130. Status DB::CreateColumnFamilies(
  3131. const ColumnFamilyOptions& /*cf_options*/,
  3132. const std::vector<std::string>& /*column_family_names*/,
  3133. std::vector<ColumnFamilyHandle*>* /*handles*/) {
  3134. return Status::NotSupported("");
  3135. }
  3136. Status DB::CreateColumnFamilies(
  3137. const std::vector<ColumnFamilyDescriptor>& /*column_families*/,
  3138. std::vector<ColumnFamilyHandle*>* /*handles*/) {
  3139. return Status::NotSupported("");
  3140. }
  3141. Status DB::DropColumnFamily(ColumnFamilyHandle* /*column_family*/) {
  3142. return Status::NotSupported("");
  3143. }
  3144. Status DB::DropColumnFamilies(
  3145. const std::vector<ColumnFamilyHandle*>& /*column_families*/) {
  3146. return Status::NotSupported("");
  3147. }
  3148. Status DB::DestroyColumnFamilyHandle(ColumnFamilyHandle* column_family) {
  3149. delete column_family;
  3150. return Status::OK();
  3151. }
  3152. DB::~DB() {}
  3153. Status DBImpl::Close() {
  3154. if (!closed_) {
  3155. {
  3156. InstrumentedMutexLock l(&mutex_);
  3157. // If there is unreleased snapshot, fail the close call
  3158. if (!snapshots_.empty()) {
  3159. return Status::Aborted("Cannot close DB with unreleased snapshot.");
  3160. }
  3161. }
  3162. closed_ = true;
  3163. return CloseImpl();
  3164. }
  3165. return Status::OK();
  3166. }
  3167. Status DB::ListColumnFamilies(const DBOptions& db_options,
  3168. const std::string& name,
  3169. std::vector<std::string>* column_families) {
  3170. FileSystem* fs = db_options.file_system.get();
  3171. LegacyFileSystemWrapper legacy_fs(db_options.env);
  3172. if (!fs) {
  3173. fs = &legacy_fs;
  3174. }
  3175. return VersionSet::ListColumnFamilies(column_families, name, fs);
  3176. }
  3177. Snapshot::~Snapshot() {}
  3178. Status DestroyDB(const std::string& dbname, const Options& options,
  3179. const std::vector<ColumnFamilyDescriptor>& column_families) {
  3180. ImmutableDBOptions soptions(SanitizeOptions(dbname, options));
  3181. Env* env = soptions.env;
  3182. std::vector<std::string> filenames;
  3183. bool wal_in_db_path = IsWalDirSameAsDBPath(&soptions);
  3184. // Reset the logger because it holds a handle to the
  3185. // log file and prevents cleanup and directory removal
  3186. soptions.info_log.reset();
  3187. // Ignore error in case directory does not exist
  3188. env->GetChildren(dbname, &filenames);
  3189. FileLock* lock;
  3190. const std::string lockname = LockFileName(dbname);
  3191. Status result = env->LockFile(lockname, &lock);
  3192. if (result.ok()) {
  3193. uint64_t number;
  3194. FileType type;
  3195. InfoLogPrefix info_log_prefix(!soptions.db_log_dir.empty(), dbname);
  3196. for (const auto& fname : filenames) {
  3197. if (ParseFileName(fname, &number, info_log_prefix.prefix, &type) &&
  3198. type != kDBLockFile) { // Lock file will be deleted at end
  3199. Status del;
  3200. std::string path_to_delete = dbname + "/" + fname;
  3201. if (type == kMetaDatabase) {
  3202. del = DestroyDB(path_to_delete, options);
  3203. } else if (type == kTableFile || type == kLogFile) {
  3204. del = DeleteDBFile(&soptions, path_to_delete, dbname,
  3205. /*force_bg=*/false, /*force_fg=*/!wal_in_db_path);
  3206. } else {
  3207. del = env->DeleteFile(path_to_delete);
  3208. }
  3209. if (result.ok() && !del.ok()) {
  3210. result = del;
  3211. }
  3212. }
  3213. }
  3214. std::vector<std::string> paths;
  3215. for (const auto& path : options.db_paths) {
  3216. paths.emplace_back(path.path);
  3217. }
  3218. for (const auto& cf : column_families) {
  3219. for (const auto& path : cf.options.cf_paths) {
  3220. paths.emplace_back(path.path);
  3221. }
  3222. }
  3223. // Remove duplicate paths.
  3224. // Note that we compare only the actual paths but not path ids.
  3225. // This reason is that same path can appear at different path_ids
  3226. // for different column families.
  3227. std::sort(paths.begin(), paths.end());
  3228. paths.erase(std::unique(paths.begin(), paths.end()), paths.end());
  3229. for (const auto& path : paths) {
  3230. if (env->GetChildren(path, &filenames).ok()) {
  3231. for (const auto& fname : filenames) {
  3232. if (ParseFileName(fname, &number, &type) &&
  3233. type == kTableFile) { // Lock file will be deleted at end
  3234. std::string table_path = path + "/" + fname;
  3235. Status del = DeleteDBFile(&soptions, table_path, dbname,
  3236. /*force_bg=*/false, /*force_fg=*/false);
  3237. if (result.ok() && !del.ok()) {
  3238. result = del;
  3239. }
  3240. }
  3241. }
  3242. env->DeleteDir(path);
  3243. }
  3244. }
  3245. std::vector<std::string> walDirFiles;
  3246. std::string archivedir = ArchivalDirectory(dbname);
  3247. bool wal_dir_exists = false;
  3248. if (dbname != soptions.wal_dir) {
  3249. wal_dir_exists = env->GetChildren(soptions.wal_dir, &walDirFiles).ok();
  3250. archivedir = ArchivalDirectory(soptions.wal_dir);
  3251. }
  3252. // Archive dir may be inside wal dir or dbname and should be
  3253. // processed and removed before those otherwise we have issues
  3254. // removing them
  3255. std::vector<std::string> archiveFiles;
  3256. if (env->GetChildren(archivedir, &archiveFiles).ok()) {
  3257. // Delete archival files.
  3258. for (const auto& file : archiveFiles) {
  3259. if (ParseFileName(file, &number, &type) && type == kLogFile) {
  3260. Status del =
  3261. DeleteDBFile(&soptions, archivedir + "/" + file, archivedir,
  3262. /*force_bg=*/false, /*force_fg=*/!wal_in_db_path);
  3263. if (result.ok() && !del.ok()) {
  3264. result = del;
  3265. }
  3266. }
  3267. }
  3268. env->DeleteDir(archivedir);
  3269. }
  3270. // Delete log files in the WAL dir
  3271. if (wal_dir_exists) {
  3272. for (const auto& file : walDirFiles) {
  3273. if (ParseFileName(file, &number, &type) && type == kLogFile) {
  3274. Status del =
  3275. DeleteDBFile(&soptions, LogFileName(soptions.wal_dir, number),
  3276. soptions.wal_dir, /*force_bg=*/false,
  3277. /*force_fg=*/!wal_in_db_path);
  3278. if (result.ok() && !del.ok()) {
  3279. result = del;
  3280. }
  3281. }
  3282. }
  3283. env->DeleteDir(soptions.wal_dir);
  3284. }
  3285. env->UnlockFile(lock); // Ignore error since state is already gone
  3286. env->DeleteFile(lockname);
  3287. // sst_file_manager holds a ref to the logger. Make sure the logger is
  3288. // gone before trying to remove the directory.
  3289. soptions.sst_file_manager.reset();
  3290. env->DeleteDir(dbname); // Ignore error in case dir contains other files
  3291. }
  3292. return result;
  3293. }
  3294. Status DBImpl::WriteOptionsFile(bool need_mutex_lock,
  3295. bool need_enter_write_thread) {
  3296. #ifndef ROCKSDB_LITE
  3297. WriteThread::Writer w;
  3298. if (need_mutex_lock) {
  3299. mutex_.Lock();
  3300. } else {
  3301. mutex_.AssertHeld();
  3302. }
  3303. if (need_enter_write_thread) {
  3304. write_thread_.EnterUnbatched(&w, &mutex_);
  3305. }
  3306. std::vector<std::string> cf_names;
  3307. std::vector<ColumnFamilyOptions> cf_opts;
  3308. // This part requires mutex to protect the column family options
  3309. for (auto cfd : *versions_->GetColumnFamilySet()) {
  3310. if (cfd->IsDropped()) {
  3311. continue;
  3312. }
  3313. cf_names.push_back(cfd->GetName());
  3314. cf_opts.push_back(cfd->GetLatestCFOptions());
  3315. }
  3316. // Unlock during expensive operations. New writes cannot get here
  3317. // because the single write thread ensures all new writes get queued.
  3318. DBOptions db_options =
  3319. BuildDBOptions(immutable_db_options_, mutable_db_options_);
  3320. mutex_.Unlock();
  3321. TEST_SYNC_POINT("DBImpl::WriteOptionsFile:1");
  3322. TEST_SYNC_POINT("DBImpl::WriteOptionsFile:2");
  3323. std::string file_name =
  3324. TempOptionsFileName(GetName(), versions_->NewFileNumber());
  3325. Status s = PersistRocksDBOptions(db_options, cf_names, cf_opts, file_name,
  3326. GetFileSystem());
  3327. if (s.ok()) {
  3328. s = RenameTempFileToOptionsFile(file_name);
  3329. }
  3330. // restore lock
  3331. if (!need_mutex_lock) {
  3332. mutex_.Lock();
  3333. }
  3334. if (need_enter_write_thread) {
  3335. write_thread_.ExitUnbatched(&w);
  3336. }
  3337. if (!s.ok()) {
  3338. ROCKS_LOG_WARN(immutable_db_options_.info_log,
  3339. "Unnable to persist options -- %s", s.ToString().c_str());
  3340. if (immutable_db_options_.fail_if_options_file_error) {
  3341. return Status::IOError("Unable to persist options.",
  3342. s.ToString().c_str());
  3343. }
  3344. }
  3345. #else
  3346. (void)need_mutex_lock;
  3347. (void)need_enter_write_thread;
  3348. #endif // !ROCKSDB_LITE
  3349. return Status::OK();
  3350. }
  3351. #ifndef ROCKSDB_LITE
  3352. namespace {
  3353. void DeleteOptionsFilesHelper(const std::map<uint64_t, std::string>& filenames,
  3354. const size_t num_files_to_keep,
  3355. const std::shared_ptr<Logger>& info_log,
  3356. Env* env) {
  3357. if (filenames.size() <= num_files_to_keep) {
  3358. return;
  3359. }
  3360. for (auto iter = std::next(filenames.begin(), num_files_to_keep);
  3361. iter != filenames.end(); ++iter) {
  3362. if (!env->DeleteFile(iter->second).ok()) {
  3363. ROCKS_LOG_WARN(info_log, "Unable to delete options file %s",
  3364. iter->second.c_str());
  3365. }
  3366. }
  3367. }
  3368. } // namespace
  3369. #endif // !ROCKSDB_LITE
  3370. Status DBImpl::DeleteObsoleteOptionsFiles() {
  3371. #ifndef ROCKSDB_LITE
  3372. std::vector<std::string> filenames;
  3373. // use ordered map to store keep the filenames sorted from the newest
  3374. // to the oldest.
  3375. std::map<uint64_t, std::string> options_filenames;
  3376. Status s;
  3377. s = GetEnv()->GetChildren(GetName(), &filenames);
  3378. if (!s.ok()) {
  3379. return s;
  3380. }
  3381. for (auto& filename : filenames) {
  3382. uint64_t file_number;
  3383. FileType type;
  3384. if (ParseFileName(filename, &file_number, &type) && type == kOptionsFile) {
  3385. options_filenames.insert(
  3386. {std::numeric_limits<uint64_t>::max() - file_number,
  3387. GetName() + "/" + filename});
  3388. }
  3389. }
  3390. // Keeps the latest 2 Options file
  3391. const size_t kNumOptionsFilesKept = 2;
  3392. DeleteOptionsFilesHelper(options_filenames, kNumOptionsFilesKept,
  3393. immutable_db_options_.info_log, GetEnv());
  3394. return Status::OK();
  3395. #else
  3396. return Status::OK();
  3397. #endif // !ROCKSDB_LITE
  3398. }
  3399. Status DBImpl::RenameTempFileToOptionsFile(const std::string& file_name) {
  3400. #ifndef ROCKSDB_LITE
  3401. Status s;
  3402. uint64_t options_file_number = versions_->NewFileNumber();
  3403. std::string options_file_name =
  3404. OptionsFileName(GetName(), options_file_number);
  3405. // Retry if the file name happen to conflict with an existing one.
  3406. s = GetEnv()->RenameFile(file_name, options_file_name);
  3407. if (s.ok()) {
  3408. InstrumentedMutexLock l(&mutex_);
  3409. versions_->options_file_number_ = options_file_number;
  3410. }
  3411. if (0 == disable_delete_obsolete_files_) {
  3412. DeleteObsoleteOptionsFiles();
  3413. }
  3414. return s;
  3415. #else
  3416. (void)file_name;
  3417. return Status::OK();
  3418. #endif // !ROCKSDB_LITE
  3419. }
  3420. #ifdef ROCKSDB_USING_THREAD_STATUS
  3421. void DBImpl::NewThreadStatusCfInfo(ColumnFamilyData* cfd) const {
  3422. if (immutable_db_options_.enable_thread_tracking) {
  3423. ThreadStatusUtil::NewColumnFamilyInfo(this, cfd, cfd->GetName(),
  3424. cfd->ioptions()->env);
  3425. }
  3426. }
  3427. void DBImpl::EraseThreadStatusCfInfo(ColumnFamilyData* cfd) const {
  3428. if (immutable_db_options_.enable_thread_tracking) {
  3429. ThreadStatusUtil::EraseColumnFamilyInfo(cfd);
  3430. }
  3431. }
  3432. void DBImpl::EraseThreadStatusDbInfo() const {
  3433. if (immutable_db_options_.enable_thread_tracking) {
  3434. ThreadStatusUtil::EraseDatabaseInfo(this);
  3435. }
  3436. }
  3437. #else
  3438. void DBImpl::NewThreadStatusCfInfo(ColumnFamilyData* /*cfd*/) const {}
  3439. void DBImpl::EraseThreadStatusCfInfo(ColumnFamilyData* /*cfd*/) const {}
  3440. void DBImpl::EraseThreadStatusDbInfo() const {}
  3441. #endif // ROCKSDB_USING_THREAD_STATUS
  3442. //
  3443. // A global method that can dump out the build version
  3444. void DumpRocksDBBuildVersion(Logger* log) {
  3445. #if !defined(IOS_CROSS_COMPILE)
  3446. // if we compile with Xcode, we don't run build_detect_version, so we don't
  3447. // generate util/build_version.cc
  3448. ROCKS_LOG_HEADER(log, "RocksDB version: %d.%d.%d\n", ROCKSDB_MAJOR,
  3449. ROCKSDB_MINOR, ROCKSDB_PATCH);
  3450. ROCKS_LOG_HEADER(log, "Git sha %s", rocksdb_build_git_sha);
  3451. ROCKS_LOG_HEADER(log, "Compile date %s", rocksdb_build_compile_date);
  3452. #else
  3453. (void)log; // ignore "-Wunused-parameter"
  3454. #endif
  3455. }
  3456. #ifndef ROCKSDB_LITE
  3457. SequenceNumber DBImpl::GetEarliestMemTableSequenceNumber(SuperVersion* sv,
  3458. bool include_history) {
  3459. // Find the earliest sequence number that we know we can rely on reading
  3460. // from the memtable without needing to check sst files.
  3461. SequenceNumber earliest_seq =
  3462. sv->imm->GetEarliestSequenceNumber(include_history);
  3463. if (earliest_seq == kMaxSequenceNumber) {
  3464. earliest_seq = sv->mem->GetEarliestSequenceNumber();
  3465. }
  3466. assert(sv->mem->GetEarliestSequenceNumber() >= earliest_seq);
  3467. return earliest_seq;
  3468. }
  3469. #endif // ROCKSDB_LITE
  3470. #ifndef ROCKSDB_LITE
  3471. Status DBImpl::GetLatestSequenceForKey(SuperVersion* sv, const Slice& key,
  3472. bool cache_only,
  3473. SequenceNumber lower_bound_seq,
  3474. SequenceNumber* seq,
  3475. bool* found_record_for_key,
  3476. bool* is_blob_index) {
  3477. Status s;
  3478. MergeContext merge_context;
  3479. SequenceNumber max_covering_tombstone_seq = 0;
  3480. ReadOptions read_options;
  3481. SequenceNumber current_seq = versions_->LastSequence();
  3482. LookupKey lkey(key, current_seq);
  3483. *seq = kMaxSequenceNumber;
  3484. *found_record_for_key = false;
  3485. // Check if there is a record for this key in the latest memtable
  3486. sv->mem->Get(lkey, nullptr, &s, &merge_context, &max_covering_tombstone_seq,
  3487. seq, read_options, nullptr /*read_callback*/, is_blob_index);
  3488. if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) {
  3489. // unexpected error reading memtable.
  3490. ROCKS_LOG_ERROR(immutable_db_options_.info_log,
  3491. "Unexpected status returned from MemTable::Get: %s\n",
  3492. s.ToString().c_str());
  3493. return s;
  3494. }
  3495. if (*seq != kMaxSequenceNumber) {
  3496. // Found a sequence number, no need to check immutable memtables
  3497. *found_record_for_key = true;
  3498. return Status::OK();
  3499. }
  3500. SequenceNumber lower_bound_in_mem = sv->mem->GetEarliestSequenceNumber();
  3501. if (lower_bound_in_mem != kMaxSequenceNumber &&
  3502. lower_bound_in_mem < lower_bound_seq) {
  3503. *found_record_for_key = false;
  3504. return Status::OK();
  3505. }
  3506. // Check if there is a record for this key in the immutable memtables
  3507. sv->imm->Get(lkey, nullptr, &s, &merge_context, &max_covering_tombstone_seq,
  3508. seq, read_options, nullptr /*read_callback*/, is_blob_index);
  3509. if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) {
  3510. // unexpected error reading memtable.
  3511. ROCKS_LOG_ERROR(immutable_db_options_.info_log,
  3512. "Unexpected status returned from MemTableList::Get: %s\n",
  3513. s.ToString().c_str());
  3514. return s;
  3515. }
  3516. if (*seq != kMaxSequenceNumber) {
  3517. // Found a sequence number, no need to check memtable history
  3518. *found_record_for_key = true;
  3519. return Status::OK();
  3520. }
  3521. SequenceNumber lower_bound_in_imm = sv->imm->GetEarliestSequenceNumber();
  3522. if (lower_bound_in_imm != kMaxSequenceNumber &&
  3523. lower_bound_in_imm < lower_bound_seq) {
  3524. *found_record_for_key = false;
  3525. return Status::OK();
  3526. }
  3527. // Check if there is a record for this key in the immutable memtables
  3528. sv->imm->GetFromHistory(lkey, nullptr, &s, &merge_context,
  3529. &max_covering_tombstone_seq, seq, read_options,
  3530. is_blob_index);
  3531. if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) {
  3532. // unexpected error reading memtable.
  3533. ROCKS_LOG_ERROR(
  3534. immutable_db_options_.info_log,
  3535. "Unexpected status returned from MemTableList::GetFromHistory: %s\n",
  3536. s.ToString().c_str());
  3537. return s;
  3538. }
  3539. if (*seq != kMaxSequenceNumber) {
  3540. // Found a sequence number, no need to check SST files
  3541. *found_record_for_key = true;
  3542. return Status::OK();
  3543. }
  3544. // We could do a sv->imm->GetEarliestSequenceNumber(/*include_history*/ true)
  3545. // check here to skip the history if possible. But currently the caller
  3546. // already does that. Maybe we should move the logic here later.
  3547. // TODO(agiardullo): possible optimization: consider checking cached
  3548. // SST files if cache_only=true?
  3549. if (!cache_only) {
  3550. // Check tables
  3551. sv->current->Get(read_options, lkey, nullptr, &s, &merge_context,
  3552. &max_covering_tombstone_seq, nullptr /* value_found */,
  3553. found_record_for_key, seq, nullptr /*read_callback*/,
  3554. is_blob_index);
  3555. if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) {
  3556. // unexpected error reading SST files
  3557. ROCKS_LOG_ERROR(immutable_db_options_.info_log,
  3558. "Unexpected status returned from Version::Get: %s\n",
  3559. s.ToString().c_str());
  3560. }
  3561. }
  3562. return s;
  3563. }
  3564. Status DBImpl::IngestExternalFile(
  3565. ColumnFamilyHandle* column_family,
  3566. const std::vector<std::string>& external_files,
  3567. const IngestExternalFileOptions& ingestion_options) {
  3568. IngestExternalFileArg arg;
  3569. arg.column_family = column_family;
  3570. arg.external_files = external_files;
  3571. arg.options = ingestion_options;
  3572. return IngestExternalFiles({arg});
  3573. }
  3574. Status DBImpl::IngestExternalFiles(
  3575. const std::vector<IngestExternalFileArg>& args) {
  3576. if (args.empty()) {
  3577. return Status::InvalidArgument("ingestion arg list is empty");
  3578. }
  3579. {
  3580. std::unordered_set<ColumnFamilyHandle*> unique_cfhs;
  3581. for (const auto& arg : args) {
  3582. if (arg.column_family == nullptr) {
  3583. return Status::InvalidArgument("column family handle is null");
  3584. } else if (unique_cfhs.count(arg.column_family) > 0) {
  3585. return Status::InvalidArgument(
  3586. "ingestion args have duplicate column families");
  3587. }
  3588. unique_cfhs.insert(arg.column_family);
  3589. }
  3590. }
  3591. // Ingest multiple external SST files atomically.
  3592. size_t num_cfs = args.size();
  3593. for (size_t i = 0; i != num_cfs; ++i) {
  3594. if (args[i].external_files.empty()) {
  3595. char err_msg[128] = {0};
  3596. snprintf(err_msg, 128, "external_files[%zu] is empty", i);
  3597. return Status::InvalidArgument(err_msg);
  3598. }
  3599. }
  3600. for (const auto& arg : args) {
  3601. const IngestExternalFileOptions& ingest_opts = arg.options;
  3602. if (ingest_opts.ingest_behind &&
  3603. !immutable_db_options_.allow_ingest_behind) {
  3604. return Status::InvalidArgument(
  3605. "can't ingest_behind file in DB with allow_ingest_behind=false");
  3606. }
  3607. }
  3608. // TODO (yanqin) maybe handle the case in which column_families have
  3609. // duplicates
  3610. std::unique_ptr<std::list<uint64_t>::iterator> pending_output_elem;
  3611. size_t total = 0;
  3612. for (const auto& arg : args) {
  3613. total += arg.external_files.size();
  3614. }
  3615. uint64_t next_file_number = 0;
  3616. Status status = ReserveFileNumbersBeforeIngestion(
  3617. static_cast<ColumnFamilyHandleImpl*>(args[0].column_family)->cfd(), total,
  3618. pending_output_elem, &next_file_number);
  3619. if (!status.ok()) {
  3620. InstrumentedMutexLock l(&mutex_);
  3621. ReleaseFileNumberFromPendingOutputs(pending_output_elem);
  3622. return status;
  3623. }
  3624. std::vector<ExternalSstFileIngestionJob> ingestion_jobs;
  3625. for (const auto& arg : args) {
  3626. auto* cfd = static_cast<ColumnFamilyHandleImpl*>(arg.column_family)->cfd();
  3627. ingestion_jobs.emplace_back(
  3628. env_, versions_.get(), cfd, immutable_db_options_, file_options_,
  3629. &snapshots_, arg.options, &directories_, &event_logger_);
  3630. }
  3631. std::vector<std::pair<bool, Status>> exec_results;
  3632. for (size_t i = 0; i != num_cfs; ++i) {
  3633. exec_results.emplace_back(false, Status::OK());
  3634. }
  3635. // TODO(yanqin) maybe make jobs run in parallel
  3636. uint64_t start_file_number = next_file_number;
  3637. for (size_t i = 1; i != num_cfs; ++i) {
  3638. start_file_number += args[i - 1].external_files.size();
  3639. auto* cfd =
  3640. static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)->cfd();
  3641. SuperVersion* super_version = cfd->GetReferencedSuperVersion(this);
  3642. exec_results[i].second = ingestion_jobs[i].Prepare(
  3643. args[i].external_files, start_file_number, super_version);
  3644. exec_results[i].first = true;
  3645. CleanupSuperVersion(super_version);
  3646. }
  3647. TEST_SYNC_POINT("DBImpl::IngestExternalFiles:BeforeLastJobPrepare:0");
  3648. TEST_SYNC_POINT("DBImpl::IngestExternalFiles:BeforeLastJobPrepare:1");
  3649. {
  3650. auto* cfd =
  3651. static_cast<ColumnFamilyHandleImpl*>(args[0].column_family)->cfd();
  3652. SuperVersion* super_version = cfd->GetReferencedSuperVersion(this);
  3653. exec_results[0].second = ingestion_jobs[0].Prepare(
  3654. args[0].external_files, next_file_number, super_version);
  3655. exec_results[0].first = true;
  3656. CleanupSuperVersion(super_version);
  3657. }
  3658. for (const auto& exec_result : exec_results) {
  3659. if (!exec_result.second.ok()) {
  3660. status = exec_result.second;
  3661. break;
  3662. }
  3663. }
  3664. if (!status.ok()) {
  3665. for (size_t i = 0; i != num_cfs; ++i) {
  3666. if (exec_results[i].first) {
  3667. ingestion_jobs[i].Cleanup(status);
  3668. }
  3669. }
  3670. InstrumentedMutexLock l(&mutex_);
  3671. ReleaseFileNumberFromPendingOutputs(pending_output_elem);
  3672. return status;
  3673. }
  3674. std::vector<SuperVersionContext> sv_ctxs;
  3675. for (size_t i = 0; i != num_cfs; ++i) {
  3676. sv_ctxs.emplace_back(true /* create_superversion */);
  3677. }
  3678. TEST_SYNC_POINT("DBImpl::IngestExternalFiles:BeforeJobsRun:0");
  3679. TEST_SYNC_POINT("DBImpl::IngestExternalFiles:BeforeJobsRun:1");
  3680. TEST_SYNC_POINT("DBImpl::AddFile:Start");
  3681. {
  3682. InstrumentedMutexLock l(&mutex_);
  3683. TEST_SYNC_POINT("DBImpl::AddFile:MutexLock");
  3684. // Stop writes to the DB by entering both write threads
  3685. WriteThread::Writer w;
  3686. write_thread_.EnterUnbatched(&w, &mutex_);
  3687. WriteThread::Writer nonmem_w;
  3688. if (two_write_queues_) {
  3689. nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
  3690. }
  3691. // When unordered_write is enabled, the keys are writing to memtable in an
  3692. // unordered way. If the ingestion job checks memtable key range before the
  3693. // key landing in memtable, the ingestion job may skip the necessary
  3694. // memtable flush.
  3695. // So wait here to ensure there is no pending write to memtable.
  3696. WaitForPendingWrites();
  3697. num_running_ingest_file_ += static_cast<int>(num_cfs);
  3698. TEST_SYNC_POINT("DBImpl::IngestExternalFile:AfterIncIngestFileCounter");
  3699. bool at_least_one_cf_need_flush = false;
  3700. std::vector<bool> need_flush(num_cfs, false);
  3701. for (size_t i = 0; i != num_cfs; ++i) {
  3702. auto* cfd =
  3703. static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)->cfd();
  3704. if (cfd->IsDropped()) {
  3705. // TODO (yanqin) investigate whether we should abort ingestion or
  3706. // proceed with other non-dropped column families.
  3707. status = Status::InvalidArgument(
  3708. "cannot ingest an external file into a dropped CF");
  3709. break;
  3710. }
  3711. bool tmp = false;
  3712. status = ingestion_jobs[i].NeedsFlush(&tmp, cfd->GetSuperVersion());
  3713. need_flush[i] = tmp;
  3714. at_least_one_cf_need_flush = (at_least_one_cf_need_flush || tmp);
  3715. if (!status.ok()) {
  3716. break;
  3717. }
  3718. }
  3719. TEST_SYNC_POINT_CALLBACK("DBImpl::IngestExternalFile:NeedFlush",
  3720. &at_least_one_cf_need_flush);
  3721. if (status.ok() && at_least_one_cf_need_flush) {
  3722. FlushOptions flush_opts;
  3723. flush_opts.allow_write_stall = true;
  3724. if (immutable_db_options_.atomic_flush) {
  3725. autovector<ColumnFamilyData*> cfds_to_flush;
  3726. SelectColumnFamiliesForAtomicFlush(&cfds_to_flush);
  3727. mutex_.Unlock();
  3728. status = AtomicFlushMemTables(cfds_to_flush, flush_opts,
  3729. FlushReason::kExternalFileIngestion,
  3730. true /* writes_stopped */);
  3731. mutex_.Lock();
  3732. } else {
  3733. for (size_t i = 0; i != num_cfs; ++i) {
  3734. if (need_flush[i]) {
  3735. mutex_.Unlock();
  3736. auto* cfd =
  3737. static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)
  3738. ->cfd();
  3739. status = FlushMemTable(cfd, flush_opts,
  3740. FlushReason::kExternalFileIngestion,
  3741. true /* writes_stopped */);
  3742. mutex_.Lock();
  3743. if (!status.ok()) {
  3744. break;
  3745. }
  3746. }
  3747. }
  3748. }
  3749. }
  3750. // Run ingestion jobs.
  3751. if (status.ok()) {
  3752. for (size_t i = 0; i != num_cfs; ++i) {
  3753. status = ingestion_jobs[i].Run();
  3754. if (!status.ok()) {
  3755. break;
  3756. }
  3757. }
  3758. }
  3759. if (status.ok()) {
  3760. int consumed_seqno_count =
  3761. ingestion_jobs[0].ConsumedSequenceNumbersCount();
  3762. #ifndef NDEBUG
  3763. for (size_t i = 1; i != num_cfs; ++i) {
  3764. assert(!!consumed_seqno_count ==
  3765. !!ingestion_jobs[i].ConsumedSequenceNumbersCount());
  3766. consumed_seqno_count +=
  3767. ingestion_jobs[i].ConsumedSequenceNumbersCount();
  3768. }
  3769. #endif
  3770. if (consumed_seqno_count > 0) {
  3771. const SequenceNumber last_seqno = versions_->LastSequence();
  3772. versions_->SetLastAllocatedSequence(last_seqno + consumed_seqno_count);
  3773. versions_->SetLastPublishedSequence(last_seqno + consumed_seqno_count);
  3774. versions_->SetLastSequence(last_seqno + consumed_seqno_count);
  3775. }
  3776. autovector<ColumnFamilyData*> cfds_to_commit;
  3777. autovector<const MutableCFOptions*> mutable_cf_options_list;
  3778. autovector<autovector<VersionEdit*>> edit_lists;
  3779. uint32_t num_entries = 0;
  3780. for (size_t i = 0; i != num_cfs; ++i) {
  3781. auto* cfd =
  3782. static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)->cfd();
  3783. if (cfd->IsDropped()) {
  3784. continue;
  3785. }
  3786. cfds_to_commit.push_back(cfd);
  3787. mutable_cf_options_list.push_back(cfd->GetLatestMutableCFOptions());
  3788. autovector<VersionEdit*> edit_list;
  3789. edit_list.push_back(ingestion_jobs[i].edit());
  3790. edit_lists.push_back(edit_list);
  3791. ++num_entries;
  3792. }
  3793. // Mark the version edits as an atomic group if the number of version
  3794. // edits exceeds 1.
  3795. if (cfds_to_commit.size() > 1) {
  3796. for (auto& edits : edit_lists) {
  3797. assert(edits.size() == 1);
  3798. edits[0]->MarkAtomicGroup(--num_entries);
  3799. }
  3800. assert(0 == num_entries);
  3801. }
  3802. status =
  3803. versions_->LogAndApply(cfds_to_commit, mutable_cf_options_list,
  3804. edit_lists, &mutex_, directories_.GetDbDir());
  3805. }
  3806. if (status.ok()) {
  3807. for (size_t i = 0; i != num_cfs; ++i) {
  3808. auto* cfd =
  3809. static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)->cfd();
  3810. if (!cfd->IsDropped()) {
  3811. InstallSuperVersionAndScheduleWork(cfd, &sv_ctxs[i],
  3812. *cfd->GetLatestMutableCFOptions());
  3813. #ifndef NDEBUG
  3814. if (0 == i && num_cfs > 1) {
  3815. TEST_SYNC_POINT(
  3816. "DBImpl::IngestExternalFiles:InstallSVForFirstCF:0");
  3817. TEST_SYNC_POINT(
  3818. "DBImpl::IngestExternalFiles:InstallSVForFirstCF:1");
  3819. }
  3820. #endif // !NDEBUG
  3821. }
  3822. }
  3823. }
  3824. // Resume writes to the DB
  3825. if (two_write_queues_) {
  3826. nonmem_write_thread_.ExitUnbatched(&nonmem_w);
  3827. }
  3828. write_thread_.ExitUnbatched(&w);
  3829. if (status.ok()) {
  3830. for (auto& job : ingestion_jobs) {
  3831. job.UpdateStats();
  3832. }
  3833. }
  3834. ReleaseFileNumberFromPendingOutputs(pending_output_elem);
  3835. num_running_ingest_file_ -= static_cast<int>(num_cfs);
  3836. if (0 == num_running_ingest_file_) {
  3837. bg_cv_.SignalAll();
  3838. }
  3839. TEST_SYNC_POINT("DBImpl::AddFile:MutexUnlock");
  3840. }
  3841. // mutex_ is unlocked here
  3842. // Cleanup
  3843. for (size_t i = 0; i != num_cfs; ++i) {
  3844. sv_ctxs[i].Clean();
  3845. // This may rollback jobs that have completed successfully. This is
  3846. // intended for atomicity.
  3847. ingestion_jobs[i].Cleanup(status);
  3848. }
  3849. if (status.ok()) {
  3850. for (size_t i = 0; i != num_cfs; ++i) {
  3851. auto* cfd =
  3852. static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)->cfd();
  3853. if (!cfd->IsDropped()) {
  3854. NotifyOnExternalFileIngested(cfd, ingestion_jobs[i]);
  3855. }
  3856. }
  3857. }
  3858. return status;
  3859. }
  3860. Status DBImpl::CreateColumnFamilyWithImport(
  3861. const ColumnFamilyOptions& options, const std::string& column_family_name,
  3862. const ImportColumnFamilyOptions& import_options,
  3863. const ExportImportFilesMetaData& metadata, ColumnFamilyHandle** handle) {
  3864. assert(handle != nullptr);
  3865. assert(*handle == nullptr);
  3866. std::string cf_comparator_name = options.comparator->Name();
  3867. if (cf_comparator_name != metadata.db_comparator_name) {
  3868. return Status::InvalidArgument("Comparator name mismatch");
  3869. }
  3870. // Create column family.
  3871. auto status = CreateColumnFamily(options, column_family_name, handle);
  3872. if (!status.ok()) {
  3873. return status;
  3874. }
  3875. // Import sst files from metadata.
  3876. auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(*handle);
  3877. auto cfd = cfh->cfd();
  3878. ImportColumnFamilyJob import_job(env_, versions_.get(), cfd,
  3879. immutable_db_options_, file_options_,
  3880. import_options, metadata.files);
  3881. SuperVersionContext dummy_sv_ctx(/* create_superversion */ true);
  3882. VersionEdit dummy_edit;
  3883. uint64_t next_file_number = 0;
  3884. std::unique_ptr<std::list<uint64_t>::iterator> pending_output_elem;
  3885. {
  3886. // Lock db mutex
  3887. InstrumentedMutexLock l(&mutex_);
  3888. if (error_handler_.IsDBStopped()) {
  3889. // Don't import files when there is a bg_error
  3890. status = error_handler_.GetBGError();
  3891. }
  3892. // Make sure that bg cleanup wont delete the files that we are importing
  3893. pending_output_elem.reset(new std::list<uint64_t>::iterator(
  3894. CaptureCurrentFileNumberInPendingOutputs()));
  3895. if (status.ok()) {
  3896. // If crash happen after a hard link established, Recover function may
  3897. // reuse the file number that has already assigned to the internal file,
  3898. // and this will overwrite the external file. To protect the external
  3899. // file, we have to make sure the file number will never being reused.
  3900. next_file_number = versions_->FetchAddFileNumber(metadata.files.size());
  3901. auto cf_options = cfd->GetLatestMutableCFOptions();
  3902. status = versions_->LogAndApply(cfd, *cf_options, &dummy_edit, &mutex_,
  3903. directories_.GetDbDir());
  3904. if (status.ok()) {
  3905. InstallSuperVersionAndScheduleWork(cfd, &dummy_sv_ctx, *cf_options);
  3906. }
  3907. }
  3908. }
  3909. dummy_sv_ctx.Clean();
  3910. if (status.ok()) {
  3911. SuperVersion* sv = cfd->GetReferencedSuperVersion(this);
  3912. status = import_job.Prepare(next_file_number, sv);
  3913. CleanupSuperVersion(sv);
  3914. }
  3915. if (status.ok()) {
  3916. SuperVersionContext sv_context(true /*create_superversion*/);
  3917. {
  3918. // Lock db mutex
  3919. InstrumentedMutexLock l(&mutex_);
  3920. // Stop writes to the DB by entering both write threads
  3921. WriteThread::Writer w;
  3922. write_thread_.EnterUnbatched(&w, &mutex_);
  3923. WriteThread::Writer nonmem_w;
  3924. if (two_write_queues_) {
  3925. nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
  3926. }
  3927. num_running_ingest_file_++;
  3928. assert(!cfd->IsDropped());
  3929. status = import_job.Run();
  3930. // Install job edit [Mutex will be unlocked here]
  3931. if (status.ok()) {
  3932. auto cf_options = cfd->GetLatestMutableCFOptions();
  3933. status = versions_->LogAndApply(cfd, *cf_options, import_job.edit(),
  3934. &mutex_, directories_.GetDbDir());
  3935. if (status.ok()) {
  3936. InstallSuperVersionAndScheduleWork(cfd, &sv_context, *cf_options);
  3937. }
  3938. }
  3939. // Resume writes to the DB
  3940. if (two_write_queues_) {
  3941. nonmem_write_thread_.ExitUnbatched(&nonmem_w);
  3942. }
  3943. write_thread_.ExitUnbatched(&w);
  3944. num_running_ingest_file_--;
  3945. if (num_running_ingest_file_ == 0) {
  3946. bg_cv_.SignalAll();
  3947. }
  3948. }
  3949. // mutex_ is unlocked here
  3950. sv_context.Clean();
  3951. }
  3952. {
  3953. InstrumentedMutexLock l(&mutex_);
  3954. ReleaseFileNumberFromPendingOutputs(pending_output_elem);
  3955. }
  3956. import_job.Cleanup(status);
  3957. if (!status.ok()) {
  3958. DropColumnFamily(*handle);
  3959. DestroyColumnFamilyHandle(*handle);
  3960. *handle = nullptr;
  3961. }
  3962. return status;
  3963. }
  3964. Status DBImpl::VerifyChecksum(const ReadOptions& read_options) {
  3965. Status s;
  3966. std::vector<ColumnFamilyData*> cfd_list;
  3967. {
  3968. InstrumentedMutexLock l(&mutex_);
  3969. for (auto cfd : *versions_->GetColumnFamilySet()) {
  3970. if (!cfd->IsDropped() && cfd->initialized()) {
  3971. cfd->Ref();
  3972. cfd_list.push_back(cfd);
  3973. }
  3974. }
  3975. }
  3976. std::vector<SuperVersion*> sv_list;
  3977. for (auto cfd : cfd_list) {
  3978. sv_list.push_back(cfd->GetReferencedSuperVersion(this));
  3979. }
  3980. for (auto& sv : sv_list) {
  3981. VersionStorageInfo* vstorage = sv->current->storage_info();
  3982. ColumnFamilyData* cfd = sv->current->cfd();
  3983. Options opts;
  3984. {
  3985. InstrumentedMutexLock l(&mutex_);
  3986. opts = Options(BuildDBOptions(immutable_db_options_, mutable_db_options_),
  3987. cfd->GetLatestCFOptions());
  3988. }
  3989. for (int i = 0; i < vstorage->num_non_empty_levels() && s.ok(); i++) {
  3990. for (size_t j = 0; j < vstorage->LevelFilesBrief(i).num_files && s.ok();
  3991. j++) {
  3992. const auto& fd = vstorage->LevelFilesBrief(i).files[j].fd;
  3993. std::string fname = TableFileName(cfd->ioptions()->cf_paths,
  3994. fd.GetNumber(), fd.GetPathId());
  3995. s = ROCKSDB_NAMESPACE::VerifySstFileChecksum(opts, file_options_,
  3996. read_options, fname);
  3997. }
  3998. }
  3999. if (!s.ok()) {
  4000. break;
  4001. }
  4002. }
  4003. bool defer_purge =
  4004. immutable_db_options().avoid_unnecessary_blocking_io;
  4005. {
  4006. InstrumentedMutexLock l(&mutex_);
  4007. for (auto sv : sv_list) {
  4008. if (sv && sv->Unref()) {
  4009. sv->Cleanup();
  4010. if (defer_purge) {
  4011. AddSuperVersionsToFreeQueue(sv);
  4012. } else {
  4013. delete sv;
  4014. }
  4015. }
  4016. }
  4017. if (defer_purge) {
  4018. SchedulePurge();
  4019. }
  4020. for (auto cfd : cfd_list) {
  4021. cfd->UnrefAndTryDelete();
  4022. }
  4023. }
  4024. return s;
  4025. }
  4026. void DBImpl::NotifyOnExternalFileIngested(
  4027. ColumnFamilyData* cfd, const ExternalSstFileIngestionJob& ingestion_job) {
  4028. if (immutable_db_options_.listeners.empty()) {
  4029. return;
  4030. }
  4031. for (const IngestedFileInfo& f : ingestion_job.files_to_ingest()) {
  4032. ExternalFileIngestionInfo info;
  4033. info.cf_name = cfd->GetName();
  4034. info.external_file_path = f.external_file_path;
  4035. info.internal_file_path = f.internal_file_path;
  4036. info.global_seqno = f.assigned_seqno;
  4037. info.table_properties = f.table_properties;
  4038. for (auto listener : immutable_db_options_.listeners) {
  4039. listener->OnExternalFileIngested(this, info);
  4040. }
  4041. }
  4042. }
  4043. void DBImpl::WaitForIngestFile() {
  4044. mutex_.AssertHeld();
  4045. while (num_running_ingest_file_ > 0) {
  4046. bg_cv_.Wait();
  4047. }
  4048. }
  4049. Status DBImpl::StartTrace(const TraceOptions& trace_options,
  4050. std::unique_ptr<TraceWriter>&& trace_writer) {
  4051. InstrumentedMutexLock lock(&trace_mutex_);
  4052. tracer_.reset(new Tracer(env_, trace_options, std::move(trace_writer)));
  4053. return Status::OK();
  4054. }
  4055. Status DBImpl::EndTrace() {
  4056. InstrumentedMutexLock lock(&trace_mutex_);
  4057. Status s;
  4058. if (tracer_ != nullptr) {
  4059. s = tracer_->Close();
  4060. tracer_.reset();
  4061. } else {
  4062. return Status::IOError("No trace file to close");
  4063. }
  4064. return s;
  4065. }
  4066. Status DBImpl::StartBlockCacheTrace(
  4067. const TraceOptions& trace_options,
  4068. std::unique_ptr<TraceWriter>&& trace_writer) {
  4069. return block_cache_tracer_.StartTrace(env_, trace_options,
  4070. std::move(trace_writer));
  4071. }
  4072. Status DBImpl::EndBlockCacheTrace() {
  4073. block_cache_tracer_.EndTrace();
  4074. return Status::OK();
  4075. }
  4076. Status DBImpl::TraceIteratorSeek(const uint32_t& cf_id, const Slice& key) {
  4077. Status s;
  4078. if (tracer_) {
  4079. InstrumentedMutexLock lock(&trace_mutex_);
  4080. if (tracer_) {
  4081. s = tracer_->IteratorSeek(cf_id, key);
  4082. }
  4083. }
  4084. return s;
  4085. }
  4086. Status DBImpl::TraceIteratorSeekForPrev(const uint32_t& cf_id,
  4087. const Slice& key) {
  4088. Status s;
  4089. if (tracer_) {
  4090. InstrumentedMutexLock lock(&trace_mutex_);
  4091. if (tracer_) {
  4092. s = tracer_->IteratorSeekForPrev(cf_id, key);
  4093. }
  4094. }
  4095. return s;
  4096. }
  4097. Status DBImpl::ReserveFileNumbersBeforeIngestion(
  4098. ColumnFamilyData* cfd, uint64_t num,
  4099. std::unique_ptr<std::list<uint64_t>::iterator>& pending_output_elem,
  4100. uint64_t* next_file_number) {
  4101. Status s;
  4102. SuperVersionContext dummy_sv_ctx(true /* create_superversion */);
  4103. assert(nullptr != next_file_number);
  4104. InstrumentedMutexLock l(&mutex_);
  4105. if (error_handler_.IsDBStopped()) {
  4106. // Do not ingest files when there is a bg_error
  4107. return error_handler_.GetBGError();
  4108. }
  4109. pending_output_elem.reset(new std::list<uint64_t>::iterator(
  4110. CaptureCurrentFileNumberInPendingOutputs()));
  4111. *next_file_number = versions_->FetchAddFileNumber(static_cast<uint64_t>(num));
  4112. auto cf_options = cfd->GetLatestMutableCFOptions();
  4113. VersionEdit dummy_edit;
  4114. // If crash happen after a hard link established, Recover function may
  4115. // reuse the file number that has already assigned to the internal file,
  4116. // and this will overwrite the external file. To protect the external
  4117. // file, we have to make sure the file number will never being reused.
  4118. s = versions_->LogAndApply(cfd, *cf_options, &dummy_edit, &mutex_,
  4119. directories_.GetDbDir());
  4120. if (s.ok()) {
  4121. InstallSuperVersionAndScheduleWork(cfd, &dummy_sv_ctx, *cf_options);
  4122. }
  4123. dummy_sv_ctx.Clean();
  4124. return s;
  4125. }
  4126. Status DBImpl::GetCreationTimeOfOldestFile(uint64_t* creation_time) {
  4127. if (mutable_db_options_.max_open_files == -1) {
  4128. uint64_t oldest_time = port::kMaxUint64;
  4129. for (auto cfd : *versions_->GetColumnFamilySet()) {
  4130. if (!cfd->IsDropped()) {
  4131. uint64_t ctime;
  4132. {
  4133. SuperVersion* sv = GetAndRefSuperVersion(cfd);
  4134. Version* version = sv->current;
  4135. version->GetCreationTimeOfOldestFile(&ctime);
  4136. ReturnAndCleanupSuperVersion(cfd, sv);
  4137. }
  4138. if (ctime < oldest_time) {
  4139. oldest_time = ctime;
  4140. }
  4141. if (oldest_time == 0) {
  4142. break;
  4143. }
  4144. }
  4145. }
  4146. *creation_time = oldest_time;
  4147. return Status::OK();
  4148. } else {
  4149. return Status::NotSupported("This API only works if max_open_files = -1");
  4150. }
  4151. }
  4152. #endif // ROCKSDB_LITE
  4153. } // namespace ROCKSDB_NAMESPACE