db_impl_secondary.cc 63 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #include "db/db_impl/db_impl_secondary.h"
  6. #include <cinttypes>
  7. #include "db/arena_wrapped_db_iter.h"
  8. #include "db/log_reader.h"
  9. #include "db/log_writer.h"
  10. #include "db/merge_context.h"
  11. #include "db/version_edit.h"
  12. #include "file/filename.h"
  13. #include "file/writable_file_writer.h"
  14. #include "logging/auto_roll_logger.h"
  15. #include "logging/logging.h"
  16. #include "monitoring/perf_context_imp.h"
  17. #include "rocksdb/convenience.h"
  18. #include "rocksdb/utilities/options_util.h"
  19. #include "util/cast_util.h"
  20. #include "util/write_batch_util.h"
  21. namespace ROCKSDB_NAMESPACE {
  22. DBImplSecondary::DBImplSecondary(const DBOptions& db_options,
  23. const std::string& dbname,
  24. std::string secondary_path)
  25. : DBImpl(db_options, dbname, false, true, true),
  26. secondary_path_(std::move(secondary_path)) {
  27. ROCKS_LOG_INFO(immutable_db_options_.info_log,
  28. "Opening the db in secondary mode");
  29. LogFlush(immutable_db_options_.info_log);
  30. }
  31. DBImplSecondary::~DBImplSecondary() = default;
  32. Status DBImplSecondary::Recover(
  33. const std::vector<ColumnFamilyDescriptor>& column_families,
  34. bool /*readonly*/, bool /*error_if_wal_file_exists*/,
  35. bool /*error_if_data_exists_in_wals*/, bool /*is_retry*/, uint64_t*,
  36. RecoveryContext* /*recovery_ctx*/, bool* /*can_retry*/) {
  37. mutex_.AssertHeld();
  38. JobContext job_context(0);
  39. Status s;
  40. s = static_cast<ReactiveVersionSet*>(versions_.get())
  41. ->Recover(column_families, &manifest_reader_, &manifest_reporter_,
  42. &manifest_reader_status_);
  43. if (!s.ok()) {
  44. if (manifest_reader_status_) {
  45. manifest_reader_status_->PermitUncheckedError();
  46. }
  47. return s;
  48. }
  49. // Initial max_total_in_memory_state_ before recovery logs.
  50. max_total_in_memory_state_ = 0;
  51. for (auto cfd : *versions_->GetColumnFamilySet()) {
  52. const auto& mutable_cf_options = cfd->GetLatestMutableCFOptions();
  53. max_total_in_memory_state_ += mutable_cf_options.write_buffer_size *
  54. mutable_cf_options.max_write_buffer_number;
  55. }
  56. if (s.ok()) {
  57. default_cf_handle_ = new ColumnFamilyHandleImpl(
  58. versions_->GetColumnFamilySet()->GetDefault(), this, &mutex_);
  59. default_cf_internal_stats_ = default_cf_handle_->cfd()->internal_stats();
  60. std::unordered_set<ColumnFamilyData*> cfds_changed;
  61. s = FindAndRecoverLogFiles(&cfds_changed, &job_context);
  62. }
  63. if (s.IsPathNotFound()) {
  64. ROCKS_LOG_INFO(immutable_db_options_.info_log,
  65. "Secondary tries to read WAL, but WAL file(s) have already "
  66. "been purged by primary.");
  67. s = Status::OK();
  68. }
  69. // TODO: update options_file_number_ needed?
  70. job_context.Clean();
  71. return s;
  72. }
  73. // find new WAL and apply them in order to the secondary instance
  74. Status DBImplSecondary::FindAndRecoverLogFiles(
  75. std::unordered_set<ColumnFamilyData*>* cfds_changed,
  76. JobContext* job_context) {
  77. assert(nullptr != cfds_changed);
  78. assert(nullptr != job_context);
  79. Status s;
  80. std::vector<uint64_t> logs;
  81. s = FindNewLogNumbers(&logs);
  82. if (s.ok() && !logs.empty()) {
  83. SequenceNumber next_sequence(kMaxSequenceNumber);
  84. s = RecoverLogFiles(logs, &next_sequence, cfds_changed, job_context);
  85. }
  86. return s;
  87. }
  88. // List wal_dir and find all new WALs, return these log numbers
  89. Status DBImplSecondary::FindNewLogNumbers(std::vector<uint64_t>* logs) {
  90. assert(logs != nullptr);
  91. std::vector<std::string> filenames;
  92. Status s;
  93. IOOptions io_opts;
  94. io_opts.do_not_recurse = true;
  95. s = immutable_db_options_.fs->GetChildren(immutable_db_options_.GetWalDir(),
  96. io_opts, &filenames,
  97. /*IODebugContext*=*/nullptr);
  98. if (s.IsNotFound()) {
  99. return Status::InvalidArgument("Failed to open wal_dir",
  100. immutable_db_options_.GetWalDir());
  101. } else if (!s.ok()) {
  102. return s;
  103. }
  104. // if log_readers_ is non-empty, it means we have applied all logs with log
  105. // numbers smaller than the smallest log in log_readers_, so there is no
  106. // need to pass these logs to RecoverLogFiles
  107. uint64_t log_number_min = 0;
  108. if (!log_readers_.empty()) {
  109. log_number_min = log_readers_.begin()->first;
  110. }
  111. for (size_t i = 0; i < filenames.size(); i++) {
  112. uint64_t number;
  113. FileType type;
  114. if (ParseFileName(filenames[i], &number, &type) && type == kWalFile &&
  115. number >= log_number_min) {
  116. logs->push_back(number);
  117. }
  118. }
  119. // Recover logs in the order that they were generated
  120. if (!logs->empty()) {
  121. std::sort(logs->begin(), logs->end());
  122. }
  123. return s;
  124. }
  125. Status DBImplSecondary::MaybeInitLogReader(
  126. uint64_t log_number, log::FragmentBufferedReader** log_reader) {
  127. auto iter = log_readers_.find(log_number);
  128. // make sure the log file is still present
  129. if (iter == log_readers_.end() ||
  130. iter->second->reader_->GetLogNumber() != log_number) {
  131. // delete the obsolete log reader if log number mismatch
  132. if (iter != log_readers_.end()) {
  133. log_readers_.erase(iter);
  134. }
  135. // initialize log reader from log_number
  136. // TODO: min_log_number_to_keep_2pc check needed?
  137. // Open the log file
  138. std::string fname =
  139. LogFileName(immutable_db_options_.GetWalDir(), log_number);
  140. ROCKS_LOG_INFO(immutable_db_options_.info_log,
  141. "Recovering log #%" PRIu64 " mode %d", log_number,
  142. static_cast<int>(immutable_db_options_.wal_recovery_mode));
  143. std::unique_ptr<SequentialFileReader> file_reader;
  144. {
  145. std::unique_ptr<FSSequentialFile> file;
  146. Status status = fs_->NewSequentialFile(
  147. fname, fs_->OptimizeForLogRead(file_options_), &file, nullptr);
  148. if (!status.ok()) {
  149. *log_reader = nullptr;
  150. return status;
  151. }
  152. file_reader.reset(new SequentialFileReader(
  153. std::move(file), fname, immutable_db_options_.log_readahead_size,
  154. io_tracer_));
  155. }
  156. // Create the log reader.
  157. LogReaderContainer* log_reader_container = new LogReaderContainer(
  158. env_, immutable_db_options_.info_log, std::move(fname),
  159. std::move(file_reader), log_number);
  160. log_readers_.insert(std::make_pair(
  161. log_number, std::unique_ptr<LogReaderContainer>(log_reader_container)));
  162. }
  163. iter = log_readers_.find(log_number);
  164. assert(iter != log_readers_.end());
  165. *log_reader = iter->second->reader_;
  166. return Status::OK();
  167. }
  168. // After manifest recovery, replay WALs and refresh log_readers_ if necessary
  169. // REQUIRES: log_numbers are sorted in ascending order
  170. Status DBImplSecondary::RecoverLogFiles(
  171. const std::vector<uint64_t>& log_numbers, SequenceNumber* next_sequence,
  172. std::unordered_set<ColumnFamilyData*>* cfds_changed,
  173. JobContext* job_context) {
  174. assert(nullptr != cfds_changed);
  175. assert(nullptr != job_context);
  176. mutex_.AssertHeld();
  177. Status status;
  178. for (auto log_number : log_numbers) {
  179. log::FragmentBufferedReader* reader = nullptr;
  180. status = MaybeInitLogReader(log_number, &reader);
  181. if (!status.ok()) {
  182. return status;
  183. }
  184. assert(reader != nullptr);
  185. }
  186. const UnorderedMap<uint32_t, size_t>& running_ts_sz =
  187. versions_->GetRunningColumnFamiliesTimestampSize();
  188. for (auto log_number : log_numbers) {
  189. auto it = log_readers_.find(log_number);
  190. assert(it != log_readers_.end());
  191. log::FragmentBufferedReader* reader = it->second->reader_;
  192. Status* wal_read_status = it->second->status_;
  193. assert(wal_read_status);
  194. // Manually update the file number allocation counter in VersionSet.
  195. versions_->MarkFileNumberUsed(log_number);
  196. // Determine if we should tolerate incomplete records at the tail end of the
  197. // Read all the records and add to a memtable
  198. std::string scratch;
  199. Slice record;
  200. WriteBatch batch;
  201. while (reader->ReadRecord(&record, &scratch,
  202. immutable_db_options_.wal_recovery_mode) &&
  203. wal_read_status->ok() && status.ok()) {
  204. if (record.size() < WriteBatchInternal::kHeader) {
  205. reader->GetReporter()->Corruption(
  206. record.size(), Status::Corruption("log record too small"));
  207. continue;
  208. }
  209. status = WriteBatchInternal::SetContents(&batch, record);
  210. if (!status.ok()) {
  211. break;
  212. }
  213. const UnorderedMap<uint32_t, size_t>& record_ts_sz =
  214. reader->GetRecordedTimestampSize();
  215. status = HandleWriteBatchTimestampSizeDifference(
  216. &batch, running_ts_sz, record_ts_sz,
  217. TimestampSizeConsistencyMode::kVerifyConsistency, seq_per_batch_,
  218. batch_per_txn_);
  219. if (!status.ok()) {
  220. break;
  221. }
  222. SequenceNumber seq_of_batch = WriteBatchInternal::Sequence(&batch);
  223. std::vector<uint32_t> column_family_ids;
  224. status = CollectColumnFamilyIdsFromWriteBatch(batch, &column_family_ids);
  225. if (status.ok()) {
  226. for (const auto id : column_family_ids) {
  227. ColumnFamilyData* cfd =
  228. versions_->GetColumnFamilySet()->GetColumnFamily(id);
  229. if (cfd == nullptr) {
  230. continue;
  231. }
  232. cfds_changed->insert(cfd);
  233. const std::vector<FileMetaData*>& l0_files =
  234. cfd->current()->storage_info()->LevelFiles(0);
  235. SequenceNumber seq =
  236. l0_files.empty() ? 0 : l0_files.back()->fd.largest_seqno;
  237. // If the write batch's sequence number is smaller than the last
  238. // sequence number of the largest sequence persisted for this column
  239. // family, then its data must reside in an SST that has already been
  240. // added in the prior MANIFEST replay.
  241. if (seq_of_batch <= seq) {
  242. continue;
  243. }
  244. auto curr_log_num = std::numeric_limits<uint64_t>::max();
  245. if (cfd_to_current_log_.count(cfd) > 0) {
  246. curr_log_num = cfd_to_current_log_[cfd];
  247. }
  248. // If the active memtable contains records added by replaying an
  249. // earlier WAL, then we need to seal the memtable, add it to the
  250. // immutable memtable list and create a new active memtable.
  251. if (!cfd->mem()->IsEmpty() &&
  252. (curr_log_num == std::numeric_limits<uint64_t>::max() ||
  253. curr_log_num != log_number)) {
  254. MemTable* new_mem = cfd->ConstructNewMemtable(
  255. cfd->GetLatestMutableCFOptions(), seq_of_batch);
  256. cfd->mem()->SetNextLogNumber(log_number);
  257. cfd->mem()->ConstructFragmentedRangeTombstones();
  258. cfd->imm()->Add(cfd->mem(), &job_context->memtables_to_free);
  259. new_mem->Ref();
  260. cfd->SetMemtable(new_mem);
  261. }
  262. }
  263. bool has_valid_writes = false;
  264. status = WriteBatchInternal::InsertInto(
  265. &batch, column_family_memtables_.get(),
  266. nullptr /* flush_scheduler */, nullptr /* trim_history_scheduler*/,
  267. true, log_number, this, false /* concurrent_memtable_writes */,
  268. next_sequence, &has_valid_writes, seq_per_batch_, batch_per_txn_);
  269. }
  270. // If column family was not found, it might mean that the WAL write
  271. // batch references to the column family that was dropped after the
  272. // insert. We don't want to fail the whole write batch in that case --
  273. // we just ignore the update.
  274. // That's why we set ignore missing column families to true
  275. // passing null flush_scheduler will disable memtable flushing which is
  276. // needed for secondary instances
  277. if (status.ok()) {
  278. for (const auto id : column_family_ids) {
  279. ColumnFamilyData* cfd =
  280. versions_->GetColumnFamilySet()->GetColumnFamily(id);
  281. if (cfd == nullptr) {
  282. continue;
  283. }
  284. std::unordered_map<ColumnFamilyData*, uint64_t>::iterator iter =
  285. cfd_to_current_log_.find(cfd);
  286. if (iter == cfd_to_current_log_.end()) {
  287. cfd_to_current_log_.insert({cfd, log_number});
  288. } else if (log_number > iter->second) {
  289. iter->second = log_number;
  290. }
  291. }
  292. auto last_sequence = *next_sequence - 1;
  293. if ((*next_sequence != kMaxSequenceNumber) &&
  294. (versions_->LastSequence() <= last_sequence)) {
  295. versions_->SetLastAllocatedSequence(last_sequence);
  296. versions_->SetLastPublishedSequence(last_sequence);
  297. versions_->SetLastSequence(last_sequence);
  298. }
  299. } else {
  300. // We are treating this as a failure while reading since we read valid
  301. // blocks that do not form coherent data
  302. reader->GetReporter()->Corruption(record.size(), status);
  303. }
  304. }
  305. if (status.ok() && !wal_read_status->ok()) {
  306. status = *wal_read_status;
  307. }
  308. if (!status.ok()) {
  309. wal_read_status->PermitUncheckedError();
  310. return status;
  311. }
  312. }
  313. // remove logreaders from map after successfully recovering the WAL
  314. if (log_readers_.size() > 1) {
  315. auto erase_iter = log_readers_.begin();
  316. std::advance(erase_iter, log_readers_.size() - 1);
  317. log_readers_.erase(log_readers_.begin(), erase_iter);
  318. }
  319. return status;
  320. }
  321. Status DBImplSecondary::GetImpl(const ReadOptions& read_options,
  322. const Slice& key,
  323. GetImplOptions& get_impl_options) {
  324. assert(get_impl_options.value != nullptr ||
  325. get_impl_options.columns != nullptr ||
  326. get_impl_options.merge_operands != nullptr);
  327. assert(get_impl_options.column_family);
  328. Status s;
  329. if (read_options.timestamp) {
  330. s = FailIfTsMismatchCf(get_impl_options.column_family,
  331. *(read_options.timestamp));
  332. if (!s.ok()) {
  333. return s;
  334. }
  335. } else {
  336. s = FailIfCfHasTs(get_impl_options.column_family);
  337. if (!s.ok()) {
  338. return s;
  339. }
  340. }
  341. // Clear the timestamps for returning results so that we can distinguish
  342. // between tombstone or key that has never been written
  343. if (get_impl_options.timestamp) {
  344. get_impl_options.timestamp->clear();
  345. }
  346. PERF_CPU_TIMER_GUARD(get_cpu_nanos, immutable_db_options_.clock);
  347. StopWatch sw(immutable_db_options_.clock, stats_, DB_GET);
  348. PERF_TIMER_GUARD(get_snapshot_time);
  349. const Comparator* ucmp = get_impl_options.column_family->GetComparator();
  350. assert(ucmp);
  351. std::string* ts =
  352. ucmp->timestamp_size() > 0 ? get_impl_options.timestamp : nullptr;
  353. SequenceNumber snapshot = versions_->LastSequence();
  354. GetWithTimestampReadCallback read_cb(snapshot);
  355. auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(
  356. get_impl_options.column_family);
  357. auto cfd = cfh->cfd();
  358. if (tracer_) {
  359. InstrumentedMutexLock lock(&trace_mutex_);
  360. if (tracer_) {
  361. tracer_->Get(get_impl_options.column_family, key);
  362. }
  363. }
  364. // Acquire SuperVersion
  365. SuperVersion* super_version = GetAndRefSuperVersion(cfd);
  366. if (read_options.timestamp && read_options.timestamp->size() > 0) {
  367. s = FailIfReadCollapsedHistory(cfd, super_version,
  368. *(read_options.timestamp));
  369. if (!s.ok()) {
  370. ReturnAndCleanupSuperVersion(cfd, super_version);
  371. return s;
  372. }
  373. }
  374. MergeContext merge_context;
  375. // TODO - Large Result Optimization for Secondary DB
  376. // (https://github.com/facebook/rocksdb/pull/10458)
  377. SequenceNumber max_covering_tombstone_seq = 0;
  378. LookupKey lkey(key, snapshot, read_options.timestamp);
  379. PERF_TIMER_STOP(get_snapshot_time);
  380. bool done = false;
  381. // Look up starts here
  382. if (get_impl_options.get_value) {
  383. if (super_version->mem->Get(
  384. lkey,
  385. get_impl_options.value ? get_impl_options.value->GetSelf()
  386. : nullptr,
  387. get_impl_options.columns, ts, &s, &merge_context,
  388. &max_covering_tombstone_seq, read_options,
  389. false /* immutable_memtable */, &read_cb,
  390. /*is_blob_index=*/nullptr, /*do_merge=*/true)) {
  391. done = true;
  392. if (get_impl_options.value) {
  393. get_impl_options.value->PinSelf();
  394. }
  395. RecordTick(stats_, MEMTABLE_HIT);
  396. } else if ((s.ok() || s.IsMergeInProgress()) &&
  397. super_version->imm->Get(
  398. lkey,
  399. get_impl_options.value ? get_impl_options.value->GetSelf()
  400. : nullptr,
  401. get_impl_options.columns, ts, &s, &merge_context,
  402. &max_covering_tombstone_seq, read_options, &read_cb)) {
  403. done = true;
  404. if (get_impl_options.value) {
  405. get_impl_options.value->PinSelf();
  406. }
  407. RecordTick(stats_, MEMTABLE_HIT);
  408. }
  409. } else {
  410. // GetMergeOperands
  411. if (super_version->mem->Get(
  412. lkey,
  413. get_impl_options.value ? get_impl_options.value->GetSelf()
  414. : nullptr,
  415. get_impl_options.columns, ts, &s, &merge_context,
  416. &max_covering_tombstone_seq, read_options,
  417. false /* immutable_memtable */, &read_cb,
  418. /*is_blob_index=*/nullptr, /*do_merge=*/false)) {
  419. done = true;
  420. RecordTick(stats_, MEMTABLE_HIT);
  421. } else if ((s.ok() || s.IsMergeInProgress()) &&
  422. super_version->imm->GetMergeOperands(lkey, &s, &merge_context,
  423. &max_covering_tombstone_seq,
  424. read_options)) {
  425. done = true;
  426. RecordTick(stats_, MEMTABLE_HIT);
  427. }
  428. }
  429. if (!s.ok() && !s.IsMergeInProgress() && !s.IsNotFound()) {
  430. assert(done);
  431. ReturnAndCleanupSuperVersion(cfd, super_version);
  432. return s;
  433. }
  434. if (!done) {
  435. PERF_TIMER_GUARD(get_from_output_files_time);
  436. PinnedIteratorsManager pinned_iters_mgr;
  437. super_version->current->Get(
  438. read_options, lkey, get_impl_options.value, get_impl_options.columns,
  439. ts, &s, &merge_context, &max_covering_tombstone_seq, &pinned_iters_mgr,
  440. /*value_found*/ nullptr,
  441. /*key_exists*/ nullptr, /*seq*/ nullptr, &read_cb, /*is_blob*/ nullptr,
  442. /*do_merge*/ true);
  443. RecordTick(stats_, MEMTABLE_MISS);
  444. }
  445. {
  446. PERF_TIMER_GUARD(get_post_process_time);
  447. ReturnAndCleanupSuperVersion(cfd, super_version);
  448. RecordTick(stats_, NUMBER_KEYS_READ);
  449. size_t size = 0;
  450. if (get_impl_options.value) {
  451. size = get_impl_options.value->size();
  452. } else if (get_impl_options.columns) {
  453. size = get_impl_options.columns->serialized_size();
  454. } else if (get_impl_options.merge_operands) {
  455. *get_impl_options.number_of_operands =
  456. static_cast<int>(merge_context.GetNumOperands());
  457. for (const Slice& sl : merge_context.GetOperands()) {
  458. size += sl.size();
  459. get_impl_options.merge_operands->PinSelf(sl);
  460. get_impl_options.merge_operands++;
  461. }
  462. }
  463. RecordTick(stats_, BYTES_READ, size);
  464. RecordTimeToHistogram(stats_, BYTES_PER_READ, size);
  465. PERF_COUNTER_ADD(get_read_bytes, size);
  466. }
  467. return s;
  468. }
  469. Iterator* DBImplSecondary::NewIterator(const ReadOptions& _read_options,
  470. ColumnFamilyHandle* column_family) {
  471. if (_read_options.io_activity != Env::IOActivity::kUnknown &&
  472. _read_options.io_activity != Env::IOActivity::kDBIterator) {
  473. return NewErrorIterator(Status::InvalidArgument(
  474. "Can only call NewIterator with `ReadOptions::io_activity` is "
  475. "`Env::IOActivity::kUnknown` or `Env::IOActivity::kDBIterator`"));
  476. }
  477. ReadOptions read_options(_read_options);
  478. if (read_options.io_activity == Env::IOActivity::kUnknown) {
  479. read_options.io_activity = Env::IOActivity::kDBIterator;
  480. }
  481. if (read_options.managed) {
  482. return NewErrorIterator(
  483. Status::NotSupported("Managed iterator is not supported anymore."));
  484. }
  485. if (read_options.read_tier == kPersistedTier) {
  486. return NewErrorIterator(Status::NotSupported(
  487. "ReadTier::kPersistedData is not yet supported in iterators."));
  488. }
  489. assert(column_family);
  490. if (read_options.timestamp) {
  491. const Status s =
  492. FailIfTsMismatchCf(column_family, *(read_options.timestamp));
  493. if (!s.ok()) {
  494. return NewErrorIterator(s);
  495. }
  496. } else {
  497. const Status s = FailIfCfHasTs(column_family);
  498. if (!s.ok()) {
  499. return NewErrorIterator(s);
  500. }
  501. }
  502. Iterator* result = nullptr;
  503. auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
  504. assert(cfh != nullptr);
  505. auto cfd = cfh->cfd();
  506. if (read_options.tailing) {
  507. return NewErrorIterator(Status::NotSupported(
  508. "tailing iterator not supported in secondary mode"));
  509. } else if (read_options.snapshot != nullptr) {
  510. // TODO (yanqin) support snapshot.
  511. return NewErrorIterator(
  512. Status::NotSupported("snapshot not supported in secondary mode"));
  513. } else {
  514. SequenceNumber snapshot(kMaxSequenceNumber);
  515. SuperVersion* sv = cfd->GetReferencedSuperVersion(this);
  516. if (read_options.timestamp && read_options.timestamp->size() > 0) {
  517. const Status s =
  518. FailIfReadCollapsedHistory(cfd, sv, *(read_options.timestamp));
  519. if (!s.ok()) {
  520. CleanupSuperVersion(sv);
  521. return NewErrorIterator(s);
  522. }
  523. }
  524. result = NewIteratorImpl(read_options, cfh, sv, snapshot,
  525. nullptr /*read_callback*/);
  526. }
  527. return result;
  528. }
  529. ArenaWrappedDBIter* DBImplSecondary::NewIteratorImpl(
  530. const ReadOptions& read_options, ColumnFamilyHandleImpl* cfh,
  531. SuperVersion* super_version, SequenceNumber snapshot,
  532. ReadCallback* read_callback, bool expose_blob_index, bool allow_refresh) {
  533. assert(nullptr != cfh);
  534. assert(snapshot == kMaxSequenceNumber);
  535. snapshot = versions_->LastSequence();
  536. assert(snapshot != kMaxSequenceNumber);
  537. return NewArenaWrappedDbIterator(env_, read_options, cfh, super_version,
  538. snapshot, read_callback, this,
  539. expose_blob_index, allow_refresh,
  540. /*allow_mark_memtable_for_flush=*/false);
  541. }
  542. Status DBImplSecondary::NewIterators(
  543. const ReadOptions& _read_options,
  544. const std::vector<ColumnFamilyHandle*>& column_families,
  545. std::vector<Iterator*>* iterators) {
  546. if (_read_options.io_activity != Env::IOActivity::kUnknown &&
  547. _read_options.io_activity != Env::IOActivity::kDBIterator) {
  548. return Status::InvalidArgument(
  549. "Can only call NewIterators with `ReadOptions::io_activity` is "
  550. "`Env::IOActivity::kUnknown` or `Env::IOActivity::kDBIterator`");
  551. }
  552. ReadOptions read_options(_read_options);
  553. if (read_options.io_activity == Env::IOActivity::kUnknown) {
  554. read_options.io_activity = Env::IOActivity::kDBIterator;
  555. }
  556. if (read_options.managed) {
  557. return Status::NotSupported("Managed iterator is not supported anymore.");
  558. }
  559. if (read_options.read_tier == kPersistedTier) {
  560. return Status::NotSupported(
  561. "ReadTier::kPersistedData is not yet supported in iterators.");
  562. }
  563. ReadCallback* read_callback = nullptr; // No read callback provided.
  564. if (iterators == nullptr) {
  565. return Status::InvalidArgument("iterators not allowed to be nullptr");
  566. }
  567. if (read_options.timestamp) {
  568. for (auto* cf : column_families) {
  569. assert(cf);
  570. const Status s = FailIfTsMismatchCf(cf, *(read_options.timestamp));
  571. if (!s.ok()) {
  572. return s;
  573. }
  574. }
  575. } else {
  576. for (auto* cf : column_families) {
  577. assert(cf);
  578. const Status s = FailIfCfHasTs(cf);
  579. if (!s.ok()) {
  580. return s;
  581. }
  582. }
  583. }
  584. iterators->clear();
  585. iterators->reserve(column_families.size());
  586. if (read_options.tailing) {
  587. return Status::NotSupported(
  588. "tailing iterator not supported in secondary mode");
  589. } else if (read_options.snapshot != nullptr) {
  590. // TODO (yanqin) support snapshot.
  591. return Status::NotSupported("snapshot not supported in secondary mode");
  592. } else {
  593. SequenceNumber read_seq(kMaxSequenceNumber);
  594. autovector<std::tuple<ColumnFamilyHandleImpl*, SuperVersion*>> cfh_to_sv;
  595. const bool check_read_ts =
  596. read_options.timestamp && read_options.timestamp->size() > 0;
  597. for (auto cf : column_families) {
  598. auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(cf);
  599. auto cfd = cfh->cfd();
  600. SuperVersion* sv = cfd->GetReferencedSuperVersion(this);
  601. cfh_to_sv.emplace_back(cfh, sv);
  602. if (check_read_ts) {
  603. const Status s =
  604. FailIfReadCollapsedHistory(cfd, sv, *(read_options.timestamp));
  605. if (!s.ok()) {
  606. for (auto prev_entry : cfh_to_sv) {
  607. CleanupSuperVersion(std::get<1>(prev_entry));
  608. }
  609. return s;
  610. }
  611. }
  612. }
  613. assert(cfh_to_sv.size() == column_families.size());
  614. for (auto [cfh, sv] : cfh_to_sv) {
  615. iterators->push_back(
  616. NewIteratorImpl(read_options, cfh, sv, read_seq, read_callback));
  617. }
  618. }
  619. return Status::OK();
  620. }
  621. Status DBImplSecondary::TryCatchUpWithPrimary() {
  622. assert(versions_.get() != nullptr);
  623. Status s;
  624. // read the manifest and apply new changes to the secondary instance
  625. std::unordered_set<ColumnFamilyData*> cfds_changed;
  626. JobContext job_context(0, true /*create_superversion*/);
  627. {
  628. InstrumentedMutexLock lock_guard(&mutex_);
  629. assert(manifest_reader_.get() != nullptr);
  630. s = static_cast_with_check<ReactiveVersionSet>(versions_.get())
  631. ->ReadAndApply(&mutex_, &manifest_reader_,
  632. manifest_reader_status_.get(), &cfds_changed,
  633. /*files_to_delete=*/nullptr);
  634. ROCKS_LOG_INFO(immutable_db_options_.info_log, "Last sequence is %" PRIu64,
  635. static_cast<uint64_t>(versions_->LastSequence()));
  636. for (ColumnFamilyData* cfd : cfds_changed) {
  637. if (cfd->IsDropped()) {
  638. ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "[%s] is dropped\n",
  639. cfd->GetName().c_str());
  640. continue;
  641. }
  642. VersionStorageInfo::LevelSummaryStorage tmp;
  643. ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
  644. "[%s] Level summary: %s\n", cfd->GetName().c_str(),
  645. cfd->current()->storage_info()->LevelSummary(&tmp));
  646. }
  647. // list wal_dir to discover new WALs and apply new changes to the secondary
  648. // instance
  649. if (s.ok()) {
  650. s = FindAndRecoverLogFiles(&cfds_changed, &job_context);
  651. if (s.IsPathNotFound()) {
  652. ROCKS_LOG_INFO(
  653. immutable_db_options_.info_log,
  654. "Secondary tries to read WAL, but WAL file(s) have already "
  655. "been purged by primary.");
  656. s = Status::OK();
  657. }
  658. }
  659. if (s.ok()) {
  660. for (auto cfd : cfds_changed) {
  661. cfd->imm()->RemoveOldMemTables(cfd->GetLogNumber(),
  662. &job_context.memtables_to_free);
  663. auto& sv_context = job_context.superversion_contexts.back();
  664. cfd->InstallSuperVersion(&sv_context, &mutex_);
  665. sv_context.NewSuperVersion();
  666. }
  667. }
  668. }
  669. job_context.Clean();
  670. // Cleanup unused, obsolete files.
  671. JobContext purge_files_job_context(0);
  672. {
  673. InstrumentedMutexLock lock_guard(&mutex_);
  674. // Currently, secondary instance does not own the database files, thus it
  675. // is unnecessary for the secondary to force full scan.
  676. FindObsoleteFiles(&purge_files_job_context, /*force=*/false);
  677. }
  678. if (purge_files_job_context.HaveSomethingToDelete()) {
  679. PurgeObsoleteFiles(purge_files_job_context);
  680. }
  681. purge_files_job_context.Clean();
  682. return s;
  683. }
  684. Status DB::OpenAsSecondary(const Options& options, const std::string& dbname,
  685. const std::string& secondary_path,
  686. std::unique_ptr<DB>* dbptr) {
  687. *dbptr = nullptr;
  688. DBOptions db_options(options);
  689. ColumnFamilyOptions cf_options(options);
  690. std::vector<ColumnFamilyDescriptor> column_families;
  691. column_families.emplace_back(kDefaultColumnFamilyName, cf_options);
  692. std::vector<ColumnFamilyHandle*> handles;
  693. Status s = DB::OpenAsSecondary(db_options, dbname, secondary_path,
  694. column_families, &handles, dbptr);
  695. if (s.ok()) {
  696. assert(handles.size() == 1);
  697. delete handles[0];
  698. }
  699. return s;
  700. }
  701. Status DB::OpenAsSecondary(
  702. const DBOptions& db_options, const std::string& dbname,
  703. const std::string& secondary_path,
  704. const std::vector<ColumnFamilyDescriptor>& column_families,
  705. std::vector<ColumnFamilyHandle*>* handles, std::unique_ptr<DB>* dbptr) {
  706. *dbptr = nullptr;
  707. DBOptions tmp_opts(db_options);
  708. Status s;
  709. if (nullptr == tmp_opts.info_log) {
  710. s = CreateLoggerFromOptions(secondary_path, tmp_opts, &tmp_opts.info_log);
  711. if (!s.ok()) {
  712. tmp_opts.info_log = nullptr;
  713. return s;
  714. }
  715. }
  716. assert(tmp_opts.info_log != nullptr);
  717. if (db_options.max_open_files != -1) {
  718. std::ostringstream oss;
  719. oss << "The primary instance may delete all types of files after they "
  720. "become obsolete. The application can coordinate the primary and "
  721. "secondary so that primary does not delete/rename files that are "
  722. "currently being used by the secondary. Alternatively, a custom "
  723. "Env/FS can be provided such that files become inaccessible only "
  724. "after all primary and secondaries indicate that they are obsolete "
  725. "and deleted. If the above two are not possible, you can open the "
  726. "secondary instance with `max_open_files==-1` so that secondary "
  727. "will eagerly keep all table files open. Even if a file is deleted, "
  728. "its content can still be accessed via a prior open file "
  729. "descriptor. This is a hacky workaround for only table files. If "
  730. "none of the above is done, then point lookup or "
  731. "range scan via the secondary instance can result in IOError: file "
  732. "not found. This can be resolved by retrying "
  733. "TryCatchUpWithPrimary().";
  734. ROCKS_LOG_WARN(tmp_opts.info_log, "%s", oss.str().c_str());
  735. }
  736. handles->clear();
  737. DBImplSecondary* impl = new DBImplSecondary(tmp_opts, dbname, secondary_path);
  738. impl->versions_.reset(new ReactiveVersionSet(
  739. dbname, &impl->immutable_db_options_, impl->file_options_,
  740. impl->table_cache_.get(), impl->write_buffer_manager_,
  741. &impl->write_controller_, impl->io_tracer_));
  742. impl->column_family_memtables_.reset(
  743. new ColumnFamilyMemTablesImpl(impl->versions_->GetColumnFamilySet()));
  744. impl->wal_in_db_path_ = impl->immutable_db_options_.IsWalDirSameAsDBPath();
  745. impl->mutex_.Lock();
  746. s = impl->Recover(column_families, true, false, false);
  747. if (s.ok()) {
  748. for (const auto& cf : column_families) {
  749. auto cfd =
  750. impl->versions_->GetColumnFamilySet()->GetColumnFamily(cf.name);
  751. if (nullptr == cfd) {
  752. s = Status::InvalidArgument("Column family not found", cf.name);
  753. break;
  754. }
  755. handles->push_back(new ColumnFamilyHandleImpl(cfd, impl, &impl->mutex_));
  756. }
  757. }
  758. SuperVersionContext sv_context(true /* create_superversion */);
  759. if (s.ok()) {
  760. for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
  761. sv_context.NewSuperVersion();
  762. cfd->InstallSuperVersion(&sv_context, &impl->mutex_);
  763. }
  764. }
  765. impl->mutex_.Unlock();
  766. sv_context.Clean();
  767. if (s.ok()) {
  768. dbptr->reset(impl);
  769. for (auto h : *handles) {
  770. impl->NewThreadStatusCfInfo(
  771. static_cast_with_check<ColumnFamilyHandleImpl>(h)->cfd());
  772. }
  773. } else {
  774. for (auto h : *handles) {
  775. delete h;
  776. }
  777. handles->clear();
  778. delete impl;
  779. }
  780. return s;
  781. }
  782. Status DBImplSecondary::ScanCompactionProgressFiles(
  783. CompactionProgressFilesScan* scan_result) {
  784. assert(scan_result != nullptr);
  785. scan_result->Clear();
  786. WriteOptions write_options(Env::IOActivity::kCompaction);
  787. IOOptions opts;
  788. Status s = WritableFileWriter::PrepareIOOptions(write_options, opts);
  789. if (!s.ok()) {
  790. return s;
  791. }
  792. std::vector<std::string> all_filenames;
  793. s = fs_->GetChildren(secondary_path_, opts, &all_filenames, nullptr /* dbg*/);
  794. if (!s.ok()) {
  795. return s;
  796. }
  797. for (const auto& filename : all_filenames) {
  798. if (filename == "." || filename == "..") {
  799. continue;
  800. }
  801. uint64_t number;
  802. FileType type;
  803. if (!ParseFileName(filename, &number, &type)) {
  804. continue;
  805. }
  806. // Categorize compaction progress files
  807. if (type == kCompactionProgressFile) {
  808. if (number > scan_result->latest_progress_timestamp) {
  809. // Found a newer progress file
  810. if (scan_result->HasLatestProgressFile()) {
  811. // Previous "latest" becomes "old"
  812. scan_result->old_progress_filenames.push_back(
  813. scan_result->latest_progress_filename.value());
  814. }
  815. scan_result->latest_progress_timestamp = number;
  816. scan_result->latest_progress_filename = filename;
  817. } else {
  818. // This is an older progress file
  819. scan_result->old_progress_filenames.push_back(filename);
  820. }
  821. } else if (type == kTempFile &&
  822. filename.find(kCompactionProgressFileNamePrefix) == 0) {
  823. // Temporary progress files
  824. scan_result->temp_progress_filenames.push_back(filename);
  825. } else if (type == kTableFile) {
  826. // Collect table file numbers for CleanupPhysicalCompactionOutputFiles
  827. scan_result->table_file_numbers.push_back(number);
  828. }
  829. }
  830. return Status::OK();
  831. }
  832. Status DBImplSecondary::DeleteCompactionProgressFiles(
  833. const std::vector<std::string>& filenames) {
  834. WriteOptions write_options(Env::IOActivity::kCompaction);
  835. IOOptions opts;
  836. Status s = WritableFileWriter::PrepareIOOptions(write_options, opts);
  837. if (!s.ok()) {
  838. return s;
  839. }
  840. for (const auto& filename : filenames) {
  841. std::string file_path = secondary_path_ + "/" + filename;
  842. Status delete_status = fs_->DeleteFile(file_path, opts, nullptr /* dbg */);
  843. if (!delete_status.ok()) {
  844. return delete_status;
  845. }
  846. }
  847. return Status::OK();
  848. }
  849. Status DBImplSecondary::CleanupOldAndTemporaryCompactionProgressFiles(
  850. bool preserve_latest, const CompactionProgressFilesScan& scan_result) {
  851. std::vector<std::string> filenames_to_delete;
  852. // Always delete old progress files
  853. filenames_to_delete.insert(filenames_to_delete.end(),
  854. scan_result.old_progress_filenames.begin(),
  855. scan_result.old_progress_filenames.end());
  856. // Always delete temp files
  857. filenames_to_delete.insert(filenames_to_delete.end(),
  858. scan_result.temp_progress_filenames.begin(),
  859. scan_result.temp_progress_filenames.end());
  860. // Conditionally delete latest file
  861. if (!preserve_latest && scan_result.HasLatestProgressFile()) {
  862. filenames_to_delete.push_back(scan_result.latest_progress_filename.value());
  863. }
  864. return DeleteCompactionProgressFiles(filenames_to_delete);
  865. }
  866. // Loads compaction progress from a file and cleans up extra output
  867. // files. After loading the progress, this function identifies and deletes any
  868. // SST files in the output folder that are NOT tracked in the
  869. // progress. This ensures consistency between the progress file and
  870. // actual output files on disk.
  871. Status DBImplSecondary::LoadCompactionProgressAndCleanupExtraOutputFiles(
  872. const std::string& compaction_progress_file_path,
  873. const CompactionProgressFilesScan& scan_result) {
  874. Status s = ParseCompactionProgressFile(compaction_progress_file_path,
  875. &compaction_progress_);
  876. if (s.ok()) {
  877. s = CleanupPhysicalCompactionOutputFiles(true /* preserve_tracked_files */,
  878. scan_result);
  879. }
  880. return s;
  881. }
  882. Status DBImplSecondary::ParseCompactionProgressFile(
  883. const std::string& compaction_progress_file_path,
  884. CompactionProgress* compaction_progress) {
  885. std::unique_ptr<FSSequentialFile> file;
  886. Status s = fs_->NewSequentialFile(compaction_progress_file_path,
  887. FileOptions(), &file, nullptr /* dbg */);
  888. if (!s.ok()) {
  889. return s;
  890. }
  891. std::unique_ptr<SequentialFileReader> file_reader(new SequentialFileReader(
  892. std::move(file), compaction_progress_file_path,
  893. immutable_db_options_.log_readahead_size, io_tracer_, {} /* listeners */,
  894. immutable_db_options_.rate_limiter.get()));
  895. Status reader_status;
  896. struct CompactionProgressReaderReporter : public log::Reader::Reporter {
  897. Status* status;
  898. explicit CompactionProgressReaderReporter(Status* s) : status(s) {}
  899. void Corruption(size_t /*bytes*/, const Status& s,
  900. uint64_t /*log_number*/) override {
  901. if (status->ok()) {
  902. *status = s;
  903. }
  904. }
  905. void OldLogRecord(size_t /*bytes*/) override {
  906. // Ignore old records
  907. }
  908. } progress_reporter(&reader_status);
  909. log::Reader compaction_progress_reader(
  910. immutable_db_options_.info_log, std::move(file_reader),
  911. &progress_reporter, true /* checksum */, 0 /* log_num */);
  912. // LIMITATION: Only supports resuming single subcompaction
  913. SubcompactionProgressBuilder progress_builder;
  914. Slice slice;
  915. std::string record;
  916. while (compaction_progress_reader.ReadRecord(&slice, &record)) {
  917. if (!reader_status.ok()) {
  918. return reader_status;
  919. }
  920. VersionEdit edit;
  921. s = edit.DecodeFrom(slice);
  922. if (!s.ok()) {
  923. break;
  924. }
  925. bool res = progress_builder.ProcessVersionEdit(edit);
  926. if (!res) {
  927. break;
  928. }
  929. }
  930. if (!s.ok()) {
  931. return s;
  932. }
  933. if (progress_builder.HasAccumulatedSubcompactionProgress()) {
  934. compaction_progress->clear();
  935. compaction_progress->push_back(
  936. progress_builder.GetAccumulatedSubcompactionProgress());
  937. } else {
  938. s = Status::NotFound("No compaction progress was persisted yet");
  939. }
  940. return s;
  941. }
  942. Status DBImplSecondary::RenameCompactionProgressFile(
  943. const std::string& temp_file_path, std::string* final_file_path) {
  944. uint64_t current_time = env_->NowMicros();
  945. *final_file_path = CompactionProgressFileName(secondary_path_, current_time);
  946. WriteOptions write_options(Env::IOActivity::kCompaction);
  947. IOOptions opts;
  948. Status s = WritableFileWriter::PrepareIOOptions(write_options, opts);
  949. if (!s.ok()) {
  950. return s;
  951. }
  952. s = fs_->RenameFile(temp_file_path, *final_file_path, opts,
  953. nullptr /* dbg */);
  954. return s;
  955. }
  956. Status DBImplSecondary::CleanupPhysicalCompactionOutputFiles(
  957. bool preserve_tracked_files,
  958. const CompactionProgressFilesScan& scan_result) {
  959. std::unordered_set<uint64_t> files_to_preserve;
  960. if (preserve_tracked_files) {
  961. for (const auto& subcompaction_progress : compaction_progress_) {
  962. for (const auto& file_metadata :
  963. subcompaction_progress.output_level_progress.GetOutputFiles()) {
  964. files_to_preserve.insert(file_metadata.fd.GetNumber());
  965. }
  966. for (const auto& file_metadata :
  967. subcompaction_progress.proximal_output_level_progress
  968. .GetOutputFiles()) {
  969. files_to_preserve.insert(file_metadata.fd.GetNumber());
  970. }
  971. }
  972. }
  973. WriteOptions write_options(Env::IOActivity::kCompaction);
  974. IOOptions opts;
  975. Status s = WritableFileWriter::PrepareIOOptions(write_options, opts);
  976. if (!s.ok()) {
  977. return s;
  978. }
  979. for (uint64_t file_number : scan_result.table_file_numbers) {
  980. bool should_delete =
  981. !preserve_tracked_files ||
  982. (files_to_preserve.find(file_number) == files_to_preserve.end());
  983. if (should_delete) {
  984. std::string file_path = MakeTableFileName(secondary_path_, file_number);
  985. Status delete_status =
  986. fs_->DeleteFile(file_path, opts, nullptr /* dbg */);
  987. if (!delete_status.ok()) {
  988. return delete_status;
  989. }
  990. }
  991. }
  992. return Status::OK();
  993. }
  994. Status DBImplSecondary::InitializeCompactionWorkspace(
  995. bool allow_resumption, std::unique_ptr<FSDirectory>* output_dir,
  996. std::unique_ptr<log::Writer>* compaction_progress_writer) {
  997. // Create output directory if it doest exist yet
  998. Status s = CreateAndNewDirectory(fs_.get(), secondary_path_, output_dir);
  999. if (!s.ok() || !allow_resumption) {
  1000. return s;
  1001. }
  1002. s = PrepareCompactionProgressState();
  1003. if (!s.ok()) {
  1004. return s;
  1005. }
  1006. s = FinalizeCompactionProgressWriter(compaction_progress_writer);
  1007. if (!s.ok()) {
  1008. return s;
  1009. }
  1010. ROCKS_LOG_INFO(immutable_db_options_.info_log,
  1011. "Initialized compaction workspace with %zu subcompaction "
  1012. "progress to resume",
  1013. compaction_progress_.size());
  1014. return Status::OK();
  1015. }
  1016. // PrepareCompactionProgressState() manages compaction progress files and output
  1017. // files to ensure a clean, consistent state for resuming or starting fresh
  1018. // compaction.
  1019. //
  1020. // PRECONDITION:
  1021. // - This function is ONLY called when allow_resumption = true
  1022. // - The caller wants resumption support for this compaction attempt
  1023. //
  1024. // FILE SYSTEM STATE (before entering this function):
  1025. // - 0 or more compaction progress files may exist in `secondary_path_`:
  1026. // * Latest progress file (from the most recent compaction attempt)
  1027. // * Older progress files (left by crashing during a previous
  1028. // InitializeCompactionWorkspace() call)
  1029. // * Temporary progress files (left by crashing during a previous
  1030. // InitializeCompactionWorkspace() call)
  1031. // - 0 or more compaction output files may exist in `secondary_path_`
  1032. //
  1033. // POSTCONDITIONS (after this function):
  1034. // - IF the latest progress file exists AND it parses successfully AND
  1035. // actually contains valid compaction progress:
  1036. // * Exactly one latest progress file remains
  1037. // * All older and temporary compaction progress files are deleted
  1038. // * All corresponding compaction output files are preserved
  1039. // * All extra compaction output files are deleted (files left by
  1040. // compaction
  1041. // crashing before persisting the progress)
  1042. // * Result: Ready to resume compaction from the saved progress
  1043. // - OTHERWISE (no latest progress file OR it fails to parse OR it's
  1044. // invalid):
  1045. // * ALL compaction progress files are deleted (latest + older +
  1046. // temporary)
  1047. // * ALL compaction output files are deleted
  1048. // * Result: Ready to start fresh compaction (despite allow_resumption =
  1049. // true, we cannot resume because there's no valid progress to resume from)
  1050. //
  1051. // ERROR HANDLING:
  1052. // - ON ERROR (if any of the postconditions cannot be achieved):
  1053. // * Function returns error status
  1054. // * File system may be left in a partially modified state
  1055. // * Caller should manually clean up secondary_path_ before retrying
  1056. // * Subsequent OpenAndCompact() calls to this clean secondary_path_ will
  1057. // effectively start fresh compaction
  1058. Status DBImplSecondary::PrepareCompactionProgressState() {
  1059. Status s;
  1060. // STEP 1: Scan directory ONCE (includes progress files + table files)
  1061. CompactionProgressFilesScan scan_result;
  1062. s = ScanCompactionProgressFiles(&scan_result);
  1063. if (!s.ok()) {
  1064. ROCKS_LOG_ERROR(immutable_db_options_.info_log,
  1065. "Encountered error when scanning for compaction "
  1066. "progress files: %s",
  1067. s.ToString().c_str());
  1068. return s;
  1069. }
  1070. std::optional<std::string> latest_progress_file =
  1071. scan_result.latest_progress_filename;
  1072. // STEP 2: Determine if we should resume
  1073. bool should_resume = false;
  1074. if (latest_progress_file.has_value()) {
  1075. should_resume = true;
  1076. } else {
  1077. ROCKS_LOG_WARN(immutable_db_options_.info_log,
  1078. "Did not find any latest compaction progress file. "
  1079. "Will perform clean up to start fresh compaction");
  1080. }
  1081. // STEP 3: Cleanup using pre-scanned results
  1082. if (should_resume) {
  1083. // Keep latest, delete old/temp
  1084. s = CleanupOldAndTemporaryCompactionProgressFiles(
  1085. true /* preserve_latest */, scan_result);
  1086. } else {
  1087. // Delete everything including latest
  1088. s = CleanupOldAndTemporaryCompactionProgressFiles(
  1089. false /* preserve_latest */, scan_result);
  1090. latest_progress_file.reset();
  1091. }
  1092. if (!s.ok()) {
  1093. ROCKS_LOG_ERROR(immutable_db_options_.info_log,
  1094. "Failed to clean up compaction progress file(s): %s. "
  1095. "Will fail the compaction",
  1096. s.ToString().c_str());
  1097. return s;
  1098. }
  1099. // STEP 4: Load progress if resuming
  1100. if (latest_progress_file.has_value()) {
  1101. uint64_t timestamp = scan_result.latest_progress_timestamp;
  1102. std::string compaction_progress_file_path =
  1103. CompactionProgressFileName(secondary_path_, timestamp);
  1104. s = LoadCompactionProgressAndCleanupExtraOutputFiles(
  1105. compaction_progress_file_path, scan_result);
  1106. if (!s.ok()) {
  1107. ROCKS_LOG_WARN(immutable_db_options_.info_log,
  1108. "Failed to load the latest compaction "
  1109. "progress from %s: %s. Will perform clean up "
  1110. "to start fresh compaction",
  1111. latest_progress_file.value().c_str(),
  1112. s.ToString().c_str());
  1113. return HandleInvalidOrNoCompactionProgress(compaction_progress_file_path,
  1114. scan_result);
  1115. }
  1116. return s;
  1117. } else {
  1118. return HandleInvalidOrNoCompactionProgress(
  1119. std::nullopt /* compaction_progress_file_path */, scan_result);
  1120. }
  1121. }
  1122. uint64_t DBImplSecondary::CalculateResumedCompactionBytes(
  1123. const CompactionProgress& compaction_progress) const {
  1124. uint64_t total_resumed_bytes = 0;
  1125. for (const auto& subcompaction_progress : compaction_progress) {
  1126. for (const auto& file_meta :
  1127. subcompaction_progress.output_level_progress.GetOutputFiles()) {
  1128. total_resumed_bytes += file_meta.fd.file_size;
  1129. }
  1130. for (const auto& file_meta :
  1131. subcompaction_progress.proximal_output_level_progress
  1132. .GetOutputFiles()) {
  1133. total_resumed_bytes += file_meta.fd.file_size;
  1134. }
  1135. }
  1136. return total_resumed_bytes;
  1137. }
  1138. Status DBImplSecondary::HandleInvalidOrNoCompactionProgress(
  1139. const std::optional<std::string>& compaction_progress_file_path,
  1140. const CompactionProgressFilesScan& scan_result) {
  1141. compaction_progress_.clear();
  1142. Status s;
  1143. if (compaction_progress_file_path.has_value()) {
  1144. WriteOptions write_options(Env::IOActivity::kCompaction);
  1145. IOOptions opts;
  1146. s = WritableFileWriter::PrepareIOOptions(write_options, opts);
  1147. if (s.ok()) {
  1148. s = fs_->DeleteFile(compaction_progress_file_path.value(), opts,
  1149. nullptr /* dbg */);
  1150. }
  1151. if (!s.ok()) {
  1152. ROCKS_LOG_ERROR(immutable_db_options_.info_log,
  1153. "Failed to remove invalid progress file: %s",
  1154. s.ToString().c_str());
  1155. return s;
  1156. }
  1157. }
  1158. s = CleanupPhysicalCompactionOutputFiles(false /* preserve_tracked_files */,
  1159. scan_result);
  1160. if (!s.ok()) {
  1161. ROCKS_LOG_ERROR(immutable_db_options_.info_log,
  1162. "Failed to cleanup existing compaction output files: %s",
  1163. s.ToString().c_str());
  1164. return s;
  1165. }
  1166. return Status::OK();
  1167. }
  1168. Status DBImplSecondary::CompactWithoutInstallation(
  1169. const OpenAndCompactOptions& options, ColumnFamilyHandle* cfh,
  1170. const CompactionServiceInput& input, CompactionServiceResult* result) {
  1171. if (options.canceled && options.canceled->load(std::memory_order_acquire)) {
  1172. return Status::Incomplete(Status::SubCode::kManualCompactionPaused);
  1173. }
  1174. std::unique_ptr<FSDirectory> output_dir;
  1175. std::unique_ptr<log::Writer> compaction_progress_writer;
  1176. InstrumentedMutexLock l(&mutex_);
  1177. auto cfd = static_cast_with_check<ColumnFamilyHandleImpl>(cfh)->cfd();
  1178. if (!cfd) {
  1179. return Status::InvalidArgument("Cannot find column family" +
  1180. cfh->GetName());
  1181. }
  1182. Status s;
  1183. // TODO(hx235): Resuming compaction is currently incompatible with
  1184. // paranoid_file_checks=true because OutputValidator hash verification would
  1185. // fail during compaction resumption. Before interruption, resuming
  1186. // compaction needs to persist the hash of each output file to enable
  1187. // validation after resumption. Alternatively and preferably, we could move
  1188. // the output verification to happen immediately after each output file is
  1189. // created. This workaround currently disables resuming compaction when
  1190. // paranoid_file_checks is enabled. Note that paranoid_file_checks is
  1191. // disabled by default.
  1192. bool allow_resumption =
  1193. options.allow_resumption &&
  1194. !cfd->GetLatestMutableCFOptions().paranoid_file_checks;
  1195. if (options.allow_resumption &&
  1196. cfd->GetLatestMutableCFOptions().paranoid_file_checks) {
  1197. ROCKS_LOG_WARN(immutable_db_options_.info_log,
  1198. "Resume compaction configured but disabled due to "
  1199. "incompatible with paranoid_file_checks=true");
  1200. }
  1201. mutex_.Unlock();
  1202. s = InitializeCompactionWorkspace(allow_resumption, &output_dir,
  1203. &compaction_progress_writer);
  1204. mutex_.Lock();
  1205. if (!s.ok()) {
  1206. return s;
  1207. }
  1208. std::unordered_set<uint64_t> input_set;
  1209. for (const auto& file_name : input.input_files) {
  1210. input_set.insert(TableFileNameToNumber(file_name));
  1211. }
  1212. auto* version = cfd->current();
  1213. ColumnFamilyMetaData cf_meta;
  1214. version->GetColumnFamilyMetaData(&cf_meta);
  1215. VersionStorageInfo* vstorage = version->storage_info();
  1216. CompactionOptions comp_options;
  1217. comp_options.compression = kDisableCompressionOption;
  1218. comp_options.output_file_size_limit = MaxFileSizeForLevel(
  1219. cfd->GetLatestMutableCFOptions(), input.output_level,
  1220. cfd->ioptions().compaction_style, vstorage->base_level(),
  1221. cfd->ioptions().level_compaction_dynamic_level_bytes);
  1222. std::vector<CompactionInputFiles> input_files;
  1223. s = cfd->compaction_picker()->GetCompactionInputsFromFileNumbers(
  1224. &input_files, &input_set, vstorage, comp_options);
  1225. if (!s.ok()) {
  1226. ROCKS_LOG_ERROR(
  1227. immutable_db_options_.info_log,
  1228. "GetCompactionInputsFromFileNumbers() failed - %s.\n DebugString: %s",
  1229. s.ToString().c_str(), version->DebugString(/*hex=*/true).c_str());
  1230. return s;
  1231. }
  1232. const int job_id = next_job_id_.fetch_add(1);
  1233. JobContext job_context(job_id, true /*create_superversion*/);
  1234. std::vector<SequenceNumber> snapshots = input.snapshots;
  1235. // TODO - snapshot_checker support in Remote Compaction
  1236. job_context.InitSnapshotContext(/*checker=*/nullptr,
  1237. /*managed_snapshot=*/nullptr,
  1238. kMaxSequenceNumber, std::move(snapshots));
  1239. // TODO - consider serializing the entire Compaction object and using it as
  1240. // input instead of recreating it in the remote worker
  1241. std::unique_ptr<Compaction> c;
  1242. assert(cfd->compaction_picker());
  1243. std::optional<SequenceNumber> earliest_snapshot = std::nullopt;
  1244. // Standalone Range Deletion Optimization is only supported in Universal
  1245. // Compactions - https://github.com/facebook/rocksdb/pull/13078
  1246. if (cfd->GetLatestCFOptions().compaction_style ==
  1247. CompactionStyle::kCompactionStyleUniversal) {
  1248. earliest_snapshot = !job_context.snapshot_seqs.empty()
  1249. ? job_context.snapshot_seqs.front()
  1250. : kMaxSequenceNumber;
  1251. }
  1252. c.reset(cfd->compaction_picker()->PickCompactionForCompactFiles(
  1253. comp_options, input_files, input.output_level, vstorage,
  1254. cfd->GetLatestMutableCFOptions(), mutable_db_options_, 0,
  1255. earliest_snapshot, job_context.snapshot_checker));
  1256. assert(c != nullptr);
  1257. c->FinalizeInputInfo(version);
  1258. LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL,
  1259. immutable_db_options_.info_log.get());
  1260. // use primary host's db_id for running the compaction, but db_session_id is
  1261. // using the local one, which is to make sure the unique id is unique from
  1262. // the remote compactors. Because the id is generated from db_id,
  1263. // db_session_id and orig_file_number, unlike the local compaction, remote
  1264. // compaction cannot guarantee the uniqueness of orig_file_number, the file
  1265. // number is only assigned when compaction is done.
  1266. CompactionServiceCompactionJob compaction_job(
  1267. job_id, c.get(), immutable_db_options_, mutable_db_options_,
  1268. file_options_for_compaction_, versions_.get(), &shutting_down_,
  1269. &log_buffer, output_dir.get(), stats_, &mutex_, &error_handler_,
  1270. &job_context, table_cache_, &event_logger_, dbname_, io_tracer_,
  1271. options.canceled ? *options.canceled : kManualCompactionCanceledFalse_,
  1272. input.db_id, db_session_id_, secondary_path_, input, result);
  1273. compaction_job.Prepare(compaction_progress_,
  1274. compaction_progress_writer.get());
  1275. mutex_.Unlock();
  1276. s = compaction_job.Run();
  1277. mutex_.Lock();
  1278. // These cleanup functions handle metadata and state cleanup only and
  1279. // not the physical files
  1280. compaction_job.io_status().PermitUncheckedError();
  1281. compaction_job.CleanupCompaction();
  1282. c->ReleaseCompactionFiles(s);
  1283. c.reset();
  1284. TEST_SYNC_POINT_CALLBACK("DBImplSecondary::CompactWithoutInstallation::End",
  1285. &s);
  1286. if (!compaction_progress_.empty() && s.ok()) {
  1287. uint64_t total_resumed_bytes =
  1288. CalculateResumedCompactionBytes(compaction_progress_);
  1289. if (total_resumed_bytes > 0 &&
  1290. immutable_db_options_.statistics != nullptr) {
  1291. RecordTick(immutable_db_options_.statistics.get(),
  1292. REMOTE_COMPACT_RESUMED_BYTES, total_resumed_bytes);
  1293. }
  1294. }
  1295. result->status = s;
  1296. return s;
  1297. }
  1298. Status DB::OpenAndCompact(
  1299. const OpenAndCompactOptions& options, const std::string& name,
  1300. const std::string& output_directory, const std::string& input,
  1301. std::string* output,
  1302. const CompactionServiceOptionsOverride& override_options) {
  1303. // Check for cancellation
  1304. if (options.canceled && options.canceled->load(std::memory_order_acquire)) {
  1305. return Status::Incomplete(Status::SubCode::kManualCompactionPaused);
  1306. }
  1307. // 1. Deserialize Compaction Input
  1308. CompactionServiceInput compaction_input;
  1309. Status s = CompactionServiceInput::Read(input, &compaction_input);
  1310. if (!s.ok()) {
  1311. return s;
  1312. }
  1313. // 2. Load the options
  1314. DBOptions base_db_options;
  1315. ConfigOptions config_options;
  1316. config_options.env = override_options.env;
  1317. config_options.ignore_unknown_options = true;
  1318. std::vector<ColumnFamilyDescriptor> all_column_families;
  1319. TEST_SYNC_POINT_CALLBACK(
  1320. "DBImplSecondary::OpenAndCompact::BeforeLoadingOptions:0",
  1321. &compaction_input.options_file_number);
  1322. TEST_SYNC_POINT("DBImplSecondary::OpenAndCompact::BeforeLoadingOptions:1");
  1323. std::string options_file_name =
  1324. OptionsFileName(name, compaction_input.options_file_number);
  1325. s = LoadOptionsFromFile(config_options, options_file_name, &base_db_options,
  1326. &all_column_families);
  1327. if (!s.ok()) {
  1328. return s;
  1329. }
  1330. // 3. Options to Override
  1331. // Override serializable configurations from override_options.options_map
  1332. DBOptions db_options;
  1333. s = GetDBOptionsFromMap(config_options, base_db_options,
  1334. override_options.options_map, &db_options);
  1335. if (!s.ok()) {
  1336. return s;
  1337. }
  1338. // Override options that are directly set as shared ptrs in
  1339. // CompactionServiceOptionsOverride
  1340. db_options.env = override_options.env;
  1341. db_options.file_checksum_gen_factory =
  1342. override_options.file_checksum_gen_factory;
  1343. db_options.statistics = override_options.statistics;
  1344. db_options.listeners = override_options.listeners;
  1345. db_options.compaction_service = nullptr;
  1346. // We will close the DB after the compaction anyway.
  1347. // Open as many files as needed for the compaction.
  1348. db_options.max_open_files = -1;
  1349. db_options.info_log = override_options.info_log;
  1350. // 4. Filter CFs that are needed for OpenAndCompact()
  1351. // We do not need to open all column families for the remote compaction.
  1352. // Only open default CF + target CF. If target CF == default CF, we will open
  1353. // just the default CF (Due to current limitation, DB cannot open without the
  1354. // default CF)
  1355. std::vector<ColumnFamilyDescriptor> column_families;
  1356. for (auto& cf : all_column_families) {
  1357. if (cf.name == compaction_input.cf_name) {
  1358. ColumnFamilyOptions cf_options;
  1359. // Override serializable configurations from override_options.options_map
  1360. s = GetColumnFamilyOptionsFromMap(config_options, cf.options,
  1361. override_options.options_map,
  1362. &cf_options);
  1363. if (!s.ok()) {
  1364. return s;
  1365. }
  1366. cf.options = std::move(cf_options);
  1367. // Override options that are directly set as shared ptrs in
  1368. // CompactionServiceOptionsOverride
  1369. cf.options.comparator = override_options.comparator;
  1370. cf.options.merge_operator = override_options.merge_operator;
  1371. cf.options.compaction_filter = override_options.compaction_filter;
  1372. cf.options.compaction_filter_factory =
  1373. override_options.compaction_filter_factory;
  1374. cf.options.prefix_extractor = override_options.prefix_extractor;
  1375. cf.options.table_factory = override_options.table_factory;
  1376. cf.options.sst_partitioner_factory =
  1377. override_options.sst_partitioner_factory;
  1378. cf.options.table_properties_collector_factories =
  1379. override_options.table_properties_collector_factories;
  1380. column_families.emplace_back(cf);
  1381. } else if (cf.name == kDefaultColumnFamilyName) {
  1382. column_families.emplace_back(cf);
  1383. }
  1384. }
  1385. // 5. Open db As Secondary
  1386. DB* db;
  1387. std::vector<ColumnFamilyHandle*> handles;
  1388. s = DB::OpenAsSecondary(db_options, name, output_directory, column_families,
  1389. &handles, &db);
  1390. if (!s.ok()) {
  1391. return s;
  1392. }
  1393. assert(db);
  1394. TEST_SYNC_POINT_CALLBACK(
  1395. "DBImplSecondary::OpenAndCompact::AfterOpenAsSecondary:0", db);
  1396. // 6. Find the handle of the Column Family that this will compact
  1397. ColumnFamilyHandle* cfh = nullptr;
  1398. for (auto* handle : handles) {
  1399. if (compaction_input.cf_name == handle->GetName()) {
  1400. cfh = handle;
  1401. break;
  1402. }
  1403. }
  1404. assert(cfh);
  1405. // 7. Run the compaction without installation.
  1406. // Output will be stored in the directory specified by output_directory
  1407. CompactionServiceResult compaction_result;
  1408. DBImplSecondary* db_secondary = static_cast_with_check<DBImplSecondary>(db);
  1409. s = db_secondary->CompactWithoutInstallation(options, cfh, compaction_input,
  1410. &compaction_result);
  1411. // 8. Serialize the result
  1412. Status serialization_status = compaction_result.Write(output);
  1413. // 9. Close the db and return
  1414. for (auto& handle : handles) {
  1415. delete handle;
  1416. }
  1417. delete db;
  1418. if (s.ok()) {
  1419. return serialization_status;
  1420. } else {
  1421. serialization_status.PermitUncheckedError();
  1422. }
  1423. return s;
  1424. }
  1425. Status DB::OpenAndCompact(
  1426. const std::string& name, const std::string& output_directory,
  1427. const std::string& input, std::string* output,
  1428. const CompactionServiceOptionsOverride& override_options) {
  1429. return OpenAndCompact(OpenAndCompactOptions(), name, output_directory, input,
  1430. output, override_options);
  1431. }
  1432. Status DBImplSecondary::CreateCompactionProgressWriter(
  1433. const std::string& file_path,
  1434. std::unique_ptr<log::Writer>* compaction_progress_writer) {
  1435. std::unique_ptr<FSWritableFile> file;
  1436. Status s =
  1437. fs_->NewWritableFile(file_path, FileOptions(), &file, nullptr /* dbg */);
  1438. if (!s.ok()) {
  1439. return s;
  1440. }
  1441. std::unique_ptr<WritableFileWriter> file_writer(
  1442. new WritableFileWriter(std::move(file), file_path, FileOptions()));
  1443. compaction_progress_writer->reset(
  1444. new log::Writer(std::move(file_writer), 0 /* log_number */,
  1445. false /* recycle_log_files */));
  1446. return Status::OK();
  1447. }
  1448. Status DBImplSecondary::PersistInitialCompactionProgress(
  1449. log::Writer* compaction_progress_writer,
  1450. const CompactionProgress& compaction_progress) {
  1451. assert(compaction_progress_writer);
  1452. // LIMITATION: Only supports resuming single subcompaction
  1453. assert(compaction_progress.size() == 1);
  1454. const SubcompactionProgress& subcompaction_progress = compaction_progress[0];
  1455. VersionEdit edit;
  1456. edit.SetSubcompactionProgress(subcompaction_progress);
  1457. std::string record;
  1458. if (!edit.EncodeTo(&record)) {
  1459. return Status::IOError("Failed to encode the initial compaction progress");
  1460. }
  1461. WriteOptions write_options(Env::IOActivity::kCompaction);
  1462. Status s = compaction_progress_writer->AddRecord(write_options, record);
  1463. if (!s.ok()) {
  1464. return s;
  1465. }
  1466. IOOptions opts;
  1467. s = WritableFileWriter::PrepareIOOptions(write_options, opts);
  1468. if (!s.ok()) {
  1469. return s;
  1470. }
  1471. s = compaction_progress_writer->file()->Sync(opts,
  1472. immutable_db_options_.use_fsync);
  1473. return s;
  1474. }
  1475. Status DBImplSecondary::HandleCompactionProgressWriterCreationFailure(
  1476. const std::string& temp_file_path, const std::string& final_file_path,
  1477. std::unique_ptr<log::Writer>* compaction_progress_writer) {
  1478. compaction_progress_writer->reset();
  1479. const std::vector<std::string> paths_to_delete = {final_file_path,
  1480. temp_file_path};
  1481. Status s;
  1482. for (const auto& file_path : paths_to_delete) {
  1483. WriteOptions write_options(Env::IOActivity::kCompaction);
  1484. IOOptions opts;
  1485. s = WritableFileWriter::PrepareIOOptions(write_options, opts);
  1486. if (s.ok()) {
  1487. s = fs_->DeleteFile(file_path, opts, nullptr /* dbg */);
  1488. }
  1489. if (!s.ok()) {
  1490. ROCKS_LOG_ERROR(immutable_db_options_.info_log,
  1491. "Failed to cleanup the compaction progress file "
  1492. "during writer creation failure: %s",
  1493. s.ToString().c_str());
  1494. return s;
  1495. }
  1496. }
  1497. return s;
  1498. }
  1499. Status DBImplSecondary::FinalizeCompactionProgressWriter(
  1500. std::unique_ptr<log::Writer>* compaction_progress_writer) {
  1501. uint64_t timestamp = env_->NowMicros();
  1502. const std::string temp_file_path =
  1503. TempCompactionProgressFileName(secondary_path_, timestamp);
  1504. Status s = CreateCompactionProgressWriter(temp_file_path,
  1505. compaction_progress_writer);
  1506. if (!s.ok()) {
  1507. ROCKS_LOG_WARN(immutable_db_options_.info_log,
  1508. "Failed to create compaction progress writer at "
  1509. "temp path %s: %s. Will perform clean up "
  1510. "to start compaction without progress persistence",
  1511. temp_file_path.c_str(), s.ToString().c_str());
  1512. return HandleCompactionProgressWriterCreationFailure(
  1513. temp_file_path, "" /* final_file_path */, compaction_progress_writer);
  1514. }
  1515. if (!compaction_progress_.empty()) {
  1516. s = PersistInitialCompactionProgress(compaction_progress_writer->get(),
  1517. compaction_progress_);
  1518. if (!s.ok()) {
  1519. ROCKS_LOG_WARN(immutable_db_options_.info_log,
  1520. "Failed to persist the initial copmaction "
  1521. "progress: %s. Will perform clean up "
  1522. "to start compaction without progress persistence",
  1523. s.ToString().c_str());
  1524. return HandleCompactionProgressWriterCreationFailure(
  1525. temp_file_path, "" /* final_file_path */, compaction_progress_writer);
  1526. }
  1527. }
  1528. compaction_progress_writer->reset();
  1529. std::string final_file_path;
  1530. s = RenameCompactionProgressFile(temp_file_path, &final_file_path);
  1531. if (!s.ok()) {
  1532. ROCKS_LOG_WARN(immutable_db_options_.info_log,
  1533. "Failed to rename temporary compaction progress "
  1534. "file from %s to %s: %s. Will perform clean up "
  1535. "to start compaction without progress persistence",
  1536. temp_file_path.c_str(), final_file_path.c_str(),
  1537. s.ToString().c_str());
  1538. return HandleCompactionProgressWriterCreationFailure(
  1539. temp_file_path, final_file_path, compaction_progress_writer);
  1540. }
  1541. s = CreateCompactionProgressWriter(final_file_path,
  1542. compaction_progress_writer);
  1543. if (!s.ok()) {
  1544. ROCKS_LOG_WARN(immutable_db_options_.info_log,
  1545. "Failed to create the final compaction progress "
  1546. "writer: %s. Will attempt clean to start the compaction "
  1547. "without progress persistence",
  1548. s.ToString().c_str());
  1549. return HandleCompactionProgressWriterCreationFailure(
  1550. "" /* temp_file_path */, final_file_path, compaction_progress_writer);
  1551. }
  1552. return Status::OK();
  1553. }
  1554. } // namespace ROCKSDB_NAMESPACE