blob_db_impl.cc 73 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #include "utilities/blob_db/blob_db_impl.h"
  6. #include <algorithm>
  7. #include <cinttypes>
  8. #include <iomanip>
  9. #include <memory>
  10. #include <sstream>
  11. #include "db/blob/blob_index.h"
  12. #include "db/db_impl/db_impl.h"
  13. #include "db/write_batch_internal.h"
  14. #include "file/file_util.h"
  15. #include "file/filename.h"
  16. #include "file/random_access_file_reader.h"
  17. #include "file/sst_file_manager_impl.h"
  18. #include "file/writable_file_writer.h"
  19. #include "logging/logging.h"
  20. #include "monitoring/instrumented_mutex.h"
  21. #include "monitoring/statistics_impl.h"
  22. #include "monitoring/thread_status_util.h"
  23. #include "rocksdb/convenience.h"
  24. #include "rocksdb/env.h"
  25. #include "rocksdb/iterator.h"
  26. #include "rocksdb/utilities/stackable_db.h"
  27. #include "rocksdb/utilities/transaction.h"
  28. #include "table/meta_blocks.h"
  29. #include "test_util/sync_point.h"
  30. #include "util/cast_util.h"
  31. #include "util/crc32c.h"
  32. #include "util/mutexlock.h"
  33. #include "util/random.h"
  34. #include "util/stop_watch.h"
  35. #include "util/timer_queue.h"
  36. #include "utilities/blob_db/blob_compaction_filter.h"
  37. #include "utilities/blob_db/blob_db_iterator.h"
  38. #include "utilities/blob_db/blob_db_listener.h"
  39. namespace {
  40. int kBlockBasedTableVersionFormat = 2;
  41. } // end namespace
  42. namespace ROCKSDB_NAMESPACE::blob_db {
  43. bool BlobFileComparator::operator()(
  44. const std::shared_ptr<BlobFile>& lhs,
  45. const std::shared_ptr<BlobFile>& rhs) const {
  46. return lhs->BlobFileNumber() > rhs->BlobFileNumber();
  47. }
  48. bool BlobFileComparatorTTL::operator()(
  49. const std::shared_ptr<BlobFile>& lhs,
  50. const std::shared_ptr<BlobFile>& rhs) const {
  51. assert(lhs->HasTTL() && rhs->HasTTL());
  52. if (lhs->expiration_range_.first < rhs->expiration_range_.first) {
  53. return true;
  54. }
  55. if (lhs->expiration_range_.first > rhs->expiration_range_.first) {
  56. return false;
  57. }
  58. return lhs->BlobFileNumber() < rhs->BlobFileNumber();
  59. }
  60. BlobDBImpl::BlobDBImpl(const std::string& dbname,
  61. const BlobDBOptions& blob_db_options,
  62. const DBOptions& db_options,
  63. const ColumnFamilyOptions& cf_options)
  64. : BlobDB(),
  65. dbname_(dbname),
  66. db_impl_(nullptr),
  67. env_(db_options.env),
  68. bdb_options_(blob_db_options),
  69. db_options_(db_options),
  70. cf_options_(cf_options),
  71. file_options_(db_options),
  72. statistics_(db_options_.statistics.get()),
  73. next_file_number_(1),
  74. flush_sequence_(0),
  75. closed_(true),
  76. open_file_count_(0),
  77. total_blob_size_(0),
  78. live_sst_size_(0),
  79. fifo_eviction_seq_(0),
  80. evict_expiration_up_to_(0),
  81. debug_level_(0) {
  82. clock_ = env_->GetSystemClock().get();
  83. blob_dir_ = (bdb_options_.path_relative)
  84. ? dbname + "/" + bdb_options_.blob_dir
  85. : bdb_options_.blob_dir;
  86. file_options_.bytes_per_sync = blob_db_options.bytes_per_sync;
  87. }
  88. BlobDBImpl::~BlobDBImpl() {
  89. tqueue_.shutdown();
  90. // CancelAllBackgroundWork(db_, true);
  91. Status s __attribute__((__unused__)) = Close();
  92. assert(s.ok());
  93. }
  94. Status BlobDBImpl::Close() {
  95. ThreadStatus::OperationType cur_op_type =
  96. ThreadStatusUtil::GetThreadOperation();
  97. ThreadStatusUtil::SetThreadOperation(ThreadStatus::OperationType::OP_UNKNOWN);
  98. Status s = CloseImpl();
  99. ThreadStatusUtil::SetThreadOperation(cur_op_type);
  100. return s;
  101. }
  102. Status BlobDBImpl::CloseImpl() {
  103. if (closed_) {
  104. return Status::OK();
  105. }
  106. closed_ = true;
  107. // Close base DB before BlobDBImpl destructs to stop event listener and
  108. // compaction filter call.
  109. Status s = db_->Close();
  110. // delete db_ anyway even if close failed.
  111. delete db_;
  112. // Reset pointers to avoid StackableDB delete the pointer again.
  113. db_ = nullptr;
  114. db_impl_ = nullptr;
  115. if (!s.ok()) {
  116. return s;
  117. }
  118. // TODO: plumb Env::IOActivity, Env::IOPriority
  119. s = SyncBlobFiles(WriteOptions());
  120. return s;
  121. }
  122. BlobDBOptions BlobDBImpl::GetBlobDBOptions() const { return bdb_options_; }
  123. Status BlobDBImpl::Open(std::vector<ColumnFamilyHandle*>* handles) {
  124. assert(handles != nullptr);
  125. assert(db_ == nullptr);
  126. if (blob_dir_.empty()) {
  127. return Status::NotSupported("No blob directory in options");
  128. }
  129. if (bdb_options_.garbage_collection_cutoff < 0.0 ||
  130. bdb_options_.garbage_collection_cutoff > 1.0) {
  131. return Status::InvalidArgument(
  132. "Garbage collection cutoff must be in the interval [0.0, 1.0]");
  133. }
  134. // Temporarily disable compactions in the base DB during open; save the user
  135. // defined value beforehand so we can restore it once BlobDB is initialized.
  136. // Note: this is only needed if garbage collection is enabled.
  137. const bool disable_auto_compactions = cf_options_.disable_auto_compactions;
  138. if (bdb_options_.enable_garbage_collection) {
  139. cf_options_.disable_auto_compactions = true;
  140. }
  141. Status s;
  142. // Create info log.
  143. if (db_options_.info_log == nullptr) {
  144. s = CreateLoggerFromOptions(dbname_, db_options_, &db_options_.info_log);
  145. if (!s.ok()) {
  146. return s;
  147. }
  148. }
  149. ROCKS_LOG_INFO(db_options_.info_log, "Opening BlobDB...");
  150. if ((cf_options_.compaction_filter != nullptr ||
  151. cf_options_.compaction_filter_factory != nullptr)) {
  152. ROCKS_LOG_INFO(db_options_.info_log,
  153. "BlobDB only support compaction filter on non-TTL values.");
  154. }
  155. // Open blob directory.
  156. s = env_->CreateDirIfMissing(blob_dir_);
  157. if (!s.ok()) {
  158. ROCKS_LOG_ERROR(db_options_.info_log,
  159. "Failed to create blob_dir %s, status: %s",
  160. blob_dir_.c_str(), s.ToString().c_str());
  161. }
  162. s = env_->GetFileSystem()->NewDirectory(blob_dir_, IOOptions(), &dir_ent_,
  163. nullptr);
  164. if (!s.ok()) {
  165. ROCKS_LOG_ERROR(db_options_.info_log,
  166. "Failed to open blob_dir %s, status: %s", blob_dir_.c_str(),
  167. s.ToString().c_str());
  168. return s;
  169. }
  170. // Open blob files.
  171. s = OpenAllBlobFiles();
  172. if (!s.ok()) {
  173. return s;
  174. }
  175. // Update options
  176. if (bdb_options_.enable_garbage_collection) {
  177. db_options_.listeners.push_back(std::make_shared<BlobDBListenerGC>(this));
  178. cf_options_.compaction_filter_factory =
  179. std::make_shared<BlobIndexCompactionFilterFactoryGC>(
  180. this, clock_, cf_options_, statistics_);
  181. } else {
  182. db_options_.listeners.push_back(std::make_shared<BlobDBListener>(this));
  183. cf_options_.compaction_filter_factory =
  184. std::make_shared<BlobIndexCompactionFilterFactory>(
  185. this, clock_, cf_options_, statistics_);
  186. }
  187. // Reset user compaction filter after building into compaction factory.
  188. cf_options_.compaction_filter = nullptr;
  189. // Open base db.
  190. ColumnFamilyDescriptor cf_descriptor(kDefaultColumnFamilyName, cf_options_);
  191. s = DB::Open(db_options_, dbname_, {cf_descriptor}, handles, &db_);
  192. if (!s.ok()) {
  193. return s;
  194. }
  195. db_impl_ = static_cast_with_check<DBImpl>(db_->GetRootDB());
  196. // Sanitize the blob_dir provided. Using a directory where the
  197. // base DB stores its files for the default CF is not supported.
  198. const ColumnFamilyData* const cfd =
  199. static_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily())->cfd();
  200. assert(cfd);
  201. assert(env_);
  202. for (const auto& cf_path : cfd->ioptions().cf_paths) {
  203. bool blob_dir_same_as_cf_dir = false;
  204. s = env_->AreFilesSame(blob_dir_, cf_path.path, &blob_dir_same_as_cf_dir);
  205. if (!s.ok()) {
  206. ROCKS_LOG_ERROR(db_options_.info_log,
  207. "Error while sanitizing blob_dir %s, status: %s",
  208. blob_dir_.c_str(), s.ToString().c_str());
  209. return s;
  210. }
  211. if (blob_dir_same_as_cf_dir) {
  212. return Status::NotSupported(
  213. "Using the base DB's storage directories for BlobDB files is not "
  214. "supported.");
  215. }
  216. }
  217. // Initialize SST file <-> oldest blob file mapping if garbage collection
  218. // is enabled.
  219. if (bdb_options_.enable_garbage_collection) {
  220. std::vector<LiveFileMetaData> live_files;
  221. db_->GetLiveFilesMetaData(&live_files);
  222. InitializeBlobFileToSstMapping(live_files);
  223. MarkUnreferencedBlobFilesObsoleteDuringOpen();
  224. if (!disable_auto_compactions) {
  225. s = db_->EnableAutoCompaction(*handles);
  226. if (!s.ok()) {
  227. ROCKS_LOG_ERROR(
  228. db_options_.info_log,
  229. "Failed to enable automatic compactions during open, status: %s",
  230. s.ToString().c_str());
  231. return s;
  232. }
  233. }
  234. }
  235. // Add trash files in blob dir to file delete scheduler.
  236. SstFileManagerImpl* sfm = static_cast<SstFileManagerImpl*>(
  237. db_impl_->immutable_db_options().sst_file_manager.get());
  238. s = DeleteScheduler::CleanupDirectory(env_, sfm, blob_dir_);
  239. if (!s.ok()) {
  240. ROCKS_LOG_ERROR(db_options_.info_log,
  241. "Failed to clean up directory %s, status: %s",
  242. blob_dir_.c_str(), s.ToString().c_str());
  243. return s;
  244. }
  245. UpdateLiveSSTSize(WriteOptions(Env::IOActivity::kDBOpen));
  246. // Start background jobs.
  247. if (!bdb_options_.disable_background_tasks) {
  248. StartBackgroundTasks();
  249. }
  250. ROCKS_LOG_INFO(db_options_.info_log, "BlobDB pointer %p", this);
  251. bdb_options_.Dump(db_options_.info_log.get());
  252. closed_ = false;
  253. return s;
  254. }
  255. void BlobDBImpl::StartBackgroundTasks() {
  256. // store a call to a member function and object
  257. tqueue_.add(
  258. kReclaimOpenFilesPeriodMillisecs,
  259. std::bind(&BlobDBImpl::ReclaimOpenFiles, this, std::placeholders::_1));
  260. tqueue_.add(
  261. kDeleteObsoleteFilesPeriodMillisecs,
  262. std::bind(&BlobDBImpl::DeleteObsoleteFiles, this, std::placeholders::_1));
  263. tqueue_.add(kSanityCheckPeriodMillisecs,
  264. std::bind(&BlobDBImpl::SanityCheck, this, std::placeholders::_1));
  265. tqueue_.add(
  266. kEvictExpiredFilesPeriodMillisecs,
  267. std::bind(&BlobDBImpl::EvictExpiredFiles, this, std::placeholders::_1));
  268. }
  269. Status BlobDBImpl::GetAllBlobFiles(std::set<uint64_t>* file_numbers) {
  270. assert(file_numbers != nullptr);
  271. std::vector<std::string> all_files;
  272. Status s = env_->GetChildren(blob_dir_, &all_files);
  273. if (!s.ok()) {
  274. ROCKS_LOG_ERROR(db_options_.info_log,
  275. "Failed to get list of blob files, status: %s",
  276. s.ToString().c_str());
  277. return s;
  278. }
  279. for (const auto& file_name : all_files) {
  280. uint64_t file_number;
  281. FileType type;
  282. bool success = ParseFileName(file_name, &file_number, &type);
  283. if (success && type == kBlobFile) {
  284. file_numbers->insert(file_number);
  285. } else {
  286. ROCKS_LOG_WARN(db_options_.info_log,
  287. "Skipping file in blob directory: %s", file_name.c_str());
  288. }
  289. }
  290. return s;
  291. }
  292. Status BlobDBImpl::OpenAllBlobFiles() {
  293. std::set<uint64_t> file_numbers;
  294. Status s = GetAllBlobFiles(&file_numbers);
  295. if (!s.ok()) {
  296. return s;
  297. }
  298. if (!file_numbers.empty()) {
  299. next_file_number_.store(*file_numbers.rbegin() + 1);
  300. }
  301. std::ostringstream blob_file_oss;
  302. std::ostringstream live_imm_oss;
  303. std::ostringstream obsolete_file_oss;
  304. for (auto& file_number : file_numbers) {
  305. std::shared_ptr<BlobFile> blob_file = std::make_shared<BlobFile>(
  306. this, blob_dir_, file_number, db_options_.info_log.get());
  307. blob_file->MarkImmutable(/* sequence */ 0);
  308. // Read file header and footer
  309. Status read_metadata_status =
  310. blob_file->ReadMetadata(env_->GetFileSystem(), file_options_);
  311. if (read_metadata_status.IsCorruption()) {
  312. // Remove incomplete file.
  313. if (!obsolete_files_.empty()) {
  314. obsolete_file_oss << ", ";
  315. }
  316. obsolete_file_oss << file_number;
  317. ObsoleteBlobFile(blob_file, 0 /*obsolete_seq*/, false /*update_size*/);
  318. continue;
  319. } else if (!read_metadata_status.ok()) {
  320. ROCKS_LOG_ERROR(db_options_.info_log,
  321. "Unable to read metadata of blob file %" PRIu64
  322. ", status: '%s'",
  323. file_number, read_metadata_status.ToString().c_str());
  324. return read_metadata_status;
  325. }
  326. total_blob_size_ += blob_file->GetFileSize();
  327. if (!blob_files_.empty()) {
  328. blob_file_oss << ", ";
  329. }
  330. blob_file_oss << file_number;
  331. blob_files_[file_number] = blob_file;
  332. if (!blob_file->HasTTL()) {
  333. if (!live_imm_non_ttl_blob_files_.empty()) {
  334. live_imm_oss << ", ";
  335. }
  336. live_imm_oss << file_number;
  337. live_imm_non_ttl_blob_files_[file_number] = blob_file;
  338. }
  339. }
  340. ROCKS_LOG_INFO(db_options_.info_log,
  341. "Found %" ROCKSDB_PRIszt " blob files: %s", blob_files_.size(),
  342. blob_file_oss.str().c_str());
  343. ROCKS_LOG_INFO(
  344. db_options_.info_log, "Found %" ROCKSDB_PRIszt " non-TTL blob files: %s",
  345. live_imm_non_ttl_blob_files_.size(), live_imm_oss.str().c_str());
  346. ROCKS_LOG_INFO(db_options_.info_log,
  347. "Found %" ROCKSDB_PRIszt
  348. " incomplete or corrupted blob files: %s",
  349. obsolete_files_.size(), obsolete_file_oss.str().c_str());
  350. return s;
  351. }
  352. template <typename Linker>
  353. void BlobDBImpl::LinkSstToBlobFileImpl(uint64_t sst_file_number,
  354. uint64_t blob_file_number,
  355. Linker linker) {
  356. assert(bdb_options_.enable_garbage_collection);
  357. assert(blob_file_number != kInvalidBlobFileNumber);
  358. auto it = blob_files_.find(blob_file_number);
  359. if (it == blob_files_.end()) {
  360. ROCKS_LOG_WARN(db_options_.info_log,
  361. "Blob file %" PRIu64
  362. " not found while trying to link "
  363. "SST file %" PRIu64,
  364. blob_file_number, sst_file_number);
  365. return;
  366. }
  367. BlobFile* const blob_file = it->second.get();
  368. assert(blob_file);
  369. linker(blob_file, sst_file_number);
  370. ROCKS_LOG_INFO(db_options_.info_log,
  371. "Blob file %" PRIu64 " linked to SST file %" PRIu64,
  372. blob_file_number, sst_file_number);
  373. }
  374. void BlobDBImpl::LinkSstToBlobFile(uint64_t sst_file_number,
  375. uint64_t blob_file_number) {
  376. auto linker = [](BlobFile* blob_file, uint64_t sst_file) {
  377. WriteLock file_lock(&blob_file->mutex_);
  378. blob_file->LinkSstFile(sst_file);
  379. };
  380. LinkSstToBlobFileImpl(sst_file_number, blob_file_number, linker);
  381. }
  382. void BlobDBImpl::LinkSstToBlobFileNoLock(uint64_t sst_file_number,
  383. uint64_t blob_file_number) {
  384. auto linker = [](BlobFile* blob_file, uint64_t sst_file) {
  385. blob_file->LinkSstFile(sst_file);
  386. };
  387. LinkSstToBlobFileImpl(sst_file_number, blob_file_number, linker);
  388. }
  389. void BlobDBImpl::UnlinkSstFromBlobFile(uint64_t sst_file_number,
  390. uint64_t blob_file_number) {
  391. assert(bdb_options_.enable_garbage_collection);
  392. assert(blob_file_number != kInvalidBlobFileNumber);
  393. auto it = blob_files_.find(blob_file_number);
  394. if (it == blob_files_.end()) {
  395. ROCKS_LOG_WARN(db_options_.info_log,
  396. "Blob file %" PRIu64
  397. " not found while trying to unlink "
  398. "SST file %" PRIu64,
  399. blob_file_number, sst_file_number);
  400. return;
  401. }
  402. BlobFile* const blob_file = it->second.get();
  403. assert(blob_file);
  404. {
  405. WriteLock file_lock(&blob_file->mutex_);
  406. blob_file->UnlinkSstFile(sst_file_number);
  407. }
  408. ROCKS_LOG_INFO(db_options_.info_log,
  409. "Blob file %" PRIu64 " unlinked from SST file %" PRIu64,
  410. blob_file_number, sst_file_number);
  411. }
  412. void BlobDBImpl::InitializeBlobFileToSstMapping(
  413. const std::vector<LiveFileMetaData>& live_files) {
  414. assert(bdb_options_.enable_garbage_collection);
  415. for (const auto& live_file : live_files) {
  416. const uint64_t sst_file_number = live_file.file_number;
  417. const uint64_t blob_file_number = live_file.oldest_blob_file_number;
  418. if (blob_file_number == kInvalidBlobFileNumber) {
  419. continue;
  420. }
  421. LinkSstToBlobFileNoLock(sst_file_number, blob_file_number);
  422. }
  423. }
  424. void BlobDBImpl::ProcessFlushJobInfo(const FlushJobInfo& info) {
  425. assert(bdb_options_.enable_garbage_collection);
  426. WriteLock lock(&mutex_);
  427. if (info.oldest_blob_file_number != kInvalidBlobFileNumber) {
  428. LinkSstToBlobFile(info.file_number, info.oldest_blob_file_number);
  429. }
  430. assert(flush_sequence_ < info.largest_seqno);
  431. flush_sequence_ = info.largest_seqno;
  432. MarkUnreferencedBlobFilesObsolete();
  433. }
  434. void BlobDBImpl::ProcessCompactionJobInfo(const CompactionJobInfo& info) {
  435. assert(bdb_options_.enable_garbage_collection);
  436. if (!info.status.ok()) {
  437. return;
  438. }
  439. // Note: the same SST file may appear in both the input and the output
  440. // file list in case of a trivial move. We walk through the two lists
  441. // below in a fashion that's similar to merge sort to detect this.
  442. auto cmp = [](const CompactionFileInfo& lhs, const CompactionFileInfo& rhs) {
  443. return lhs.file_number < rhs.file_number;
  444. };
  445. auto inputs = info.input_file_infos;
  446. auto iit = inputs.begin();
  447. const auto iit_end = inputs.end();
  448. std::sort(iit, iit_end, cmp);
  449. auto outputs = info.output_file_infos;
  450. auto oit = outputs.begin();
  451. const auto oit_end = outputs.end();
  452. std::sort(oit, oit_end, cmp);
  453. WriteLock lock(&mutex_);
  454. while (iit != iit_end && oit != oit_end) {
  455. const auto& input = *iit;
  456. const auto& output = *oit;
  457. if (input.file_number == output.file_number) {
  458. ++iit;
  459. ++oit;
  460. } else if (input.file_number < output.file_number) {
  461. if (input.oldest_blob_file_number != kInvalidBlobFileNumber) {
  462. UnlinkSstFromBlobFile(input.file_number, input.oldest_blob_file_number);
  463. }
  464. ++iit;
  465. } else {
  466. assert(output.file_number < input.file_number);
  467. if (output.oldest_blob_file_number != kInvalidBlobFileNumber) {
  468. LinkSstToBlobFile(output.file_number, output.oldest_blob_file_number);
  469. }
  470. ++oit;
  471. }
  472. }
  473. while (iit != iit_end) {
  474. const auto& input = *iit;
  475. if (input.oldest_blob_file_number != kInvalidBlobFileNumber) {
  476. UnlinkSstFromBlobFile(input.file_number, input.oldest_blob_file_number);
  477. }
  478. ++iit;
  479. }
  480. while (oit != oit_end) {
  481. const auto& output = *oit;
  482. if (output.oldest_blob_file_number != kInvalidBlobFileNumber) {
  483. LinkSstToBlobFile(output.file_number, output.oldest_blob_file_number);
  484. }
  485. ++oit;
  486. }
  487. MarkUnreferencedBlobFilesObsolete();
  488. }
  489. bool BlobDBImpl::MarkBlobFileObsoleteIfNeeded(
  490. const std::shared_ptr<BlobFile>& blob_file, SequenceNumber obsolete_seq) {
  491. assert(blob_file);
  492. assert(!blob_file->HasTTL());
  493. assert(blob_file->Immutable());
  494. assert(bdb_options_.enable_garbage_collection);
  495. // Note: FIFO eviction could have marked this file obsolete already.
  496. if (blob_file->Obsolete()) {
  497. return true;
  498. }
  499. // We cannot mark this file (or any higher-numbered files for that matter)
  500. // obsolete if it is referenced by any memtables or SSTs. We keep track of
  501. // the SSTs explicitly. To account for memtables, we keep track of the highest
  502. // sequence number received in flush notifications, and we do not mark the
  503. // blob file obsolete if there are still unflushed memtables from before
  504. // the time the blob file was closed.
  505. if (blob_file->GetImmutableSequence() > flush_sequence_ ||
  506. !blob_file->GetLinkedSstFiles().empty()) {
  507. return false;
  508. }
  509. ROCKS_LOG_INFO(db_options_.info_log,
  510. "Blob file %" PRIu64 " is no longer needed, marking obsolete",
  511. blob_file->BlobFileNumber());
  512. ObsoleteBlobFile(blob_file, obsolete_seq, /* update_size */ true);
  513. return true;
  514. }
  515. template <class Functor>
  516. void BlobDBImpl::MarkUnreferencedBlobFilesObsoleteImpl(Functor mark_if_needed) {
  517. assert(bdb_options_.enable_garbage_collection);
  518. // Iterate through all live immutable non-TTL blob files, and mark them
  519. // obsolete assuming no SST files or memtables rely on the blobs in them.
  520. // Note: we need to stop as soon as we find a blob file that has any
  521. // linked SSTs (or one potentially referenced by memtables).
  522. uint64_t obsoleted_files = 0;
  523. auto it = live_imm_non_ttl_blob_files_.begin();
  524. while (it != live_imm_non_ttl_blob_files_.end()) {
  525. const auto& blob_file = it->second;
  526. assert(blob_file);
  527. assert(blob_file->BlobFileNumber() == it->first);
  528. assert(!blob_file->HasTTL());
  529. assert(blob_file->Immutable());
  530. // Small optimization: Obsolete() does an atomic read, so we can do
  531. // this check without taking a lock on the blob file's mutex.
  532. if (blob_file->Obsolete()) {
  533. it = live_imm_non_ttl_blob_files_.erase(it);
  534. continue;
  535. }
  536. if (!mark_if_needed(blob_file)) {
  537. break;
  538. }
  539. it = live_imm_non_ttl_blob_files_.erase(it);
  540. ++obsoleted_files;
  541. }
  542. if (obsoleted_files > 0) {
  543. ROCKS_LOG_INFO(db_options_.info_log,
  544. "%" PRIu64 " blob file(s) marked obsolete by GC",
  545. obsoleted_files);
  546. RecordTick(statistics_, BLOB_DB_GC_NUM_FILES, obsoleted_files);
  547. }
  548. }
  549. void BlobDBImpl::MarkUnreferencedBlobFilesObsolete() {
  550. const SequenceNumber obsolete_seq = GetLatestSequenceNumber();
  551. MarkUnreferencedBlobFilesObsoleteImpl(
  552. [this, obsolete_seq](const std::shared_ptr<BlobFile>& blob_file) {
  553. WriteLock file_lock(&blob_file->mutex_);
  554. return MarkBlobFileObsoleteIfNeeded(blob_file, obsolete_seq);
  555. });
  556. }
  557. void BlobDBImpl::MarkUnreferencedBlobFilesObsoleteDuringOpen() {
  558. MarkUnreferencedBlobFilesObsoleteImpl(
  559. [this](const std::shared_ptr<BlobFile>& blob_file) {
  560. return MarkBlobFileObsoleteIfNeeded(blob_file, /* obsolete_seq */ 0);
  561. });
  562. }
  563. void BlobDBImpl::CloseRandomAccessLocked(
  564. const std::shared_ptr<BlobFile>& bfile) {
  565. bfile->CloseRandomAccessLocked();
  566. open_file_count_--;
  567. }
  568. Status BlobDBImpl::GetBlobFileReader(
  569. const std::shared_ptr<BlobFile>& blob_file,
  570. std::shared_ptr<RandomAccessFileReader>* reader) {
  571. assert(reader != nullptr);
  572. bool fresh_open = false;
  573. Status s = blob_file->GetReader(env_, file_options_, reader, &fresh_open);
  574. if (s.ok() && fresh_open) {
  575. assert(*reader != nullptr);
  576. open_file_count_++;
  577. }
  578. return s;
  579. }
  580. std::shared_ptr<BlobFile> BlobDBImpl::NewBlobFile(
  581. bool has_ttl, const ExpirationRange& expiration_range,
  582. const std::string& reason) {
  583. assert(has_ttl == (expiration_range.first || expiration_range.second));
  584. uint64_t file_num = next_file_number_++;
  585. const uint32_t column_family_id =
  586. static_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily())->GetID();
  587. auto blob_file = std::make_shared<BlobFile>(
  588. this, blob_dir_, file_num, db_options_.info_log.get(), column_family_id,
  589. bdb_options_.compression, has_ttl, expiration_range);
  590. ROCKS_LOG_DEBUG(db_options_.info_log, "New blob file created: %s reason='%s'",
  591. blob_file->PathName().c_str(), reason.c_str());
  592. LogFlush(db_options_.info_log);
  593. return blob_file;
  594. }
  595. void BlobDBImpl::RegisterBlobFile(std::shared_ptr<BlobFile> blob_file) {
  596. const uint64_t blob_file_number = blob_file->BlobFileNumber();
  597. auto it = blob_files_.lower_bound(blob_file_number);
  598. assert(it == blob_files_.end() || it->first != blob_file_number);
  599. blob_files_.insert(it,
  600. std::map<uint64_t, std::shared_ptr<BlobFile>>::value_type(
  601. blob_file_number, std::move(blob_file)));
  602. }
  603. Status BlobDBImpl::CreateWriterLocked(const std::shared_ptr<BlobFile>& bfile) {
  604. std::string fpath(bfile->PathName());
  605. std::unique_ptr<FSWritableFile> wfile;
  606. const auto& fs = env_->GetFileSystem();
  607. Status s = fs->ReopenWritableFile(fpath, file_options_, &wfile, nullptr);
  608. if (!s.ok()) {
  609. ROCKS_LOG_ERROR(db_options_.info_log,
  610. "Failed to open blob file for write: %s status: '%s'"
  611. " exists: '%s'",
  612. fpath.c_str(), s.ToString().c_str(),
  613. fs->FileExists(fpath, file_options_.io_options, nullptr)
  614. .ToString()
  615. .c_str());
  616. return s;
  617. }
  618. std::unique_ptr<WritableFileWriter> fwriter;
  619. fwriter.reset(new WritableFileWriter(
  620. std::move(wfile), fpath, file_options_, clock_, nullptr /* io_tracer */,
  621. statistics_, Histograms::BLOB_DB_BLOB_FILE_WRITE_MICROS));
  622. uint64_t boffset = bfile->GetFileSize();
  623. if (debug_level_ >= 2 && boffset) {
  624. ROCKS_LOG_DEBUG(db_options_.info_log,
  625. "Open blob file: %s with offset: %" PRIu64, fpath.c_str(),
  626. boffset);
  627. }
  628. BlobLogWriter::ElemType et = BlobLogWriter::kEtNone;
  629. if (bfile->file_size_ == BlobLogHeader::kSize) {
  630. et = BlobLogWriter::kEtFileHdr;
  631. } else if (bfile->file_size_ > BlobLogHeader::kSize) {
  632. et = BlobLogWriter::kEtRecord;
  633. } else if (bfile->file_size_) {
  634. ROCKS_LOG_WARN(db_options_.info_log,
  635. "Open blob file: %s with wrong size: %" PRIu64,
  636. fpath.c_str(), boffset);
  637. return Status::Corruption("Invalid blob file size");
  638. }
  639. constexpr bool do_flush = true;
  640. bfile->log_writer_ = std::make_shared<BlobLogWriter>(
  641. std::move(fwriter), clock_, statistics_, bfile->file_number_,
  642. db_options_.use_fsync, do_flush, boffset);
  643. bfile->log_writer_->last_elem_type_ = et;
  644. return s;
  645. }
  646. std::shared_ptr<BlobFile> BlobDBImpl::FindBlobFileLocked(
  647. uint64_t expiration) const {
  648. if (open_ttl_files_.empty()) {
  649. return nullptr;
  650. }
  651. std::shared_ptr<BlobFile> tmp = std::make_shared<BlobFile>();
  652. tmp->SetHasTTL(true);
  653. tmp->expiration_range_ = std::make_pair(expiration, 0);
  654. tmp->file_number_ = std::numeric_limits<uint64_t>::max();
  655. auto citr = open_ttl_files_.equal_range(tmp);
  656. if (citr.first == open_ttl_files_.end()) {
  657. assert(citr.second == open_ttl_files_.end());
  658. std::shared_ptr<BlobFile> check = *(open_ttl_files_.rbegin());
  659. return (check->expiration_range_.second <= expiration) ? nullptr : check;
  660. }
  661. if (citr.first != citr.second) {
  662. return *(citr.first);
  663. }
  664. auto finditr = citr.second;
  665. if (finditr != open_ttl_files_.begin()) {
  666. --finditr;
  667. }
  668. bool b2 = (*finditr)->expiration_range_.second <= expiration;
  669. bool b1 = (*finditr)->expiration_range_.first > expiration;
  670. return (b1 || b2) ? nullptr : (*finditr);
  671. }
  672. Status BlobDBImpl::CheckOrCreateWriterLocked(
  673. const std::shared_ptr<BlobFile>& blob_file,
  674. std::shared_ptr<BlobLogWriter>* writer) {
  675. assert(writer != nullptr);
  676. *writer = blob_file->GetWriter();
  677. if (*writer != nullptr) {
  678. return Status::OK();
  679. }
  680. Status s = CreateWriterLocked(blob_file);
  681. if (s.ok()) {
  682. *writer = blob_file->GetWriter();
  683. }
  684. return s;
  685. }
  686. Status BlobDBImpl::CreateBlobFileAndWriter(
  687. const WriteOptions& write_options, bool has_ttl,
  688. const ExpirationRange& expiration_range, const std::string& reason,
  689. std::shared_ptr<BlobFile>* blob_file,
  690. std::shared_ptr<BlobLogWriter>* writer) {
  691. TEST_SYNC_POINT("BlobDBImpl::CreateBlobFileAndWriter");
  692. assert(has_ttl == (expiration_range.first || expiration_range.second));
  693. assert(blob_file);
  694. assert(writer);
  695. *blob_file = NewBlobFile(has_ttl, expiration_range, reason);
  696. assert(*blob_file);
  697. // file not visible, hence no lock
  698. Status s = CheckOrCreateWriterLocked(*blob_file, writer);
  699. if (!s.ok()) {
  700. ROCKS_LOG_ERROR(db_options_.info_log,
  701. "Failed to get writer for blob file: %s, error: %s",
  702. (*blob_file)->PathName().c_str(), s.ToString().c_str());
  703. return s;
  704. }
  705. assert(*writer);
  706. s = (*writer)->WriteHeader(write_options, (*blob_file)->header_);
  707. if (!s.ok()) {
  708. ROCKS_LOG_ERROR(db_options_.info_log,
  709. "Failed to write header to new blob file: %s"
  710. " status: '%s'",
  711. (*blob_file)->PathName().c_str(), s.ToString().c_str());
  712. return s;
  713. }
  714. (*blob_file)->SetFileSize(BlobLogHeader::kSize);
  715. total_blob_size_ += BlobLogHeader::kSize;
  716. return s;
  717. }
  718. Status BlobDBImpl::SelectBlobFile(const WriteOptions& write_options,
  719. std::shared_ptr<BlobFile>* blob_file) {
  720. assert(blob_file);
  721. {
  722. ReadLock rl(&mutex_);
  723. if (open_non_ttl_file_) {
  724. assert(!open_non_ttl_file_->Immutable());
  725. *blob_file = open_non_ttl_file_;
  726. return Status::OK();
  727. }
  728. }
  729. // Check again
  730. WriteLock wl(&mutex_);
  731. if (open_non_ttl_file_) {
  732. assert(!open_non_ttl_file_->Immutable());
  733. *blob_file = open_non_ttl_file_;
  734. return Status::OK();
  735. }
  736. std::shared_ptr<BlobLogWriter> writer;
  737. const Status s = CreateBlobFileAndWriter(
  738. write_options,
  739. /* has_ttl */ false, ExpirationRange(),
  740. /* reason */ "SelectBlobFile", blob_file, &writer);
  741. if (!s.ok()) {
  742. return s;
  743. }
  744. RegisterBlobFile(*blob_file);
  745. open_non_ttl_file_ = *blob_file;
  746. return s;
  747. }
  748. Status BlobDBImpl::SelectBlobFileTTL(const WriteOptions& write_options,
  749. uint64_t expiration,
  750. std::shared_ptr<BlobFile>* blob_file) {
  751. assert(blob_file);
  752. assert(expiration != kNoExpiration);
  753. {
  754. ReadLock rl(&mutex_);
  755. *blob_file = FindBlobFileLocked(expiration);
  756. if (*blob_file != nullptr) {
  757. assert(!(*blob_file)->Immutable());
  758. return Status::OK();
  759. }
  760. }
  761. // Check again
  762. WriteLock wl(&mutex_);
  763. *blob_file = FindBlobFileLocked(expiration);
  764. if (*blob_file != nullptr) {
  765. assert(!(*blob_file)->Immutable());
  766. return Status::OK();
  767. }
  768. const uint64_t exp_low =
  769. (expiration / bdb_options_.ttl_range_secs) * bdb_options_.ttl_range_secs;
  770. const uint64_t exp_high = exp_low + bdb_options_.ttl_range_secs;
  771. const ExpirationRange expiration_range(exp_low, exp_high);
  772. std::ostringstream oss;
  773. oss << "SelectBlobFileTTL range: [" << exp_low << ',' << exp_high << ')';
  774. std::shared_ptr<BlobLogWriter> writer;
  775. const Status s = CreateBlobFileAndWriter(
  776. write_options, /* has_ttl */ true, expiration_range,
  777. /* reason */ oss.str(), blob_file, &writer);
  778. if (!s.ok()) {
  779. return s;
  780. }
  781. RegisterBlobFile(*blob_file);
  782. open_ttl_files_.insert(*blob_file);
  783. return s;
  784. }
  785. class BlobDBImpl::BlobInserter : public WriteBatch::Handler {
  786. private:
  787. const WriteOptions& options_;
  788. BlobDBImpl* blob_db_impl_;
  789. uint32_t default_cf_id_;
  790. WriteBatch batch_;
  791. public:
  792. BlobInserter(const WriteOptions& options, BlobDBImpl* blob_db_impl,
  793. uint32_t default_cf_id)
  794. : options_(options),
  795. blob_db_impl_(blob_db_impl),
  796. default_cf_id_(default_cf_id) {}
  797. WriteBatch* batch() { return &batch_; }
  798. Status PutCF(uint32_t column_family_id, const Slice& key,
  799. const Slice& value) override {
  800. if (column_family_id != default_cf_id_) {
  801. return Status::NotSupported(
  802. "Blob DB doesn't support non-default column family.");
  803. }
  804. Status s = blob_db_impl_->PutBlobValue(options_, key, value, kNoExpiration,
  805. &batch_);
  806. return s;
  807. }
  808. Status DeleteCF(uint32_t column_family_id, const Slice& key) override {
  809. if (column_family_id != default_cf_id_) {
  810. return Status::NotSupported(
  811. "Blob DB doesn't support non-default column family.");
  812. }
  813. Status s = WriteBatchInternal::Delete(&batch_, column_family_id, key);
  814. return s;
  815. }
  816. virtual Status DeleteRange(uint32_t column_family_id, const Slice& begin_key,
  817. const Slice& end_key) {
  818. if (column_family_id != default_cf_id_) {
  819. return Status::NotSupported(
  820. "Blob DB doesn't support non-default column family.");
  821. }
  822. Status s = WriteBatchInternal::DeleteRange(&batch_, column_family_id,
  823. begin_key, end_key);
  824. return s;
  825. }
  826. Status SingleDeleteCF(uint32_t /*column_family_id*/,
  827. const Slice& /*key*/) override {
  828. return Status::NotSupported("Not supported operation in blob db.");
  829. }
  830. Status MergeCF(uint32_t /*column_family_id*/, const Slice& /*key*/,
  831. const Slice& /*value*/) override {
  832. return Status::NotSupported("Not supported operation in blob db.");
  833. }
  834. void LogData(const Slice& blob) override { batch_.PutLogData(blob); }
  835. };
  836. Status BlobDBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
  837. StopWatch write_sw(clock_, statistics_, BLOB_DB_WRITE_MICROS);
  838. RecordTick(statistics_, BLOB_DB_NUM_WRITE);
  839. uint32_t default_cf_id =
  840. static_cast_with_check<ColumnFamilyHandleImpl>(DefaultColumnFamily())
  841. ->GetID();
  842. Status s;
  843. BlobInserter blob_inserter(options, this, default_cf_id);
  844. {
  845. // Release write_mutex_ before DB write to avoid race condition with
  846. // flush begin listener, which also require write_mutex_ to sync
  847. // blob files.
  848. MutexLock l(&write_mutex_);
  849. s = updates->Iterate(&blob_inserter);
  850. }
  851. if (!s.ok()) {
  852. return s;
  853. }
  854. return db_->Write(options, blob_inserter.batch());
  855. }
  856. Status BlobDBImpl::Put(const WriteOptions& options, const Slice& key,
  857. const Slice& value) {
  858. return PutUntil(options, key, value, kNoExpiration);
  859. }
  860. Status BlobDBImpl::PutWithTTL(const WriteOptions& options, const Slice& key,
  861. const Slice& value, uint64_t ttl) {
  862. uint64_t now = EpochNow();
  863. uint64_t expiration = kNoExpiration - now > ttl ? now + ttl : kNoExpiration;
  864. return PutUntil(options, key, value, expiration);
  865. }
  866. Status BlobDBImpl::PutUntil(const WriteOptions& options, const Slice& key,
  867. const Slice& value, uint64_t expiration) {
  868. StopWatch write_sw(clock_, statistics_, BLOB_DB_WRITE_MICROS);
  869. RecordTick(statistics_, BLOB_DB_NUM_PUT);
  870. Status s;
  871. WriteBatch batch;
  872. {
  873. // Release write_mutex_ before DB write to avoid race condition with
  874. // flush begin listener, which also require write_mutex_ to sync
  875. // blob files.
  876. MutexLock l(&write_mutex_);
  877. s = PutBlobValue(options, key, value, expiration, &batch);
  878. }
  879. if (s.ok()) {
  880. s = db_->Write(options, &batch);
  881. }
  882. return s;
  883. }
  884. Status BlobDBImpl::PutBlobValue(const WriteOptions& write_options,
  885. const Slice& key, const Slice& value,
  886. uint64_t expiration, WriteBatch* batch) {
  887. write_mutex_.AssertHeld();
  888. Status s;
  889. std::string index_entry;
  890. uint32_t column_family_id =
  891. static_cast_with_check<ColumnFamilyHandleImpl>(DefaultColumnFamily())
  892. ->GetID();
  893. if (value.size() < bdb_options_.min_blob_size) {
  894. if (expiration == kNoExpiration) {
  895. // Put as normal value
  896. s = batch->Put(key, value);
  897. RecordTick(statistics_, BLOB_DB_WRITE_INLINED);
  898. } else {
  899. // Inlined with TTL
  900. BlobIndex::EncodeInlinedTTL(&index_entry, expiration, value);
  901. s = WriteBatchInternal::PutBlobIndex(batch, column_family_id, key,
  902. index_entry);
  903. RecordTick(statistics_, BLOB_DB_WRITE_INLINED_TTL);
  904. }
  905. } else {
  906. std::string compression_output;
  907. Slice value_compressed = GetCompressedSlice(value, &compression_output);
  908. std::string headerbuf;
  909. BlobLogWriter::ConstructBlobHeader(&headerbuf, key, value_compressed,
  910. expiration);
  911. // Check DB size limit before selecting blob file to
  912. // Since CheckSizeAndEvictBlobFiles() can close blob files, it needs to be
  913. // done before calling SelectBlobFile().
  914. s = CheckSizeAndEvictBlobFiles(
  915. write_options, headerbuf.size() + key.size() + value_compressed.size());
  916. if (!s.ok()) {
  917. return s;
  918. }
  919. std::shared_ptr<BlobFile> blob_file;
  920. if (expiration != kNoExpiration) {
  921. s = SelectBlobFileTTL(write_options, expiration, &blob_file);
  922. } else {
  923. s = SelectBlobFile(write_options, &blob_file);
  924. }
  925. if (s.ok()) {
  926. assert(blob_file != nullptr);
  927. assert(blob_file->GetCompressionType() == bdb_options_.compression);
  928. s = AppendBlob(write_options, blob_file, headerbuf, key, value_compressed,
  929. expiration, &index_entry);
  930. }
  931. if (s.ok()) {
  932. if (expiration != kNoExpiration) {
  933. WriteLock file_lock(&blob_file->mutex_);
  934. blob_file->ExtendExpirationRange(expiration);
  935. }
  936. s = CloseBlobFileIfNeeded(write_options, blob_file);
  937. }
  938. if (s.ok()) {
  939. s = WriteBatchInternal::PutBlobIndex(batch, column_family_id, key,
  940. index_entry);
  941. }
  942. if (s.ok()) {
  943. if (expiration == kNoExpiration) {
  944. RecordTick(statistics_, BLOB_DB_WRITE_BLOB);
  945. } else {
  946. RecordTick(statistics_, BLOB_DB_WRITE_BLOB_TTL);
  947. }
  948. } else {
  949. ROCKS_LOG_ERROR(
  950. db_options_.info_log,
  951. "Failed to append blob to FILE: %s: KEY: %s VALSZ: %" ROCKSDB_PRIszt
  952. " status: '%s' blob_file: '%s'",
  953. blob_file->PathName().c_str(), key.ToString().c_str(), value.size(),
  954. s.ToString().c_str(), blob_file->DumpState().c_str());
  955. }
  956. }
  957. RecordTick(statistics_, BLOB_DB_NUM_KEYS_WRITTEN);
  958. RecordTick(statistics_, BLOB_DB_BYTES_WRITTEN, key.size() + value.size());
  959. RecordInHistogram(statistics_, BLOB_DB_KEY_SIZE, key.size());
  960. RecordInHistogram(statistics_, BLOB_DB_VALUE_SIZE, value.size());
  961. return s;
  962. }
  963. Slice BlobDBImpl::GetCompressedSlice(const Slice& raw,
  964. std::string* compression_output) const {
  965. if (bdb_options_.compression == kNoCompression) {
  966. return raw;
  967. }
  968. StopWatch compression_sw(clock_, statistics_, BLOB_DB_COMPRESSION_MICROS);
  969. CompressionType type = bdb_options_.compression;
  970. CompressionOptions opts;
  971. CompressionContext context(type, opts);
  972. CompressionInfo info(opts, context, CompressionDict::GetEmptyDict(), type);
  973. OLD_CompressData(raw, info,
  974. GetCompressFormatForVersion(kBlockBasedTableVersionFormat),
  975. compression_output);
  976. return *compression_output;
  977. }
  978. Decompressor& BlobDecompressor() {
  979. static auto mgr = GetBuiltinCompressionManager(
  980. GetCompressFormatForVersion(kBlockBasedTableVersionFormat));
  981. static auto decompressor = mgr->GetDecompressor();
  982. return *decompressor;
  983. }
  984. Status BlobDBImpl::DecompressSlice(const Slice& compressed_value,
  985. CompressionType compression_type,
  986. PinnableSlice* value_output) const {
  987. assert(compression_type != kNoCompression);
  988. BlockContents contents;
  989. auto cfh = static_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily());
  990. {
  991. StopWatch decompression_sw(clock_, statistics_,
  992. BLOB_DB_DECOMPRESSION_MICROS);
  993. Status s = DecompressBlockData(
  994. compressed_value.data(), compressed_value.size(), compression_type,
  995. BlobDecompressor(), &contents, cfh->cfd()->ioptions());
  996. if (!s.ok()) {
  997. return Status::Corruption("Unable to decompress blob.");
  998. }
  999. }
  1000. value_output->PinSelf(contents.data);
  1001. return Status::OK();
  1002. }
  1003. Status BlobDBImpl::CompactFiles(
  1004. const CompactionOptions& compact_options,
  1005. const std::vector<std::string>& input_file_names, const int output_level,
  1006. const int output_path_id, std::vector<std::string>* const output_file_names,
  1007. CompactionJobInfo* compaction_job_info) {
  1008. // Note: we need CompactionJobInfo to be able to track updates to the
  1009. // blob file <-> SST mappings, so we provide one if the user hasn't,
  1010. // assuming that GC is enabled.
  1011. CompactionJobInfo info{};
  1012. if (bdb_options_.enable_garbage_collection && !compaction_job_info) {
  1013. compaction_job_info = &info;
  1014. }
  1015. const Status s =
  1016. db_->CompactFiles(compact_options, input_file_names, output_level,
  1017. output_path_id, output_file_names, compaction_job_info);
  1018. if (!s.ok()) {
  1019. return s;
  1020. }
  1021. if (bdb_options_.enable_garbage_collection) {
  1022. assert(compaction_job_info);
  1023. ProcessCompactionJobInfo(*compaction_job_info);
  1024. }
  1025. return s;
  1026. }
  1027. void BlobDBImpl::GetCompactionContextCommon(BlobCompactionContext* context) {
  1028. assert(context);
  1029. context->blob_db_impl = this;
  1030. context->next_file_number = next_file_number_.load();
  1031. context->current_blob_files.clear();
  1032. for (auto& p : blob_files_) {
  1033. context->current_blob_files.insert(p.first);
  1034. }
  1035. context->fifo_eviction_seq = fifo_eviction_seq_;
  1036. context->evict_expiration_up_to = evict_expiration_up_to_;
  1037. }
  1038. void BlobDBImpl::GetCompactionContext(BlobCompactionContext* context) {
  1039. assert(context);
  1040. ReadLock l(&mutex_);
  1041. GetCompactionContextCommon(context);
  1042. }
  1043. void BlobDBImpl::GetCompactionContext(BlobCompactionContext* context,
  1044. BlobCompactionContextGC* context_gc) {
  1045. assert(context);
  1046. assert(context_gc);
  1047. ReadLock l(&mutex_);
  1048. GetCompactionContextCommon(context);
  1049. if (!live_imm_non_ttl_blob_files_.empty()) {
  1050. auto it = live_imm_non_ttl_blob_files_.begin();
  1051. std::advance(it, bdb_options_.garbage_collection_cutoff *
  1052. live_imm_non_ttl_blob_files_.size());
  1053. context_gc->cutoff_file_number = it != live_imm_non_ttl_blob_files_.end()
  1054. ? it->first
  1055. : std::numeric_limits<uint64_t>::max();
  1056. }
  1057. }
  1058. void BlobDBImpl::UpdateLiveSSTSize(const WriteOptions& write_options) {
  1059. uint64_t live_sst_size = 0;
  1060. bool ok = GetIntProperty(DB::Properties::kLiveSstFilesSize, &live_sst_size);
  1061. if (ok) {
  1062. live_sst_size_.store(live_sst_size);
  1063. ROCKS_LOG_INFO(db_options_.info_log,
  1064. "Updated total SST file size: %" PRIu64 " bytes.",
  1065. live_sst_size);
  1066. } else {
  1067. ROCKS_LOG_ERROR(
  1068. db_options_.info_log,
  1069. "Failed to update total SST file size after flush or compaction.");
  1070. }
  1071. {
  1072. // Trigger FIFO eviction if needed.
  1073. MutexLock l(&write_mutex_);
  1074. Status s = CheckSizeAndEvictBlobFiles(write_options, 0, true /*force*/);
  1075. if (s.IsNoSpace()) {
  1076. ROCKS_LOG_WARN(db_options_.info_log,
  1077. "DB grow out-of-space after SST size updated. Current live"
  1078. " SST size: %" PRIu64
  1079. " , current blob files size: %" PRIu64 ".",
  1080. live_sst_size_.load(), total_blob_size_.load());
  1081. }
  1082. }
  1083. }
  1084. Status BlobDBImpl::CheckSizeAndEvictBlobFiles(const WriteOptions& write_options,
  1085. uint64_t blob_size,
  1086. bool force_evict) {
  1087. write_mutex_.AssertHeld();
  1088. uint64_t live_sst_size = live_sst_size_.load();
  1089. if (bdb_options_.max_db_size == 0 ||
  1090. live_sst_size + total_blob_size_.load() + blob_size <=
  1091. bdb_options_.max_db_size) {
  1092. return Status::OK();
  1093. }
  1094. if (bdb_options_.is_fifo == false ||
  1095. (!force_evict && live_sst_size + blob_size > bdb_options_.max_db_size)) {
  1096. // FIFO eviction is disabled, or no space to insert new blob even we evict
  1097. // all blob files.
  1098. return Status::NoSpace(
  1099. "Write failed, as writing it would exceed max_db_size limit.");
  1100. }
  1101. std::vector<std::shared_ptr<BlobFile>> candidate_files;
  1102. CopyBlobFiles(&candidate_files);
  1103. std::sort(candidate_files.begin(), candidate_files.end(),
  1104. BlobFileComparator());
  1105. fifo_eviction_seq_ = GetLatestSequenceNumber();
  1106. WriteLock l(&mutex_);
  1107. while (!candidate_files.empty() &&
  1108. live_sst_size + total_blob_size_.load() + blob_size >
  1109. bdb_options_.max_db_size) {
  1110. std::shared_ptr<BlobFile> blob_file = candidate_files.back();
  1111. candidate_files.pop_back();
  1112. WriteLock file_lock(&blob_file->mutex_);
  1113. if (blob_file->Obsolete()) {
  1114. // File already obsoleted by someone else.
  1115. assert(blob_file->Immutable());
  1116. continue;
  1117. }
  1118. // FIFO eviction can evict open blob files.
  1119. if (!blob_file->Immutable()) {
  1120. Status s = CloseBlobFile(write_options, blob_file);
  1121. if (!s.ok()) {
  1122. return s;
  1123. }
  1124. }
  1125. assert(blob_file->Immutable());
  1126. auto expiration_range = blob_file->GetExpirationRange();
  1127. ROCKS_LOG_INFO(db_options_.info_log,
  1128. "Evict oldest blob file since DB out of space. Current "
  1129. "live SST file size: %" PRIu64 ", total blob size: %" PRIu64
  1130. ", max db size: %" PRIu64 ", evicted blob file #%" PRIu64
  1131. ".",
  1132. live_sst_size, total_blob_size_.load(),
  1133. bdb_options_.max_db_size, blob_file->BlobFileNumber());
  1134. ObsoleteBlobFile(blob_file, fifo_eviction_seq_, true /*update_size*/);
  1135. evict_expiration_up_to_ = expiration_range.first;
  1136. RecordTick(statistics_, BLOB_DB_FIFO_NUM_FILES_EVICTED);
  1137. RecordTick(statistics_, BLOB_DB_FIFO_NUM_KEYS_EVICTED,
  1138. blob_file->BlobCount());
  1139. RecordTick(statistics_, BLOB_DB_FIFO_BYTES_EVICTED,
  1140. blob_file->GetFileSize());
  1141. TEST_SYNC_POINT("BlobDBImpl::EvictOldestBlobFile:Evicted");
  1142. }
  1143. if (live_sst_size + total_blob_size_.load() + blob_size >
  1144. bdb_options_.max_db_size) {
  1145. return Status::NoSpace(
  1146. "Write failed, as writing it would exceed max_db_size limit.");
  1147. }
  1148. return Status::OK();
  1149. }
  1150. Status BlobDBImpl::AppendBlob(const WriteOptions& write_options,
  1151. const std::shared_ptr<BlobFile>& bfile,
  1152. const std::string& headerbuf, const Slice& key,
  1153. const Slice& value, uint64_t expiration,
  1154. std::string* index_entry) {
  1155. Status s;
  1156. uint64_t blob_offset = 0;
  1157. uint64_t key_offset = 0;
  1158. {
  1159. WriteLock lockbfile_w(&bfile->mutex_);
  1160. std::shared_ptr<BlobLogWriter> writer;
  1161. s = CheckOrCreateWriterLocked(bfile, &writer);
  1162. if (!s.ok()) {
  1163. return s;
  1164. }
  1165. // write the blob to the blob log.
  1166. s = writer->EmitPhysicalRecord(write_options, headerbuf, key, value,
  1167. &key_offset, &blob_offset);
  1168. }
  1169. if (!s.ok()) {
  1170. ROCKS_LOG_ERROR(db_options_.info_log,
  1171. "Invalid status in AppendBlob: %s status: '%s'",
  1172. bfile->PathName().c_str(), s.ToString().c_str());
  1173. return s;
  1174. }
  1175. uint64_t size_put = headerbuf.size() + key.size() + value.size();
  1176. bfile->BlobRecordAdded(size_put);
  1177. total_blob_size_ += size_put;
  1178. if (expiration == kNoExpiration) {
  1179. BlobIndex::EncodeBlob(index_entry, bfile->BlobFileNumber(), blob_offset,
  1180. value.size(), bdb_options_.compression);
  1181. } else {
  1182. BlobIndex::EncodeBlobTTL(index_entry, expiration, bfile->BlobFileNumber(),
  1183. blob_offset, value.size(),
  1184. bdb_options_.compression);
  1185. }
  1186. return s;
  1187. }
  1188. void BlobDBImpl::MultiGet(const ReadOptions& _read_options, size_t num_keys,
  1189. ColumnFamilyHandle** column_families,
  1190. const Slice* keys, PinnableSlice* values,
  1191. std::string* timestamps, Status* statuses,
  1192. const bool /*sorted_input*/) {
  1193. StopWatch multiget_sw(clock_, statistics_, BLOB_DB_MULTIGET_MICROS);
  1194. RecordTick(statistics_, BLOB_DB_NUM_MULTIGET);
  1195. // Get a snapshot to avoid blob file get deleted between we
  1196. // fetch and index entry and reading from the file.
  1197. {
  1198. Status s;
  1199. if (_read_options.io_activity != Env::IOActivity::kUnknown &&
  1200. _read_options.io_activity != Env::IOActivity::kMultiGet) {
  1201. s = Status::InvalidArgument(
  1202. "Can only call MultiGet with `ReadOptions::io_activity` is "
  1203. "`Env::IOActivity::kUnknown` or `Env::IOActivity::kMultiGet`");
  1204. } else if (timestamps) {
  1205. s = Status::NotSupported(
  1206. "MultiGet() returning timestamps not implemented.");
  1207. }
  1208. if (s.ok()) {
  1209. for (size_t i = 0; i < num_keys; ++i) {
  1210. if (column_families[i]->GetID() != DefaultColumnFamily()->GetID()) {
  1211. s = Status::NotSupported(
  1212. "Blob DB doesn't support non-default column family.");
  1213. break;
  1214. }
  1215. }
  1216. }
  1217. if (!s.ok()) {
  1218. for (size_t i = 0; i < num_keys; ++i) {
  1219. statuses[i] = s;
  1220. }
  1221. return;
  1222. }
  1223. }
  1224. ReadOptions read_options(_read_options);
  1225. if (read_options.io_activity == Env::IOActivity::kUnknown) {
  1226. read_options.io_activity = Env::IOActivity::kMultiGet;
  1227. }
  1228. bool snapshot_created = SetSnapshotIfNeeded(&read_options);
  1229. for (size_t i = 0; i < num_keys; i++) {
  1230. PinnableSlice& value = values[i];
  1231. statuses[i] = GetImpl(read_options, DefaultColumnFamily(), keys[i], &value);
  1232. }
  1233. if (snapshot_created) {
  1234. db_->ReleaseSnapshot(read_options.snapshot);
  1235. }
  1236. }
  1237. bool BlobDBImpl::SetSnapshotIfNeeded(ReadOptions* read_options) {
  1238. assert(read_options != nullptr);
  1239. if (read_options->snapshot != nullptr) {
  1240. return false;
  1241. }
  1242. read_options->snapshot = db_->GetSnapshot();
  1243. return true;
  1244. }
  1245. Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry,
  1246. PinnableSlice* value, uint64_t* expiration) {
  1247. assert(value);
  1248. BlobIndex blob_index;
  1249. Status s = blob_index.DecodeFrom(index_entry);
  1250. if (!s.ok()) {
  1251. return s;
  1252. }
  1253. if (blob_index.HasTTL() && blob_index.expiration() <= EpochNow()) {
  1254. return Status::NotFound("Key expired");
  1255. }
  1256. if (expiration != nullptr) {
  1257. if (blob_index.HasTTL()) {
  1258. *expiration = blob_index.expiration();
  1259. } else {
  1260. *expiration = kNoExpiration;
  1261. }
  1262. }
  1263. if (blob_index.IsInlined()) {
  1264. // TODO(yiwu): If index_entry is a PinnableSlice, we can also pin the same
  1265. // memory buffer to avoid extra copy.
  1266. value->PinSelf(blob_index.value());
  1267. return Status::OK();
  1268. }
  1269. CompressionType compression_type = kNoCompression;
  1270. s = GetRawBlobFromFile(key, blob_index.file_number(), blob_index.offset(),
  1271. blob_index.size(), value, &compression_type);
  1272. if (!s.ok()) {
  1273. return s;
  1274. }
  1275. if (compression_type != kNoCompression) {
  1276. s = DecompressSlice(*value, compression_type, value);
  1277. if (!s.ok()) {
  1278. if (debug_level_ >= 2) {
  1279. ROCKS_LOG_ERROR(
  1280. db_options_.info_log,
  1281. "Uncompression error during blob read from file: %" PRIu64
  1282. " blob_offset: %" PRIu64 " blob_size: %" PRIu64
  1283. " key: %s status: '%s'",
  1284. blob_index.file_number(), blob_index.offset(), blob_index.size(),
  1285. key.ToString(/* output_hex */ true).c_str(), s.ToString().c_str());
  1286. }
  1287. return s;
  1288. }
  1289. }
  1290. return Status::OK();
  1291. }
  1292. Status BlobDBImpl::GetRawBlobFromFile(const Slice& key, uint64_t file_number,
  1293. uint64_t offset, uint64_t size,
  1294. PinnableSlice* value,
  1295. CompressionType* compression_type) {
  1296. assert(value);
  1297. assert(compression_type);
  1298. assert(*compression_type == kNoCompression);
  1299. if (!size) {
  1300. value->PinSelf("");
  1301. return Status::OK();
  1302. }
  1303. // offset has to have certain min, as we will read CRC
  1304. // later from the Blob Header, which needs to be also a
  1305. // valid offset.
  1306. if (offset <
  1307. (BlobLogHeader::kSize + BlobLogRecord::kHeaderSize + key.size())) {
  1308. if (debug_level_ >= 2) {
  1309. ROCKS_LOG_ERROR(db_options_.info_log,
  1310. "Invalid blob index file_number: %" PRIu64
  1311. " blob_offset: %" PRIu64 " blob_size: %" PRIu64
  1312. " key: %s",
  1313. file_number, offset, size,
  1314. key.ToString(/* output_hex */ true).c_str());
  1315. }
  1316. return Status::NotFound("Invalid blob offset");
  1317. }
  1318. std::shared_ptr<BlobFile> blob_file;
  1319. {
  1320. ReadLock rl(&mutex_);
  1321. auto it = blob_files_.find(file_number);
  1322. // file was deleted
  1323. if (it == blob_files_.end()) {
  1324. return Status::NotFound("Blob Not Found as blob file missing");
  1325. }
  1326. blob_file = it->second;
  1327. }
  1328. *compression_type = blob_file->GetCompressionType();
  1329. // takes locks when called
  1330. std::shared_ptr<RandomAccessFileReader> reader;
  1331. Status s = GetBlobFileReader(blob_file, &reader);
  1332. if (!s.ok()) {
  1333. return s;
  1334. }
  1335. assert(offset >= key.size() + sizeof(uint32_t));
  1336. const uint64_t record_offset = offset - key.size() - sizeof(uint32_t);
  1337. const uint64_t record_size = sizeof(uint32_t) + key.size() + size;
  1338. // Allocate the buffer. This is safe in C++11
  1339. std::string buf;
  1340. AlignedBuf aligned_buf;
  1341. // A partial blob record contain checksum, key and value.
  1342. Slice blob_record;
  1343. {
  1344. StopWatch read_sw(clock_, statistics_, BLOB_DB_BLOB_FILE_READ_MICROS);
  1345. // TODO: rate limit old blob DB file reads.
  1346. if (reader->use_direct_io()) {
  1347. s = reader->Read(IOOptions(), record_offset,
  1348. static_cast<size_t>(record_size), &blob_record, nullptr,
  1349. &aligned_buf);
  1350. } else {
  1351. buf.reserve(static_cast<size_t>(record_size));
  1352. s = reader->Read(IOOptions(), record_offset,
  1353. static_cast<size_t>(record_size), &blob_record,
  1354. buf.data(), nullptr);
  1355. }
  1356. RecordTick(statistics_, BLOB_DB_BLOB_FILE_BYTES_READ, blob_record.size());
  1357. }
  1358. if (!s.ok()) {
  1359. ROCKS_LOG_DEBUG(
  1360. db_options_.info_log,
  1361. "Failed to read blob from blob file %" PRIu64 ", blob_offset: %" PRIu64
  1362. ", blob_size: %" PRIu64 ", key_size: %" ROCKSDB_PRIszt ", status: '%s'",
  1363. file_number, offset, size, key.size(), s.ToString().c_str());
  1364. return s;
  1365. }
  1366. if (blob_record.size() != record_size) {
  1367. ROCKS_LOG_DEBUG(
  1368. db_options_.info_log,
  1369. "Failed to read blob from blob file %" PRIu64 ", blob_offset: %" PRIu64
  1370. ", blob_size: %" PRIu64 ", key_size: %" ROCKSDB_PRIszt
  1371. ", read %" ROCKSDB_PRIszt " bytes, expected %" PRIu64 " bytes",
  1372. file_number, offset, size, key.size(), blob_record.size(), record_size);
  1373. return Status::Corruption("Failed to retrieve blob from blob index.");
  1374. }
  1375. Slice crc_slice(blob_record.data(), sizeof(uint32_t));
  1376. Slice blob_value(blob_record.data() + sizeof(uint32_t) + key.size(),
  1377. static_cast<size_t>(size));
  1378. uint32_t crc_exp = 0;
  1379. if (!GetFixed32(&crc_slice, &crc_exp)) {
  1380. ROCKS_LOG_DEBUG(
  1381. db_options_.info_log,
  1382. "Unable to decode CRC from blob file %" PRIu64 ", blob_offset: %" PRIu64
  1383. ", blob_size: %" PRIu64 ", key size: %" ROCKSDB_PRIszt ", status: '%s'",
  1384. file_number, offset, size, key.size(), s.ToString().c_str());
  1385. return Status::Corruption("Unable to decode checksum.");
  1386. }
  1387. uint32_t crc = crc32c::Value(blob_record.data() + sizeof(uint32_t),
  1388. blob_record.size() - sizeof(uint32_t));
  1389. crc = crc32c::Mask(crc); // Adjust for storage
  1390. if (crc != crc_exp) {
  1391. if (debug_level_ >= 2) {
  1392. ROCKS_LOG_ERROR(
  1393. db_options_.info_log,
  1394. "Blob crc mismatch file: %" PRIu64 " blob_offset: %" PRIu64
  1395. " blob_size: %" PRIu64 " key: %s status: '%s'",
  1396. file_number, offset, size,
  1397. key.ToString(/* output_hex */ true).c_str(), s.ToString().c_str());
  1398. }
  1399. return Status::Corruption("Corruption. Blob CRC mismatch");
  1400. }
  1401. value->PinSelf(blob_value);
  1402. return Status::OK();
  1403. }
  1404. Status BlobDBImpl::Get(const ReadOptions& _read_options,
  1405. ColumnFamilyHandle* column_family, const Slice& key,
  1406. PinnableSlice* value, std::string* timestamp) {
  1407. if (_read_options.io_activity != Env::IOActivity::kUnknown &&
  1408. _read_options.io_activity != Env::IOActivity::kGet) {
  1409. return Status::InvalidArgument(
  1410. "Can only call Get with `ReadOptions::io_activity` is "
  1411. "`Env::IOActivity::kUnknown` or `Env::IOActivity::kGet`");
  1412. }
  1413. if (timestamp) {
  1414. return Status::NotSupported(
  1415. "Get() that returns timestamp is not implemented.");
  1416. }
  1417. ReadOptions read_options(_read_options);
  1418. if (read_options.io_activity == Env::IOActivity::kUnknown) {
  1419. read_options.io_activity = Env::IOActivity::kGet;
  1420. }
  1421. return GetImpl(read_options, column_family, key, value);
  1422. }
  1423. Status BlobDBImpl::Get(const ReadOptions& _read_options,
  1424. ColumnFamilyHandle* column_family, const Slice& key,
  1425. PinnableSlice* value, uint64_t* expiration) {
  1426. if (_read_options.io_activity != Env::IOActivity::kUnknown &&
  1427. _read_options.io_activity != Env::IOActivity::kGet) {
  1428. return Status::InvalidArgument(
  1429. "Can only call Get with `ReadOptions::io_activity` is "
  1430. "`Env::IOActivity::kUnknown` or `Env::IOActivity::kGet`");
  1431. }
  1432. ReadOptions read_options(_read_options);
  1433. if (read_options.io_activity == Env::IOActivity::kUnknown) {
  1434. read_options.io_activity = Env::IOActivity::kGet;
  1435. }
  1436. StopWatch get_sw(clock_, statistics_, BLOB_DB_GET_MICROS);
  1437. RecordTick(statistics_, BLOB_DB_NUM_GET);
  1438. return GetImpl(read_options, column_family, key, value, expiration);
  1439. }
  1440. Status BlobDBImpl::GetImpl(const ReadOptions& read_options,
  1441. ColumnFamilyHandle* column_family, const Slice& key,
  1442. PinnableSlice* value, uint64_t* expiration) {
  1443. if (column_family->GetID() != DefaultColumnFamily()->GetID()) {
  1444. return Status::NotSupported(
  1445. "Blob DB doesn't support non-default column family.");
  1446. }
  1447. // Get a snapshot to avoid blob file get deleted between we
  1448. // fetch and index entry and reading from the file.
  1449. // TODO(yiwu): For Get() retry if file not found would be a simpler strategy.
  1450. ReadOptions ro(read_options);
  1451. bool snapshot_created = SetSnapshotIfNeeded(&ro);
  1452. PinnableSlice index_entry;
  1453. Status s;
  1454. bool is_blob_index = false;
  1455. DBImpl::GetImplOptions get_impl_options;
  1456. get_impl_options.column_family = column_family;
  1457. get_impl_options.value = &index_entry;
  1458. get_impl_options.is_blob_index = &is_blob_index;
  1459. s = db_impl_->GetImpl(ro, key, get_impl_options);
  1460. if (expiration != nullptr) {
  1461. *expiration = kNoExpiration;
  1462. }
  1463. RecordTick(statistics_, BLOB_DB_NUM_KEYS_READ);
  1464. if (s.ok()) {
  1465. if (is_blob_index) {
  1466. s = GetBlobValue(key, index_entry, value, expiration);
  1467. } else {
  1468. // The index entry is the value itself in this case.
  1469. value->PinSelf(index_entry);
  1470. }
  1471. RecordTick(statistics_, BLOB_DB_BYTES_READ, value->size());
  1472. }
  1473. if (snapshot_created) {
  1474. db_->ReleaseSnapshot(ro.snapshot);
  1475. }
  1476. return s;
  1477. }
  1478. std::pair<bool, int64_t> BlobDBImpl::SanityCheck(bool aborted) {
  1479. if (aborted) {
  1480. return std::make_pair(false, -1);
  1481. }
  1482. ReadLock rl(&mutex_);
  1483. ROCKS_LOG_INFO(db_options_.info_log, "Starting Sanity Check");
  1484. ROCKS_LOG_INFO(db_options_.info_log, "Number of files %" ROCKSDB_PRIszt,
  1485. blob_files_.size());
  1486. ROCKS_LOG_INFO(db_options_.info_log, "Number of open files %" ROCKSDB_PRIszt,
  1487. open_ttl_files_.size());
  1488. for (const auto& blob_file : open_ttl_files_) {
  1489. (void)blob_file;
  1490. assert(!blob_file->Immutable());
  1491. }
  1492. for (const auto& pair : live_imm_non_ttl_blob_files_) {
  1493. const auto& blob_file = pair.second;
  1494. (void)blob_file;
  1495. assert(!blob_file->HasTTL());
  1496. assert(blob_file->Immutable());
  1497. }
  1498. uint64_t now = EpochNow();
  1499. for (const auto& blob_file_pair : blob_files_) {
  1500. auto blob_file = blob_file_pair.second;
  1501. std::ostringstream buf;
  1502. buf << "Blob file " << blob_file->BlobFileNumber() << ", size "
  1503. << blob_file->GetFileSize() << ", blob count " << blob_file->BlobCount()
  1504. << ", immutable " << blob_file->Immutable();
  1505. if (blob_file->HasTTL()) {
  1506. ExpirationRange expiration_range;
  1507. {
  1508. ReadLock file_lock(&blob_file->mutex_);
  1509. expiration_range = blob_file->GetExpirationRange();
  1510. }
  1511. buf << ", expiration range (" << expiration_range.first << ", "
  1512. << expiration_range.second << ")";
  1513. if (!blob_file->Obsolete()) {
  1514. buf << ", expire in " << (expiration_range.second - now) << "seconds";
  1515. }
  1516. }
  1517. if (blob_file->Obsolete()) {
  1518. buf << ", obsolete at " << blob_file->GetObsoleteSequence();
  1519. }
  1520. buf << ".";
  1521. ROCKS_LOG_INFO(db_options_.info_log, "%s", buf.str().c_str());
  1522. }
  1523. // reschedule
  1524. return std::make_pair(true, -1);
  1525. }
  1526. Status BlobDBImpl::CloseBlobFile(const WriteOptions& write_options,
  1527. std::shared_ptr<BlobFile> bfile) {
  1528. TEST_SYNC_POINT("BlobDBImpl::CloseBlobFile");
  1529. assert(bfile);
  1530. assert(!bfile->Immutable());
  1531. assert(!bfile->Obsolete());
  1532. if (bfile->HasTTL() || bfile == open_non_ttl_file_) {
  1533. write_mutex_.AssertHeld();
  1534. }
  1535. ROCKS_LOG_INFO(db_options_.info_log,
  1536. "Closing blob file %" PRIu64 ". Path: %s",
  1537. bfile->BlobFileNumber(), bfile->PathName().c_str());
  1538. const SequenceNumber sequence = GetLatestSequenceNumber();
  1539. const Status s = bfile->WriteFooterAndCloseLocked(write_options, sequence);
  1540. if (s.ok()) {
  1541. total_blob_size_ += BlobLogFooter::kSize;
  1542. } else {
  1543. bfile->MarkImmutable(sequence);
  1544. ROCKS_LOG_ERROR(db_options_.info_log,
  1545. "Failed to close blob file %" PRIu64 "with error: %s",
  1546. bfile->BlobFileNumber(), s.ToString().c_str());
  1547. }
  1548. if (bfile->HasTTL()) {
  1549. size_t erased __attribute__((__unused__));
  1550. erased = open_ttl_files_.erase(bfile);
  1551. } else {
  1552. if (bfile == open_non_ttl_file_) {
  1553. open_non_ttl_file_ = nullptr;
  1554. }
  1555. const uint64_t blob_file_number = bfile->BlobFileNumber();
  1556. auto it = live_imm_non_ttl_blob_files_.lower_bound(blob_file_number);
  1557. assert(it == live_imm_non_ttl_blob_files_.end() ||
  1558. it->first != blob_file_number);
  1559. live_imm_non_ttl_blob_files_.insert(
  1560. it, std::map<uint64_t, std::shared_ptr<BlobFile>>::value_type(
  1561. blob_file_number, bfile));
  1562. }
  1563. return s;
  1564. }
  1565. Status BlobDBImpl::CloseBlobFileIfNeeded(const WriteOptions& write_options,
  1566. std::shared_ptr<BlobFile>& bfile) {
  1567. write_mutex_.AssertHeld();
  1568. // atomic read
  1569. if (bfile->GetFileSize() < bdb_options_.blob_file_size) {
  1570. return Status::OK();
  1571. }
  1572. WriteLock lock(&mutex_);
  1573. WriteLock file_lock(&bfile->mutex_);
  1574. assert(!bfile->Obsolete() || bfile->Immutable());
  1575. if (bfile->Immutable()) {
  1576. return Status::OK();
  1577. }
  1578. return CloseBlobFile(write_options, bfile);
  1579. }
  1580. void BlobDBImpl::ObsoleteBlobFile(std::shared_ptr<BlobFile> blob_file,
  1581. SequenceNumber obsolete_seq,
  1582. bool update_size) {
  1583. assert(blob_file->Immutable());
  1584. assert(!blob_file->Obsolete());
  1585. // Should hold write lock of mutex_ or during DB open.
  1586. blob_file->MarkObsolete(obsolete_seq);
  1587. obsolete_files_.push_back(blob_file);
  1588. assert(total_blob_size_.load() >= blob_file->GetFileSize());
  1589. if (update_size) {
  1590. total_blob_size_ -= blob_file->GetFileSize();
  1591. }
  1592. }
  1593. bool BlobDBImpl::VisibleToActiveSnapshot(
  1594. const std::shared_ptr<BlobFile>& bfile) {
  1595. assert(bfile->Obsolete());
  1596. // We check whether the oldest snapshot is no less than the last sequence
  1597. // by the time the blob file become obsolete. If so, the blob file is not
  1598. // visible to all existing snapshots.
  1599. //
  1600. // If we keep track of the earliest sequence of the keys in the blob file,
  1601. // we could instead check if there's a snapshot falls in range
  1602. // [earliest_sequence, obsolete_sequence). But doing so will make the
  1603. // implementation more complicated.
  1604. SequenceNumber obsolete_sequence = bfile->GetObsoleteSequence();
  1605. SequenceNumber oldest_snapshot = kMaxSequenceNumber;
  1606. {
  1607. // Need to lock DBImpl mutex before access snapshot list.
  1608. InstrumentedMutexLock l(db_impl_->mutex());
  1609. auto& snapshots = db_impl_->snapshots();
  1610. if (!snapshots.empty()) {
  1611. oldest_snapshot = snapshots.oldest()->GetSequenceNumber();
  1612. }
  1613. }
  1614. bool visible = oldest_snapshot < obsolete_sequence;
  1615. if (visible) {
  1616. ROCKS_LOG_INFO(db_options_.info_log,
  1617. "Obsolete blob file %" PRIu64 " (obsolete at %" PRIu64
  1618. ") visible to oldest snapshot %" PRIu64 ".",
  1619. bfile->BlobFileNumber(), obsolete_sequence, oldest_snapshot);
  1620. }
  1621. return visible;
  1622. }
  1623. std::pair<bool, int64_t> BlobDBImpl::EvictExpiredFiles(bool aborted) {
  1624. if (aborted) {
  1625. return std::make_pair(false, -1);
  1626. }
  1627. TEST_SYNC_POINT("BlobDBImpl::EvictExpiredFiles:0");
  1628. TEST_SYNC_POINT("BlobDBImpl::EvictExpiredFiles:1");
  1629. std::vector<std::shared_ptr<BlobFile>> process_files;
  1630. uint64_t now = EpochNow();
  1631. {
  1632. ReadLock rl(&mutex_);
  1633. for (const auto& p : blob_files_) {
  1634. auto& blob_file = p.second;
  1635. ReadLock file_lock(&blob_file->mutex_);
  1636. if (blob_file->HasTTL() && !blob_file->Obsolete() &&
  1637. blob_file->GetExpirationRange().second <= now) {
  1638. process_files.push_back(blob_file);
  1639. }
  1640. }
  1641. }
  1642. TEST_SYNC_POINT("BlobDBImpl::EvictExpiredFiles:2");
  1643. TEST_SYNC_POINT("BlobDBImpl::EvictExpiredFiles:3");
  1644. TEST_SYNC_POINT_CALLBACK("BlobDBImpl::EvictExpiredFiles:cb", nullptr);
  1645. SequenceNumber seq = GetLatestSequenceNumber();
  1646. {
  1647. MutexLock l(&write_mutex_);
  1648. WriteLock lock(&mutex_);
  1649. for (auto& blob_file : process_files) {
  1650. WriteLock file_lock(&blob_file->mutex_);
  1651. // Need to double check if the file is obsolete.
  1652. if (blob_file->Obsolete()) {
  1653. assert(blob_file->Immutable());
  1654. continue;
  1655. }
  1656. if (!blob_file->Immutable()) {
  1657. // TODO: plumb Env::IOActivity, Env::IOPriority
  1658. CloseBlobFile(WriteOptions(), blob_file).PermitUncheckedError();
  1659. }
  1660. assert(blob_file->Immutable());
  1661. ObsoleteBlobFile(blob_file, seq, true /*update_size*/);
  1662. }
  1663. }
  1664. return std::make_pair(true, -1);
  1665. }
  1666. Status BlobDBImpl::SyncBlobFiles(const WriteOptions& write_options) {
  1667. MutexLock l(&write_mutex_);
  1668. std::vector<std::shared_ptr<BlobFile>> process_files;
  1669. {
  1670. ReadLock rl(&mutex_);
  1671. for (const auto& fitr : open_ttl_files_) {
  1672. process_files.push_back(fitr);
  1673. }
  1674. if (open_non_ttl_file_ != nullptr) {
  1675. process_files.push_back(open_non_ttl_file_);
  1676. }
  1677. }
  1678. Status s;
  1679. for (auto& blob_file : process_files) {
  1680. s = blob_file->Fsync(write_options);
  1681. if (!s.ok()) {
  1682. ROCKS_LOG_ERROR(db_options_.info_log,
  1683. "Failed to sync blob file %" PRIu64 ", status: %s",
  1684. blob_file->BlobFileNumber(), s.ToString().c_str());
  1685. return s;
  1686. }
  1687. }
  1688. s = dir_ent_->FsyncWithDirOptions(IOOptions(), nullptr, DirFsyncOptions());
  1689. if (!s.ok()) {
  1690. ROCKS_LOG_ERROR(db_options_.info_log,
  1691. "Failed to sync blob directory, status: %s",
  1692. s.ToString().c_str());
  1693. }
  1694. return s;
  1695. }
  1696. std::pair<bool, int64_t> BlobDBImpl::ReclaimOpenFiles(bool aborted) {
  1697. if (aborted) {
  1698. return std::make_pair(false, -1);
  1699. }
  1700. if (open_file_count_.load() < kOpenFilesTrigger) {
  1701. return std::make_pair(true, -1);
  1702. }
  1703. // in the future, we should sort by last_access_
  1704. // instead of closing every file
  1705. ReadLock rl(&mutex_);
  1706. for (auto const& ent : blob_files_) {
  1707. auto bfile = ent.second;
  1708. if (bfile->last_access_.load() == -1) {
  1709. continue;
  1710. }
  1711. WriteLock lockbfile_w(&bfile->mutex_);
  1712. CloseRandomAccessLocked(bfile);
  1713. }
  1714. return std::make_pair(true, -1);
  1715. }
  1716. std::pair<bool, int64_t> BlobDBImpl::DeleteObsoleteFiles(bool aborted) {
  1717. if (aborted) {
  1718. return std::make_pair(false, -1);
  1719. }
  1720. MutexLock delete_file_lock(&delete_file_mutex_);
  1721. if (disable_file_deletions_ > 0) {
  1722. return std::make_pair(true, -1);
  1723. }
  1724. std::list<std::shared_ptr<BlobFile>> tobsolete;
  1725. {
  1726. WriteLock wl(&mutex_);
  1727. if (obsolete_files_.empty()) {
  1728. return std::make_pair(true, -1);
  1729. }
  1730. tobsolete.swap(obsolete_files_);
  1731. }
  1732. bool file_deleted = false;
  1733. for (auto iter = tobsolete.begin(); iter != tobsolete.end();) {
  1734. auto bfile = *iter;
  1735. {
  1736. ReadLock lockbfile_r(&bfile->mutex_);
  1737. if (VisibleToActiveSnapshot(bfile)) {
  1738. ROCKS_LOG_INFO(db_options_.info_log,
  1739. "Could not delete file due to snapshot failure %s",
  1740. bfile->PathName().c_str());
  1741. ++iter;
  1742. continue;
  1743. }
  1744. }
  1745. ROCKS_LOG_INFO(db_options_.info_log,
  1746. "Will delete file due to snapshot success %s",
  1747. bfile->PathName().c_str());
  1748. {
  1749. WriteLock wl(&mutex_);
  1750. blob_files_.erase(bfile->BlobFileNumber());
  1751. }
  1752. Status s = DeleteDBFile(&(db_impl_->immutable_db_options()),
  1753. bfile->PathName(), blob_dir_, true,
  1754. /*force_fg=*/false);
  1755. if (!s.ok()) {
  1756. ROCKS_LOG_ERROR(db_options_.info_log,
  1757. "File failed to be deleted as obsolete %s",
  1758. bfile->PathName().c_str());
  1759. ++iter;
  1760. continue;
  1761. }
  1762. file_deleted = true;
  1763. ROCKS_LOG_INFO(db_options_.info_log,
  1764. "File deleted as obsolete from blob dir %s",
  1765. bfile->PathName().c_str());
  1766. iter = tobsolete.erase(iter);
  1767. }
  1768. // directory change. Fsync
  1769. if (file_deleted) {
  1770. Status s = dir_ent_->FsyncWithDirOptions(
  1771. IOOptions(), nullptr,
  1772. DirFsyncOptions(DirFsyncOptions::FsyncReason::kFileDeleted));
  1773. if (!s.ok()) {
  1774. ROCKS_LOG_ERROR(db_options_.info_log, "Failed to sync dir %s: %s",
  1775. blob_dir_.c_str(), s.ToString().c_str());
  1776. }
  1777. }
  1778. // put files back into obsolete if for some reason, delete failed
  1779. if (!tobsolete.empty()) {
  1780. WriteLock wl(&mutex_);
  1781. for (const auto& bfile : tobsolete) {
  1782. blob_files_.insert(std::make_pair(bfile->BlobFileNumber(), bfile));
  1783. obsolete_files_.push_front(bfile);
  1784. }
  1785. }
  1786. return std::make_pair(!aborted, -1);
  1787. }
  1788. void BlobDBImpl::CopyBlobFiles(
  1789. std::vector<std::shared_ptr<BlobFile>>* bfiles_copy) {
  1790. ReadLock rl(&mutex_);
  1791. for (auto const& p : blob_files_) {
  1792. bfiles_copy->push_back(p.second);
  1793. }
  1794. }
  1795. Iterator* BlobDBImpl::NewIterator(const ReadOptions& _read_options) {
  1796. if (_read_options.io_activity != Env::IOActivity::kUnknown &&
  1797. _read_options.io_activity != Env::IOActivity::kDBIterator) {
  1798. return NewErrorIterator(Status::InvalidArgument(
  1799. "Can only call NewIterator with `ReadOptions::io_activity` is "
  1800. "`Env::IOActivity::kUnknown` or `Env::IOActivity::kDBIterator`"));
  1801. }
  1802. ReadOptions read_options(_read_options);
  1803. if (read_options.io_activity == Env::IOActivity::kUnknown) {
  1804. read_options.io_activity = Env::IOActivity::kDBIterator;
  1805. }
  1806. auto* cfh =
  1807. static_cast_with_check<ColumnFamilyHandleImpl>(DefaultColumnFamily());
  1808. auto* cfd = cfh->cfd();
  1809. // Get a snapshot to avoid blob file get deleted between we
  1810. // fetch and index entry and reading from the file.
  1811. ManagedSnapshot* own_snapshot = nullptr;
  1812. const Snapshot* snapshot = read_options.snapshot;
  1813. if (snapshot == nullptr) {
  1814. own_snapshot = new ManagedSnapshot(db_);
  1815. snapshot = own_snapshot->snapshot();
  1816. }
  1817. SuperVersion* sv = cfd->GetReferencedSuperVersion(db_impl_);
  1818. auto* iter = db_impl_->NewIteratorImpl(
  1819. read_options, cfh, sv, snapshot->GetSequenceNumber(),
  1820. nullptr /*read_callback*/, true /*expose_blob_index*/);
  1821. return new BlobDBIterator(own_snapshot, iter, this, clock_, statistics_);
  1822. }
  1823. Status DestroyBlobDB(const std::string& dbname, const Options& options,
  1824. const BlobDBOptions& bdb_options) {
  1825. const ImmutableDBOptions soptions(SanitizeOptions(dbname, options));
  1826. Env* env = soptions.env;
  1827. Status status;
  1828. std::string blobdir;
  1829. blobdir = (bdb_options.path_relative) ? dbname + "/" + bdb_options.blob_dir
  1830. : bdb_options.blob_dir;
  1831. std::vector<std::string> filenames;
  1832. if (env->GetChildren(blobdir, &filenames).ok()) {
  1833. for (const auto& f : filenames) {
  1834. uint64_t number;
  1835. FileType type;
  1836. if (ParseFileName(f, &number, &type) && type == kBlobFile) {
  1837. Status del = DeleteDBFile(&soptions, blobdir + "/" + f, blobdir, true,
  1838. /*force_fg=*/false);
  1839. if (status.ok() && !del.ok()) {
  1840. status = del;
  1841. }
  1842. }
  1843. }
  1844. // TODO: What to do if we cannot delete the directory?
  1845. env->DeleteDir(blobdir).PermitUncheckedError();
  1846. }
  1847. Status destroy = DestroyDB(dbname, options);
  1848. if (status.ok() && !destroy.ok()) {
  1849. status = destroy;
  1850. }
  1851. return status;
  1852. }
  1853. #ifndef NDEBUG
  1854. Status BlobDBImpl::TEST_GetBlobValue(const Slice& key, const Slice& index_entry,
  1855. PinnableSlice* value) {
  1856. return GetBlobValue(key, index_entry, value);
  1857. }
  1858. void BlobDBImpl::TEST_AddDummyBlobFile(uint64_t blob_file_number,
  1859. SequenceNumber immutable_sequence) {
  1860. auto blob_file = std::make_shared<BlobFile>(this, blob_dir_, blob_file_number,
  1861. db_options_.info_log.get());
  1862. blob_file->MarkImmutable(immutable_sequence);
  1863. blob_files_[blob_file_number] = blob_file;
  1864. live_imm_non_ttl_blob_files_[blob_file_number] = blob_file;
  1865. }
  1866. std::vector<std::shared_ptr<BlobFile>> BlobDBImpl::TEST_GetBlobFiles() const {
  1867. ReadLock l(&mutex_);
  1868. std::vector<std::shared_ptr<BlobFile>> blob_files;
  1869. for (auto& p : blob_files_) {
  1870. blob_files.emplace_back(p.second);
  1871. }
  1872. return blob_files;
  1873. }
  1874. std::vector<std::shared_ptr<BlobFile>> BlobDBImpl::TEST_GetLiveImmNonTTLFiles()
  1875. const {
  1876. ReadLock l(&mutex_);
  1877. std::vector<std::shared_ptr<BlobFile>> live_imm_non_ttl_files;
  1878. for (const auto& pair : live_imm_non_ttl_blob_files_) {
  1879. live_imm_non_ttl_files.emplace_back(pair.second);
  1880. }
  1881. return live_imm_non_ttl_files;
  1882. }
  1883. std::vector<std::shared_ptr<BlobFile>> BlobDBImpl::TEST_GetObsoleteFiles()
  1884. const {
  1885. ReadLock l(&mutex_);
  1886. std::vector<std::shared_ptr<BlobFile>> obsolete_files;
  1887. for (auto& bfile : obsolete_files_) {
  1888. obsolete_files.emplace_back(bfile);
  1889. }
  1890. return obsolete_files;
  1891. }
  1892. void BlobDBImpl::TEST_DeleteObsoleteFiles() {
  1893. DeleteObsoleteFiles(false /*abort*/);
  1894. }
  1895. Status BlobDBImpl::TEST_CloseBlobFile(std::shared_ptr<BlobFile>& bfile) {
  1896. MutexLock l(&write_mutex_);
  1897. WriteLock lock(&mutex_);
  1898. WriteLock file_lock(&bfile->mutex_);
  1899. return CloseBlobFile(WriteOptions(), bfile);
  1900. }
  1901. void BlobDBImpl::TEST_ObsoleteBlobFile(std::shared_ptr<BlobFile>& blob_file,
  1902. SequenceNumber obsolete_seq,
  1903. bool update_size) {
  1904. return ObsoleteBlobFile(blob_file, obsolete_seq, update_size);
  1905. }
  1906. void BlobDBImpl::TEST_EvictExpiredFiles() {
  1907. EvictExpiredFiles(false /*abort*/);
  1908. }
  1909. uint64_t BlobDBImpl::TEST_live_sst_size() { return live_sst_size_.load(); }
  1910. void BlobDBImpl::TEST_InitializeBlobFileToSstMapping(
  1911. const std::vector<LiveFileMetaData>& live_files) {
  1912. InitializeBlobFileToSstMapping(live_files);
  1913. }
  1914. void BlobDBImpl::TEST_ProcessFlushJobInfo(const FlushJobInfo& info) {
  1915. ProcessFlushJobInfo(info);
  1916. }
  1917. void BlobDBImpl::TEST_ProcessCompactionJobInfo(const CompactionJobInfo& info) {
  1918. ProcessCompactionJobInfo(info);
  1919. }
  1920. #endif // !NDEBUG
  1921. } // namespace ROCKSDB_NAMESPACE::blob_db