db_impl_compaction_flush.cc 117 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645264626472648264926502651265226532654265526562657265826592660266126622663266426652666266726682669267026712672267326742675267626772678267926802681268226832684268526862687268826892690269126922693269426952696269726982699270027012702270327042705270627072708270927102711271227132714271527162717271827192720272127222723272427252726272727282729273027312732273327342735273627372738273927402741274227432744274527462747274827492750275127522753275427552756275727582759276027612762276327642765276627672768276927702771277227732774277527762777277827792780278127822783278427852786278727882789279027912792279327942795279627972798279928002801280228032804280528062807280828092810281128122813281428152816281728182819282028212822282328242825282628272828282928302831283228332834283528362837283828392840284128422843284428452846284728482849285028512852285328542855285628572858285928602861286228632864286528662867286828692870287128722873287428752876287728782879288028812882288328842885288628872888288928902891289228932894289528962897289828992900290129022903290429052906290729082909291029112912291329142915291629172918291929202921292229232924292529262927292829292930293129322933293429352936293729382939294029412942294329442945294629472948294929502951295229532954295529562957295829592960296129622963296429652966296729682969297029712972297329742975297629772978297929802981298229832984298529862987298829892990299129922993299429952996299729982999300030013002300330043005300630073008300930103011301230133014301530163017301830193020302130223023302430253026302730283029303030313032303330343035303630373038303930403041304230433044304530463047304830493050305130523053305430553056305730583059306030613062306330643065306630673068306930703071307230733074307530763077307830793080308130823083308430853086308730883089309030913092309330943095309630973098309931003101310231033104310531063107310831093110311131123113311431153116
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #include "db/db_impl/db_impl.h"
  10. #include <cinttypes>
  11. #include "db/builder.h"
  12. #include "db/error_handler.h"
  13. #include "db/event_helpers.h"
  14. #include "file/sst_file_manager_impl.h"
  15. #include "monitoring/iostats_context_imp.h"
  16. #include "monitoring/perf_context_imp.h"
  17. #include "monitoring/thread_status_updater.h"
  18. #include "monitoring/thread_status_util.h"
  19. #include "test_util/sync_point.h"
  20. #include "util/cast_util.h"
  21. #include "util/concurrent_task_limiter_impl.h"
  22. namespace ROCKSDB_NAMESPACE {
  23. bool DBImpl::EnoughRoomForCompaction(
  24. ColumnFamilyData* cfd, const std::vector<CompactionInputFiles>& inputs,
  25. bool* sfm_reserved_compact_space, LogBuffer* log_buffer) {
  26. // Check if we have enough room to do the compaction
  27. bool enough_room = true;
  28. #ifndef ROCKSDB_LITE
  29. auto sfm = static_cast<SstFileManagerImpl*>(
  30. immutable_db_options_.sst_file_manager.get());
  31. if (sfm) {
  32. // Pass the current bg_error_ to SFM so it can decide what checks to
  33. // perform. If this DB instance hasn't seen any error yet, the SFM can be
  34. // optimistic and not do disk space checks
  35. enough_room =
  36. sfm->EnoughRoomForCompaction(cfd, inputs, error_handler_.GetBGError());
  37. if (enough_room) {
  38. *sfm_reserved_compact_space = true;
  39. }
  40. }
  41. #else
  42. (void)cfd;
  43. (void)inputs;
  44. (void)sfm_reserved_compact_space;
  45. #endif // ROCKSDB_LITE
  46. if (!enough_room) {
  47. // Just in case tests want to change the value of enough_room
  48. TEST_SYNC_POINT_CALLBACK(
  49. "DBImpl::BackgroundCompaction():CancelledCompaction", &enough_room);
  50. ROCKS_LOG_BUFFER(log_buffer,
  51. "Cancelled compaction because not enough room");
  52. RecordTick(stats_, COMPACTION_CANCELLED, 1);
  53. }
  54. return enough_room;
  55. }
  56. bool DBImpl::RequestCompactionToken(ColumnFamilyData* cfd, bool force,
  57. std::unique_ptr<TaskLimiterToken>* token,
  58. LogBuffer* log_buffer) {
  59. assert(*token == nullptr);
  60. auto limiter = static_cast<ConcurrentTaskLimiterImpl*>(
  61. cfd->ioptions()->compaction_thread_limiter.get());
  62. if (limiter == nullptr) {
  63. return true;
  64. }
  65. *token = limiter->GetToken(force);
  66. if (*token != nullptr) {
  67. ROCKS_LOG_BUFFER(log_buffer,
  68. "Thread limiter [%s] increase [%s] compaction task, "
  69. "force: %s, tasks after: %d",
  70. limiter->GetName().c_str(), cfd->GetName().c_str(),
  71. force ? "true" : "false", limiter->GetOutstandingTask());
  72. return true;
  73. }
  74. return false;
  75. }
  76. Status DBImpl::SyncClosedLogs(JobContext* job_context) {
  77. TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Start");
  78. mutex_.AssertHeld();
  79. autovector<log::Writer*, 1> logs_to_sync;
  80. uint64_t current_log_number = logfile_number_;
  81. while (logs_.front().number < current_log_number &&
  82. logs_.front().getting_synced) {
  83. log_sync_cv_.Wait();
  84. }
  85. for (auto it = logs_.begin();
  86. it != logs_.end() && it->number < current_log_number; ++it) {
  87. auto& log = *it;
  88. assert(!log.getting_synced);
  89. log.getting_synced = true;
  90. logs_to_sync.push_back(log.writer);
  91. }
  92. Status s;
  93. if (!logs_to_sync.empty()) {
  94. mutex_.Unlock();
  95. for (log::Writer* log : logs_to_sync) {
  96. ROCKS_LOG_INFO(immutable_db_options_.info_log,
  97. "[JOB %d] Syncing log #%" PRIu64, job_context->job_id,
  98. log->get_log_number());
  99. s = log->file()->Sync(immutable_db_options_.use_fsync);
  100. if (!s.ok()) {
  101. break;
  102. }
  103. if (immutable_db_options_.recycle_log_file_num > 0) {
  104. s = log->Close();
  105. if (!s.ok()) {
  106. break;
  107. }
  108. }
  109. }
  110. if (s.ok()) {
  111. s = directories_.GetWalDir()->Fsync();
  112. }
  113. mutex_.Lock();
  114. // "number <= current_log_number - 1" is equivalent to
  115. // "number < current_log_number".
  116. MarkLogsSynced(current_log_number - 1, true, s);
  117. if (!s.ok()) {
  118. error_handler_.SetBGError(s, BackgroundErrorReason::kFlush);
  119. TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Failed");
  120. return s;
  121. }
  122. }
  123. return s;
  124. }
  125. Status DBImpl::FlushMemTableToOutputFile(
  126. ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
  127. bool* made_progress, JobContext* job_context,
  128. SuperVersionContext* superversion_context,
  129. std::vector<SequenceNumber>& snapshot_seqs,
  130. SequenceNumber earliest_write_conflict_snapshot,
  131. SnapshotChecker* snapshot_checker, LogBuffer* log_buffer,
  132. Env::Priority thread_pri) {
  133. mutex_.AssertHeld();
  134. assert(cfd->imm()->NumNotFlushed() != 0);
  135. assert(cfd->imm()->IsFlushPending());
  136. FlushJob flush_job(
  137. dbname_, cfd, immutable_db_options_, mutable_cf_options,
  138. nullptr /* memtable_id */, file_options_for_compaction_, versions_.get(),
  139. &mutex_, &shutting_down_, snapshot_seqs, earliest_write_conflict_snapshot,
  140. snapshot_checker, job_context, log_buffer, directories_.GetDbDir(),
  141. GetDataDir(cfd, 0U),
  142. GetCompressionFlush(*cfd->ioptions(), mutable_cf_options), stats_,
  143. &event_logger_, mutable_cf_options.report_bg_io_stats,
  144. true /* sync_output_directory */, true /* write_manifest */, thread_pri);
  145. FileMetaData file_meta;
  146. TEST_SYNC_POINT("DBImpl::FlushMemTableToOutputFile:BeforePickMemtables");
  147. flush_job.PickMemTable();
  148. TEST_SYNC_POINT("DBImpl::FlushMemTableToOutputFile:AfterPickMemtables");
  149. #ifndef ROCKSDB_LITE
  150. // may temporarily unlock and lock the mutex.
  151. NotifyOnFlushBegin(cfd, &file_meta, mutable_cf_options, job_context->job_id);
  152. #endif // ROCKSDB_LITE
  153. Status s;
  154. if (logfile_number_ > 0 &&
  155. versions_->GetColumnFamilySet()->NumberOfColumnFamilies() > 1) {
  156. // If there are more than one column families, we need to make sure that
  157. // all the log files except the most recent one are synced. Otherwise if
  158. // the host crashes after flushing and before WAL is persistent, the
  159. // flushed SST may contain data from write batches whose updates to
  160. // other column families are missing.
  161. // SyncClosedLogs() may unlock and re-lock the db_mutex.
  162. s = SyncClosedLogs(job_context);
  163. } else {
  164. TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Skip");
  165. }
  166. // Within flush_job.Run, rocksdb may call event listener to notify
  167. // file creation and deletion.
  168. //
  169. // Note that flush_job.Run will unlock and lock the db_mutex,
  170. // and EventListener callback will be called when the db_mutex
  171. // is unlocked by the current thread.
  172. if (s.ok()) {
  173. s = flush_job.Run(&logs_with_prep_tracker_, &file_meta);
  174. } else {
  175. flush_job.Cancel();
  176. }
  177. if (s.ok()) {
  178. InstallSuperVersionAndScheduleWork(cfd, superversion_context,
  179. mutable_cf_options);
  180. if (made_progress) {
  181. *made_progress = true;
  182. }
  183. VersionStorageInfo::LevelSummaryStorage tmp;
  184. ROCKS_LOG_BUFFER(log_buffer, "[%s] Level summary: %s\n",
  185. cfd->GetName().c_str(),
  186. cfd->current()->storage_info()->LevelSummary(&tmp));
  187. }
  188. if (!s.ok() && !s.IsShutdownInProgress() && !s.IsColumnFamilyDropped()) {
  189. Status new_bg_error = s;
  190. error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush);
  191. }
  192. if (s.ok()) {
  193. #ifndef ROCKSDB_LITE
  194. // may temporarily unlock and lock the mutex.
  195. NotifyOnFlushCompleted(cfd, mutable_cf_options,
  196. flush_job.GetCommittedFlushJobsInfo());
  197. auto sfm = static_cast<SstFileManagerImpl*>(
  198. immutable_db_options_.sst_file_manager.get());
  199. if (sfm) {
  200. // Notify sst_file_manager that a new file was added
  201. std::string file_path = MakeTableFileName(
  202. cfd->ioptions()->cf_paths[0].path, file_meta.fd.GetNumber());
  203. sfm->OnAddFile(file_path);
  204. if (sfm->IsMaxAllowedSpaceReached()) {
  205. Status new_bg_error =
  206. Status::SpaceLimit("Max allowed space was reached");
  207. TEST_SYNC_POINT_CALLBACK(
  208. "DBImpl::FlushMemTableToOutputFile:MaxAllowedSpaceReached",
  209. &new_bg_error);
  210. error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush);
  211. }
  212. }
  213. #endif // ROCKSDB_LITE
  214. }
  215. TEST_SYNC_POINT("DBImpl::FlushMemTableToOutputFile:Finish");
  216. return s;
  217. }
  218. Status DBImpl::FlushMemTablesToOutputFiles(
  219. const autovector<BGFlushArg>& bg_flush_args, bool* made_progress,
  220. JobContext* job_context, LogBuffer* log_buffer, Env::Priority thread_pri) {
  221. if (immutable_db_options_.atomic_flush) {
  222. return AtomicFlushMemTablesToOutputFiles(
  223. bg_flush_args, made_progress, job_context, log_buffer, thread_pri);
  224. }
  225. std::vector<SequenceNumber> snapshot_seqs;
  226. SequenceNumber earliest_write_conflict_snapshot;
  227. SnapshotChecker* snapshot_checker;
  228. GetSnapshotContext(job_context, &snapshot_seqs,
  229. &earliest_write_conflict_snapshot, &snapshot_checker);
  230. Status status;
  231. for (auto& arg : bg_flush_args) {
  232. ColumnFamilyData* cfd = arg.cfd_;
  233. MutableCFOptions mutable_cf_options = *cfd->GetLatestMutableCFOptions();
  234. SuperVersionContext* superversion_context = arg.superversion_context_;
  235. Status s = FlushMemTableToOutputFile(
  236. cfd, mutable_cf_options, made_progress, job_context,
  237. superversion_context, snapshot_seqs, earliest_write_conflict_snapshot,
  238. snapshot_checker, log_buffer, thread_pri);
  239. if (!s.ok()) {
  240. status = s;
  241. if (!s.IsShutdownInProgress() && !s.IsColumnFamilyDropped()) {
  242. // At this point, DB is not shutting down, nor is cfd dropped.
  243. // Something is wrong, thus we break out of the loop.
  244. break;
  245. }
  246. }
  247. }
  248. return status;
  249. }
  250. /*
  251. * Atomically flushes multiple column families.
  252. *
  253. * For each column family, all memtables with ID smaller than or equal to the
  254. * ID specified in bg_flush_args will be flushed. Only after all column
  255. * families finish flush will this function commit to MANIFEST. If any of the
  256. * column families are not flushed successfully, this function does not have
  257. * any side-effect on the state of the database.
  258. */
  259. Status DBImpl::AtomicFlushMemTablesToOutputFiles(
  260. const autovector<BGFlushArg>& bg_flush_args, bool* made_progress,
  261. JobContext* job_context, LogBuffer* log_buffer, Env::Priority thread_pri) {
  262. mutex_.AssertHeld();
  263. autovector<ColumnFamilyData*> cfds;
  264. for (const auto& arg : bg_flush_args) {
  265. cfds.emplace_back(arg.cfd_);
  266. }
  267. #ifndef NDEBUG
  268. for (const auto cfd : cfds) {
  269. assert(cfd->imm()->NumNotFlushed() != 0);
  270. assert(cfd->imm()->IsFlushPending());
  271. }
  272. #endif /* !NDEBUG */
  273. std::vector<SequenceNumber> snapshot_seqs;
  274. SequenceNumber earliest_write_conflict_snapshot;
  275. SnapshotChecker* snapshot_checker;
  276. GetSnapshotContext(job_context, &snapshot_seqs,
  277. &earliest_write_conflict_snapshot, &snapshot_checker);
  278. autovector<Directory*> distinct_output_dirs;
  279. autovector<std::string> distinct_output_dir_paths;
  280. std::vector<std::unique_ptr<FlushJob>> jobs;
  281. std::vector<MutableCFOptions> all_mutable_cf_options;
  282. int num_cfs = static_cast<int>(cfds.size());
  283. all_mutable_cf_options.reserve(num_cfs);
  284. for (int i = 0; i < num_cfs; ++i) {
  285. auto cfd = cfds[i];
  286. Directory* data_dir = GetDataDir(cfd, 0U);
  287. const std::string& curr_path = cfd->ioptions()->cf_paths[0].path;
  288. // Add to distinct output directories if eligible. Use linear search. Since
  289. // the number of elements in the vector is not large, performance should be
  290. // tolerable.
  291. bool found = false;
  292. for (const auto& path : distinct_output_dir_paths) {
  293. if (path == curr_path) {
  294. found = true;
  295. break;
  296. }
  297. }
  298. if (!found) {
  299. distinct_output_dir_paths.emplace_back(curr_path);
  300. distinct_output_dirs.emplace_back(data_dir);
  301. }
  302. all_mutable_cf_options.emplace_back(*cfd->GetLatestMutableCFOptions());
  303. const MutableCFOptions& mutable_cf_options = all_mutable_cf_options.back();
  304. const uint64_t* max_memtable_id = &(bg_flush_args[i].max_memtable_id_);
  305. jobs.emplace_back(new FlushJob(
  306. dbname_, cfd, immutable_db_options_, mutable_cf_options,
  307. max_memtable_id, file_options_for_compaction_, versions_.get(), &mutex_,
  308. &shutting_down_, snapshot_seqs, earliest_write_conflict_snapshot,
  309. snapshot_checker, job_context, log_buffer, directories_.GetDbDir(),
  310. data_dir, GetCompressionFlush(*cfd->ioptions(), mutable_cf_options),
  311. stats_, &event_logger_, mutable_cf_options.report_bg_io_stats,
  312. false /* sync_output_directory */, false /* write_manifest */,
  313. thread_pri));
  314. jobs.back()->PickMemTable();
  315. }
  316. std::vector<FileMetaData> file_meta(num_cfs);
  317. Status s;
  318. assert(num_cfs == static_cast<int>(jobs.size()));
  319. #ifndef ROCKSDB_LITE
  320. for (int i = 0; i != num_cfs; ++i) {
  321. const MutableCFOptions& mutable_cf_options = all_mutable_cf_options.at(i);
  322. // may temporarily unlock and lock the mutex.
  323. NotifyOnFlushBegin(cfds[i], &file_meta[i], mutable_cf_options,
  324. job_context->job_id);
  325. }
  326. #endif /* !ROCKSDB_LITE */
  327. if (logfile_number_ > 0) {
  328. // TODO (yanqin) investigate whether we should sync the closed logs for
  329. // single column family case.
  330. s = SyncClosedLogs(job_context);
  331. }
  332. // exec_status stores the execution status of flush_jobs as
  333. // <bool /* executed */, Status /* status code */>
  334. autovector<std::pair<bool, Status>> exec_status;
  335. for (int i = 0; i != num_cfs; ++i) {
  336. // Initially all jobs are not executed, with status OK.
  337. exec_status.emplace_back(false, Status::OK());
  338. }
  339. if (s.ok()) {
  340. // TODO (yanqin): parallelize jobs with threads.
  341. for (int i = 1; i != num_cfs; ++i) {
  342. exec_status[i].second =
  343. jobs[i]->Run(&logs_with_prep_tracker_, &file_meta[i]);
  344. exec_status[i].first = true;
  345. }
  346. if (num_cfs > 1) {
  347. TEST_SYNC_POINT(
  348. "DBImpl::AtomicFlushMemTablesToOutputFiles:SomeFlushJobsComplete:1");
  349. TEST_SYNC_POINT(
  350. "DBImpl::AtomicFlushMemTablesToOutputFiles:SomeFlushJobsComplete:2");
  351. }
  352. assert(exec_status.size() > 0);
  353. assert(!file_meta.empty());
  354. exec_status[0].second =
  355. jobs[0]->Run(&logs_with_prep_tracker_, &file_meta[0]);
  356. exec_status[0].first = true;
  357. Status error_status;
  358. for (const auto& e : exec_status) {
  359. if (!e.second.ok()) {
  360. s = e.second;
  361. if (!e.second.IsShutdownInProgress() &&
  362. !e.second.IsColumnFamilyDropped()) {
  363. // If a flush job did not return OK, and the CF is not dropped, and
  364. // the DB is not shutting down, then we have to return this result to
  365. // caller later.
  366. error_status = e.second;
  367. }
  368. }
  369. }
  370. s = error_status.ok() ? s : error_status;
  371. }
  372. if (s.IsColumnFamilyDropped()) {
  373. s = Status::OK();
  374. }
  375. if (s.ok() || s.IsShutdownInProgress()) {
  376. // Sync on all distinct output directories.
  377. for (auto dir : distinct_output_dirs) {
  378. if (dir != nullptr) {
  379. Status error_status = dir->Fsync();
  380. if (!error_status.ok()) {
  381. s = error_status;
  382. break;
  383. }
  384. }
  385. }
  386. } else {
  387. // Need to undo atomic flush if something went wrong, i.e. s is not OK and
  388. // it is not because of CF drop.
  389. // Have to cancel the flush jobs that have NOT executed because we need to
  390. // unref the versions.
  391. for (int i = 0; i != num_cfs; ++i) {
  392. if (!exec_status[i].first) {
  393. jobs[i]->Cancel();
  394. }
  395. }
  396. for (int i = 0; i != num_cfs; ++i) {
  397. if (exec_status[i].first && exec_status[i].second.ok()) {
  398. auto& mems = jobs[i]->GetMemTables();
  399. cfds[i]->imm()->RollbackMemtableFlush(mems,
  400. file_meta[i].fd.GetNumber());
  401. }
  402. }
  403. }
  404. if (s.ok()) {
  405. auto wait_to_install_func = [&]() {
  406. bool ready = true;
  407. for (size_t i = 0; i != cfds.size(); ++i) {
  408. const auto& mems = jobs[i]->GetMemTables();
  409. if (cfds[i]->IsDropped()) {
  410. // If the column family is dropped, then do not wait.
  411. continue;
  412. } else if (!mems.empty() &&
  413. cfds[i]->imm()->GetEarliestMemTableID() < mems[0]->GetID()) {
  414. // If a flush job needs to install the flush result for mems and
  415. // mems[0] is not the earliest memtable, it means another thread must
  416. // be installing flush results for the same column family, then the
  417. // current thread needs to wait.
  418. ready = false;
  419. break;
  420. } else if (mems.empty() && cfds[i]->imm()->GetEarliestMemTableID() <=
  421. bg_flush_args[i].max_memtable_id_) {
  422. // If a flush job does not need to install flush results, then it has
  423. // to wait until all memtables up to max_memtable_id_ (inclusive) are
  424. // installed.
  425. ready = false;
  426. break;
  427. }
  428. }
  429. return ready;
  430. };
  431. bool resuming_from_bg_err = error_handler_.IsDBStopped();
  432. while ((!error_handler_.IsDBStopped() ||
  433. error_handler_.GetRecoveryError().ok()) &&
  434. !wait_to_install_func()) {
  435. atomic_flush_install_cv_.Wait();
  436. }
  437. s = resuming_from_bg_err ? error_handler_.GetRecoveryError()
  438. : error_handler_.GetBGError();
  439. }
  440. if (s.ok()) {
  441. autovector<ColumnFamilyData*> tmp_cfds;
  442. autovector<const autovector<MemTable*>*> mems_list;
  443. autovector<const MutableCFOptions*> mutable_cf_options_list;
  444. autovector<FileMetaData*> tmp_file_meta;
  445. for (int i = 0; i != num_cfs; ++i) {
  446. const auto& mems = jobs[i]->GetMemTables();
  447. if (!cfds[i]->IsDropped() && !mems.empty()) {
  448. tmp_cfds.emplace_back(cfds[i]);
  449. mems_list.emplace_back(&mems);
  450. mutable_cf_options_list.emplace_back(&all_mutable_cf_options[i]);
  451. tmp_file_meta.emplace_back(&file_meta[i]);
  452. }
  453. }
  454. s = InstallMemtableAtomicFlushResults(
  455. nullptr /* imm_lists */, tmp_cfds, mutable_cf_options_list, mems_list,
  456. versions_.get(), &mutex_, tmp_file_meta,
  457. &job_context->memtables_to_free, directories_.GetDbDir(), log_buffer);
  458. }
  459. if (s.ok()) {
  460. assert(num_cfs ==
  461. static_cast<int>(job_context->superversion_contexts.size()));
  462. for (int i = 0; i != num_cfs; ++i) {
  463. if (cfds[i]->IsDropped()) {
  464. continue;
  465. }
  466. InstallSuperVersionAndScheduleWork(cfds[i],
  467. &job_context->superversion_contexts[i],
  468. all_mutable_cf_options[i]);
  469. VersionStorageInfo::LevelSummaryStorage tmp;
  470. ROCKS_LOG_BUFFER(log_buffer, "[%s] Level summary: %s\n",
  471. cfds[i]->GetName().c_str(),
  472. cfds[i]->current()->storage_info()->LevelSummary(&tmp));
  473. }
  474. if (made_progress) {
  475. *made_progress = true;
  476. }
  477. #ifndef ROCKSDB_LITE
  478. auto sfm = static_cast<SstFileManagerImpl*>(
  479. immutable_db_options_.sst_file_manager.get());
  480. assert(all_mutable_cf_options.size() == static_cast<size_t>(num_cfs));
  481. for (int i = 0; i != num_cfs; ++i) {
  482. if (cfds[i]->IsDropped()) {
  483. continue;
  484. }
  485. NotifyOnFlushCompleted(cfds[i], all_mutable_cf_options[i],
  486. jobs[i]->GetCommittedFlushJobsInfo());
  487. if (sfm) {
  488. std::string file_path = MakeTableFileName(
  489. cfds[i]->ioptions()->cf_paths[0].path, file_meta[i].fd.GetNumber());
  490. sfm->OnAddFile(file_path);
  491. if (sfm->IsMaxAllowedSpaceReached() &&
  492. error_handler_.GetBGError().ok()) {
  493. Status new_bg_error =
  494. Status::SpaceLimit("Max allowed space was reached");
  495. error_handler_.SetBGError(new_bg_error,
  496. BackgroundErrorReason::kFlush);
  497. }
  498. }
  499. }
  500. #endif // ROCKSDB_LITE
  501. }
  502. if (!s.ok() && !s.IsShutdownInProgress()) {
  503. Status new_bg_error = s;
  504. error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush);
  505. }
  506. return s;
  507. }
  508. void DBImpl::NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta,
  509. const MutableCFOptions& mutable_cf_options,
  510. int job_id) {
  511. #ifndef ROCKSDB_LITE
  512. if (immutable_db_options_.listeners.size() == 0U) {
  513. return;
  514. }
  515. mutex_.AssertHeld();
  516. if (shutting_down_.load(std::memory_order_acquire)) {
  517. return;
  518. }
  519. bool triggered_writes_slowdown =
  520. (cfd->current()->storage_info()->NumLevelFiles(0) >=
  521. mutable_cf_options.level0_slowdown_writes_trigger);
  522. bool triggered_writes_stop =
  523. (cfd->current()->storage_info()->NumLevelFiles(0) >=
  524. mutable_cf_options.level0_stop_writes_trigger);
  525. // release lock while notifying events
  526. mutex_.Unlock();
  527. {
  528. FlushJobInfo info{};
  529. info.cf_id = cfd->GetID();
  530. info.cf_name = cfd->GetName();
  531. // TODO(yhchiang): make db_paths dynamic in case flush does not
  532. // go to L0 in the future.
  533. const uint64_t file_number = file_meta->fd.GetNumber();
  534. info.file_path =
  535. MakeTableFileName(cfd->ioptions()->cf_paths[0].path, file_number);
  536. info.file_number = file_number;
  537. info.thread_id = env_->GetThreadID();
  538. info.job_id = job_id;
  539. info.triggered_writes_slowdown = triggered_writes_slowdown;
  540. info.triggered_writes_stop = triggered_writes_stop;
  541. info.smallest_seqno = file_meta->fd.smallest_seqno;
  542. info.largest_seqno = file_meta->fd.largest_seqno;
  543. info.flush_reason = cfd->GetFlushReason();
  544. for (auto listener : immutable_db_options_.listeners) {
  545. listener->OnFlushBegin(this, info);
  546. }
  547. }
  548. mutex_.Lock();
  549. // no need to signal bg_cv_ as it will be signaled at the end of the
  550. // flush process.
  551. #else
  552. (void)cfd;
  553. (void)file_meta;
  554. (void)mutable_cf_options;
  555. (void)job_id;
  556. #endif // ROCKSDB_LITE
  557. }
  558. void DBImpl::NotifyOnFlushCompleted(
  559. ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
  560. std::list<std::unique_ptr<FlushJobInfo>>* flush_jobs_info) {
  561. #ifndef ROCKSDB_LITE
  562. assert(flush_jobs_info != nullptr);
  563. if (immutable_db_options_.listeners.size() == 0U) {
  564. return;
  565. }
  566. mutex_.AssertHeld();
  567. if (shutting_down_.load(std::memory_order_acquire)) {
  568. return;
  569. }
  570. bool triggered_writes_slowdown =
  571. (cfd->current()->storage_info()->NumLevelFiles(0) >=
  572. mutable_cf_options.level0_slowdown_writes_trigger);
  573. bool triggered_writes_stop =
  574. (cfd->current()->storage_info()->NumLevelFiles(0) >=
  575. mutable_cf_options.level0_stop_writes_trigger);
  576. // release lock while notifying events
  577. mutex_.Unlock();
  578. {
  579. for (auto& info : *flush_jobs_info) {
  580. info->triggered_writes_slowdown = triggered_writes_slowdown;
  581. info->triggered_writes_stop = triggered_writes_stop;
  582. for (auto listener : immutable_db_options_.listeners) {
  583. listener->OnFlushCompleted(this, *info);
  584. }
  585. }
  586. flush_jobs_info->clear();
  587. }
  588. mutex_.Lock();
  589. // no need to signal bg_cv_ as it will be signaled at the end of the
  590. // flush process.
  591. #else
  592. (void)cfd;
  593. (void)mutable_cf_options;
  594. (void)flush_jobs_info;
  595. #endif // ROCKSDB_LITE
  596. }
  597. Status DBImpl::CompactRange(const CompactRangeOptions& options,
  598. ColumnFamilyHandle* column_family,
  599. const Slice* begin, const Slice* end) {
  600. auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
  601. auto cfd = cfh->cfd();
  602. if (options.target_path_id >= cfd->ioptions()->cf_paths.size()) {
  603. return Status::InvalidArgument("Invalid target path ID");
  604. }
  605. bool exclusive = options.exclusive_manual_compaction;
  606. bool flush_needed = true;
  607. if (begin != nullptr && end != nullptr) {
  608. // TODO(ajkr): We could also optimize away the flush in certain cases where
  609. // one/both sides of the interval are unbounded. But it requires more
  610. // changes to RangesOverlapWithMemtables.
  611. Range range(*begin, *end);
  612. SuperVersion* super_version = cfd->GetReferencedSuperVersion(this);
  613. cfd->RangesOverlapWithMemtables({range}, super_version, &flush_needed);
  614. CleanupSuperVersion(super_version);
  615. }
  616. Status s;
  617. if (flush_needed) {
  618. FlushOptions fo;
  619. fo.allow_write_stall = options.allow_write_stall;
  620. if (immutable_db_options_.atomic_flush) {
  621. autovector<ColumnFamilyData*> cfds;
  622. mutex_.Lock();
  623. SelectColumnFamiliesForAtomicFlush(&cfds);
  624. mutex_.Unlock();
  625. s = AtomicFlushMemTables(cfds, fo, FlushReason::kManualCompaction,
  626. false /* writes_stopped */);
  627. } else {
  628. s = FlushMemTable(cfd, fo, FlushReason::kManualCompaction,
  629. false /* writes_stopped*/);
  630. }
  631. if (!s.ok()) {
  632. LogFlush(immutable_db_options_.info_log);
  633. return s;
  634. }
  635. }
  636. int max_level_with_files = 0;
  637. // max_file_num_to_ignore can be used to filter out newly created SST files,
  638. // useful for bottom level compaction in a manual compaction
  639. uint64_t max_file_num_to_ignore = port::kMaxUint64;
  640. uint64_t next_file_number = port::kMaxUint64;
  641. {
  642. InstrumentedMutexLock l(&mutex_);
  643. Version* base = cfd->current();
  644. for (int level = 1; level < base->storage_info()->num_non_empty_levels();
  645. level++) {
  646. if (base->storage_info()->OverlapInLevel(level, begin, end)) {
  647. max_level_with_files = level;
  648. }
  649. }
  650. next_file_number = versions_->current_next_file_number();
  651. }
  652. int final_output_level = 0;
  653. if (cfd->ioptions()->compaction_style == kCompactionStyleUniversal &&
  654. cfd->NumberLevels() > 1) {
  655. // Always compact all files together.
  656. final_output_level = cfd->NumberLevels() - 1;
  657. // if bottom most level is reserved
  658. if (immutable_db_options_.allow_ingest_behind) {
  659. final_output_level--;
  660. }
  661. s = RunManualCompaction(cfd, ColumnFamilyData::kCompactAllLevels,
  662. final_output_level, options, begin, end, exclusive,
  663. false, max_file_num_to_ignore);
  664. } else {
  665. for (int level = 0; level <= max_level_with_files; level++) {
  666. int output_level;
  667. // in case the compaction is universal or if we're compacting the
  668. // bottom-most level, the output level will be the same as input one.
  669. // level 0 can never be the bottommost level (i.e. if all files are in
  670. // level 0, we will compact to level 1)
  671. if (cfd->ioptions()->compaction_style == kCompactionStyleUniversal ||
  672. cfd->ioptions()->compaction_style == kCompactionStyleFIFO) {
  673. output_level = level;
  674. } else if (level == max_level_with_files && level > 0) {
  675. if (options.bottommost_level_compaction ==
  676. BottommostLevelCompaction::kSkip) {
  677. // Skip bottommost level compaction
  678. continue;
  679. } else if (options.bottommost_level_compaction ==
  680. BottommostLevelCompaction::kIfHaveCompactionFilter &&
  681. cfd->ioptions()->compaction_filter == nullptr &&
  682. cfd->ioptions()->compaction_filter_factory == nullptr) {
  683. // Skip bottommost level compaction since we don't have a compaction
  684. // filter
  685. continue;
  686. }
  687. output_level = level;
  688. // update max_file_num_to_ignore only for bottom level compaction
  689. // because data in newly compacted files in middle levels may still need
  690. // to be pushed down
  691. max_file_num_to_ignore = next_file_number;
  692. } else {
  693. output_level = level + 1;
  694. if (cfd->ioptions()->compaction_style == kCompactionStyleLevel &&
  695. cfd->ioptions()->level_compaction_dynamic_level_bytes &&
  696. level == 0) {
  697. output_level = ColumnFamilyData::kCompactToBaseLevel;
  698. }
  699. }
  700. s = RunManualCompaction(cfd, level, output_level, options, begin, end,
  701. exclusive, false, max_file_num_to_ignore);
  702. if (!s.ok()) {
  703. break;
  704. }
  705. if (output_level == ColumnFamilyData::kCompactToBaseLevel) {
  706. final_output_level = cfd->NumberLevels() - 1;
  707. } else if (output_level > final_output_level) {
  708. final_output_level = output_level;
  709. }
  710. TEST_SYNC_POINT("DBImpl::RunManualCompaction()::1");
  711. TEST_SYNC_POINT("DBImpl::RunManualCompaction()::2");
  712. }
  713. }
  714. if (!s.ok()) {
  715. LogFlush(immutable_db_options_.info_log);
  716. return s;
  717. }
  718. if (options.change_level) {
  719. ROCKS_LOG_INFO(immutable_db_options_.info_log,
  720. "[RefitLevel] waiting for background threads to stop");
  721. s = PauseBackgroundWork();
  722. if (s.ok()) {
  723. s = ReFitLevel(cfd, final_output_level, options.target_level);
  724. }
  725. ContinueBackgroundWork();
  726. }
  727. LogFlush(immutable_db_options_.info_log);
  728. {
  729. InstrumentedMutexLock l(&mutex_);
  730. // an automatic compaction that has been scheduled might have been
  731. // preempted by the manual compactions. Need to schedule it back.
  732. MaybeScheduleFlushOrCompaction();
  733. }
  734. return s;
  735. }
  736. Status DBImpl::CompactFiles(const CompactionOptions& compact_options,
  737. ColumnFamilyHandle* column_family,
  738. const std::vector<std::string>& input_file_names,
  739. const int output_level, const int output_path_id,
  740. std::vector<std::string>* const output_file_names,
  741. CompactionJobInfo* compaction_job_info) {
  742. #ifdef ROCKSDB_LITE
  743. (void)compact_options;
  744. (void)column_family;
  745. (void)input_file_names;
  746. (void)output_level;
  747. (void)output_path_id;
  748. (void)output_file_names;
  749. (void)compaction_job_info;
  750. // not supported in lite version
  751. return Status::NotSupported("Not supported in ROCKSDB LITE");
  752. #else
  753. if (column_family == nullptr) {
  754. return Status::InvalidArgument("ColumnFamilyHandle must be non-null.");
  755. }
  756. auto cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd();
  757. assert(cfd);
  758. Status s;
  759. JobContext job_context(0, true);
  760. LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL,
  761. immutable_db_options_.info_log.get());
  762. // Perform CompactFiles
  763. TEST_SYNC_POINT("TestCompactFiles::IngestExternalFile2");
  764. {
  765. InstrumentedMutexLock l(&mutex_);
  766. // This call will unlock/lock the mutex to wait for current running
  767. // IngestExternalFile() calls to finish.
  768. WaitForIngestFile();
  769. // We need to get current after `WaitForIngestFile`, because
  770. // `IngestExternalFile` may add files that overlap with `input_file_names`
  771. auto* current = cfd->current();
  772. current->Ref();
  773. s = CompactFilesImpl(compact_options, cfd, current, input_file_names,
  774. output_file_names, output_level, output_path_id,
  775. &job_context, &log_buffer, compaction_job_info);
  776. current->Unref();
  777. }
  778. // Find and delete obsolete files
  779. {
  780. InstrumentedMutexLock l(&mutex_);
  781. // If !s.ok(), this means that Compaction failed. In that case, we want
  782. // to delete all obsolete files we might have created and we force
  783. // FindObsoleteFiles(). This is because job_context does not
  784. // catch all created files if compaction failed.
  785. FindObsoleteFiles(&job_context, !s.ok());
  786. } // release the mutex
  787. // delete unnecessary files if any, this is done outside the mutex
  788. if (job_context.HaveSomethingToClean() ||
  789. job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) {
  790. // Have to flush the info logs before bg_compaction_scheduled_--
  791. // because if bg_flush_scheduled_ becomes 0 and the lock is
  792. // released, the deconstructor of DB can kick in and destroy all the
  793. // states of DB so info_log might not be available after that point.
  794. // It also applies to access other states that DB owns.
  795. log_buffer.FlushBufferToLog();
  796. if (job_context.HaveSomethingToDelete()) {
  797. // no mutex is locked here. No need to Unlock() and Lock() here.
  798. PurgeObsoleteFiles(job_context);
  799. }
  800. job_context.Clean();
  801. }
  802. return s;
  803. #endif // ROCKSDB_LITE
  804. }
  805. #ifndef ROCKSDB_LITE
  806. Status DBImpl::CompactFilesImpl(
  807. const CompactionOptions& compact_options, ColumnFamilyData* cfd,
  808. Version* version, const std::vector<std::string>& input_file_names,
  809. std::vector<std::string>* const output_file_names, const int output_level,
  810. int output_path_id, JobContext* job_context, LogBuffer* log_buffer,
  811. CompactionJobInfo* compaction_job_info) {
  812. mutex_.AssertHeld();
  813. if (shutting_down_.load(std::memory_order_acquire)) {
  814. return Status::ShutdownInProgress();
  815. }
  816. if (manual_compaction_paused_.load(std::memory_order_acquire)) {
  817. return Status::Incomplete(Status::SubCode::kManualCompactionPaused);
  818. }
  819. std::unordered_set<uint64_t> input_set;
  820. for (const auto& file_name : input_file_names) {
  821. input_set.insert(TableFileNameToNumber(file_name));
  822. }
  823. ColumnFamilyMetaData cf_meta;
  824. // TODO(yhchiang): can directly use version here if none of the
  825. // following functions call is pluggable to external developers.
  826. version->GetColumnFamilyMetaData(&cf_meta);
  827. if (output_path_id < 0) {
  828. if (cfd->ioptions()->cf_paths.size() == 1U) {
  829. output_path_id = 0;
  830. } else {
  831. return Status::NotSupported(
  832. "Automatic output path selection is not "
  833. "yet supported in CompactFiles()");
  834. }
  835. }
  836. Status s = cfd->compaction_picker()->SanitizeCompactionInputFiles(
  837. &input_set, cf_meta, output_level);
  838. if (!s.ok()) {
  839. return s;
  840. }
  841. std::vector<CompactionInputFiles> input_files;
  842. s = cfd->compaction_picker()->GetCompactionInputsFromFileNumbers(
  843. &input_files, &input_set, version->storage_info(), compact_options);
  844. if (!s.ok()) {
  845. return s;
  846. }
  847. for (const auto& inputs : input_files) {
  848. if (cfd->compaction_picker()->AreFilesInCompaction(inputs.files)) {
  849. return Status::Aborted(
  850. "Some of the necessary compaction input "
  851. "files are already being compacted");
  852. }
  853. }
  854. bool sfm_reserved_compact_space = false;
  855. // First check if we have enough room to do the compaction
  856. bool enough_room = EnoughRoomForCompaction(
  857. cfd, input_files, &sfm_reserved_compact_space, log_buffer);
  858. if (!enough_room) {
  859. // m's vars will get set properly at the end of this function,
  860. // as long as status == CompactionTooLarge
  861. return Status::CompactionTooLarge();
  862. }
  863. // At this point, CompactFiles will be run.
  864. bg_compaction_scheduled_++;
  865. std::unique_ptr<Compaction> c;
  866. assert(cfd->compaction_picker());
  867. c.reset(cfd->compaction_picker()->CompactFiles(
  868. compact_options, input_files, output_level, version->storage_info(),
  869. *cfd->GetLatestMutableCFOptions(), output_path_id));
  870. // we already sanitized the set of input files and checked for conflicts
  871. // without releasing the lock, so we're guaranteed a compaction can be formed.
  872. assert(c != nullptr);
  873. c->SetInputVersion(version);
  874. // deletion compaction currently not allowed in CompactFiles.
  875. assert(!c->deletion_compaction());
  876. std::vector<SequenceNumber> snapshot_seqs;
  877. SequenceNumber earliest_write_conflict_snapshot;
  878. SnapshotChecker* snapshot_checker;
  879. GetSnapshotContext(job_context, &snapshot_seqs,
  880. &earliest_write_conflict_snapshot, &snapshot_checker);
  881. std::unique_ptr<std::list<uint64_t>::iterator> pending_outputs_inserted_elem(
  882. new std::list<uint64_t>::iterator(
  883. CaptureCurrentFileNumberInPendingOutputs()));
  884. assert(is_snapshot_supported_ || snapshots_.empty());
  885. CompactionJobStats compaction_job_stats;
  886. CompactionJob compaction_job(
  887. job_context->job_id, c.get(), immutable_db_options_,
  888. file_options_for_compaction_, versions_.get(), &shutting_down_,
  889. preserve_deletes_seqnum_.load(), log_buffer, directories_.GetDbDir(),
  890. GetDataDir(c->column_family_data(), c->output_path_id()), stats_, &mutex_,
  891. &error_handler_, snapshot_seqs, earliest_write_conflict_snapshot,
  892. snapshot_checker, table_cache_, &event_logger_,
  893. c->mutable_cf_options()->paranoid_file_checks,
  894. c->mutable_cf_options()->report_bg_io_stats, dbname_,
  895. &compaction_job_stats, Env::Priority::USER, &manual_compaction_paused_);
  896. // Creating a compaction influences the compaction score because the score
  897. // takes running compactions into account (by skipping files that are already
  898. // being compacted). Since we just changed compaction score, we recalculate it
  899. // here.
  900. version->storage_info()->ComputeCompactionScore(*cfd->ioptions(),
  901. *c->mutable_cf_options());
  902. compaction_job.Prepare();
  903. mutex_.Unlock();
  904. TEST_SYNC_POINT("CompactFilesImpl:0");
  905. TEST_SYNC_POINT("CompactFilesImpl:1");
  906. compaction_job.Run();
  907. TEST_SYNC_POINT("CompactFilesImpl:2");
  908. TEST_SYNC_POINT("CompactFilesImpl:3");
  909. mutex_.Lock();
  910. Status status = compaction_job.Install(*c->mutable_cf_options());
  911. if (status.ok()) {
  912. InstallSuperVersionAndScheduleWork(c->column_family_data(),
  913. &job_context->superversion_contexts[0],
  914. *c->mutable_cf_options());
  915. }
  916. c->ReleaseCompactionFiles(s);
  917. #ifndef ROCKSDB_LITE
  918. // Need to make sure SstFileManager does its bookkeeping
  919. auto sfm = static_cast<SstFileManagerImpl*>(
  920. immutable_db_options_.sst_file_manager.get());
  921. if (sfm && sfm_reserved_compact_space) {
  922. sfm->OnCompactionCompletion(c.get());
  923. }
  924. #endif // ROCKSDB_LITE
  925. ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem);
  926. if (compaction_job_info != nullptr) {
  927. BuildCompactionJobInfo(cfd, c.get(), s, compaction_job_stats,
  928. job_context->job_id, version, compaction_job_info);
  929. }
  930. if (status.ok()) {
  931. // Done
  932. } else if (status.IsColumnFamilyDropped() || status.IsShutdownInProgress()) {
  933. // Ignore compaction errors found during shutting down
  934. } else if (status.IsManualCompactionPaused()) {
  935. // Don't report stopping manual compaction as error
  936. ROCKS_LOG_INFO(immutable_db_options_.info_log,
  937. "[%s] [JOB %d] Stopping manual compaction",
  938. c->column_family_data()->GetName().c_str(),
  939. job_context->job_id);
  940. } else {
  941. ROCKS_LOG_WARN(immutable_db_options_.info_log,
  942. "[%s] [JOB %d] Compaction error: %s",
  943. c->column_family_data()->GetName().c_str(),
  944. job_context->job_id, status.ToString().c_str());
  945. error_handler_.SetBGError(status, BackgroundErrorReason::kCompaction);
  946. }
  947. if (output_file_names != nullptr) {
  948. for (const auto newf : c->edit()->GetNewFiles()) {
  949. (*output_file_names)
  950. .push_back(TableFileName(c->immutable_cf_options()->cf_paths,
  951. newf.second.fd.GetNumber(),
  952. newf.second.fd.GetPathId()));
  953. }
  954. }
  955. c.reset();
  956. bg_compaction_scheduled_--;
  957. if (bg_compaction_scheduled_ == 0) {
  958. bg_cv_.SignalAll();
  959. }
  960. MaybeScheduleFlushOrCompaction();
  961. TEST_SYNC_POINT("CompactFilesImpl:End");
  962. return status;
  963. }
  964. #endif // ROCKSDB_LITE
  965. Status DBImpl::PauseBackgroundWork() {
  966. InstrumentedMutexLock guard_lock(&mutex_);
  967. bg_compaction_paused_++;
  968. while (bg_bottom_compaction_scheduled_ > 0 || bg_compaction_scheduled_ > 0 ||
  969. bg_flush_scheduled_ > 0) {
  970. bg_cv_.Wait();
  971. }
  972. bg_work_paused_++;
  973. return Status::OK();
  974. }
  975. Status DBImpl::ContinueBackgroundWork() {
  976. InstrumentedMutexLock guard_lock(&mutex_);
  977. if (bg_work_paused_ == 0) {
  978. return Status::InvalidArgument();
  979. }
  980. assert(bg_work_paused_ > 0);
  981. assert(bg_compaction_paused_ > 0);
  982. bg_compaction_paused_--;
  983. bg_work_paused_--;
  984. // It's sufficient to check just bg_work_paused_ here since
  985. // bg_work_paused_ is always no greater than bg_compaction_paused_
  986. if (bg_work_paused_ == 0) {
  987. MaybeScheduleFlushOrCompaction();
  988. }
  989. return Status::OK();
  990. }
  991. void DBImpl::NotifyOnCompactionBegin(ColumnFamilyData* cfd, Compaction* c,
  992. const Status& st,
  993. const CompactionJobStats& job_stats,
  994. int job_id) {
  995. #ifndef ROCKSDB_LITE
  996. if (immutable_db_options_.listeners.empty()) {
  997. return;
  998. }
  999. mutex_.AssertHeld();
  1000. if (shutting_down_.load(std::memory_order_acquire)) {
  1001. return;
  1002. }
  1003. if (c->is_manual_compaction() &&
  1004. manual_compaction_paused_.load(std::memory_order_acquire)) {
  1005. return;
  1006. }
  1007. Version* current = cfd->current();
  1008. current->Ref();
  1009. // release lock while notifying events
  1010. mutex_.Unlock();
  1011. TEST_SYNC_POINT("DBImpl::NotifyOnCompactionBegin::UnlockMutex");
  1012. {
  1013. CompactionJobInfo info{};
  1014. info.cf_name = cfd->GetName();
  1015. info.status = st;
  1016. info.thread_id = env_->GetThreadID();
  1017. info.job_id = job_id;
  1018. info.base_input_level = c->start_level();
  1019. info.output_level = c->output_level();
  1020. info.stats = job_stats;
  1021. info.table_properties = c->GetOutputTableProperties();
  1022. info.compaction_reason = c->compaction_reason();
  1023. info.compression = c->output_compression();
  1024. for (size_t i = 0; i < c->num_input_levels(); ++i) {
  1025. for (const auto fmd : *c->inputs(i)) {
  1026. const FileDescriptor& desc = fmd->fd;
  1027. const uint64_t file_number = desc.GetNumber();
  1028. auto fn = TableFileName(c->immutable_cf_options()->cf_paths,
  1029. file_number, desc.GetPathId());
  1030. info.input_files.push_back(fn);
  1031. info.input_file_infos.push_back(CompactionFileInfo{
  1032. static_cast<int>(i), file_number, fmd->oldest_blob_file_number});
  1033. if (info.table_properties.count(fn) == 0) {
  1034. std::shared_ptr<const TableProperties> tp;
  1035. auto s = current->GetTableProperties(&tp, fmd, &fn);
  1036. if (s.ok()) {
  1037. info.table_properties[fn] = tp;
  1038. }
  1039. }
  1040. }
  1041. }
  1042. for (const auto newf : c->edit()->GetNewFiles()) {
  1043. const FileMetaData& meta = newf.second;
  1044. const FileDescriptor& desc = meta.fd;
  1045. const uint64_t file_number = desc.GetNumber();
  1046. info.output_files.push_back(TableFileName(
  1047. c->immutable_cf_options()->cf_paths, file_number, desc.GetPathId()));
  1048. info.output_file_infos.push_back(CompactionFileInfo{
  1049. newf.first, file_number, meta.oldest_blob_file_number});
  1050. }
  1051. for (auto listener : immutable_db_options_.listeners) {
  1052. listener->OnCompactionBegin(this, info);
  1053. }
  1054. }
  1055. mutex_.Lock();
  1056. current->Unref();
  1057. #else
  1058. (void)cfd;
  1059. (void)c;
  1060. (void)st;
  1061. (void)job_stats;
  1062. (void)job_id;
  1063. #endif // ROCKSDB_LITE
  1064. }
  1065. void DBImpl::NotifyOnCompactionCompleted(
  1066. ColumnFamilyData* cfd, Compaction* c, const Status& st,
  1067. const CompactionJobStats& compaction_job_stats, const int job_id) {
  1068. #ifndef ROCKSDB_LITE
  1069. if (immutable_db_options_.listeners.size() == 0U) {
  1070. return;
  1071. }
  1072. mutex_.AssertHeld();
  1073. if (shutting_down_.load(std::memory_order_acquire)) {
  1074. return;
  1075. }
  1076. if (c->is_manual_compaction() &&
  1077. manual_compaction_paused_.load(std::memory_order_acquire)) {
  1078. return;
  1079. }
  1080. Version* current = cfd->current();
  1081. current->Ref();
  1082. // release lock while notifying events
  1083. mutex_.Unlock();
  1084. TEST_SYNC_POINT("DBImpl::NotifyOnCompactionCompleted::UnlockMutex");
  1085. {
  1086. CompactionJobInfo info{};
  1087. BuildCompactionJobInfo(cfd, c, st, compaction_job_stats, job_id, current,
  1088. &info);
  1089. for (auto listener : immutable_db_options_.listeners) {
  1090. listener->OnCompactionCompleted(this, info);
  1091. }
  1092. }
  1093. mutex_.Lock();
  1094. current->Unref();
  1095. // no need to signal bg_cv_ as it will be signaled at the end of the
  1096. // flush process.
  1097. #else
  1098. (void)cfd;
  1099. (void)c;
  1100. (void)st;
  1101. (void)compaction_job_stats;
  1102. (void)job_id;
  1103. #endif // ROCKSDB_LITE
  1104. }
  1105. // REQUIREMENT: block all background work by calling PauseBackgroundWork()
  1106. // before calling this function
  1107. Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) {
  1108. assert(level < cfd->NumberLevels());
  1109. if (target_level >= cfd->NumberLevels()) {
  1110. return Status::InvalidArgument("Target level exceeds number of levels");
  1111. }
  1112. SuperVersionContext sv_context(/* create_superversion */ true);
  1113. Status status;
  1114. InstrumentedMutexLock guard_lock(&mutex_);
  1115. // only allow one thread refitting
  1116. if (refitting_level_) {
  1117. ROCKS_LOG_INFO(immutable_db_options_.info_log,
  1118. "[ReFitLevel] another thread is refitting");
  1119. return Status::NotSupported("another thread is refitting");
  1120. }
  1121. refitting_level_ = true;
  1122. const MutableCFOptions mutable_cf_options = *cfd->GetLatestMutableCFOptions();
  1123. // move to a smaller level
  1124. int to_level = target_level;
  1125. if (target_level < 0) {
  1126. to_level = FindMinimumEmptyLevelFitting(cfd, mutable_cf_options, level);
  1127. }
  1128. auto* vstorage = cfd->current()->storage_info();
  1129. if (to_level > level) {
  1130. if (level == 0) {
  1131. return Status::NotSupported(
  1132. "Cannot change from level 0 to other levels.");
  1133. }
  1134. // Check levels are empty for a trivial move
  1135. for (int l = level + 1; l <= to_level; l++) {
  1136. if (vstorage->NumLevelFiles(l) > 0) {
  1137. return Status::NotSupported(
  1138. "Levels between source and target are not empty for a move.");
  1139. }
  1140. }
  1141. }
  1142. if (to_level != level) {
  1143. ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
  1144. "[%s] Before refitting:\n%s", cfd->GetName().c_str(),
  1145. cfd->current()->DebugString().data());
  1146. VersionEdit edit;
  1147. edit.SetColumnFamily(cfd->GetID());
  1148. for (const auto& f : vstorage->LevelFiles(level)) {
  1149. edit.DeleteFile(level, f->fd.GetNumber());
  1150. edit.AddFile(to_level, f->fd.GetNumber(), f->fd.GetPathId(),
  1151. f->fd.GetFileSize(), f->smallest, f->largest,
  1152. f->fd.smallest_seqno, f->fd.largest_seqno,
  1153. f->marked_for_compaction, f->oldest_blob_file_number,
  1154. f->oldest_ancester_time, f->file_creation_time,
  1155. f->file_checksum, f->file_checksum_func_name);
  1156. }
  1157. ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
  1158. "[%s] Apply version edit:\n%s", cfd->GetName().c_str(),
  1159. edit.DebugString().data());
  1160. status = versions_->LogAndApply(cfd, mutable_cf_options, &edit, &mutex_,
  1161. directories_.GetDbDir());
  1162. InstallSuperVersionAndScheduleWork(cfd, &sv_context, mutable_cf_options);
  1163. ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "[%s] LogAndApply: %s\n",
  1164. cfd->GetName().c_str(), status.ToString().data());
  1165. if (status.ok()) {
  1166. ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
  1167. "[%s] After refitting:\n%s", cfd->GetName().c_str(),
  1168. cfd->current()->DebugString().data());
  1169. }
  1170. }
  1171. sv_context.Clean();
  1172. refitting_level_ = false;
  1173. return status;
  1174. }
  1175. int DBImpl::NumberLevels(ColumnFamilyHandle* column_family) {
  1176. auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
  1177. return cfh->cfd()->NumberLevels();
  1178. }
  1179. int DBImpl::MaxMemCompactionLevel(ColumnFamilyHandle* /*column_family*/) {
  1180. return 0;
  1181. }
  1182. int DBImpl::Level0StopWriteTrigger(ColumnFamilyHandle* column_family) {
  1183. auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
  1184. InstrumentedMutexLock l(&mutex_);
  1185. return cfh->cfd()
  1186. ->GetSuperVersion()
  1187. ->mutable_cf_options.level0_stop_writes_trigger;
  1188. }
  1189. Status DBImpl::Flush(const FlushOptions& flush_options,
  1190. ColumnFamilyHandle* column_family) {
  1191. auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
  1192. ROCKS_LOG_INFO(immutable_db_options_.info_log, "[%s] Manual flush start.",
  1193. cfh->GetName().c_str());
  1194. Status s;
  1195. if (immutable_db_options_.atomic_flush) {
  1196. s = AtomicFlushMemTables({cfh->cfd()}, flush_options,
  1197. FlushReason::kManualFlush);
  1198. } else {
  1199. s = FlushMemTable(cfh->cfd(), flush_options, FlushReason::kManualFlush);
  1200. }
  1201. ROCKS_LOG_INFO(immutable_db_options_.info_log,
  1202. "[%s] Manual flush finished, status: %s\n",
  1203. cfh->GetName().c_str(), s.ToString().c_str());
  1204. return s;
  1205. }
  1206. Status DBImpl::Flush(const FlushOptions& flush_options,
  1207. const std::vector<ColumnFamilyHandle*>& column_families) {
  1208. Status s;
  1209. if (!immutable_db_options_.atomic_flush) {
  1210. for (auto cfh : column_families) {
  1211. s = Flush(flush_options, cfh);
  1212. if (!s.ok()) {
  1213. break;
  1214. }
  1215. }
  1216. } else {
  1217. ROCKS_LOG_INFO(immutable_db_options_.info_log,
  1218. "Manual atomic flush start.\n"
  1219. "=====Column families:=====");
  1220. for (auto cfh : column_families) {
  1221. auto cfhi = static_cast<ColumnFamilyHandleImpl*>(cfh);
  1222. ROCKS_LOG_INFO(immutable_db_options_.info_log, "%s",
  1223. cfhi->GetName().c_str());
  1224. }
  1225. ROCKS_LOG_INFO(immutable_db_options_.info_log,
  1226. "=====End of column families list=====");
  1227. autovector<ColumnFamilyData*> cfds;
  1228. std::for_each(column_families.begin(), column_families.end(),
  1229. [&cfds](ColumnFamilyHandle* elem) {
  1230. auto cfh = static_cast<ColumnFamilyHandleImpl*>(elem);
  1231. cfds.emplace_back(cfh->cfd());
  1232. });
  1233. s = AtomicFlushMemTables(cfds, flush_options, FlushReason::kManualFlush);
  1234. ROCKS_LOG_INFO(immutable_db_options_.info_log,
  1235. "Manual atomic flush finished, status: %s\n"
  1236. "=====Column families:=====",
  1237. s.ToString().c_str());
  1238. for (auto cfh : column_families) {
  1239. auto cfhi = static_cast<ColumnFamilyHandleImpl*>(cfh);
  1240. ROCKS_LOG_INFO(immutable_db_options_.info_log, "%s",
  1241. cfhi->GetName().c_str());
  1242. }
  1243. ROCKS_LOG_INFO(immutable_db_options_.info_log,
  1244. "=====End of column families list=====");
  1245. }
  1246. return s;
  1247. }
  1248. Status DBImpl::RunManualCompaction(
  1249. ColumnFamilyData* cfd, int input_level, int output_level,
  1250. const CompactRangeOptions& compact_range_options, const Slice* begin,
  1251. const Slice* end, bool exclusive, bool disallow_trivial_move,
  1252. uint64_t max_file_num_to_ignore) {
  1253. assert(input_level == ColumnFamilyData::kCompactAllLevels ||
  1254. input_level >= 0);
  1255. InternalKey begin_storage, end_storage;
  1256. CompactionArg* ca;
  1257. bool scheduled = false;
  1258. bool manual_conflict = false;
  1259. ManualCompactionState manual;
  1260. manual.cfd = cfd;
  1261. manual.input_level = input_level;
  1262. manual.output_level = output_level;
  1263. manual.output_path_id = compact_range_options.target_path_id;
  1264. manual.done = false;
  1265. manual.in_progress = false;
  1266. manual.incomplete = false;
  1267. manual.exclusive = exclusive;
  1268. manual.disallow_trivial_move = disallow_trivial_move;
  1269. // For universal compaction, we enforce every manual compaction to compact
  1270. // all files.
  1271. if (begin == nullptr ||
  1272. cfd->ioptions()->compaction_style == kCompactionStyleUniversal ||
  1273. cfd->ioptions()->compaction_style == kCompactionStyleFIFO) {
  1274. manual.begin = nullptr;
  1275. } else {
  1276. begin_storage.SetMinPossibleForUserKey(*begin);
  1277. manual.begin = &begin_storage;
  1278. }
  1279. if (end == nullptr ||
  1280. cfd->ioptions()->compaction_style == kCompactionStyleUniversal ||
  1281. cfd->ioptions()->compaction_style == kCompactionStyleFIFO) {
  1282. manual.end = nullptr;
  1283. } else {
  1284. end_storage.SetMaxPossibleForUserKey(*end);
  1285. manual.end = &end_storage;
  1286. }
  1287. TEST_SYNC_POINT("DBImpl::RunManualCompaction:0");
  1288. TEST_SYNC_POINT("DBImpl::RunManualCompaction:1");
  1289. InstrumentedMutexLock l(&mutex_);
  1290. // When a manual compaction arrives, temporarily disable scheduling of
  1291. // non-manual compactions and wait until the number of scheduled compaction
  1292. // jobs drops to zero. This is needed to ensure that this manual compaction
  1293. // can compact any range of keys/files.
  1294. //
  1295. // HasPendingManualCompaction() is true when at least one thread is inside
  1296. // RunManualCompaction(), i.e. during that time no other compaction will
  1297. // get scheduled (see MaybeScheduleFlushOrCompaction).
  1298. //
  1299. // Note that the following loop doesn't stop more that one thread calling
  1300. // RunManualCompaction() from getting to the second while loop below.
  1301. // However, only one of them will actually schedule compaction, while
  1302. // others will wait on a condition variable until it completes.
  1303. AddManualCompaction(&manual);
  1304. TEST_SYNC_POINT_CALLBACK("DBImpl::RunManualCompaction:NotScheduled", &mutex_);
  1305. if (exclusive) {
  1306. while (bg_bottom_compaction_scheduled_ > 0 ||
  1307. bg_compaction_scheduled_ > 0) {
  1308. TEST_SYNC_POINT("DBImpl::RunManualCompaction:WaitScheduled");
  1309. ROCKS_LOG_INFO(
  1310. immutable_db_options_.info_log,
  1311. "[%s] Manual compaction waiting for all other scheduled background "
  1312. "compactions to finish",
  1313. cfd->GetName().c_str());
  1314. bg_cv_.Wait();
  1315. }
  1316. }
  1317. ROCKS_LOG_INFO(immutable_db_options_.info_log,
  1318. "[%s] Manual compaction starting", cfd->GetName().c_str());
  1319. LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL,
  1320. immutable_db_options_.info_log.get());
  1321. // We don't check bg_error_ here, because if we get the error in compaction,
  1322. // the compaction will set manual.status to bg_error_ and set manual.done to
  1323. // true.
  1324. while (!manual.done) {
  1325. assert(HasPendingManualCompaction());
  1326. manual_conflict = false;
  1327. Compaction* compaction = nullptr;
  1328. if (ShouldntRunManualCompaction(&manual) || (manual.in_progress == true) ||
  1329. scheduled ||
  1330. (((manual.manual_end = &manual.tmp_storage1) != nullptr) &&
  1331. ((compaction = manual.cfd->CompactRange(
  1332. *manual.cfd->GetLatestMutableCFOptions(), manual.input_level,
  1333. manual.output_level, compact_range_options, manual.begin,
  1334. manual.end, &manual.manual_end, &manual_conflict,
  1335. max_file_num_to_ignore)) == nullptr &&
  1336. manual_conflict))) {
  1337. // exclusive manual compactions should not see a conflict during
  1338. // CompactRange
  1339. assert(!exclusive || !manual_conflict);
  1340. // Running either this or some other manual compaction
  1341. bg_cv_.Wait();
  1342. if (scheduled && manual.incomplete == true) {
  1343. assert(!manual.in_progress);
  1344. scheduled = false;
  1345. manual.incomplete = false;
  1346. }
  1347. } else if (!scheduled) {
  1348. if (compaction == nullptr) {
  1349. manual.done = true;
  1350. bg_cv_.SignalAll();
  1351. continue;
  1352. }
  1353. ca = new CompactionArg;
  1354. ca->db = this;
  1355. ca->prepicked_compaction = new PrepickedCompaction;
  1356. ca->prepicked_compaction->manual_compaction_state = &manual;
  1357. ca->prepicked_compaction->compaction = compaction;
  1358. if (!RequestCompactionToken(
  1359. cfd, true, &ca->prepicked_compaction->task_token, &log_buffer)) {
  1360. // Don't throttle manual compaction, only count outstanding tasks.
  1361. assert(false);
  1362. }
  1363. manual.incomplete = false;
  1364. bg_compaction_scheduled_++;
  1365. env_->Schedule(&DBImpl::BGWorkCompaction, ca, Env::Priority::LOW, this,
  1366. &DBImpl::UnscheduleCompactionCallback);
  1367. scheduled = true;
  1368. }
  1369. }
  1370. log_buffer.FlushBufferToLog();
  1371. assert(!manual.in_progress);
  1372. assert(HasPendingManualCompaction());
  1373. RemoveManualCompaction(&manual);
  1374. bg_cv_.SignalAll();
  1375. return manual.status;
  1376. }
  1377. void DBImpl::GenerateFlushRequest(const autovector<ColumnFamilyData*>& cfds,
  1378. FlushRequest* req) {
  1379. assert(req != nullptr);
  1380. req->reserve(cfds.size());
  1381. for (const auto cfd : cfds) {
  1382. if (nullptr == cfd) {
  1383. // cfd may be null, see DBImpl::ScheduleFlushes
  1384. continue;
  1385. }
  1386. uint64_t max_memtable_id = cfd->imm()->GetLatestMemTableID();
  1387. req->emplace_back(cfd, max_memtable_id);
  1388. }
  1389. }
  1390. Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
  1391. const FlushOptions& flush_options,
  1392. FlushReason flush_reason, bool writes_stopped) {
  1393. Status s;
  1394. uint64_t flush_memtable_id = 0;
  1395. if (!flush_options.allow_write_stall) {
  1396. bool flush_needed = true;
  1397. s = WaitUntilFlushWouldNotStallWrites(cfd, &flush_needed);
  1398. TEST_SYNC_POINT("DBImpl::FlushMemTable:StallWaitDone");
  1399. if (!s.ok() || !flush_needed) {
  1400. return s;
  1401. }
  1402. }
  1403. FlushRequest flush_req;
  1404. {
  1405. WriteContext context;
  1406. InstrumentedMutexLock guard_lock(&mutex_);
  1407. WriteThread::Writer w;
  1408. WriteThread::Writer nonmem_w;
  1409. if (!writes_stopped) {
  1410. write_thread_.EnterUnbatched(&w, &mutex_);
  1411. if (two_write_queues_) {
  1412. nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
  1413. }
  1414. }
  1415. WaitForPendingWrites();
  1416. if (!cfd->mem()->IsEmpty() || !cached_recoverable_state_empty_.load()) {
  1417. s = SwitchMemtable(cfd, &context);
  1418. }
  1419. if (s.ok()) {
  1420. if (cfd->imm()->NumNotFlushed() != 0 || !cfd->mem()->IsEmpty() ||
  1421. !cached_recoverable_state_empty_.load()) {
  1422. flush_memtable_id = cfd->imm()->GetLatestMemTableID();
  1423. flush_req.emplace_back(cfd, flush_memtable_id);
  1424. }
  1425. if (immutable_db_options_.persist_stats_to_disk) {
  1426. ColumnFamilyData* cfd_stats =
  1427. versions_->GetColumnFamilySet()->GetColumnFamily(
  1428. kPersistentStatsColumnFamilyName);
  1429. if (cfd_stats != nullptr && cfd_stats != cfd &&
  1430. !cfd_stats->mem()->IsEmpty()) {
  1431. // only force flush stats CF when it will be the only CF lagging
  1432. // behind after the current flush
  1433. bool stats_cf_flush_needed = true;
  1434. for (auto* loop_cfd : *versions_->GetColumnFamilySet()) {
  1435. if (loop_cfd == cfd_stats || loop_cfd == cfd) {
  1436. continue;
  1437. }
  1438. if (loop_cfd->GetLogNumber() <= cfd_stats->GetLogNumber()) {
  1439. stats_cf_flush_needed = false;
  1440. }
  1441. }
  1442. if (stats_cf_flush_needed) {
  1443. ROCKS_LOG_INFO(immutable_db_options_.info_log,
  1444. "Force flushing stats CF with manual flush of %s "
  1445. "to avoid holding old logs",
  1446. cfd->GetName().c_str());
  1447. s = SwitchMemtable(cfd_stats, &context);
  1448. flush_memtable_id = cfd_stats->imm()->GetLatestMemTableID();
  1449. flush_req.emplace_back(cfd_stats, flush_memtable_id);
  1450. }
  1451. }
  1452. }
  1453. }
  1454. if (s.ok() && !flush_req.empty()) {
  1455. for (auto& elem : flush_req) {
  1456. ColumnFamilyData* loop_cfd = elem.first;
  1457. loop_cfd->imm()->FlushRequested();
  1458. }
  1459. // If the caller wants to wait for this flush to complete, it indicates
  1460. // that the caller expects the ColumnFamilyData not to be free'ed by
  1461. // other threads which may drop the column family concurrently.
  1462. // Therefore, we increase the cfd's ref count.
  1463. if (flush_options.wait) {
  1464. for (auto& elem : flush_req) {
  1465. ColumnFamilyData* loop_cfd = elem.first;
  1466. loop_cfd->Ref();
  1467. }
  1468. }
  1469. SchedulePendingFlush(flush_req, flush_reason);
  1470. MaybeScheduleFlushOrCompaction();
  1471. }
  1472. if (!writes_stopped) {
  1473. write_thread_.ExitUnbatched(&w);
  1474. if (two_write_queues_) {
  1475. nonmem_write_thread_.ExitUnbatched(&nonmem_w);
  1476. }
  1477. }
  1478. }
  1479. TEST_SYNC_POINT("DBImpl::FlushMemTable:AfterScheduleFlush");
  1480. TEST_SYNC_POINT("DBImpl::FlushMemTable:BeforeWaitForBgFlush");
  1481. if (s.ok() && flush_options.wait) {
  1482. autovector<ColumnFamilyData*> cfds;
  1483. autovector<const uint64_t*> flush_memtable_ids;
  1484. for (auto& iter : flush_req) {
  1485. cfds.push_back(iter.first);
  1486. flush_memtable_ids.push_back(&(iter.second));
  1487. }
  1488. s = WaitForFlushMemTables(cfds, flush_memtable_ids,
  1489. (flush_reason == FlushReason::kErrorRecovery));
  1490. InstrumentedMutexLock lock_guard(&mutex_);
  1491. for (auto* tmp_cfd : cfds) {
  1492. tmp_cfd->UnrefAndTryDelete();
  1493. }
  1494. }
  1495. TEST_SYNC_POINT("DBImpl::FlushMemTable:FlushMemTableFinished");
  1496. return s;
  1497. }
  1498. // Flush all elements in 'column_family_datas'
  1499. // and atomically record the result to the MANIFEST.
  1500. Status DBImpl::AtomicFlushMemTables(
  1501. const autovector<ColumnFamilyData*>& column_family_datas,
  1502. const FlushOptions& flush_options, FlushReason flush_reason,
  1503. bool writes_stopped) {
  1504. Status s;
  1505. if (!flush_options.allow_write_stall) {
  1506. int num_cfs_to_flush = 0;
  1507. for (auto cfd : column_family_datas) {
  1508. bool flush_needed = true;
  1509. s = WaitUntilFlushWouldNotStallWrites(cfd, &flush_needed);
  1510. if (!s.ok()) {
  1511. return s;
  1512. } else if (flush_needed) {
  1513. ++num_cfs_to_flush;
  1514. }
  1515. }
  1516. if (0 == num_cfs_to_flush) {
  1517. return s;
  1518. }
  1519. }
  1520. FlushRequest flush_req;
  1521. autovector<ColumnFamilyData*> cfds;
  1522. {
  1523. WriteContext context;
  1524. InstrumentedMutexLock guard_lock(&mutex_);
  1525. WriteThread::Writer w;
  1526. WriteThread::Writer nonmem_w;
  1527. if (!writes_stopped) {
  1528. write_thread_.EnterUnbatched(&w, &mutex_);
  1529. if (two_write_queues_) {
  1530. nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
  1531. }
  1532. }
  1533. WaitForPendingWrites();
  1534. for (auto cfd : column_family_datas) {
  1535. if (cfd->IsDropped()) {
  1536. continue;
  1537. }
  1538. if (cfd->imm()->NumNotFlushed() != 0 || !cfd->mem()->IsEmpty() ||
  1539. !cached_recoverable_state_empty_.load()) {
  1540. cfds.emplace_back(cfd);
  1541. }
  1542. }
  1543. for (auto cfd : cfds) {
  1544. if (cfd->mem()->IsEmpty() && cached_recoverable_state_empty_.load()) {
  1545. continue;
  1546. }
  1547. cfd->Ref();
  1548. s = SwitchMemtable(cfd, &context);
  1549. cfd->UnrefAndTryDelete();
  1550. if (!s.ok()) {
  1551. break;
  1552. }
  1553. }
  1554. if (s.ok()) {
  1555. AssignAtomicFlushSeq(cfds);
  1556. for (auto cfd : cfds) {
  1557. cfd->imm()->FlushRequested();
  1558. }
  1559. // If the caller wants to wait for this flush to complete, it indicates
  1560. // that the caller expects the ColumnFamilyData not to be free'ed by
  1561. // other threads which may drop the column family concurrently.
  1562. // Therefore, we increase the cfd's ref count.
  1563. if (flush_options.wait) {
  1564. for (auto cfd : cfds) {
  1565. cfd->Ref();
  1566. }
  1567. }
  1568. GenerateFlushRequest(cfds, &flush_req);
  1569. SchedulePendingFlush(flush_req, flush_reason);
  1570. MaybeScheduleFlushOrCompaction();
  1571. }
  1572. if (!writes_stopped) {
  1573. write_thread_.ExitUnbatched(&w);
  1574. if (two_write_queues_) {
  1575. nonmem_write_thread_.ExitUnbatched(&nonmem_w);
  1576. }
  1577. }
  1578. }
  1579. TEST_SYNC_POINT("DBImpl::AtomicFlushMemTables:AfterScheduleFlush");
  1580. TEST_SYNC_POINT("DBImpl::AtomicFlushMemTables:BeforeWaitForBgFlush");
  1581. if (s.ok() && flush_options.wait) {
  1582. autovector<const uint64_t*> flush_memtable_ids;
  1583. for (auto& iter : flush_req) {
  1584. flush_memtable_ids.push_back(&(iter.second));
  1585. }
  1586. s = WaitForFlushMemTables(cfds, flush_memtable_ids,
  1587. (flush_reason == FlushReason::kErrorRecovery));
  1588. InstrumentedMutexLock lock_guard(&mutex_);
  1589. for (auto* cfd : cfds) {
  1590. cfd->UnrefAndTryDelete();
  1591. }
  1592. }
  1593. return s;
  1594. }
  1595. // Calling FlushMemTable(), whether from DB::Flush() or from Backup Engine, can
  1596. // cause write stall, for example if one memtable is being flushed already.
  1597. // This method tries to avoid write stall (similar to CompactRange() behavior)
  1598. // it emulates how the SuperVersion / LSM would change if flush happens, checks
  1599. // it against various constrains and delays flush if it'd cause write stall.
  1600. // Called should check status and flush_needed to see if flush already happened.
  1601. Status DBImpl::WaitUntilFlushWouldNotStallWrites(ColumnFamilyData* cfd,
  1602. bool* flush_needed) {
  1603. {
  1604. *flush_needed = true;
  1605. InstrumentedMutexLock l(&mutex_);
  1606. uint64_t orig_active_memtable_id = cfd->mem()->GetID();
  1607. WriteStallCondition write_stall_condition = WriteStallCondition::kNormal;
  1608. do {
  1609. if (write_stall_condition != WriteStallCondition::kNormal) {
  1610. // Same error handling as user writes: Don't wait if there's a
  1611. // background error, even if it's a soft error. We might wait here
  1612. // indefinitely as the pending flushes/compactions may never finish
  1613. // successfully, resulting in the stall condition lasting indefinitely
  1614. if (error_handler_.IsBGWorkStopped()) {
  1615. return error_handler_.GetBGError();
  1616. }
  1617. TEST_SYNC_POINT("DBImpl::WaitUntilFlushWouldNotStallWrites:StallWait");
  1618. ROCKS_LOG_INFO(immutable_db_options_.info_log,
  1619. "[%s] WaitUntilFlushWouldNotStallWrites"
  1620. " waiting on stall conditions to clear",
  1621. cfd->GetName().c_str());
  1622. bg_cv_.Wait();
  1623. }
  1624. if (cfd->IsDropped()) {
  1625. return Status::ColumnFamilyDropped();
  1626. }
  1627. if (shutting_down_.load(std::memory_order_acquire)) {
  1628. return Status::ShutdownInProgress();
  1629. }
  1630. uint64_t earliest_memtable_id =
  1631. std::min(cfd->mem()->GetID(), cfd->imm()->GetEarliestMemTableID());
  1632. if (earliest_memtable_id > orig_active_memtable_id) {
  1633. // We waited so long that the memtable we were originally waiting on was
  1634. // flushed.
  1635. *flush_needed = false;
  1636. return Status::OK();
  1637. }
  1638. const auto& mutable_cf_options = *cfd->GetLatestMutableCFOptions();
  1639. const auto* vstorage = cfd->current()->storage_info();
  1640. // Skip stalling check if we're below auto-flush and auto-compaction
  1641. // triggers. If it stalled in these conditions, that'd mean the stall
  1642. // triggers are so low that stalling is needed for any background work. In
  1643. // that case we shouldn't wait since background work won't be scheduled.
  1644. if (cfd->imm()->NumNotFlushed() <
  1645. cfd->ioptions()->min_write_buffer_number_to_merge &&
  1646. vstorage->l0_delay_trigger_count() <
  1647. mutable_cf_options.level0_file_num_compaction_trigger) {
  1648. break;
  1649. }
  1650. // check whether one extra immutable memtable or an extra L0 file would
  1651. // cause write stalling mode to be entered. It could still enter stall
  1652. // mode due to pending compaction bytes, but that's less common
  1653. write_stall_condition =
  1654. ColumnFamilyData::GetWriteStallConditionAndCause(
  1655. cfd->imm()->NumNotFlushed() + 1,
  1656. vstorage->l0_delay_trigger_count() + 1,
  1657. vstorage->estimated_compaction_needed_bytes(), mutable_cf_options)
  1658. .first;
  1659. } while (write_stall_condition != WriteStallCondition::kNormal);
  1660. }
  1661. return Status::OK();
  1662. }
  1663. // Wait for memtables to be flushed for multiple column families.
  1664. // let N = cfds.size()
  1665. // for i in [0, N),
  1666. // 1) if flush_memtable_ids[i] is not null, then the memtables with lower IDs
  1667. // have to be flushed for THIS column family;
  1668. // 2) if flush_memtable_ids[i] is null, then all memtables in THIS column
  1669. // family have to be flushed.
  1670. // Finish waiting when ALL column families finish flushing memtables.
  1671. // resuming_from_bg_err indicates whether the caller is trying to resume from
  1672. // background error or in normal processing.
  1673. Status DBImpl::WaitForFlushMemTables(
  1674. const autovector<ColumnFamilyData*>& cfds,
  1675. const autovector<const uint64_t*>& flush_memtable_ids,
  1676. bool resuming_from_bg_err) {
  1677. int num = static_cast<int>(cfds.size());
  1678. // Wait until the compaction completes
  1679. InstrumentedMutexLock l(&mutex_);
  1680. // If the caller is trying to resume from bg error, then
  1681. // error_handler_.IsDBStopped() is true.
  1682. while (resuming_from_bg_err || !error_handler_.IsDBStopped()) {
  1683. if (shutting_down_.load(std::memory_order_acquire)) {
  1684. return Status::ShutdownInProgress();
  1685. }
  1686. // If an error has occurred during resumption, then no need to wait.
  1687. if (!error_handler_.GetRecoveryError().ok()) {
  1688. break;
  1689. }
  1690. // Number of column families that have been dropped.
  1691. int num_dropped = 0;
  1692. // Number of column families that have finished flush.
  1693. int num_finished = 0;
  1694. for (int i = 0; i < num; ++i) {
  1695. if (cfds[i]->IsDropped()) {
  1696. ++num_dropped;
  1697. } else if (cfds[i]->imm()->NumNotFlushed() == 0 ||
  1698. (flush_memtable_ids[i] != nullptr &&
  1699. cfds[i]->imm()->GetEarliestMemTableID() >
  1700. *flush_memtable_ids[i])) {
  1701. ++num_finished;
  1702. }
  1703. }
  1704. if (1 == num_dropped && 1 == num) {
  1705. return Status::InvalidArgument("Cannot flush a dropped CF");
  1706. }
  1707. // Column families involved in this flush request have either been dropped
  1708. // or finished flush. Then it's time to finish waiting.
  1709. if (num_dropped + num_finished == num) {
  1710. break;
  1711. }
  1712. bg_cv_.Wait();
  1713. }
  1714. Status s;
  1715. // If not resuming from bg error, and an error has caused the DB to stop,
  1716. // then report the bg error to caller.
  1717. if (!resuming_from_bg_err && error_handler_.IsDBStopped()) {
  1718. s = error_handler_.GetBGError();
  1719. }
  1720. return s;
  1721. }
  1722. Status DBImpl::EnableAutoCompaction(
  1723. const std::vector<ColumnFamilyHandle*>& column_family_handles) {
  1724. Status s;
  1725. for (auto cf_ptr : column_family_handles) {
  1726. Status status =
  1727. this->SetOptions(cf_ptr, {{"disable_auto_compactions", "false"}});
  1728. if (!status.ok()) {
  1729. s = status;
  1730. }
  1731. }
  1732. return s;
  1733. }
  1734. void DBImpl::DisableManualCompaction() {
  1735. manual_compaction_paused_.store(true, std::memory_order_release);
  1736. }
  1737. void DBImpl::EnableManualCompaction() {
  1738. manual_compaction_paused_.store(false, std::memory_order_release);
  1739. }
  1740. void DBImpl::MaybeScheduleFlushOrCompaction() {
  1741. mutex_.AssertHeld();
  1742. if (!opened_successfully_) {
  1743. // Compaction may introduce data race to DB open
  1744. return;
  1745. }
  1746. if (bg_work_paused_ > 0) {
  1747. // we paused the background work
  1748. return;
  1749. } else if (error_handler_.IsBGWorkStopped() &&
  1750. !error_handler_.IsRecoveryInProgress()) {
  1751. // There has been a hard error and this call is not part of the recovery
  1752. // sequence. Bail out here so we don't get into an endless loop of
  1753. // scheduling BG work which will again call this function
  1754. return;
  1755. } else if (shutting_down_.load(std::memory_order_acquire)) {
  1756. // DB is being deleted; no more background compactions
  1757. return;
  1758. }
  1759. auto bg_job_limits = GetBGJobLimits();
  1760. bool is_flush_pool_empty =
  1761. env_->GetBackgroundThreads(Env::Priority::HIGH) == 0;
  1762. while (!is_flush_pool_empty && unscheduled_flushes_ > 0 &&
  1763. bg_flush_scheduled_ < bg_job_limits.max_flushes) {
  1764. bg_flush_scheduled_++;
  1765. FlushThreadArg* fta = new FlushThreadArg;
  1766. fta->db_ = this;
  1767. fta->thread_pri_ = Env::Priority::HIGH;
  1768. env_->Schedule(&DBImpl::BGWorkFlush, fta, Env::Priority::HIGH, this,
  1769. &DBImpl::UnscheduleFlushCallback);
  1770. --unscheduled_flushes_;
  1771. TEST_SYNC_POINT_CALLBACK(
  1772. "DBImpl::MaybeScheduleFlushOrCompaction:AfterSchedule:0",
  1773. &unscheduled_flushes_);
  1774. }
  1775. // special case -- if high-pri (flush) thread pool is empty, then schedule
  1776. // flushes in low-pri (compaction) thread pool.
  1777. if (is_flush_pool_empty) {
  1778. while (unscheduled_flushes_ > 0 &&
  1779. bg_flush_scheduled_ + bg_compaction_scheduled_ <
  1780. bg_job_limits.max_flushes) {
  1781. bg_flush_scheduled_++;
  1782. FlushThreadArg* fta = new FlushThreadArg;
  1783. fta->db_ = this;
  1784. fta->thread_pri_ = Env::Priority::LOW;
  1785. env_->Schedule(&DBImpl::BGWorkFlush, fta, Env::Priority::LOW, this,
  1786. &DBImpl::UnscheduleFlushCallback);
  1787. --unscheduled_flushes_;
  1788. }
  1789. }
  1790. if (bg_compaction_paused_ > 0) {
  1791. // we paused the background compaction
  1792. return;
  1793. } else if (error_handler_.IsBGWorkStopped()) {
  1794. // Compaction is not part of the recovery sequence from a hard error. We
  1795. // might get here because recovery might do a flush and install a new
  1796. // super version, which will try to schedule pending compactions. Bail
  1797. // out here and let the higher level recovery handle compactions
  1798. return;
  1799. }
  1800. if (HasExclusiveManualCompaction()) {
  1801. // only manual compactions are allowed to run. don't schedule automatic
  1802. // compactions
  1803. TEST_SYNC_POINT("DBImpl::MaybeScheduleFlushOrCompaction:Conflict");
  1804. return;
  1805. }
  1806. while (bg_compaction_scheduled_ < bg_job_limits.max_compactions &&
  1807. unscheduled_compactions_ > 0) {
  1808. CompactionArg* ca = new CompactionArg;
  1809. ca->db = this;
  1810. ca->prepicked_compaction = nullptr;
  1811. bg_compaction_scheduled_++;
  1812. unscheduled_compactions_--;
  1813. env_->Schedule(&DBImpl::BGWorkCompaction, ca, Env::Priority::LOW, this,
  1814. &DBImpl::UnscheduleCompactionCallback);
  1815. }
  1816. }
  1817. DBImpl::BGJobLimits DBImpl::GetBGJobLimits() const {
  1818. mutex_.AssertHeld();
  1819. return GetBGJobLimits(immutable_db_options_.max_background_flushes,
  1820. mutable_db_options_.max_background_compactions,
  1821. mutable_db_options_.max_background_jobs,
  1822. write_controller_.NeedSpeedupCompaction());
  1823. }
  1824. DBImpl::BGJobLimits DBImpl::GetBGJobLimits(int max_background_flushes,
  1825. int max_background_compactions,
  1826. int max_background_jobs,
  1827. bool parallelize_compactions) {
  1828. BGJobLimits res;
  1829. if (max_background_flushes == -1 && max_background_compactions == -1) {
  1830. // for our first stab implementing max_background_jobs, simply allocate a
  1831. // quarter of the threads to flushes.
  1832. res.max_flushes = std::max(1, max_background_jobs / 4);
  1833. res.max_compactions = std::max(1, max_background_jobs - res.max_flushes);
  1834. } else {
  1835. // compatibility code in case users haven't migrated to max_background_jobs,
  1836. // which automatically computes flush/compaction limits
  1837. res.max_flushes = std::max(1, max_background_flushes);
  1838. res.max_compactions = std::max(1, max_background_compactions);
  1839. }
  1840. if (!parallelize_compactions) {
  1841. // throttle background compactions until we deem necessary
  1842. res.max_compactions = 1;
  1843. }
  1844. return res;
  1845. }
  1846. void DBImpl::AddToCompactionQueue(ColumnFamilyData* cfd) {
  1847. assert(!cfd->queued_for_compaction());
  1848. cfd->Ref();
  1849. compaction_queue_.push_back(cfd);
  1850. cfd->set_queued_for_compaction(true);
  1851. }
  1852. ColumnFamilyData* DBImpl::PopFirstFromCompactionQueue() {
  1853. assert(!compaction_queue_.empty());
  1854. auto cfd = *compaction_queue_.begin();
  1855. compaction_queue_.pop_front();
  1856. assert(cfd->queued_for_compaction());
  1857. cfd->set_queued_for_compaction(false);
  1858. return cfd;
  1859. }
  1860. DBImpl::FlushRequest DBImpl::PopFirstFromFlushQueue() {
  1861. assert(!flush_queue_.empty());
  1862. FlushRequest flush_req = flush_queue_.front();
  1863. flush_queue_.pop_front();
  1864. // TODO: need to unset flush reason?
  1865. return flush_req;
  1866. }
  1867. ColumnFamilyData* DBImpl::PickCompactionFromQueue(
  1868. std::unique_ptr<TaskLimiterToken>* token, LogBuffer* log_buffer) {
  1869. assert(!compaction_queue_.empty());
  1870. assert(*token == nullptr);
  1871. autovector<ColumnFamilyData*> throttled_candidates;
  1872. ColumnFamilyData* cfd = nullptr;
  1873. while (!compaction_queue_.empty()) {
  1874. auto first_cfd = *compaction_queue_.begin();
  1875. compaction_queue_.pop_front();
  1876. assert(first_cfd->queued_for_compaction());
  1877. if (!RequestCompactionToken(first_cfd, false, token, log_buffer)) {
  1878. throttled_candidates.push_back(first_cfd);
  1879. continue;
  1880. }
  1881. cfd = first_cfd;
  1882. cfd->set_queued_for_compaction(false);
  1883. break;
  1884. }
  1885. // Add throttled compaction candidates back to queue in the original order.
  1886. for (auto iter = throttled_candidates.rbegin();
  1887. iter != throttled_candidates.rend(); ++iter) {
  1888. compaction_queue_.push_front(*iter);
  1889. }
  1890. return cfd;
  1891. }
  1892. void DBImpl::SchedulePendingFlush(const FlushRequest& flush_req,
  1893. FlushReason flush_reason) {
  1894. if (flush_req.empty()) {
  1895. return;
  1896. }
  1897. for (auto& iter : flush_req) {
  1898. ColumnFamilyData* cfd = iter.first;
  1899. cfd->Ref();
  1900. cfd->SetFlushReason(flush_reason);
  1901. }
  1902. ++unscheduled_flushes_;
  1903. flush_queue_.push_back(flush_req);
  1904. }
  1905. void DBImpl::SchedulePendingCompaction(ColumnFamilyData* cfd) {
  1906. if (!cfd->queued_for_compaction() && cfd->NeedsCompaction()) {
  1907. AddToCompactionQueue(cfd);
  1908. ++unscheduled_compactions_;
  1909. }
  1910. }
  1911. void DBImpl::SchedulePendingPurge(std::string fname, std::string dir_to_sync,
  1912. FileType type, uint64_t number, int job_id) {
  1913. mutex_.AssertHeld();
  1914. PurgeFileInfo file_info(fname, dir_to_sync, type, number, job_id);
  1915. purge_files_.insert({{number, std::move(file_info)}});
  1916. }
  1917. void DBImpl::BGWorkFlush(void* arg) {
  1918. FlushThreadArg fta = *(reinterpret_cast<FlushThreadArg*>(arg));
  1919. delete reinterpret_cast<FlushThreadArg*>(arg);
  1920. IOSTATS_SET_THREAD_POOL_ID(fta.thread_pri_);
  1921. TEST_SYNC_POINT("DBImpl::BGWorkFlush");
  1922. static_cast_with_check<DBImpl, DB>(fta.db_)->BackgroundCallFlush(
  1923. fta.thread_pri_);
  1924. TEST_SYNC_POINT("DBImpl::BGWorkFlush:done");
  1925. }
  1926. void DBImpl::BGWorkCompaction(void* arg) {
  1927. CompactionArg ca = *(reinterpret_cast<CompactionArg*>(arg));
  1928. delete reinterpret_cast<CompactionArg*>(arg);
  1929. IOSTATS_SET_THREAD_POOL_ID(Env::Priority::LOW);
  1930. TEST_SYNC_POINT("DBImpl::BGWorkCompaction");
  1931. auto prepicked_compaction =
  1932. static_cast<PrepickedCompaction*>(ca.prepicked_compaction);
  1933. static_cast_with_check<DBImpl, DB>(ca.db)->BackgroundCallCompaction(
  1934. prepicked_compaction, Env::Priority::LOW);
  1935. delete prepicked_compaction;
  1936. }
  1937. void DBImpl::BGWorkBottomCompaction(void* arg) {
  1938. CompactionArg ca = *(static_cast<CompactionArg*>(arg));
  1939. delete static_cast<CompactionArg*>(arg);
  1940. IOSTATS_SET_THREAD_POOL_ID(Env::Priority::BOTTOM);
  1941. TEST_SYNC_POINT("DBImpl::BGWorkBottomCompaction");
  1942. auto* prepicked_compaction = ca.prepicked_compaction;
  1943. assert(prepicked_compaction && prepicked_compaction->compaction &&
  1944. !prepicked_compaction->manual_compaction_state);
  1945. ca.db->BackgroundCallCompaction(prepicked_compaction, Env::Priority::BOTTOM);
  1946. delete prepicked_compaction;
  1947. }
  1948. void DBImpl::BGWorkPurge(void* db) {
  1949. IOSTATS_SET_THREAD_POOL_ID(Env::Priority::HIGH);
  1950. TEST_SYNC_POINT("DBImpl::BGWorkPurge:start");
  1951. reinterpret_cast<DBImpl*>(db)->BackgroundCallPurge();
  1952. TEST_SYNC_POINT("DBImpl::BGWorkPurge:end");
  1953. }
  1954. void DBImpl::UnscheduleCompactionCallback(void* arg) {
  1955. CompactionArg ca = *(reinterpret_cast<CompactionArg*>(arg));
  1956. delete reinterpret_cast<CompactionArg*>(arg);
  1957. if (ca.prepicked_compaction != nullptr) {
  1958. if (ca.prepicked_compaction->compaction != nullptr) {
  1959. delete ca.prepicked_compaction->compaction;
  1960. }
  1961. delete ca.prepicked_compaction;
  1962. }
  1963. TEST_SYNC_POINT("DBImpl::UnscheduleCompactionCallback");
  1964. }
  1965. void DBImpl::UnscheduleFlushCallback(void* arg) {
  1966. delete reinterpret_cast<FlushThreadArg*>(arg);
  1967. TEST_SYNC_POINT("DBImpl::UnscheduleFlushCallback");
  1968. }
  1969. Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context,
  1970. LogBuffer* log_buffer, FlushReason* reason,
  1971. Env::Priority thread_pri) {
  1972. mutex_.AssertHeld();
  1973. Status status;
  1974. *reason = FlushReason::kOthers;
  1975. // If BG work is stopped due to an error, but a recovery is in progress,
  1976. // that means this flush is part of the recovery. So allow it to go through
  1977. if (!error_handler_.IsBGWorkStopped()) {
  1978. if (shutting_down_.load(std::memory_order_acquire)) {
  1979. status = Status::ShutdownInProgress();
  1980. }
  1981. } else if (!error_handler_.IsRecoveryInProgress()) {
  1982. status = error_handler_.GetBGError();
  1983. }
  1984. if (!status.ok()) {
  1985. return status;
  1986. }
  1987. autovector<BGFlushArg> bg_flush_args;
  1988. std::vector<SuperVersionContext>& superversion_contexts =
  1989. job_context->superversion_contexts;
  1990. autovector<ColumnFamilyData*> column_families_not_to_flush;
  1991. while (!flush_queue_.empty()) {
  1992. // This cfd is already referenced
  1993. const FlushRequest& flush_req = PopFirstFromFlushQueue();
  1994. superversion_contexts.clear();
  1995. superversion_contexts.reserve(flush_req.size());
  1996. for (const auto& iter : flush_req) {
  1997. ColumnFamilyData* cfd = iter.first;
  1998. if (cfd->IsDropped() || !cfd->imm()->IsFlushPending()) {
  1999. // can't flush this CF, try next one
  2000. column_families_not_to_flush.push_back(cfd);
  2001. continue;
  2002. }
  2003. superversion_contexts.emplace_back(SuperVersionContext(true));
  2004. bg_flush_args.emplace_back(cfd, iter.second,
  2005. &(superversion_contexts.back()));
  2006. }
  2007. if (!bg_flush_args.empty()) {
  2008. break;
  2009. }
  2010. }
  2011. if (!bg_flush_args.empty()) {
  2012. auto bg_job_limits = GetBGJobLimits();
  2013. for (const auto& arg : bg_flush_args) {
  2014. ColumnFamilyData* cfd = arg.cfd_;
  2015. ROCKS_LOG_BUFFER(
  2016. log_buffer,
  2017. "Calling FlushMemTableToOutputFile with column "
  2018. "family [%s], flush slots available %d, compaction slots available "
  2019. "%d, "
  2020. "flush slots scheduled %d, compaction slots scheduled %d",
  2021. cfd->GetName().c_str(), bg_job_limits.max_flushes,
  2022. bg_job_limits.max_compactions, bg_flush_scheduled_,
  2023. bg_compaction_scheduled_);
  2024. }
  2025. status = FlushMemTablesToOutputFiles(bg_flush_args, made_progress,
  2026. job_context, log_buffer, thread_pri);
  2027. TEST_SYNC_POINT("DBImpl::BackgroundFlush:BeforeFlush");
  2028. // All the CFDs in the FlushReq must have the same flush reason, so just
  2029. // grab the first one
  2030. *reason = bg_flush_args[0].cfd_->GetFlushReason();
  2031. for (auto& arg : bg_flush_args) {
  2032. ColumnFamilyData* cfd = arg.cfd_;
  2033. if (cfd->UnrefAndTryDelete()) {
  2034. arg.cfd_ = nullptr;
  2035. }
  2036. }
  2037. }
  2038. for (auto cfd : column_families_not_to_flush) {
  2039. cfd->UnrefAndTryDelete();
  2040. }
  2041. return status;
  2042. }
  2043. void DBImpl::BackgroundCallFlush(Env::Priority thread_pri) {
  2044. bool made_progress = false;
  2045. JobContext job_context(next_job_id_.fetch_add(1), true);
  2046. TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:start");
  2047. LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL,
  2048. immutable_db_options_.info_log.get());
  2049. {
  2050. InstrumentedMutexLock l(&mutex_);
  2051. assert(bg_flush_scheduled_);
  2052. num_running_flushes_++;
  2053. std::unique_ptr<std::list<uint64_t>::iterator>
  2054. pending_outputs_inserted_elem(new std::list<uint64_t>::iterator(
  2055. CaptureCurrentFileNumberInPendingOutputs()));
  2056. FlushReason reason;
  2057. Status s = BackgroundFlush(&made_progress, &job_context, &log_buffer,
  2058. &reason, thread_pri);
  2059. if (!s.ok() && !s.IsShutdownInProgress() && !s.IsColumnFamilyDropped() &&
  2060. reason != FlushReason::kErrorRecovery) {
  2061. // Wait a little bit before retrying background flush in
  2062. // case this is an environmental problem and we do not want to
  2063. // chew up resources for failed flushes for the duration of
  2064. // the problem.
  2065. uint64_t error_cnt =
  2066. default_cf_internal_stats_->BumpAndGetBackgroundErrorCount();
  2067. bg_cv_.SignalAll(); // In case a waiter can proceed despite the error
  2068. mutex_.Unlock();
  2069. ROCKS_LOG_ERROR(immutable_db_options_.info_log,
  2070. "Waiting after background flush error: %s"
  2071. "Accumulated background error counts: %" PRIu64,
  2072. s.ToString().c_str(), error_cnt);
  2073. log_buffer.FlushBufferToLog();
  2074. LogFlush(immutable_db_options_.info_log);
  2075. env_->SleepForMicroseconds(1000000);
  2076. mutex_.Lock();
  2077. }
  2078. TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:FlushFinish:0");
  2079. ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem);
  2080. // If flush failed, we want to delete all temporary files that we might have
  2081. // created. Thus, we force full scan in FindObsoleteFiles()
  2082. FindObsoleteFiles(&job_context, !s.ok() && !s.IsShutdownInProgress() &&
  2083. !s.IsColumnFamilyDropped());
  2084. // delete unnecessary files if any, this is done outside the mutex
  2085. if (job_context.HaveSomethingToClean() ||
  2086. job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) {
  2087. mutex_.Unlock();
  2088. TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:FilesFound");
  2089. // Have to flush the info logs before bg_flush_scheduled_--
  2090. // because if bg_flush_scheduled_ becomes 0 and the lock is
  2091. // released, the deconstructor of DB can kick in and destroy all the
  2092. // states of DB so info_log might not be available after that point.
  2093. // It also applies to access other states that DB owns.
  2094. log_buffer.FlushBufferToLog();
  2095. if (job_context.HaveSomethingToDelete()) {
  2096. PurgeObsoleteFiles(job_context);
  2097. }
  2098. job_context.Clean();
  2099. mutex_.Lock();
  2100. }
  2101. TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:ContextCleanedUp");
  2102. assert(num_running_flushes_ > 0);
  2103. num_running_flushes_--;
  2104. bg_flush_scheduled_--;
  2105. // See if there's more work to be done
  2106. MaybeScheduleFlushOrCompaction();
  2107. atomic_flush_install_cv_.SignalAll();
  2108. bg_cv_.SignalAll();
  2109. // IMPORTANT: there should be no code after calling SignalAll. This call may
  2110. // signal the DB destructor that it's OK to proceed with destruction. In
  2111. // that case, all DB variables will be dealloacated and referencing them
  2112. // will cause trouble.
  2113. }
  2114. }
  2115. void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction,
  2116. Env::Priority bg_thread_pri) {
  2117. bool made_progress = false;
  2118. JobContext job_context(next_job_id_.fetch_add(1), true);
  2119. TEST_SYNC_POINT("BackgroundCallCompaction:0");
  2120. LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL,
  2121. immutable_db_options_.info_log.get());
  2122. {
  2123. InstrumentedMutexLock l(&mutex_);
  2124. // This call will unlock/lock the mutex to wait for current running
  2125. // IngestExternalFile() calls to finish.
  2126. WaitForIngestFile();
  2127. num_running_compactions_++;
  2128. std::unique_ptr<std::list<uint64_t>::iterator>
  2129. pending_outputs_inserted_elem(new std::list<uint64_t>::iterator(
  2130. CaptureCurrentFileNumberInPendingOutputs()));
  2131. assert((bg_thread_pri == Env::Priority::BOTTOM &&
  2132. bg_bottom_compaction_scheduled_) ||
  2133. (bg_thread_pri == Env::Priority::LOW && bg_compaction_scheduled_));
  2134. Status s = BackgroundCompaction(&made_progress, &job_context, &log_buffer,
  2135. prepicked_compaction, bg_thread_pri);
  2136. TEST_SYNC_POINT("BackgroundCallCompaction:1");
  2137. if (s.IsBusy()) {
  2138. bg_cv_.SignalAll(); // In case a waiter can proceed despite the error
  2139. mutex_.Unlock();
  2140. env_->SleepForMicroseconds(10000); // prevent hot loop
  2141. mutex_.Lock();
  2142. } else if (!s.ok() && !s.IsShutdownInProgress() &&
  2143. !s.IsManualCompactionPaused() && !s.IsColumnFamilyDropped()) {
  2144. // Wait a little bit before retrying background compaction in
  2145. // case this is an environmental problem and we do not want to
  2146. // chew up resources for failed compactions for the duration of
  2147. // the problem.
  2148. uint64_t error_cnt =
  2149. default_cf_internal_stats_->BumpAndGetBackgroundErrorCount();
  2150. bg_cv_.SignalAll(); // In case a waiter can proceed despite the error
  2151. mutex_.Unlock();
  2152. log_buffer.FlushBufferToLog();
  2153. ROCKS_LOG_ERROR(immutable_db_options_.info_log,
  2154. "Waiting after background compaction error: %s, "
  2155. "Accumulated background error counts: %" PRIu64,
  2156. s.ToString().c_str(), error_cnt);
  2157. LogFlush(immutable_db_options_.info_log);
  2158. env_->SleepForMicroseconds(1000000);
  2159. mutex_.Lock();
  2160. } else if (s.IsManualCompactionPaused()) {
  2161. ManualCompactionState* m = prepicked_compaction->manual_compaction_state;
  2162. assert(m);
  2163. ROCKS_LOG_BUFFER(&log_buffer, "[%s] [JOB %d] Manual compaction paused",
  2164. m->cfd->GetName().c_str(), job_context.job_id);
  2165. }
  2166. ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem);
  2167. // If compaction failed, we want to delete all temporary files that we might
  2168. // have created (they might not be all recorded in job_context in case of a
  2169. // failure). Thus, we force full scan in FindObsoleteFiles()
  2170. FindObsoleteFiles(&job_context, !s.ok() && !s.IsShutdownInProgress() &&
  2171. !s.IsManualCompactionPaused() &&
  2172. !s.IsColumnFamilyDropped());
  2173. TEST_SYNC_POINT("DBImpl::BackgroundCallCompaction:FoundObsoleteFiles");
  2174. // delete unnecessary files if any, this is done outside the mutex
  2175. if (job_context.HaveSomethingToClean() ||
  2176. job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) {
  2177. mutex_.Unlock();
  2178. // Have to flush the info logs before bg_compaction_scheduled_--
  2179. // because if bg_flush_scheduled_ becomes 0 and the lock is
  2180. // released, the deconstructor of DB can kick in and destroy all the
  2181. // states of DB so info_log might not be available after that point.
  2182. // It also applies to access other states that DB owns.
  2183. log_buffer.FlushBufferToLog();
  2184. if (job_context.HaveSomethingToDelete()) {
  2185. PurgeObsoleteFiles(job_context);
  2186. TEST_SYNC_POINT("DBImpl::BackgroundCallCompaction:PurgedObsoleteFiles");
  2187. }
  2188. job_context.Clean();
  2189. mutex_.Lock();
  2190. }
  2191. assert(num_running_compactions_ > 0);
  2192. num_running_compactions_--;
  2193. if (bg_thread_pri == Env::Priority::LOW) {
  2194. bg_compaction_scheduled_--;
  2195. } else {
  2196. assert(bg_thread_pri == Env::Priority::BOTTOM);
  2197. bg_bottom_compaction_scheduled_--;
  2198. }
  2199. versions_->GetColumnFamilySet()->FreeDeadColumnFamilies();
  2200. // See if there's more work to be done
  2201. MaybeScheduleFlushOrCompaction();
  2202. if (made_progress ||
  2203. (bg_compaction_scheduled_ == 0 &&
  2204. bg_bottom_compaction_scheduled_ == 0) ||
  2205. HasPendingManualCompaction() || unscheduled_compactions_ == 0) {
  2206. // signal if
  2207. // * made_progress -- need to wakeup DelayWrite
  2208. // * bg_{bottom,}_compaction_scheduled_ == 0 -- need to wakeup ~DBImpl
  2209. // * HasPendingManualCompaction -- need to wakeup RunManualCompaction
  2210. // If none of this is true, there is no need to signal since nobody is
  2211. // waiting for it
  2212. bg_cv_.SignalAll();
  2213. }
  2214. // IMPORTANT: there should be no code after calling SignalAll. This call may
  2215. // signal the DB destructor that it's OK to proceed with destruction. In
  2216. // that case, all DB variables will be dealloacated and referencing them
  2217. // will cause trouble.
  2218. }
  2219. }
  2220. Status DBImpl::BackgroundCompaction(bool* made_progress,
  2221. JobContext* job_context,
  2222. LogBuffer* log_buffer,
  2223. PrepickedCompaction* prepicked_compaction,
  2224. Env::Priority thread_pri) {
  2225. ManualCompactionState* manual_compaction =
  2226. prepicked_compaction == nullptr
  2227. ? nullptr
  2228. : prepicked_compaction->manual_compaction_state;
  2229. *made_progress = false;
  2230. mutex_.AssertHeld();
  2231. TEST_SYNC_POINT("DBImpl::BackgroundCompaction:Start");
  2232. bool is_manual = (manual_compaction != nullptr);
  2233. std::unique_ptr<Compaction> c;
  2234. if (prepicked_compaction != nullptr &&
  2235. prepicked_compaction->compaction != nullptr) {
  2236. c.reset(prepicked_compaction->compaction);
  2237. }
  2238. bool is_prepicked = is_manual || c;
  2239. // (manual_compaction->in_progress == false);
  2240. bool trivial_move_disallowed =
  2241. is_manual && manual_compaction->disallow_trivial_move;
  2242. CompactionJobStats compaction_job_stats;
  2243. Status status;
  2244. if (!error_handler_.IsBGWorkStopped()) {
  2245. if (shutting_down_.load(std::memory_order_acquire)) {
  2246. status = Status::ShutdownInProgress();
  2247. } else if (is_manual &&
  2248. manual_compaction_paused_.load(std::memory_order_acquire)) {
  2249. status = Status::Incomplete(Status::SubCode::kManualCompactionPaused);
  2250. }
  2251. } else {
  2252. status = error_handler_.GetBGError();
  2253. // If we get here, it means a hard error happened after this compaction
  2254. // was scheduled by MaybeScheduleFlushOrCompaction(), but before it got
  2255. // a chance to execute. Since we didn't pop a cfd from the compaction
  2256. // queue, increment unscheduled_compactions_
  2257. unscheduled_compactions_++;
  2258. }
  2259. if (!status.ok()) {
  2260. if (is_manual) {
  2261. manual_compaction->status = status;
  2262. manual_compaction->done = true;
  2263. manual_compaction->in_progress = false;
  2264. manual_compaction = nullptr;
  2265. }
  2266. if (c) {
  2267. c->ReleaseCompactionFiles(status);
  2268. c.reset();
  2269. }
  2270. return status;
  2271. }
  2272. if (is_manual) {
  2273. // another thread cannot pick up the same work
  2274. manual_compaction->in_progress = true;
  2275. }
  2276. std::unique_ptr<TaskLimiterToken> task_token;
  2277. // InternalKey manual_end_storage;
  2278. // InternalKey* manual_end = &manual_end_storage;
  2279. bool sfm_reserved_compact_space = false;
  2280. if (is_manual) {
  2281. ManualCompactionState* m = manual_compaction;
  2282. assert(m->in_progress);
  2283. if (!c) {
  2284. m->done = true;
  2285. m->manual_end = nullptr;
  2286. ROCKS_LOG_BUFFER(log_buffer,
  2287. "[%s] Manual compaction from level-%d from %s .. "
  2288. "%s; nothing to do\n",
  2289. m->cfd->GetName().c_str(), m->input_level,
  2290. (m->begin ? m->begin->DebugString().c_str() : "(begin)"),
  2291. (m->end ? m->end->DebugString().c_str() : "(end)"));
  2292. } else {
  2293. // First check if we have enough room to do the compaction
  2294. bool enough_room = EnoughRoomForCompaction(
  2295. m->cfd, *(c->inputs()), &sfm_reserved_compact_space, log_buffer);
  2296. if (!enough_room) {
  2297. // Then don't do the compaction
  2298. c->ReleaseCompactionFiles(status);
  2299. c.reset();
  2300. // m's vars will get set properly at the end of this function,
  2301. // as long as status == CompactionTooLarge
  2302. status = Status::CompactionTooLarge();
  2303. } else {
  2304. ROCKS_LOG_BUFFER(
  2305. log_buffer,
  2306. "[%s] Manual compaction from level-%d to level-%d from %s .. "
  2307. "%s; will stop at %s\n",
  2308. m->cfd->GetName().c_str(), m->input_level, c->output_level(),
  2309. (m->begin ? m->begin->DebugString().c_str() : "(begin)"),
  2310. (m->end ? m->end->DebugString().c_str() : "(end)"),
  2311. ((m->done || m->manual_end == nullptr)
  2312. ? "(end)"
  2313. : m->manual_end->DebugString().c_str()));
  2314. }
  2315. }
  2316. } else if (!is_prepicked && !compaction_queue_.empty()) {
  2317. if (HasExclusiveManualCompaction()) {
  2318. // Can't compact right now, but try again later
  2319. TEST_SYNC_POINT("DBImpl::BackgroundCompaction()::Conflict");
  2320. // Stay in the compaction queue.
  2321. unscheduled_compactions_++;
  2322. return Status::OK();
  2323. }
  2324. auto cfd = PickCompactionFromQueue(&task_token, log_buffer);
  2325. if (cfd == nullptr) {
  2326. // Can't find any executable task from the compaction queue.
  2327. // All tasks have been throttled by compaction thread limiter.
  2328. ++unscheduled_compactions_;
  2329. return Status::Busy();
  2330. }
  2331. // We unreference here because the following code will take a Ref() on
  2332. // this cfd if it is going to use it (Compaction class holds a
  2333. // reference).
  2334. // This will all happen under a mutex so we don't have to be afraid of
  2335. // somebody else deleting it.
  2336. if (cfd->UnrefAndTryDelete()) {
  2337. // This was the last reference of the column family, so no need to
  2338. // compact.
  2339. return Status::OK();
  2340. }
  2341. // Pick up latest mutable CF Options and use it throughout the
  2342. // compaction job
  2343. // Compaction makes a copy of the latest MutableCFOptions. It should be used
  2344. // throughout the compaction procedure to make sure consistency. It will
  2345. // eventually be installed into SuperVersion
  2346. auto* mutable_cf_options = cfd->GetLatestMutableCFOptions();
  2347. if (!mutable_cf_options->disable_auto_compactions && !cfd->IsDropped()) {
  2348. // NOTE: try to avoid unnecessary copy of MutableCFOptions if
  2349. // compaction is not necessary. Need to make sure mutex is held
  2350. // until we make a copy in the following code
  2351. TEST_SYNC_POINT("DBImpl::BackgroundCompaction():BeforePickCompaction");
  2352. c.reset(cfd->PickCompaction(*mutable_cf_options, log_buffer));
  2353. TEST_SYNC_POINT("DBImpl::BackgroundCompaction():AfterPickCompaction");
  2354. if (c != nullptr) {
  2355. bool enough_room = EnoughRoomForCompaction(
  2356. cfd, *(c->inputs()), &sfm_reserved_compact_space, log_buffer);
  2357. if (!enough_room) {
  2358. // Then don't do the compaction
  2359. c->ReleaseCompactionFiles(status);
  2360. c->column_family_data()
  2361. ->current()
  2362. ->storage_info()
  2363. ->ComputeCompactionScore(*(c->immutable_cf_options()),
  2364. *(c->mutable_cf_options()));
  2365. AddToCompactionQueue(cfd);
  2366. ++unscheduled_compactions_;
  2367. c.reset();
  2368. // Don't need to sleep here, because BackgroundCallCompaction
  2369. // will sleep if !s.ok()
  2370. status = Status::CompactionTooLarge();
  2371. } else {
  2372. // update statistics
  2373. RecordInHistogram(stats_, NUM_FILES_IN_SINGLE_COMPACTION,
  2374. c->inputs(0)->size());
  2375. // There are three things that can change compaction score:
  2376. // 1) When flush or compaction finish. This case is covered by
  2377. // InstallSuperVersionAndScheduleWork
  2378. // 2) When MutableCFOptions changes. This case is also covered by
  2379. // InstallSuperVersionAndScheduleWork, because this is when the new
  2380. // options take effect.
  2381. // 3) When we Pick a new compaction, we "remove" those files being
  2382. // compacted from the calculation, which then influences compaction
  2383. // score. Here we check if we need the new compaction even without the
  2384. // files that are currently being compacted. If we need another
  2385. // compaction, we might be able to execute it in parallel, so we add
  2386. // it to the queue and schedule a new thread.
  2387. if (cfd->NeedsCompaction()) {
  2388. // Yes, we need more compactions!
  2389. AddToCompactionQueue(cfd);
  2390. ++unscheduled_compactions_;
  2391. MaybeScheduleFlushOrCompaction();
  2392. }
  2393. }
  2394. }
  2395. }
  2396. }
  2397. if (!c) {
  2398. // Nothing to do
  2399. ROCKS_LOG_BUFFER(log_buffer, "Compaction nothing to do");
  2400. } else if (c->deletion_compaction()) {
  2401. // TODO(icanadi) Do we want to honor snapshots here? i.e. not delete old
  2402. // file if there is alive snapshot pointing to it
  2403. TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:BeforeCompaction",
  2404. c->column_family_data());
  2405. assert(c->num_input_files(1) == 0);
  2406. assert(c->level() == 0);
  2407. assert(c->column_family_data()->ioptions()->compaction_style ==
  2408. kCompactionStyleFIFO);
  2409. compaction_job_stats.num_input_files = c->num_input_files(0);
  2410. NotifyOnCompactionBegin(c->column_family_data(), c.get(), status,
  2411. compaction_job_stats, job_context->job_id);
  2412. for (const auto& f : *c->inputs(0)) {
  2413. c->edit()->DeleteFile(c->level(), f->fd.GetNumber());
  2414. }
  2415. status = versions_->LogAndApply(c->column_family_data(),
  2416. *c->mutable_cf_options(), c->edit(),
  2417. &mutex_, directories_.GetDbDir());
  2418. InstallSuperVersionAndScheduleWork(c->column_family_data(),
  2419. &job_context->superversion_contexts[0],
  2420. *c->mutable_cf_options());
  2421. ROCKS_LOG_BUFFER(log_buffer, "[%s] Deleted %d files\n",
  2422. c->column_family_data()->GetName().c_str(),
  2423. c->num_input_files(0));
  2424. *made_progress = true;
  2425. TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:AfterCompaction",
  2426. c->column_family_data());
  2427. } else if (!trivial_move_disallowed && c->IsTrivialMove()) {
  2428. TEST_SYNC_POINT("DBImpl::BackgroundCompaction:TrivialMove");
  2429. TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:BeforeCompaction",
  2430. c->column_family_data());
  2431. // Instrument for event update
  2432. // TODO(yhchiang): add op details for showing trivial-move.
  2433. ThreadStatusUtil::SetColumnFamily(
  2434. c->column_family_data(), c->column_family_data()->ioptions()->env,
  2435. immutable_db_options_.enable_thread_tracking);
  2436. ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_COMPACTION);
  2437. compaction_job_stats.num_input_files = c->num_input_files(0);
  2438. NotifyOnCompactionBegin(c->column_family_data(), c.get(), status,
  2439. compaction_job_stats, job_context->job_id);
  2440. // Move files to next level
  2441. int32_t moved_files = 0;
  2442. int64_t moved_bytes = 0;
  2443. for (unsigned int l = 0; l < c->num_input_levels(); l++) {
  2444. if (c->level(l) == c->output_level()) {
  2445. continue;
  2446. }
  2447. for (size_t i = 0; i < c->num_input_files(l); i++) {
  2448. FileMetaData* f = c->input(l, i);
  2449. c->edit()->DeleteFile(c->level(l), f->fd.GetNumber());
  2450. c->edit()->AddFile(c->output_level(), f->fd.GetNumber(),
  2451. f->fd.GetPathId(), f->fd.GetFileSize(), f->smallest,
  2452. f->largest, f->fd.smallest_seqno,
  2453. f->fd.largest_seqno, f->marked_for_compaction,
  2454. f->oldest_blob_file_number, f->oldest_ancester_time,
  2455. f->file_creation_time, f->file_checksum,
  2456. f->file_checksum_func_name);
  2457. ROCKS_LOG_BUFFER(
  2458. log_buffer,
  2459. "[%s] Moving #%" PRIu64 " to level-%d %" PRIu64 " bytes\n",
  2460. c->column_family_data()->GetName().c_str(), f->fd.GetNumber(),
  2461. c->output_level(), f->fd.GetFileSize());
  2462. ++moved_files;
  2463. moved_bytes += f->fd.GetFileSize();
  2464. }
  2465. }
  2466. status = versions_->LogAndApply(c->column_family_data(),
  2467. *c->mutable_cf_options(), c->edit(),
  2468. &mutex_, directories_.GetDbDir());
  2469. // Use latest MutableCFOptions
  2470. InstallSuperVersionAndScheduleWork(c->column_family_data(),
  2471. &job_context->superversion_contexts[0],
  2472. *c->mutable_cf_options());
  2473. VersionStorageInfo::LevelSummaryStorage tmp;
  2474. c->column_family_data()->internal_stats()->IncBytesMoved(c->output_level(),
  2475. moved_bytes);
  2476. {
  2477. event_logger_.LogToBuffer(log_buffer)
  2478. << "job" << job_context->job_id << "event"
  2479. << "trivial_move"
  2480. << "destination_level" << c->output_level() << "files" << moved_files
  2481. << "total_files_size" << moved_bytes;
  2482. }
  2483. ROCKS_LOG_BUFFER(
  2484. log_buffer,
  2485. "[%s] Moved #%d files to level-%d %" PRIu64 " bytes %s: %s\n",
  2486. c->column_family_data()->GetName().c_str(), moved_files,
  2487. c->output_level(), moved_bytes, status.ToString().c_str(),
  2488. c->column_family_data()->current()->storage_info()->LevelSummary(&tmp));
  2489. *made_progress = true;
  2490. // Clear Instrument
  2491. ThreadStatusUtil::ResetThreadStatus();
  2492. TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:AfterCompaction",
  2493. c->column_family_data());
  2494. } else if (!is_prepicked && c->output_level() > 0 &&
  2495. c->output_level() ==
  2496. c->column_family_data()
  2497. ->current()
  2498. ->storage_info()
  2499. ->MaxOutputLevel(
  2500. immutable_db_options_.allow_ingest_behind) &&
  2501. env_->GetBackgroundThreads(Env::Priority::BOTTOM) > 0) {
  2502. // Forward compactions involving last level to the bottom pool if it exists,
  2503. // such that compactions unlikely to contribute to write stalls can be
  2504. // delayed or deprioritized.
  2505. TEST_SYNC_POINT("DBImpl::BackgroundCompaction:ForwardToBottomPriPool");
  2506. CompactionArg* ca = new CompactionArg;
  2507. ca->db = this;
  2508. ca->prepicked_compaction = new PrepickedCompaction;
  2509. ca->prepicked_compaction->compaction = c.release();
  2510. ca->prepicked_compaction->manual_compaction_state = nullptr;
  2511. // Transfer requested token, so it doesn't need to do it again.
  2512. ca->prepicked_compaction->task_token = std::move(task_token);
  2513. ++bg_bottom_compaction_scheduled_;
  2514. env_->Schedule(&DBImpl::BGWorkBottomCompaction, ca, Env::Priority::BOTTOM,
  2515. this, &DBImpl::UnscheduleCompactionCallback);
  2516. } else {
  2517. TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:BeforeCompaction",
  2518. c->column_family_data());
  2519. int output_level __attribute__((__unused__));
  2520. output_level = c->output_level();
  2521. TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:NonTrivial",
  2522. &output_level);
  2523. std::vector<SequenceNumber> snapshot_seqs;
  2524. SequenceNumber earliest_write_conflict_snapshot;
  2525. SnapshotChecker* snapshot_checker;
  2526. GetSnapshotContext(job_context, &snapshot_seqs,
  2527. &earliest_write_conflict_snapshot, &snapshot_checker);
  2528. assert(is_snapshot_supported_ || snapshots_.empty());
  2529. CompactionJob compaction_job(
  2530. job_context->job_id, c.get(), immutable_db_options_,
  2531. file_options_for_compaction_, versions_.get(), &shutting_down_,
  2532. preserve_deletes_seqnum_.load(), log_buffer, directories_.GetDbDir(),
  2533. GetDataDir(c->column_family_data(), c->output_path_id()), stats_,
  2534. &mutex_, &error_handler_, snapshot_seqs,
  2535. earliest_write_conflict_snapshot, snapshot_checker, table_cache_,
  2536. &event_logger_, c->mutable_cf_options()->paranoid_file_checks,
  2537. c->mutable_cf_options()->report_bg_io_stats, dbname_,
  2538. &compaction_job_stats, thread_pri,
  2539. is_manual ? &manual_compaction_paused_ : nullptr);
  2540. compaction_job.Prepare();
  2541. NotifyOnCompactionBegin(c->column_family_data(), c.get(), status,
  2542. compaction_job_stats, job_context->job_id);
  2543. mutex_.Unlock();
  2544. TEST_SYNC_POINT_CALLBACK(
  2545. "DBImpl::BackgroundCompaction:NonTrivial:BeforeRun", nullptr);
  2546. compaction_job.Run();
  2547. TEST_SYNC_POINT("DBImpl::BackgroundCompaction:NonTrivial:AfterRun");
  2548. mutex_.Lock();
  2549. status = compaction_job.Install(*c->mutable_cf_options());
  2550. if (status.ok()) {
  2551. InstallSuperVersionAndScheduleWork(c->column_family_data(),
  2552. &job_context->superversion_contexts[0],
  2553. *c->mutable_cf_options());
  2554. }
  2555. *made_progress = true;
  2556. TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:AfterCompaction",
  2557. c->column_family_data());
  2558. }
  2559. if (c != nullptr) {
  2560. c->ReleaseCompactionFiles(status);
  2561. *made_progress = true;
  2562. #ifndef ROCKSDB_LITE
  2563. // Need to make sure SstFileManager does its bookkeeping
  2564. auto sfm = static_cast<SstFileManagerImpl*>(
  2565. immutable_db_options_.sst_file_manager.get());
  2566. if (sfm && sfm_reserved_compact_space) {
  2567. sfm->OnCompactionCompletion(c.get());
  2568. }
  2569. #endif // ROCKSDB_LITE
  2570. NotifyOnCompactionCompleted(c->column_family_data(), c.get(), status,
  2571. compaction_job_stats, job_context->job_id);
  2572. }
  2573. if (status.ok() || status.IsCompactionTooLarge() ||
  2574. status.IsManualCompactionPaused()) {
  2575. // Done
  2576. } else if (status.IsColumnFamilyDropped() || status.IsShutdownInProgress()) {
  2577. // Ignore compaction errors found during shutting down
  2578. } else {
  2579. ROCKS_LOG_WARN(immutable_db_options_.info_log, "Compaction error: %s",
  2580. status.ToString().c_str());
  2581. error_handler_.SetBGError(status, BackgroundErrorReason::kCompaction);
  2582. if (c != nullptr && !is_manual && !error_handler_.IsBGWorkStopped()) {
  2583. // Put this cfd back in the compaction queue so we can retry after some
  2584. // time
  2585. auto cfd = c->column_family_data();
  2586. assert(cfd != nullptr);
  2587. // Since this compaction failed, we need to recompute the score so it
  2588. // takes the original input files into account
  2589. c->column_family_data()
  2590. ->current()
  2591. ->storage_info()
  2592. ->ComputeCompactionScore(*(c->immutable_cf_options()),
  2593. *(c->mutable_cf_options()));
  2594. if (!cfd->queued_for_compaction()) {
  2595. AddToCompactionQueue(cfd);
  2596. ++unscheduled_compactions_;
  2597. }
  2598. }
  2599. }
  2600. // this will unref its input_version and column_family_data
  2601. c.reset();
  2602. if (is_manual) {
  2603. ManualCompactionState* m = manual_compaction;
  2604. if (!status.ok()) {
  2605. m->status = status;
  2606. m->done = true;
  2607. }
  2608. // For universal compaction:
  2609. // Because universal compaction always happens at level 0, so one
  2610. // compaction will pick up all overlapped files. No files will be
  2611. // filtered out due to size limit and left for a successive compaction.
  2612. // So we can safely conclude the current compaction.
  2613. //
  2614. // Also note that, if we don't stop here, then the current compaction
  2615. // writes a new file back to level 0, which will be used in successive
  2616. // compaction. Hence the manual compaction will never finish.
  2617. //
  2618. // Stop the compaction if manual_end points to nullptr -- this means
  2619. // that we compacted the whole range. manual_end should always point
  2620. // to nullptr in case of universal compaction
  2621. if (m->manual_end == nullptr) {
  2622. m->done = true;
  2623. }
  2624. if (!m->done) {
  2625. // We only compacted part of the requested range. Update *m
  2626. // to the range that is left to be compacted.
  2627. // Universal and FIFO compactions should always compact the whole range
  2628. assert(m->cfd->ioptions()->compaction_style !=
  2629. kCompactionStyleUniversal ||
  2630. m->cfd->ioptions()->num_levels > 1);
  2631. assert(m->cfd->ioptions()->compaction_style != kCompactionStyleFIFO);
  2632. m->tmp_storage = *m->manual_end;
  2633. m->begin = &m->tmp_storage;
  2634. m->incomplete = true;
  2635. }
  2636. m->in_progress = false; // not being processed anymore
  2637. }
  2638. TEST_SYNC_POINT("DBImpl::BackgroundCompaction:Finish");
  2639. return status;
  2640. }
  2641. bool DBImpl::HasPendingManualCompaction() {
  2642. return (!manual_compaction_dequeue_.empty());
  2643. }
  2644. void DBImpl::AddManualCompaction(DBImpl::ManualCompactionState* m) {
  2645. manual_compaction_dequeue_.push_back(m);
  2646. }
  2647. void DBImpl::RemoveManualCompaction(DBImpl::ManualCompactionState* m) {
  2648. // Remove from queue
  2649. std::deque<ManualCompactionState*>::iterator it =
  2650. manual_compaction_dequeue_.begin();
  2651. while (it != manual_compaction_dequeue_.end()) {
  2652. if (m == (*it)) {
  2653. it = manual_compaction_dequeue_.erase(it);
  2654. return;
  2655. }
  2656. ++it;
  2657. }
  2658. assert(false);
  2659. return;
  2660. }
  2661. bool DBImpl::ShouldntRunManualCompaction(ManualCompactionState* m) {
  2662. if (num_running_ingest_file_ > 0) {
  2663. // We need to wait for other IngestExternalFile() calls to finish
  2664. // before running a manual compaction.
  2665. return true;
  2666. }
  2667. if (m->exclusive) {
  2668. return (bg_bottom_compaction_scheduled_ > 0 ||
  2669. bg_compaction_scheduled_ > 0);
  2670. }
  2671. std::deque<ManualCompactionState*>::iterator it =
  2672. manual_compaction_dequeue_.begin();
  2673. bool seen = false;
  2674. while (it != manual_compaction_dequeue_.end()) {
  2675. if (m == (*it)) {
  2676. ++it;
  2677. seen = true;
  2678. continue;
  2679. } else if (MCOverlap(m, (*it)) && (!seen && !(*it)->in_progress)) {
  2680. // Consider the other manual compaction *it, conflicts if:
  2681. // overlaps with m
  2682. // and (*it) is ahead in the queue and is not yet in progress
  2683. return true;
  2684. }
  2685. ++it;
  2686. }
  2687. return false;
  2688. }
  2689. bool DBImpl::HaveManualCompaction(ColumnFamilyData* cfd) {
  2690. // Remove from priority queue
  2691. std::deque<ManualCompactionState*>::iterator it =
  2692. manual_compaction_dequeue_.begin();
  2693. while (it != manual_compaction_dequeue_.end()) {
  2694. if ((*it)->exclusive) {
  2695. return true;
  2696. }
  2697. if ((cfd == (*it)->cfd) && (!((*it)->in_progress || (*it)->done))) {
  2698. // Allow automatic compaction if manual compaction is
  2699. // in progress
  2700. return true;
  2701. }
  2702. ++it;
  2703. }
  2704. return false;
  2705. }
  2706. bool DBImpl::HasExclusiveManualCompaction() {
  2707. // Remove from priority queue
  2708. std::deque<ManualCompactionState*>::iterator it =
  2709. manual_compaction_dequeue_.begin();
  2710. while (it != manual_compaction_dequeue_.end()) {
  2711. if ((*it)->exclusive) {
  2712. return true;
  2713. }
  2714. ++it;
  2715. }
  2716. return false;
  2717. }
  2718. bool DBImpl::MCOverlap(ManualCompactionState* m, ManualCompactionState* m1) {
  2719. if ((m->exclusive) || (m1->exclusive)) {
  2720. return true;
  2721. }
  2722. if (m->cfd != m1->cfd) {
  2723. return false;
  2724. }
  2725. return true;
  2726. }
  2727. #ifndef ROCKSDB_LITE
  2728. void DBImpl::BuildCompactionJobInfo(
  2729. const ColumnFamilyData* cfd, Compaction* c, const Status& st,
  2730. const CompactionJobStats& compaction_job_stats, const int job_id,
  2731. const Version* current, CompactionJobInfo* compaction_job_info) const {
  2732. assert(compaction_job_info != nullptr);
  2733. compaction_job_info->cf_id = cfd->GetID();
  2734. compaction_job_info->cf_name = cfd->GetName();
  2735. compaction_job_info->status = st;
  2736. compaction_job_info->thread_id = env_->GetThreadID();
  2737. compaction_job_info->job_id = job_id;
  2738. compaction_job_info->base_input_level = c->start_level();
  2739. compaction_job_info->output_level = c->output_level();
  2740. compaction_job_info->stats = compaction_job_stats;
  2741. compaction_job_info->table_properties = c->GetOutputTableProperties();
  2742. compaction_job_info->compaction_reason = c->compaction_reason();
  2743. compaction_job_info->compression = c->output_compression();
  2744. for (size_t i = 0; i < c->num_input_levels(); ++i) {
  2745. for (const auto fmd : *c->inputs(i)) {
  2746. const FileDescriptor& desc = fmd->fd;
  2747. const uint64_t file_number = desc.GetNumber();
  2748. auto fn = TableFileName(c->immutable_cf_options()->cf_paths, file_number,
  2749. desc.GetPathId());
  2750. compaction_job_info->input_files.push_back(fn);
  2751. compaction_job_info->input_file_infos.push_back(CompactionFileInfo{
  2752. static_cast<int>(i), file_number, fmd->oldest_blob_file_number});
  2753. if (compaction_job_info->table_properties.count(fn) == 0) {
  2754. std::shared_ptr<const TableProperties> tp;
  2755. auto s = current->GetTableProperties(&tp, fmd, &fn);
  2756. if (s.ok()) {
  2757. compaction_job_info->table_properties[fn] = tp;
  2758. }
  2759. }
  2760. }
  2761. }
  2762. for (const auto& newf : c->edit()->GetNewFiles()) {
  2763. const FileMetaData& meta = newf.second;
  2764. const FileDescriptor& desc = meta.fd;
  2765. const uint64_t file_number = desc.GetNumber();
  2766. compaction_job_info->output_files.push_back(TableFileName(
  2767. c->immutable_cf_options()->cf_paths, file_number, desc.GetPathId()));
  2768. compaction_job_info->output_file_infos.push_back(CompactionFileInfo{
  2769. newf.first, file_number, meta.oldest_blob_file_number});
  2770. }
  2771. }
  2772. #endif
  2773. // SuperVersionContext gets created and destructed outside of the lock --
  2774. // we use this conveniently to:
  2775. // * malloc one SuperVersion() outside of the lock -- new_superversion
  2776. // * delete SuperVersion()s outside of the lock -- superversions_to_free
  2777. //
  2778. // However, if InstallSuperVersionAndScheduleWork() gets called twice with the
  2779. // same sv_context, we can't reuse the SuperVersion() that got
  2780. // malloced because
  2781. // first call already used it. In that rare case, we take a hit and create a
  2782. // new SuperVersion() inside of the mutex. We do similar thing
  2783. // for superversion_to_free
  2784. void DBImpl::InstallSuperVersionAndScheduleWork(
  2785. ColumnFamilyData* cfd, SuperVersionContext* sv_context,
  2786. const MutableCFOptions& mutable_cf_options) {
  2787. mutex_.AssertHeld();
  2788. // Update max_total_in_memory_state_
  2789. size_t old_memtable_size = 0;
  2790. auto* old_sv = cfd->GetSuperVersion();
  2791. if (old_sv) {
  2792. old_memtable_size = old_sv->mutable_cf_options.write_buffer_size *
  2793. old_sv->mutable_cf_options.max_write_buffer_number;
  2794. }
  2795. // this branch is unlikely to step in
  2796. if (UNLIKELY(sv_context->new_superversion == nullptr)) {
  2797. sv_context->NewSuperVersion();
  2798. }
  2799. cfd->InstallSuperVersion(sv_context, &mutex_, mutable_cf_options);
  2800. // There may be a small data race here. The snapshot tricking bottommost
  2801. // compaction may already be released here. But assuming there will always be
  2802. // newer snapshot created and released frequently, the compaction will be
  2803. // triggered soon anyway.
  2804. bottommost_files_mark_threshold_ = kMaxSequenceNumber;
  2805. for (auto* my_cfd : *versions_->GetColumnFamilySet()) {
  2806. bottommost_files_mark_threshold_ = std::min(
  2807. bottommost_files_mark_threshold_,
  2808. my_cfd->current()->storage_info()->bottommost_files_mark_threshold());
  2809. }
  2810. // Whenever we install new SuperVersion, we might need to issue new flushes or
  2811. // compactions.
  2812. SchedulePendingCompaction(cfd);
  2813. MaybeScheduleFlushOrCompaction();
  2814. // Update max_total_in_memory_state_
  2815. max_total_in_memory_state_ = max_total_in_memory_state_ - old_memtable_size +
  2816. mutable_cf_options.write_buffer_size *
  2817. mutable_cf_options.max_write_buffer_number;
  2818. }
  2819. // ShouldPurge is called by FindObsoleteFiles when doing a full scan,
  2820. // and db mutex (mutex_) should already be held.
  2821. // Actually, the current implementation of FindObsoleteFiles with
  2822. // full_scan=true can issue I/O requests to obtain list of files in
  2823. // directories, e.g. env_->getChildren while holding db mutex.
  2824. bool DBImpl::ShouldPurge(uint64_t file_number) const {
  2825. return files_grabbed_for_purge_.find(file_number) ==
  2826. files_grabbed_for_purge_.end() &&
  2827. purge_files_.find(file_number) == purge_files_.end();
  2828. }
  2829. // MarkAsGrabbedForPurge is called by FindObsoleteFiles, and db mutex
  2830. // (mutex_) should already be held.
  2831. void DBImpl::MarkAsGrabbedForPurge(uint64_t file_number) {
  2832. files_grabbed_for_purge_.insert(file_number);
  2833. }
  2834. void DBImpl::SetSnapshotChecker(SnapshotChecker* snapshot_checker) {
  2835. InstrumentedMutexLock l(&mutex_);
  2836. // snapshot_checker_ should only set once. If we need to set it multiple
  2837. // times, we need to make sure the old one is not deleted while it is still
  2838. // using by a compaction job.
  2839. assert(!snapshot_checker_);
  2840. snapshot_checker_.reset(snapshot_checker);
  2841. }
  2842. void DBImpl::GetSnapshotContext(
  2843. JobContext* job_context, std::vector<SequenceNumber>* snapshot_seqs,
  2844. SequenceNumber* earliest_write_conflict_snapshot,
  2845. SnapshotChecker** snapshot_checker_ptr) {
  2846. mutex_.AssertHeld();
  2847. assert(job_context != nullptr);
  2848. assert(snapshot_seqs != nullptr);
  2849. assert(earliest_write_conflict_snapshot != nullptr);
  2850. assert(snapshot_checker_ptr != nullptr);
  2851. *snapshot_checker_ptr = snapshot_checker_.get();
  2852. if (use_custom_gc_ && *snapshot_checker_ptr == nullptr) {
  2853. *snapshot_checker_ptr = DisableGCSnapshotChecker::Instance();
  2854. }
  2855. if (*snapshot_checker_ptr != nullptr) {
  2856. // If snapshot_checker is used, that means the flush/compaction may
  2857. // contain values not visible to snapshot taken after
  2858. // flush/compaction job starts. Take a snapshot and it will appear
  2859. // in snapshot_seqs and force compaction iterator to consider such
  2860. // snapshots.
  2861. const Snapshot* job_snapshot =
  2862. GetSnapshotImpl(false /*write_conflict_boundary*/, false /*lock*/);
  2863. job_context->job_snapshot.reset(new ManagedSnapshot(this, job_snapshot));
  2864. }
  2865. *snapshot_seqs = snapshots_.GetAll(earliest_write_conflict_snapshot);
  2866. }
  2867. } // namespace ROCKSDB_NAMESPACE