blob_db_impl.cc 67 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #ifndef ROCKSDB_LITE
  6. #include "utilities/blob_db/blob_db_impl.h"
  7. #include <algorithm>
  8. #include <cinttypes>
  9. #include <iomanip>
  10. #include <memory>
  11. #include <sstream>
  12. #include "db/blob_index.h"
  13. #include "db/db_impl/db_impl.h"
  14. #include "db/write_batch_internal.h"
  15. #include "env/composite_env_wrapper.h"
  16. #include "file/file_util.h"
  17. #include "file/filename.h"
  18. #include "file/random_access_file_reader.h"
  19. #include "file/sst_file_manager_impl.h"
  20. #include "file/writable_file_writer.h"
  21. #include "logging/logging.h"
  22. #include "monitoring/instrumented_mutex.h"
  23. #include "monitoring/statistics.h"
  24. #include "rocksdb/convenience.h"
  25. #include "rocksdb/env.h"
  26. #include "rocksdb/iterator.h"
  27. #include "rocksdb/utilities/stackable_db.h"
  28. #include "rocksdb/utilities/transaction.h"
  29. #include "table/block_based/block.h"
  30. #include "table/block_based/block_based_table_builder.h"
  31. #include "table/block_based/block_builder.h"
  32. #include "table/meta_blocks.h"
  33. #include "test_util/sync_point.h"
  34. #include "util/cast_util.h"
  35. #include "util/crc32c.h"
  36. #include "util/mutexlock.h"
  37. #include "util/random.h"
  38. #include "util/stop_watch.h"
  39. #include "util/timer_queue.h"
  40. #include "utilities/blob_db/blob_compaction_filter.h"
  41. #include "utilities/blob_db/blob_db_iterator.h"
  42. #include "utilities/blob_db/blob_db_listener.h"
  43. namespace {
  44. int kBlockBasedTableVersionFormat = 2;
  45. } // end namespace
  46. namespace ROCKSDB_NAMESPACE {
  47. namespace blob_db {
  48. bool BlobFileComparator::operator()(
  49. const std::shared_ptr<BlobFile>& lhs,
  50. const std::shared_ptr<BlobFile>& rhs) const {
  51. return lhs->BlobFileNumber() > rhs->BlobFileNumber();
  52. }
  53. bool BlobFileComparatorTTL::operator()(
  54. const std::shared_ptr<BlobFile>& lhs,
  55. const std::shared_ptr<BlobFile>& rhs) const {
  56. assert(lhs->HasTTL() && rhs->HasTTL());
  57. if (lhs->expiration_range_.first < rhs->expiration_range_.first) {
  58. return true;
  59. }
  60. if (lhs->expiration_range_.first > rhs->expiration_range_.first) {
  61. return false;
  62. }
  63. return lhs->BlobFileNumber() < rhs->BlobFileNumber();
  64. }
  65. BlobDBImpl::BlobDBImpl(const std::string& dbname,
  66. const BlobDBOptions& blob_db_options,
  67. const DBOptions& db_options,
  68. const ColumnFamilyOptions& cf_options)
  69. : BlobDB(),
  70. dbname_(dbname),
  71. db_impl_(nullptr),
  72. env_(db_options.env),
  73. bdb_options_(blob_db_options),
  74. db_options_(db_options),
  75. cf_options_(cf_options),
  76. env_options_(db_options),
  77. statistics_(db_options_.statistics.get()),
  78. next_file_number_(1),
  79. flush_sequence_(0),
  80. closed_(true),
  81. open_file_count_(0),
  82. total_blob_size_(0),
  83. live_sst_size_(0),
  84. fifo_eviction_seq_(0),
  85. evict_expiration_up_to_(0),
  86. debug_level_(0) {
  87. blob_dir_ = (bdb_options_.path_relative)
  88. ? dbname + "/" + bdb_options_.blob_dir
  89. : bdb_options_.blob_dir;
  90. env_options_.bytes_per_sync = blob_db_options.bytes_per_sync;
  91. }
  92. BlobDBImpl::~BlobDBImpl() {
  93. tqueue_.shutdown();
  94. // CancelAllBackgroundWork(db_, true);
  95. Status s __attribute__((__unused__)) = Close();
  96. assert(s.ok());
  97. }
  98. Status BlobDBImpl::Close() {
  99. if (closed_) {
  100. return Status::OK();
  101. }
  102. closed_ = true;
  103. // Close base DB before BlobDBImpl destructs to stop event listener and
  104. // compaction filter call.
  105. Status s = db_->Close();
  106. // delete db_ anyway even if close failed.
  107. delete db_;
  108. // Reset pointers to avoid StackableDB delete the pointer again.
  109. db_ = nullptr;
  110. db_impl_ = nullptr;
  111. if (!s.ok()) {
  112. return s;
  113. }
  114. s = SyncBlobFiles();
  115. return s;
  116. }
  117. BlobDBOptions BlobDBImpl::GetBlobDBOptions() const { return bdb_options_; }
  118. Status BlobDBImpl::Open(std::vector<ColumnFamilyHandle*>* handles) {
  119. assert(handles != nullptr);
  120. assert(db_ == nullptr);
  121. if (blob_dir_.empty()) {
  122. return Status::NotSupported("No blob directory in options");
  123. }
  124. if (cf_options_.compaction_filter != nullptr ||
  125. cf_options_.compaction_filter_factory != nullptr) {
  126. return Status::NotSupported("Blob DB doesn't support compaction filter.");
  127. }
  128. if (bdb_options_.garbage_collection_cutoff < 0.0 ||
  129. bdb_options_.garbage_collection_cutoff > 1.0) {
  130. return Status::InvalidArgument(
  131. "Garbage collection cutoff must be in the interval [0.0, 1.0]");
  132. }
  133. // Temporarily disable compactions in the base DB during open; save the user
  134. // defined value beforehand so we can restore it once BlobDB is initialized.
  135. // Note: this is only needed if garbage collection is enabled.
  136. const bool disable_auto_compactions = cf_options_.disable_auto_compactions;
  137. if (bdb_options_.enable_garbage_collection) {
  138. cf_options_.disable_auto_compactions = true;
  139. }
  140. Status s;
  141. // Create info log.
  142. if (db_options_.info_log == nullptr) {
  143. s = CreateLoggerFromOptions(dbname_, db_options_, &db_options_.info_log);
  144. if (!s.ok()) {
  145. return s;
  146. }
  147. }
  148. ROCKS_LOG_INFO(db_options_.info_log, "Opening BlobDB...");
  149. // Open blob directory.
  150. s = env_->CreateDirIfMissing(blob_dir_);
  151. if (!s.ok()) {
  152. ROCKS_LOG_ERROR(db_options_.info_log,
  153. "Failed to create blob_dir %s, status: %s",
  154. blob_dir_.c_str(), s.ToString().c_str());
  155. }
  156. s = env_->NewDirectory(blob_dir_, &dir_ent_);
  157. if (!s.ok()) {
  158. ROCKS_LOG_ERROR(db_options_.info_log,
  159. "Failed to open blob_dir %s, status: %s", blob_dir_.c_str(),
  160. s.ToString().c_str());
  161. return s;
  162. }
  163. // Open blob files.
  164. s = OpenAllBlobFiles();
  165. if (!s.ok()) {
  166. return s;
  167. }
  168. // Update options
  169. if (bdb_options_.enable_garbage_collection) {
  170. db_options_.listeners.push_back(std::make_shared<BlobDBListenerGC>(this));
  171. cf_options_.compaction_filter_factory =
  172. std::make_shared<BlobIndexCompactionFilterFactoryGC>(this, env_,
  173. statistics_);
  174. } else {
  175. db_options_.listeners.push_back(std::make_shared<BlobDBListener>(this));
  176. cf_options_.compaction_filter_factory =
  177. std::make_shared<BlobIndexCompactionFilterFactory>(this, env_,
  178. statistics_);
  179. }
  180. // Open base db.
  181. ColumnFamilyDescriptor cf_descriptor(kDefaultColumnFamilyName, cf_options_);
  182. s = DB::Open(db_options_, dbname_, {cf_descriptor}, handles, &db_);
  183. if (!s.ok()) {
  184. return s;
  185. }
  186. db_impl_ = static_cast_with_check<DBImpl, DB>(db_->GetRootDB());
  187. // Initialize SST file <-> oldest blob file mapping if garbage collection
  188. // is enabled.
  189. if (bdb_options_.enable_garbage_collection) {
  190. std::vector<LiveFileMetaData> live_files;
  191. db_->GetLiveFilesMetaData(&live_files);
  192. InitializeBlobFileToSstMapping(live_files);
  193. MarkUnreferencedBlobFilesObsoleteDuringOpen();
  194. if (!disable_auto_compactions) {
  195. s = db_->EnableAutoCompaction(*handles);
  196. if (!s.ok()) {
  197. ROCKS_LOG_ERROR(
  198. db_options_.info_log,
  199. "Failed to enable automatic compactions during open, status: %s",
  200. s.ToString().c_str());
  201. return s;
  202. }
  203. }
  204. }
  205. // Add trash files in blob dir to file delete scheduler.
  206. SstFileManagerImpl* sfm = static_cast<SstFileManagerImpl*>(
  207. db_impl_->immutable_db_options().sst_file_manager.get());
  208. DeleteScheduler::CleanupDirectory(env_, sfm, blob_dir_);
  209. UpdateLiveSSTSize();
  210. // Start background jobs.
  211. if (!bdb_options_.disable_background_tasks) {
  212. StartBackgroundTasks();
  213. }
  214. ROCKS_LOG_INFO(db_options_.info_log, "BlobDB pointer %p", this);
  215. bdb_options_.Dump(db_options_.info_log.get());
  216. closed_ = false;
  217. return s;
  218. }
  219. void BlobDBImpl::StartBackgroundTasks() {
  220. // store a call to a member function and object
  221. tqueue_.add(
  222. kReclaimOpenFilesPeriodMillisecs,
  223. std::bind(&BlobDBImpl::ReclaimOpenFiles, this, std::placeholders::_1));
  224. tqueue_.add(
  225. kDeleteObsoleteFilesPeriodMillisecs,
  226. std::bind(&BlobDBImpl::DeleteObsoleteFiles, this, std::placeholders::_1));
  227. tqueue_.add(kSanityCheckPeriodMillisecs,
  228. std::bind(&BlobDBImpl::SanityCheck, this, std::placeholders::_1));
  229. tqueue_.add(
  230. kEvictExpiredFilesPeriodMillisecs,
  231. std::bind(&BlobDBImpl::EvictExpiredFiles, this, std::placeholders::_1));
  232. }
  233. Status BlobDBImpl::GetAllBlobFiles(std::set<uint64_t>* file_numbers) {
  234. assert(file_numbers != nullptr);
  235. std::vector<std::string> all_files;
  236. Status s = env_->GetChildren(blob_dir_, &all_files);
  237. if (!s.ok()) {
  238. ROCKS_LOG_ERROR(db_options_.info_log,
  239. "Failed to get list of blob files, status: %s",
  240. s.ToString().c_str());
  241. return s;
  242. }
  243. for (const auto& file_name : all_files) {
  244. uint64_t file_number;
  245. FileType type;
  246. bool success = ParseFileName(file_name, &file_number, &type);
  247. if (success && type == kBlobFile) {
  248. file_numbers->insert(file_number);
  249. } else {
  250. ROCKS_LOG_WARN(db_options_.info_log,
  251. "Skipping file in blob directory: %s", file_name.c_str());
  252. }
  253. }
  254. return s;
  255. }
  256. Status BlobDBImpl::OpenAllBlobFiles() {
  257. std::set<uint64_t> file_numbers;
  258. Status s = GetAllBlobFiles(&file_numbers);
  259. if (!s.ok()) {
  260. return s;
  261. }
  262. if (!file_numbers.empty()) {
  263. next_file_number_.store(*file_numbers.rbegin() + 1);
  264. }
  265. std::ostringstream blob_file_oss;
  266. std::ostringstream live_imm_oss;
  267. std::ostringstream obsolete_file_oss;
  268. for (auto& file_number : file_numbers) {
  269. std::shared_ptr<BlobFile> blob_file = std::make_shared<BlobFile>(
  270. this, blob_dir_, file_number, db_options_.info_log.get());
  271. blob_file->MarkImmutable(/* sequence */ 0);
  272. // Read file header and footer
  273. Status read_metadata_status = blob_file->ReadMetadata(env_, env_options_);
  274. if (read_metadata_status.IsCorruption()) {
  275. // Remove incomplete file.
  276. if (!obsolete_files_.empty()) {
  277. obsolete_file_oss << ", ";
  278. }
  279. obsolete_file_oss << file_number;
  280. ObsoleteBlobFile(blob_file, 0 /*obsolete_seq*/, false /*update_size*/);
  281. continue;
  282. } else if (!read_metadata_status.ok()) {
  283. ROCKS_LOG_ERROR(db_options_.info_log,
  284. "Unable to read metadata of blob file %" PRIu64
  285. ", status: '%s'",
  286. file_number, read_metadata_status.ToString().c_str());
  287. return read_metadata_status;
  288. }
  289. total_blob_size_ += blob_file->GetFileSize();
  290. if (!blob_files_.empty()) {
  291. blob_file_oss << ", ";
  292. }
  293. blob_file_oss << file_number;
  294. blob_files_[file_number] = blob_file;
  295. if (!blob_file->HasTTL()) {
  296. if (!live_imm_non_ttl_blob_files_.empty()) {
  297. live_imm_oss << ", ";
  298. }
  299. live_imm_oss << file_number;
  300. live_imm_non_ttl_blob_files_[file_number] = blob_file;
  301. }
  302. }
  303. ROCKS_LOG_INFO(db_options_.info_log,
  304. "Found %" ROCKSDB_PRIszt " blob files: %s", blob_files_.size(),
  305. blob_file_oss.str().c_str());
  306. ROCKS_LOG_INFO(
  307. db_options_.info_log, "Found %" ROCKSDB_PRIszt " non-TTL blob files: %s",
  308. live_imm_non_ttl_blob_files_.size(), live_imm_oss.str().c_str());
  309. ROCKS_LOG_INFO(db_options_.info_log,
  310. "Found %" ROCKSDB_PRIszt
  311. " incomplete or corrupted blob files: %s",
  312. obsolete_files_.size(), obsolete_file_oss.str().c_str());
  313. return s;
  314. }
  315. template <typename Linker>
  316. void BlobDBImpl::LinkSstToBlobFileImpl(uint64_t sst_file_number,
  317. uint64_t blob_file_number,
  318. Linker linker) {
  319. assert(bdb_options_.enable_garbage_collection);
  320. assert(blob_file_number != kInvalidBlobFileNumber);
  321. auto it = blob_files_.find(blob_file_number);
  322. if (it == blob_files_.end()) {
  323. ROCKS_LOG_WARN(db_options_.info_log,
  324. "Blob file %" PRIu64
  325. " not found while trying to link "
  326. "SST file %" PRIu64,
  327. blob_file_number, sst_file_number);
  328. return;
  329. }
  330. BlobFile* const blob_file = it->second.get();
  331. assert(blob_file);
  332. linker(blob_file, sst_file_number);
  333. ROCKS_LOG_INFO(db_options_.info_log,
  334. "Blob file %" PRIu64 " linked to SST file %" PRIu64,
  335. blob_file_number, sst_file_number);
  336. }
  337. void BlobDBImpl::LinkSstToBlobFile(uint64_t sst_file_number,
  338. uint64_t blob_file_number) {
  339. auto linker = [](BlobFile* blob_file, uint64_t sst_file) {
  340. WriteLock file_lock(&blob_file->mutex_);
  341. blob_file->LinkSstFile(sst_file);
  342. };
  343. LinkSstToBlobFileImpl(sst_file_number, blob_file_number, linker);
  344. }
  345. void BlobDBImpl::LinkSstToBlobFileNoLock(uint64_t sst_file_number,
  346. uint64_t blob_file_number) {
  347. auto linker = [](BlobFile* blob_file, uint64_t sst_file) {
  348. blob_file->LinkSstFile(sst_file);
  349. };
  350. LinkSstToBlobFileImpl(sst_file_number, blob_file_number, linker);
  351. }
  352. void BlobDBImpl::UnlinkSstFromBlobFile(uint64_t sst_file_number,
  353. uint64_t blob_file_number) {
  354. assert(bdb_options_.enable_garbage_collection);
  355. assert(blob_file_number != kInvalidBlobFileNumber);
  356. auto it = blob_files_.find(blob_file_number);
  357. if (it == blob_files_.end()) {
  358. ROCKS_LOG_WARN(db_options_.info_log,
  359. "Blob file %" PRIu64
  360. " not found while trying to unlink "
  361. "SST file %" PRIu64,
  362. blob_file_number, sst_file_number);
  363. return;
  364. }
  365. BlobFile* const blob_file = it->second.get();
  366. assert(blob_file);
  367. {
  368. WriteLock file_lock(&blob_file->mutex_);
  369. blob_file->UnlinkSstFile(sst_file_number);
  370. }
  371. ROCKS_LOG_INFO(db_options_.info_log,
  372. "Blob file %" PRIu64 " unlinked from SST file %" PRIu64,
  373. blob_file_number, sst_file_number);
  374. }
  375. void BlobDBImpl::InitializeBlobFileToSstMapping(
  376. const std::vector<LiveFileMetaData>& live_files) {
  377. assert(bdb_options_.enable_garbage_collection);
  378. for (const auto& live_file : live_files) {
  379. const uint64_t sst_file_number = live_file.file_number;
  380. const uint64_t blob_file_number = live_file.oldest_blob_file_number;
  381. if (blob_file_number == kInvalidBlobFileNumber) {
  382. continue;
  383. }
  384. LinkSstToBlobFileNoLock(sst_file_number, blob_file_number);
  385. }
  386. }
  387. void BlobDBImpl::ProcessFlushJobInfo(const FlushJobInfo& info) {
  388. assert(bdb_options_.enable_garbage_collection);
  389. WriteLock lock(&mutex_);
  390. if (info.oldest_blob_file_number != kInvalidBlobFileNumber) {
  391. LinkSstToBlobFile(info.file_number, info.oldest_blob_file_number);
  392. }
  393. assert(flush_sequence_ < info.largest_seqno);
  394. flush_sequence_ = info.largest_seqno;
  395. MarkUnreferencedBlobFilesObsolete();
  396. }
  397. void BlobDBImpl::ProcessCompactionJobInfo(const CompactionJobInfo& info) {
  398. assert(bdb_options_.enable_garbage_collection);
  399. if (!info.status.ok()) {
  400. return;
  401. }
  402. // Note: the same SST file may appear in both the input and the output
  403. // file list in case of a trivial move. We walk through the two lists
  404. // below in a fashion that's similar to merge sort to detect this.
  405. auto cmp = [](const CompactionFileInfo& lhs, const CompactionFileInfo& rhs) {
  406. return lhs.file_number < rhs.file_number;
  407. };
  408. auto inputs = info.input_file_infos;
  409. auto iit = inputs.begin();
  410. const auto iit_end = inputs.end();
  411. std::sort(iit, iit_end, cmp);
  412. auto outputs = info.output_file_infos;
  413. auto oit = outputs.begin();
  414. const auto oit_end = outputs.end();
  415. std::sort(oit, oit_end, cmp);
  416. WriteLock lock(&mutex_);
  417. while (iit != iit_end && oit != oit_end) {
  418. const auto& input = *iit;
  419. const auto& output = *oit;
  420. if (input.file_number == output.file_number) {
  421. ++iit;
  422. ++oit;
  423. } else if (input.file_number < output.file_number) {
  424. if (input.oldest_blob_file_number != kInvalidBlobFileNumber) {
  425. UnlinkSstFromBlobFile(input.file_number, input.oldest_blob_file_number);
  426. }
  427. ++iit;
  428. } else {
  429. assert(output.file_number < input.file_number);
  430. if (output.oldest_blob_file_number != kInvalidBlobFileNumber) {
  431. LinkSstToBlobFile(output.file_number, output.oldest_blob_file_number);
  432. }
  433. ++oit;
  434. }
  435. }
  436. while (iit != iit_end) {
  437. const auto& input = *iit;
  438. if (input.oldest_blob_file_number != kInvalidBlobFileNumber) {
  439. UnlinkSstFromBlobFile(input.file_number, input.oldest_blob_file_number);
  440. }
  441. ++iit;
  442. }
  443. while (oit != oit_end) {
  444. const auto& output = *oit;
  445. if (output.oldest_blob_file_number != kInvalidBlobFileNumber) {
  446. LinkSstToBlobFile(output.file_number, output.oldest_blob_file_number);
  447. }
  448. ++oit;
  449. }
  450. MarkUnreferencedBlobFilesObsolete();
  451. }
  452. bool BlobDBImpl::MarkBlobFileObsoleteIfNeeded(
  453. const std::shared_ptr<BlobFile>& blob_file, SequenceNumber obsolete_seq) {
  454. assert(blob_file);
  455. assert(!blob_file->HasTTL());
  456. assert(blob_file->Immutable());
  457. assert(bdb_options_.enable_garbage_collection);
  458. // Note: FIFO eviction could have marked this file obsolete already.
  459. if (blob_file->Obsolete()) {
  460. return true;
  461. }
  462. // We cannot mark this file (or any higher-numbered files for that matter)
  463. // obsolete if it is referenced by any memtables or SSTs. We keep track of
  464. // the SSTs explicitly. To account for memtables, we keep track of the highest
  465. // sequence number received in flush notifications, and we do not mark the
  466. // blob file obsolete if there are still unflushed memtables from before
  467. // the time the blob file was closed.
  468. if (blob_file->GetImmutableSequence() > flush_sequence_ ||
  469. !blob_file->GetLinkedSstFiles().empty()) {
  470. return false;
  471. }
  472. ROCKS_LOG_INFO(db_options_.info_log,
  473. "Blob file %" PRIu64 " is no longer needed, marking obsolete",
  474. blob_file->BlobFileNumber());
  475. ObsoleteBlobFile(blob_file, obsolete_seq, /* update_size */ true);
  476. return true;
  477. }
  478. template <class Functor>
  479. void BlobDBImpl::MarkUnreferencedBlobFilesObsoleteImpl(Functor mark_if_needed) {
  480. assert(bdb_options_.enable_garbage_collection);
  481. // Iterate through all live immutable non-TTL blob files, and mark them
  482. // obsolete assuming no SST files or memtables rely on the blobs in them.
  483. // Note: we need to stop as soon as we find a blob file that has any
  484. // linked SSTs (or one potentially referenced by memtables).
  485. uint64_t obsoleted_files = 0;
  486. auto it = live_imm_non_ttl_blob_files_.begin();
  487. while (it != live_imm_non_ttl_blob_files_.end()) {
  488. const auto& blob_file = it->second;
  489. assert(blob_file);
  490. assert(blob_file->BlobFileNumber() == it->first);
  491. assert(!blob_file->HasTTL());
  492. assert(blob_file->Immutable());
  493. // Small optimization: Obsolete() does an atomic read, so we can do
  494. // this check without taking a lock on the blob file's mutex.
  495. if (blob_file->Obsolete()) {
  496. it = live_imm_non_ttl_blob_files_.erase(it);
  497. continue;
  498. }
  499. if (!mark_if_needed(blob_file)) {
  500. break;
  501. }
  502. it = live_imm_non_ttl_blob_files_.erase(it);
  503. ++obsoleted_files;
  504. }
  505. if (obsoleted_files > 0) {
  506. ROCKS_LOG_INFO(db_options_.info_log,
  507. "%" PRIu64 " blob file(s) marked obsolete by GC",
  508. obsoleted_files);
  509. RecordTick(statistics_, BLOB_DB_GC_NUM_FILES, obsoleted_files);
  510. }
  511. }
  512. void BlobDBImpl::MarkUnreferencedBlobFilesObsolete() {
  513. const SequenceNumber obsolete_seq = GetLatestSequenceNumber();
  514. MarkUnreferencedBlobFilesObsoleteImpl(
  515. [=](const std::shared_ptr<BlobFile>& blob_file) {
  516. WriteLock file_lock(&blob_file->mutex_);
  517. return MarkBlobFileObsoleteIfNeeded(blob_file, obsolete_seq);
  518. });
  519. }
  520. void BlobDBImpl::MarkUnreferencedBlobFilesObsoleteDuringOpen() {
  521. MarkUnreferencedBlobFilesObsoleteImpl(
  522. [=](const std::shared_ptr<BlobFile>& blob_file) {
  523. return MarkBlobFileObsoleteIfNeeded(blob_file, /* obsolete_seq */ 0);
  524. });
  525. }
  526. void BlobDBImpl::CloseRandomAccessLocked(
  527. const std::shared_ptr<BlobFile>& bfile) {
  528. bfile->CloseRandomAccessLocked();
  529. open_file_count_--;
  530. }
  531. Status BlobDBImpl::GetBlobFileReader(
  532. const std::shared_ptr<BlobFile>& blob_file,
  533. std::shared_ptr<RandomAccessFileReader>* reader) {
  534. assert(reader != nullptr);
  535. bool fresh_open = false;
  536. Status s = blob_file->GetReader(env_, env_options_, reader, &fresh_open);
  537. if (s.ok() && fresh_open) {
  538. assert(*reader != nullptr);
  539. open_file_count_++;
  540. }
  541. return s;
  542. }
  543. std::shared_ptr<BlobFile> BlobDBImpl::NewBlobFile(
  544. bool has_ttl, const ExpirationRange& expiration_range,
  545. const std::string& reason) {
  546. assert(has_ttl == (expiration_range.first || expiration_range.second));
  547. uint64_t file_num = next_file_number_++;
  548. const uint32_t column_family_id =
  549. static_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily())->GetID();
  550. auto blob_file = std::make_shared<BlobFile>(
  551. this, blob_dir_, file_num, db_options_.info_log.get(), column_family_id,
  552. bdb_options_.compression, has_ttl, expiration_range);
  553. ROCKS_LOG_DEBUG(db_options_.info_log, "New blob file created: %s reason='%s'",
  554. blob_file->PathName().c_str(), reason.c_str());
  555. LogFlush(db_options_.info_log);
  556. return blob_file;
  557. }
  558. void BlobDBImpl::RegisterBlobFile(std::shared_ptr<BlobFile> blob_file) {
  559. const uint64_t blob_file_number = blob_file->BlobFileNumber();
  560. auto it = blob_files_.lower_bound(blob_file_number);
  561. assert(it == blob_files_.end() || it->first != blob_file_number);
  562. blob_files_.insert(it,
  563. std::map<uint64_t, std::shared_ptr<BlobFile>>::value_type(
  564. blob_file_number, std::move(blob_file)));
  565. }
  566. Status BlobDBImpl::CreateWriterLocked(const std::shared_ptr<BlobFile>& bfile) {
  567. std::string fpath(bfile->PathName());
  568. std::unique_ptr<WritableFile> wfile;
  569. Status s = env_->ReopenWritableFile(fpath, &wfile, env_options_);
  570. if (!s.ok()) {
  571. ROCKS_LOG_ERROR(db_options_.info_log,
  572. "Failed to open blob file for write: %s status: '%s'"
  573. " exists: '%s'",
  574. fpath.c_str(), s.ToString().c_str(),
  575. env_->FileExists(fpath).ToString().c_str());
  576. return s;
  577. }
  578. std::unique_ptr<WritableFileWriter> fwriter;
  579. fwriter.reset(new WritableFileWriter(
  580. NewLegacyWritableFileWrapper(std::move(wfile)), fpath, env_options_));
  581. uint64_t boffset = bfile->GetFileSize();
  582. if (debug_level_ >= 2 && boffset) {
  583. ROCKS_LOG_DEBUG(db_options_.info_log,
  584. "Open blob file: %s with offset: %" PRIu64, fpath.c_str(),
  585. boffset);
  586. }
  587. Writer::ElemType et = Writer::kEtNone;
  588. if (bfile->file_size_ == BlobLogHeader::kSize) {
  589. et = Writer::kEtFileHdr;
  590. } else if (bfile->file_size_ > BlobLogHeader::kSize) {
  591. et = Writer::kEtRecord;
  592. } else if (bfile->file_size_) {
  593. ROCKS_LOG_WARN(db_options_.info_log,
  594. "Open blob file: %s with wrong size: %" PRIu64,
  595. fpath.c_str(), boffset);
  596. return Status::Corruption("Invalid blob file size");
  597. }
  598. bfile->log_writer_ = std::make_shared<Writer>(
  599. std::move(fwriter), env_, statistics_, bfile->file_number_,
  600. bdb_options_.bytes_per_sync, db_options_.use_fsync, boffset);
  601. bfile->log_writer_->last_elem_type_ = et;
  602. return s;
  603. }
  604. std::shared_ptr<BlobFile> BlobDBImpl::FindBlobFileLocked(
  605. uint64_t expiration) const {
  606. if (open_ttl_files_.empty()) {
  607. return nullptr;
  608. }
  609. std::shared_ptr<BlobFile> tmp = std::make_shared<BlobFile>();
  610. tmp->SetHasTTL(true);
  611. tmp->expiration_range_ = std::make_pair(expiration, 0);
  612. tmp->file_number_ = std::numeric_limits<uint64_t>::max();
  613. auto citr = open_ttl_files_.equal_range(tmp);
  614. if (citr.first == open_ttl_files_.end()) {
  615. assert(citr.second == open_ttl_files_.end());
  616. std::shared_ptr<BlobFile> check = *(open_ttl_files_.rbegin());
  617. return (check->expiration_range_.second <= expiration) ? nullptr : check;
  618. }
  619. if (citr.first != citr.second) {
  620. return *(citr.first);
  621. }
  622. auto finditr = citr.second;
  623. if (finditr != open_ttl_files_.begin()) {
  624. --finditr;
  625. }
  626. bool b2 = (*finditr)->expiration_range_.second <= expiration;
  627. bool b1 = (*finditr)->expiration_range_.first > expiration;
  628. return (b1 || b2) ? nullptr : (*finditr);
  629. }
  630. Status BlobDBImpl::CheckOrCreateWriterLocked(
  631. const std::shared_ptr<BlobFile>& blob_file,
  632. std::shared_ptr<Writer>* writer) {
  633. assert(writer != nullptr);
  634. *writer = blob_file->GetWriter();
  635. if (*writer != nullptr) {
  636. return Status::OK();
  637. }
  638. Status s = CreateWriterLocked(blob_file);
  639. if (s.ok()) {
  640. *writer = blob_file->GetWriter();
  641. }
  642. return s;
  643. }
  644. Status BlobDBImpl::CreateBlobFileAndWriter(
  645. bool has_ttl, const ExpirationRange& expiration_range,
  646. const std::string& reason, std::shared_ptr<BlobFile>* blob_file,
  647. std::shared_ptr<Writer>* writer) {
  648. assert(has_ttl == (expiration_range.first || expiration_range.second));
  649. assert(blob_file);
  650. assert(writer);
  651. *blob_file = NewBlobFile(has_ttl, expiration_range, reason);
  652. assert(*blob_file);
  653. // file not visible, hence no lock
  654. Status s = CheckOrCreateWriterLocked(*blob_file, writer);
  655. if (!s.ok()) {
  656. ROCKS_LOG_ERROR(db_options_.info_log,
  657. "Failed to get writer for blob file: %s, error: %s",
  658. (*blob_file)->PathName().c_str(), s.ToString().c_str());
  659. return s;
  660. }
  661. assert(*writer);
  662. s = (*writer)->WriteHeader((*blob_file)->header_);
  663. if (!s.ok()) {
  664. ROCKS_LOG_ERROR(db_options_.info_log,
  665. "Failed to write header to new blob file: %s"
  666. " status: '%s'",
  667. (*blob_file)->PathName().c_str(), s.ToString().c_str());
  668. return s;
  669. }
  670. (*blob_file)->SetFileSize(BlobLogHeader::kSize);
  671. total_blob_size_ += BlobLogHeader::kSize;
  672. return s;
  673. }
  674. Status BlobDBImpl::SelectBlobFile(std::shared_ptr<BlobFile>* blob_file) {
  675. assert(blob_file);
  676. {
  677. ReadLock rl(&mutex_);
  678. if (open_non_ttl_file_) {
  679. assert(!open_non_ttl_file_->Immutable());
  680. *blob_file = open_non_ttl_file_;
  681. return Status::OK();
  682. }
  683. }
  684. // Check again
  685. WriteLock wl(&mutex_);
  686. if (open_non_ttl_file_) {
  687. assert(!open_non_ttl_file_->Immutable());
  688. *blob_file = open_non_ttl_file_;
  689. return Status::OK();
  690. }
  691. std::shared_ptr<Writer> writer;
  692. const Status s = CreateBlobFileAndWriter(
  693. /* has_ttl */ false, ExpirationRange(),
  694. /* reason */ "SelectBlobFile", blob_file, &writer);
  695. if (!s.ok()) {
  696. return s;
  697. }
  698. RegisterBlobFile(*blob_file);
  699. open_non_ttl_file_ = *blob_file;
  700. return s;
  701. }
  702. Status BlobDBImpl::SelectBlobFileTTL(uint64_t expiration,
  703. std::shared_ptr<BlobFile>* blob_file) {
  704. assert(blob_file);
  705. assert(expiration != kNoExpiration);
  706. {
  707. ReadLock rl(&mutex_);
  708. *blob_file = FindBlobFileLocked(expiration);
  709. if (*blob_file != nullptr) {
  710. assert(!(*blob_file)->Immutable());
  711. return Status::OK();
  712. }
  713. }
  714. // Check again
  715. WriteLock wl(&mutex_);
  716. *blob_file = FindBlobFileLocked(expiration);
  717. if (*blob_file != nullptr) {
  718. assert(!(*blob_file)->Immutable());
  719. return Status::OK();
  720. }
  721. const uint64_t exp_low =
  722. (expiration / bdb_options_.ttl_range_secs) * bdb_options_.ttl_range_secs;
  723. const uint64_t exp_high = exp_low + bdb_options_.ttl_range_secs;
  724. const ExpirationRange expiration_range(exp_low, exp_high);
  725. std::ostringstream oss;
  726. oss << "SelectBlobFileTTL range: [" << exp_low << ',' << exp_high << ')';
  727. std::shared_ptr<Writer> writer;
  728. const Status s =
  729. CreateBlobFileAndWriter(/* has_ttl */ true, expiration_range,
  730. /* reason */ oss.str(), blob_file, &writer);
  731. if (!s.ok()) {
  732. return s;
  733. }
  734. RegisterBlobFile(*blob_file);
  735. open_ttl_files_.insert(*blob_file);
  736. return s;
  737. }
  738. class BlobDBImpl::BlobInserter : public WriteBatch::Handler {
  739. private:
  740. const WriteOptions& options_;
  741. BlobDBImpl* blob_db_impl_;
  742. uint32_t default_cf_id_;
  743. WriteBatch batch_;
  744. public:
  745. BlobInserter(const WriteOptions& options, BlobDBImpl* blob_db_impl,
  746. uint32_t default_cf_id)
  747. : options_(options),
  748. blob_db_impl_(blob_db_impl),
  749. default_cf_id_(default_cf_id) {}
  750. WriteBatch* batch() { return &batch_; }
  751. Status PutCF(uint32_t column_family_id, const Slice& key,
  752. const Slice& value) override {
  753. if (column_family_id != default_cf_id_) {
  754. return Status::NotSupported(
  755. "Blob DB doesn't support non-default column family.");
  756. }
  757. Status s = blob_db_impl_->PutBlobValue(options_, key, value, kNoExpiration,
  758. &batch_);
  759. return s;
  760. }
  761. Status DeleteCF(uint32_t column_family_id, const Slice& key) override {
  762. if (column_family_id != default_cf_id_) {
  763. return Status::NotSupported(
  764. "Blob DB doesn't support non-default column family.");
  765. }
  766. Status s = WriteBatchInternal::Delete(&batch_, column_family_id, key);
  767. return s;
  768. }
  769. virtual Status DeleteRange(uint32_t column_family_id, const Slice& begin_key,
  770. const Slice& end_key) {
  771. if (column_family_id != default_cf_id_) {
  772. return Status::NotSupported(
  773. "Blob DB doesn't support non-default column family.");
  774. }
  775. Status s = WriteBatchInternal::DeleteRange(&batch_, column_family_id,
  776. begin_key, end_key);
  777. return s;
  778. }
  779. Status SingleDeleteCF(uint32_t /*column_family_id*/,
  780. const Slice& /*key*/) override {
  781. return Status::NotSupported("Not supported operation in blob db.");
  782. }
  783. Status MergeCF(uint32_t /*column_family_id*/, const Slice& /*key*/,
  784. const Slice& /*value*/) override {
  785. return Status::NotSupported("Not supported operation in blob db.");
  786. }
  787. void LogData(const Slice& blob) override { batch_.PutLogData(blob); }
  788. };
  789. Status BlobDBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
  790. StopWatch write_sw(env_, statistics_, BLOB_DB_WRITE_MICROS);
  791. RecordTick(statistics_, BLOB_DB_NUM_WRITE);
  792. uint32_t default_cf_id =
  793. reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily())->GetID();
  794. Status s;
  795. BlobInserter blob_inserter(options, this, default_cf_id);
  796. {
  797. // Release write_mutex_ before DB write to avoid race condition with
  798. // flush begin listener, which also require write_mutex_ to sync
  799. // blob files.
  800. MutexLock l(&write_mutex_);
  801. s = updates->Iterate(&blob_inserter);
  802. }
  803. if (!s.ok()) {
  804. return s;
  805. }
  806. return db_->Write(options, blob_inserter.batch());
  807. }
  808. Status BlobDBImpl::Put(const WriteOptions& options, const Slice& key,
  809. const Slice& value) {
  810. return PutUntil(options, key, value, kNoExpiration);
  811. }
  812. Status BlobDBImpl::PutWithTTL(const WriteOptions& options,
  813. const Slice& key, const Slice& value,
  814. uint64_t ttl) {
  815. uint64_t now = EpochNow();
  816. uint64_t expiration = kNoExpiration - now > ttl ? now + ttl : kNoExpiration;
  817. return PutUntil(options, key, value, expiration);
  818. }
  819. Status BlobDBImpl::PutUntil(const WriteOptions& options, const Slice& key,
  820. const Slice& value, uint64_t expiration) {
  821. StopWatch write_sw(env_, statistics_, BLOB_DB_WRITE_MICROS);
  822. RecordTick(statistics_, BLOB_DB_NUM_PUT);
  823. Status s;
  824. WriteBatch batch;
  825. {
  826. // Release write_mutex_ before DB write to avoid race condition with
  827. // flush begin listener, which also require write_mutex_ to sync
  828. // blob files.
  829. MutexLock l(&write_mutex_);
  830. s = PutBlobValue(options, key, value, expiration, &batch);
  831. }
  832. if (s.ok()) {
  833. s = db_->Write(options, &batch);
  834. }
  835. return s;
  836. }
  837. Status BlobDBImpl::PutBlobValue(const WriteOptions& /*options*/,
  838. const Slice& key, const Slice& value,
  839. uint64_t expiration, WriteBatch* batch) {
  840. write_mutex_.AssertHeld();
  841. Status s;
  842. std::string index_entry;
  843. uint32_t column_family_id =
  844. reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily())->GetID();
  845. if (value.size() < bdb_options_.min_blob_size) {
  846. if (expiration == kNoExpiration) {
  847. // Put as normal value
  848. s = batch->Put(key, value);
  849. RecordTick(statistics_, BLOB_DB_WRITE_INLINED);
  850. } else {
  851. // Inlined with TTL
  852. BlobIndex::EncodeInlinedTTL(&index_entry, expiration, value);
  853. s = WriteBatchInternal::PutBlobIndex(batch, column_family_id, key,
  854. index_entry);
  855. RecordTick(statistics_, BLOB_DB_WRITE_INLINED_TTL);
  856. }
  857. } else {
  858. std::string compression_output;
  859. Slice value_compressed = GetCompressedSlice(value, &compression_output);
  860. std::string headerbuf;
  861. Writer::ConstructBlobHeader(&headerbuf, key, value_compressed, expiration);
  862. // Check DB size limit before selecting blob file to
  863. // Since CheckSizeAndEvictBlobFiles() can close blob files, it needs to be
  864. // done before calling SelectBlobFile().
  865. s = CheckSizeAndEvictBlobFiles(headerbuf.size() + key.size() +
  866. value_compressed.size());
  867. if (!s.ok()) {
  868. return s;
  869. }
  870. std::shared_ptr<BlobFile> blob_file;
  871. if (expiration != kNoExpiration) {
  872. s = SelectBlobFileTTL(expiration, &blob_file);
  873. } else {
  874. s = SelectBlobFile(&blob_file);
  875. }
  876. if (s.ok()) {
  877. assert(blob_file != nullptr);
  878. assert(blob_file->GetCompressionType() == bdb_options_.compression);
  879. s = AppendBlob(blob_file, headerbuf, key, value_compressed, expiration,
  880. &index_entry);
  881. }
  882. if (s.ok()) {
  883. if (expiration != kNoExpiration) {
  884. WriteLock file_lock(&blob_file->mutex_);
  885. blob_file->ExtendExpirationRange(expiration);
  886. }
  887. s = CloseBlobFileIfNeeded(blob_file);
  888. }
  889. if (s.ok()) {
  890. s = WriteBatchInternal::PutBlobIndex(batch, column_family_id, key,
  891. index_entry);
  892. }
  893. if (s.ok()) {
  894. if (expiration == kNoExpiration) {
  895. RecordTick(statistics_, BLOB_DB_WRITE_BLOB);
  896. } else {
  897. RecordTick(statistics_, BLOB_DB_WRITE_BLOB_TTL);
  898. }
  899. } else {
  900. ROCKS_LOG_ERROR(
  901. db_options_.info_log,
  902. "Failed to append blob to FILE: %s: KEY: %s VALSZ: %" ROCKSDB_PRIszt
  903. " status: '%s' blob_file: '%s'",
  904. blob_file->PathName().c_str(), key.ToString().c_str(), value.size(),
  905. s.ToString().c_str(), blob_file->DumpState().c_str());
  906. }
  907. }
  908. RecordTick(statistics_, BLOB_DB_NUM_KEYS_WRITTEN);
  909. RecordTick(statistics_, BLOB_DB_BYTES_WRITTEN, key.size() + value.size());
  910. RecordInHistogram(statistics_, BLOB_DB_KEY_SIZE, key.size());
  911. RecordInHistogram(statistics_, BLOB_DB_VALUE_SIZE, value.size());
  912. return s;
  913. }
  914. Slice BlobDBImpl::GetCompressedSlice(const Slice& raw,
  915. std::string* compression_output) const {
  916. if (bdb_options_.compression == kNoCompression) {
  917. return raw;
  918. }
  919. StopWatch compression_sw(env_, statistics_, BLOB_DB_COMPRESSION_MICROS);
  920. CompressionType type = bdb_options_.compression;
  921. CompressionOptions opts;
  922. CompressionContext context(type);
  923. CompressionInfo info(opts, context, CompressionDict::GetEmptyDict(), type,
  924. 0 /* sample_for_compression */);
  925. CompressBlock(raw, info, &type, kBlockBasedTableVersionFormat, false,
  926. compression_output, nullptr, nullptr);
  927. return *compression_output;
  928. }
  929. Status BlobDBImpl::CompactFiles(
  930. const CompactionOptions& compact_options,
  931. const std::vector<std::string>& input_file_names, const int output_level,
  932. const int output_path_id, std::vector<std::string>* const output_file_names,
  933. CompactionJobInfo* compaction_job_info) {
  934. // Note: we need CompactionJobInfo to be able to track updates to the
  935. // blob file <-> SST mappings, so we provide one if the user hasn't,
  936. // assuming that GC is enabled.
  937. CompactionJobInfo info{};
  938. if (bdb_options_.enable_garbage_collection && !compaction_job_info) {
  939. compaction_job_info = &info;
  940. }
  941. const Status s =
  942. db_->CompactFiles(compact_options, input_file_names, output_level,
  943. output_path_id, output_file_names, compaction_job_info);
  944. if (!s.ok()) {
  945. return s;
  946. }
  947. if (bdb_options_.enable_garbage_collection) {
  948. assert(compaction_job_info);
  949. ProcessCompactionJobInfo(*compaction_job_info);
  950. }
  951. return s;
  952. }
  953. void BlobDBImpl::GetCompactionContextCommon(
  954. BlobCompactionContext* context) const {
  955. assert(context);
  956. context->next_file_number = next_file_number_.load();
  957. context->current_blob_files.clear();
  958. for (auto& p : blob_files_) {
  959. context->current_blob_files.insert(p.first);
  960. }
  961. context->fifo_eviction_seq = fifo_eviction_seq_;
  962. context->evict_expiration_up_to = evict_expiration_up_to_;
  963. }
  964. void BlobDBImpl::GetCompactionContext(BlobCompactionContext* context) {
  965. assert(context);
  966. ReadLock l(&mutex_);
  967. GetCompactionContextCommon(context);
  968. }
  969. void BlobDBImpl::GetCompactionContext(BlobCompactionContext* context,
  970. BlobCompactionContextGC* context_gc) {
  971. assert(context);
  972. assert(context_gc);
  973. ReadLock l(&mutex_);
  974. GetCompactionContextCommon(context);
  975. context_gc->blob_db_impl = this;
  976. if (!live_imm_non_ttl_blob_files_.empty()) {
  977. auto it = live_imm_non_ttl_blob_files_.begin();
  978. std::advance(it, bdb_options_.garbage_collection_cutoff *
  979. live_imm_non_ttl_blob_files_.size());
  980. context_gc->cutoff_file_number = it != live_imm_non_ttl_blob_files_.end()
  981. ? it->first
  982. : std::numeric_limits<uint64_t>::max();
  983. }
  984. }
  985. void BlobDBImpl::UpdateLiveSSTSize() {
  986. uint64_t live_sst_size = 0;
  987. bool ok = GetIntProperty(DB::Properties::kLiveSstFilesSize, &live_sst_size);
  988. if (ok) {
  989. live_sst_size_.store(live_sst_size);
  990. ROCKS_LOG_INFO(db_options_.info_log,
  991. "Updated total SST file size: %" PRIu64 " bytes.",
  992. live_sst_size);
  993. } else {
  994. ROCKS_LOG_ERROR(
  995. db_options_.info_log,
  996. "Failed to update total SST file size after flush or compaction.");
  997. }
  998. {
  999. // Trigger FIFO eviction if needed.
  1000. MutexLock l(&write_mutex_);
  1001. Status s = CheckSizeAndEvictBlobFiles(0, true /*force*/);
  1002. if (s.IsNoSpace()) {
  1003. ROCKS_LOG_WARN(db_options_.info_log,
  1004. "DB grow out-of-space after SST size updated. Current live"
  1005. " SST size: %" PRIu64
  1006. " , current blob files size: %" PRIu64 ".",
  1007. live_sst_size_.load(), total_blob_size_.load());
  1008. }
  1009. }
  1010. }
  1011. Status BlobDBImpl::CheckSizeAndEvictBlobFiles(uint64_t blob_size,
  1012. bool force_evict) {
  1013. write_mutex_.AssertHeld();
  1014. uint64_t live_sst_size = live_sst_size_.load();
  1015. if (bdb_options_.max_db_size == 0 ||
  1016. live_sst_size + total_blob_size_.load() + blob_size <=
  1017. bdb_options_.max_db_size) {
  1018. return Status::OK();
  1019. }
  1020. if (bdb_options_.is_fifo == false ||
  1021. (!force_evict && live_sst_size + blob_size > bdb_options_.max_db_size)) {
  1022. // FIFO eviction is disabled, or no space to insert new blob even we evict
  1023. // all blob files.
  1024. return Status::NoSpace(
  1025. "Write failed, as writing it would exceed max_db_size limit.");
  1026. }
  1027. std::vector<std::shared_ptr<BlobFile>> candidate_files;
  1028. CopyBlobFiles(&candidate_files);
  1029. std::sort(candidate_files.begin(), candidate_files.end(),
  1030. BlobFileComparator());
  1031. fifo_eviction_seq_ = GetLatestSequenceNumber();
  1032. WriteLock l(&mutex_);
  1033. while (!candidate_files.empty() &&
  1034. live_sst_size + total_blob_size_.load() + blob_size >
  1035. bdb_options_.max_db_size) {
  1036. std::shared_ptr<BlobFile> blob_file = candidate_files.back();
  1037. candidate_files.pop_back();
  1038. WriteLock file_lock(&blob_file->mutex_);
  1039. if (blob_file->Obsolete()) {
  1040. // File already obsoleted by someone else.
  1041. assert(blob_file->Immutable());
  1042. continue;
  1043. }
  1044. // FIFO eviction can evict open blob files.
  1045. if (!blob_file->Immutable()) {
  1046. Status s = CloseBlobFile(blob_file);
  1047. if (!s.ok()) {
  1048. return s;
  1049. }
  1050. }
  1051. assert(blob_file->Immutable());
  1052. auto expiration_range = blob_file->GetExpirationRange();
  1053. ROCKS_LOG_INFO(db_options_.info_log,
  1054. "Evict oldest blob file since DB out of space. Current "
  1055. "live SST file size: %" PRIu64 ", total blob size: %" PRIu64
  1056. ", max db size: %" PRIu64 ", evicted blob file #%" PRIu64
  1057. ".",
  1058. live_sst_size, total_blob_size_.load(),
  1059. bdb_options_.max_db_size, blob_file->BlobFileNumber());
  1060. ObsoleteBlobFile(blob_file, fifo_eviction_seq_, true /*update_size*/);
  1061. evict_expiration_up_to_ = expiration_range.first;
  1062. RecordTick(statistics_, BLOB_DB_FIFO_NUM_FILES_EVICTED);
  1063. RecordTick(statistics_, BLOB_DB_FIFO_NUM_KEYS_EVICTED,
  1064. blob_file->BlobCount());
  1065. RecordTick(statistics_, BLOB_DB_FIFO_BYTES_EVICTED,
  1066. blob_file->GetFileSize());
  1067. TEST_SYNC_POINT("BlobDBImpl::EvictOldestBlobFile:Evicted");
  1068. }
  1069. if (live_sst_size + total_blob_size_.load() + blob_size >
  1070. bdb_options_.max_db_size) {
  1071. return Status::NoSpace(
  1072. "Write failed, as writing it would exceed max_db_size limit.");
  1073. }
  1074. return Status::OK();
  1075. }
  1076. Status BlobDBImpl::AppendBlob(const std::shared_ptr<BlobFile>& bfile,
  1077. const std::string& headerbuf, const Slice& key,
  1078. const Slice& value, uint64_t expiration,
  1079. std::string* index_entry) {
  1080. Status s;
  1081. uint64_t blob_offset = 0;
  1082. uint64_t key_offset = 0;
  1083. {
  1084. WriteLock lockbfile_w(&bfile->mutex_);
  1085. std::shared_ptr<Writer> writer;
  1086. s = CheckOrCreateWriterLocked(bfile, &writer);
  1087. if (!s.ok()) {
  1088. return s;
  1089. }
  1090. // write the blob to the blob log.
  1091. s = writer->EmitPhysicalRecord(headerbuf, key, value, &key_offset,
  1092. &blob_offset);
  1093. }
  1094. if (!s.ok()) {
  1095. ROCKS_LOG_ERROR(db_options_.info_log,
  1096. "Invalid status in AppendBlob: %s status: '%s'",
  1097. bfile->PathName().c_str(), s.ToString().c_str());
  1098. return s;
  1099. }
  1100. uint64_t size_put = headerbuf.size() + key.size() + value.size();
  1101. bfile->BlobRecordAdded(size_put);
  1102. total_blob_size_ += size_put;
  1103. if (expiration == kNoExpiration) {
  1104. BlobIndex::EncodeBlob(index_entry, bfile->BlobFileNumber(), blob_offset,
  1105. value.size(), bdb_options_.compression);
  1106. } else {
  1107. BlobIndex::EncodeBlobTTL(index_entry, expiration, bfile->BlobFileNumber(),
  1108. blob_offset, value.size(),
  1109. bdb_options_.compression);
  1110. }
  1111. return s;
  1112. }
  1113. std::vector<Status> BlobDBImpl::MultiGet(
  1114. const ReadOptions& read_options,
  1115. const std::vector<Slice>& keys, std::vector<std::string>* values) {
  1116. StopWatch multiget_sw(env_, statistics_, BLOB_DB_MULTIGET_MICROS);
  1117. RecordTick(statistics_, BLOB_DB_NUM_MULTIGET);
  1118. // Get a snapshot to avoid blob file get deleted between we
  1119. // fetch and index entry and reading from the file.
  1120. ReadOptions ro(read_options);
  1121. bool snapshot_created = SetSnapshotIfNeeded(&ro);
  1122. std::vector<Status> statuses;
  1123. statuses.reserve(keys.size());
  1124. values->clear();
  1125. values->reserve(keys.size());
  1126. PinnableSlice value;
  1127. for (size_t i = 0; i < keys.size(); i++) {
  1128. statuses.push_back(Get(ro, DefaultColumnFamily(), keys[i], &value));
  1129. values->push_back(value.ToString());
  1130. value.Reset();
  1131. }
  1132. if (snapshot_created) {
  1133. db_->ReleaseSnapshot(ro.snapshot);
  1134. }
  1135. return statuses;
  1136. }
  1137. bool BlobDBImpl::SetSnapshotIfNeeded(ReadOptions* read_options) {
  1138. assert(read_options != nullptr);
  1139. if (read_options->snapshot != nullptr) {
  1140. return false;
  1141. }
  1142. read_options->snapshot = db_->GetSnapshot();
  1143. return true;
  1144. }
  1145. Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry,
  1146. PinnableSlice* value, uint64_t* expiration) {
  1147. assert(value);
  1148. BlobIndex blob_index;
  1149. Status s = blob_index.DecodeFrom(index_entry);
  1150. if (!s.ok()) {
  1151. return s;
  1152. }
  1153. if (blob_index.HasTTL() && blob_index.expiration() <= EpochNow()) {
  1154. return Status::NotFound("Key expired");
  1155. }
  1156. if (expiration != nullptr) {
  1157. if (blob_index.HasTTL()) {
  1158. *expiration = blob_index.expiration();
  1159. } else {
  1160. *expiration = kNoExpiration;
  1161. }
  1162. }
  1163. if (blob_index.IsInlined()) {
  1164. // TODO(yiwu): If index_entry is a PinnableSlice, we can also pin the same
  1165. // memory buffer to avoid extra copy.
  1166. value->PinSelf(blob_index.value());
  1167. return Status::OK();
  1168. }
  1169. CompressionType compression_type = kNoCompression;
  1170. s = GetRawBlobFromFile(key, blob_index.file_number(), blob_index.offset(),
  1171. blob_index.size(), value, &compression_type);
  1172. if (!s.ok()) {
  1173. return s;
  1174. }
  1175. if (compression_type != kNoCompression) {
  1176. BlockContents contents;
  1177. auto cfh = static_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily());
  1178. {
  1179. StopWatch decompression_sw(env_, statistics_,
  1180. BLOB_DB_DECOMPRESSION_MICROS);
  1181. UncompressionContext context(compression_type);
  1182. UncompressionInfo info(context, UncompressionDict::GetEmptyDict(),
  1183. compression_type);
  1184. s = UncompressBlockContentsForCompressionType(
  1185. info, value->data(), value->size(), &contents,
  1186. kBlockBasedTableVersionFormat, *(cfh->cfd()->ioptions()));
  1187. }
  1188. if (!s.ok()) {
  1189. if (debug_level_ >= 2) {
  1190. ROCKS_LOG_ERROR(
  1191. db_options_.info_log,
  1192. "Uncompression error during blob read from file: %" PRIu64
  1193. " blob_offset: %" PRIu64 " blob_size: %" PRIu64
  1194. " key: %s status: '%s'",
  1195. blob_index.file_number(), blob_index.offset(), blob_index.size(),
  1196. key.ToString(/* output_hex */ true).c_str(), s.ToString().c_str());
  1197. }
  1198. return Status::Corruption("Unable to uncompress blob.");
  1199. }
  1200. value->PinSelf(contents.data);
  1201. }
  1202. return Status::OK();
  1203. }
  1204. Status BlobDBImpl::GetRawBlobFromFile(const Slice& key, uint64_t file_number,
  1205. uint64_t offset, uint64_t size,
  1206. PinnableSlice* value,
  1207. CompressionType* compression_type) {
  1208. assert(value);
  1209. assert(compression_type);
  1210. assert(*compression_type == kNoCompression);
  1211. if (!size) {
  1212. value->PinSelf("");
  1213. return Status::OK();
  1214. }
  1215. // offset has to have certain min, as we will read CRC
  1216. // later from the Blob Header, which needs to be also a
  1217. // valid offset.
  1218. if (offset <
  1219. (BlobLogHeader::kSize + BlobLogRecord::kHeaderSize + key.size())) {
  1220. if (debug_level_ >= 2) {
  1221. ROCKS_LOG_ERROR(db_options_.info_log,
  1222. "Invalid blob index file_number: %" PRIu64
  1223. " blob_offset: %" PRIu64 " blob_size: %" PRIu64
  1224. " key: %s",
  1225. file_number, offset, size,
  1226. key.ToString(/* output_hex */ true).c_str());
  1227. }
  1228. return Status::NotFound("Invalid blob offset");
  1229. }
  1230. std::shared_ptr<BlobFile> blob_file;
  1231. {
  1232. ReadLock rl(&mutex_);
  1233. auto it = blob_files_.find(file_number);
  1234. // file was deleted
  1235. if (it == blob_files_.end()) {
  1236. return Status::NotFound("Blob Not Found as blob file missing");
  1237. }
  1238. blob_file = it->second;
  1239. }
  1240. *compression_type = blob_file->GetCompressionType();
  1241. // takes locks when called
  1242. std::shared_ptr<RandomAccessFileReader> reader;
  1243. Status s = GetBlobFileReader(blob_file, &reader);
  1244. if (!s.ok()) {
  1245. return s;
  1246. }
  1247. assert(offset >= key.size() + sizeof(uint32_t));
  1248. const uint64_t record_offset = offset - key.size() - sizeof(uint32_t);
  1249. const uint64_t record_size = sizeof(uint32_t) + key.size() + size;
  1250. // Allocate the buffer. This is safe in C++11
  1251. std::string buffer_str(static_cast<size_t>(record_size), static_cast<char>(0));
  1252. char* buffer = &buffer_str[0];
  1253. // A partial blob record contain checksum, key and value.
  1254. Slice blob_record;
  1255. {
  1256. StopWatch read_sw(env_, statistics_, BLOB_DB_BLOB_FILE_READ_MICROS);
  1257. s = reader->Read(record_offset, static_cast<size_t>(record_size), &blob_record, buffer);
  1258. RecordTick(statistics_, BLOB_DB_BLOB_FILE_BYTES_READ, blob_record.size());
  1259. }
  1260. if (!s.ok()) {
  1261. ROCKS_LOG_DEBUG(
  1262. db_options_.info_log,
  1263. "Failed to read blob from blob file %" PRIu64 ", blob_offset: %" PRIu64
  1264. ", blob_size: %" PRIu64 ", key_size: %" ROCKSDB_PRIszt ", status: '%s'",
  1265. file_number, offset, size, key.size(), s.ToString().c_str());
  1266. return s;
  1267. }
  1268. if (blob_record.size() != record_size) {
  1269. ROCKS_LOG_DEBUG(
  1270. db_options_.info_log,
  1271. "Failed to read blob from blob file %" PRIu64 ", blob_offset: %" PRIu64
  1272. ", blob_size: %" PRIu64 ", key_size: %" ROCKSDB_PRIszt
  1273. ", read %" ROCKSDB_PRIszt " bytes, expected %" PRIu64 " bytes",
  1274. file_number, offset, size, key.size(), blob_record.size(), record_size);
  1275. return Status::Corruption("Failed to retrieve blob from blob index.");
  1276. }
  1277. Slice crc_slice(blob_record.data(), sizeof(uint32_t));
  1278. Slice blob_value(blob_record.data() + sizeof(uint32_t) + key.size(),
  1279. static_cast<size_t>(size));
  1280. uint32_t crc_exp = 0;
  1281. if (!GetFixed32(&crc_slice, &crc_exp)) {
  1282. ROCKS_LOG_DEBUG(
  1283. db_options_.info_log,
  1284. "Unable to decode CRC from blob file %" PRIu64 ", blob_offset: %" PRIu64
  1285. ", blob_size: %" PRIu64 ", key size: %" ROCKSDB_PRIszt ", status: '%s'",
  1286. file_number, offset, size, key.size(), s.ToString().c_str());
  1287. return Status::Corruption("Unable to decode checksum.");
  1288. }
  1289. uint32_t crc = crc32c::Value(blob_record.data() + sizeof(uint32_t),
  1290. blob_record.size() - sizeof(uint32_t));
  1291. crc = crc32c::Mask(crc); // Adjust for storage
  1292. if (crc != crc_exp) {
  1293. if (debug_level_ >= 2) {
  1294. ROCKS_LOG_ERROR(
  1295. db_options_.info_log,
  1296. "Blob crc mismatch file: %" PRIu64 " blob_offset: %" PRIu64
  1297. " blob_size: %" PRIu64 " key: %s status: '%s'",
  1298. file_number, offset, size,
  1299. key.ToString(/* output_hex */ true).c_str(), s.ToString().c_str());
  1300. }
  1301. return Status::Corruption("Corruption. Blob CRC mismatch");
  1302. }
  1303. value->PinSelf(blob_value);
  1304. return Status::OK();
  1305. }
  1306. Status BlobDBImpl::Get(const ReadOptions& read_options,
  1307. ColumnFamilyHandle* column_family, const Slice& key,
  1308. PinnableSlice* value) {
  1309. return Get(read_options, column_family, key, value, nullptr /*expiration*/);
  1310. }
  1311. Status BlobDBImpl::Get(const ReadOptions& read_options,
  1312. ColumnFamilyHandle* column_family, const Slice& key,
  1313. PinnableSlice* value, uint64_t* expiration) {
  1314. StopWatch get_sw(env_, statistics_, BLOB_DB_GET_MICROS);
  1315. RecordTick(statistics_, BLOB_DB_NUM_GET);
  1316. return GetImpl(read_options, column_family, key, value, expiration);
  1317. }
  1318. Status BlobDBImpl::GetImpl(const ReadOptions& read_options,
  1319. ColumnFamilyHandle* column_family, const Slice& key,
  1320. PinnableSlice* value, uint64_t* expiration) {
  1321. if (column_family->GetID() != DefaultColumnFamily()->GetID()) {
  1322. return Status::NotSupported(
  1323. "Blob DB doesn't support non-default column family.");
  1324. }
  1325. // Get a snapshot to avoid blob file get deleted between we
  1326. // fetch and index entry and reading from the file.
  1327. // TODO(yiwu): For Get() retry if file not found would be a simpler strategy.
  1328. ReadOptions ro(read_options);
  1329. bool snapshot_created = SetSnapshotIfNeeded(&ro);
  1330. PinnableSlice index_entry;
  1331. Status s;
  1332. bool is_blob_index = false;
  1333. DBImpl::GetImplOptions get_impl_options;
  1334. get_impl_options.column_family = column_family;
  1335. get_impl_options.value = &index_entry;
  1336. get_impl_options.is_blob_index = &is_blob_index;
  1337. s = db_impl_->GetImpl(ro, key, get_impl_options);
  1338. if (expiration != nullptr) {
  1339. *expiration = kNoExpiration;
  1340. }
  1341. RecordTick(statistics_, BLOB_DB_NUM_KEYS_READ);
  1342. if (s.ok()) {
  1343. if (is_blob_index) {
  1344. s = GetBlobValue(key, index_entry, value, expiration);
  1345. } else {
  1346. // The index entry is the value itself in this case.
  1347. value->PinSelf(index_entry);
  1348. }
  1349. RecordTick(statistics_, BLOB_DB_BYTES_READ, value->size());
  1350. }
  1351. if (snapshot_created) {
  1352. db_->ReleaseSnapshot(ro.snapshot);
  1353. }
  1354. return s;
  1355. }
  1356. std::pair<bool, int64_t> BlobDBImpl::SanityCheck(bool aborted) {
  1357. if (aborted) {
  1358. return std::make_pair(false, -1);
  1359. }
  1360. ReadLock rl(&mutex_);
  1361. ROCKS_LOG_INFO(db_options_.info_log, "Starting Sanity Check");
  1362. ROCKS_LOG_INFO(db_options_.info_log, "Number of files %" ROCKSDB_PRIszt,
  1363. blob_files_.size());
  1364. ROCKS_LOG_INFO(db_options_.info_log, "Number of open files %" ROCKSDB_PRIszt,
  1365. open_ttl_files_.size());
  1366. for (const auto& blob_file : open_ttl_files_) {
  1367. (void)blob_file;
  1368. assert(!blob_file->Immutable());
  1369. }
  1370. for (const auto& pair : live_imm_non_ttl_blob_files_) {
  1371. const auto& blob_file = pair.second;
  1372. (void)blob_file;
  1373. assert(!blob_file->HasTTL());
  1374. assert(blob_file->Immutable());
  1375. }
  1376. uint64_t now = EpochNow();
  1377. for (auto blob_file_pair : blob_files_) {
  1378. auto blob_file = blob_file_pair.second;
  1379. char buf[1000];
  1380. int pos = snprintf(buf, sizeof(buf),
  1381. "Blob file %" PRIu64 ", size %" PRIu64
  1382. ", blob count %" PRIu64 ", immutable %d",
  1383. blob_file->BlobFileNumber(), blob_file->GetFileSize(),
  1384. blob_file->BlobCount(), blob_file->Immutable());
  1385. if (blob_file->HasTTL()) {
  1386. ExpirationRange expiration_range;
  1387. {
  1388. ReadLock file_lock(&blob_file->mutex_);
  1389. expiration_range = blob_file->GetExpirationRange();
  1390. }
  1391. pos += snprintf(buf + pos, sizeof(buf) - pos,
  1392. ", expiration range (%" PRIu64 ", %" PRIu64 ")",
  1393. expiration_range.first, expiration_range.second);
  1394. if (!blob_file->Obsolete()) {
  1395. pos += snprintf(buf + pos, sizeof(buf) - pos,
  1396. ", expire in %" PRIu64 " seconds",
  1397. expiration_range.second - now);
  1398. }
  1399. }
  1400. if (blob_file->Obsolete()) {
  1401. pos += snprintf(buf + pos, sizeof(buf) - pos, ", obsolete at %" PRIu64,
  1402. blob_file->GetObsoleteSequence());
  1403. }
  1404. snprintf(buf + pos, sizeof(buf) - pos, ".");
  1405. ROCKS_LOG_INFO(db_options_.info_log, "%s", buf);
  1406. }
  1407. // reschedule
  1408. return std::make_pair(true, -1);
  1409. }
  1410. Status BlobDBImpl::CloseBlobFile(std::shared_ptr<BlobFile> bfile) {
  1411. assert(bfile);
  1412. assert(!bfile->Immutable());
  1413. assert(!bfile->Obsolete());
  1414. if (bfile->HasTTL() || bfile == open_non_ttl_file_) {
  1415. write_mutex_.AssertHeld();
  1416. }
  1417. ROCKS_LOG_INFO(db_options_.info_log,
  1418. "Closing blob file %" PRIu64 ". Path: %s",
  1419. bfile->BlobFileNumber(), bfile->PathName().c_str());
  1420. const SequenceNumber sequence = GetLatestSequenceNumber();
  1421. const Status s = bfile->WriteFooterAndCloseLocked(sequence);
  1422. if (s.ok()) {
  1423. total_blob_size_ += BlobLogFooter::kSize;
  1424. } else {
  1425. bfile->MarkImmutable(sequence);
  1426. ROCKS_LOG_ERROR(db_options_.info_log,
  1427. "Failed to close blob file %" PRIu64 "with error: %s",
  1428. bfile->BlobFileNumber(), s.ToString().c_str());
  1429. }
  1430. if (bfile->HasTTL()) {
  1431. size_t erased __attribute__((__unused__));
  1432. erased = open_ttl_files_.erase(bfile);
  1433. } else {
  1434. if (bfile == open_non_ttl_file_) {
  1435. open_non_ttl_file_ = nullptr;
  1436. }
  1437. const uint64_t blob_file_number = bfile->BlobFileNumber();
  1438. auto it = live_imm_non_ttl_blob_files_.lower_bound(blob_file_number);
  1439. assert(it == live_imm_non_ttl_blob_files_.end() ||
  1440. it->first != blob_file_number);
  1441. live_imm_non_ttl_blob_files_.insert(
  1442. it, std::map<uint64_t, std::shared_ptr<BlobFile>>::value_type(
  1443. blob_file_number, bfile));
  1444. }
  1445. return s;
  1446. }
  1447. Status BlobDBImpl::CloseBlobFileIfNeeded(std::shared_ptr<BlobFile>& bfile) {
  1448. write_mutex_.AssertHeld();
  1449. // atomic read
  1450. if (bfile->GetFileSize() < bdb_options_.blob_file_size) {
  1451. return Status::OK();
  1452. }
  1453. WriteLock lock(&mutex_);
  1454. WriteLock file_lock(&bfile->mutex_);
  1455. assert(!bfile->Obsolete() || bfile->Immutable());
  1456. if (bfile->Immutable()) {
  1457. return Status::OK();
  1458. }
  1459. return CloseBlobFile(bfile);
  1460. }
  1461. void BlobDBImpl::ObsoleteBlobFile(std::shared_ptr<BlobFile> blob_file,
  1462. SequenceNumber obsolete_seq,
  1463. bool update_size) {
  1464. assert(blob_file->Immutable());
  1465. assert(!blob_file->Obsolete());
  1466. // Should hold write lock of mutex_ or during DB open.
  1467. blob_file->MarkObsolete(obsolete_seq);
  1468. obsolete_files_.push_back(blob_file);
  1469. assert(total_blob_size_.load() >= blob_file->GetFileSize());
  1470. if (update_size) {
  1471. total_blob_size_ -= blob_file->GetFileSize();
  1472. }
  1473. }
  1474. bool BlobDBImpl::VisibleToActiveSnapshot(
  1475. const std::shared_ptr<BlobFile>& bfile) {
  1476. assert(bfile->Obsolete());
  1477. // We check whether the oldest snapshot is no less than the last sequence
  1478. // by the time the blob file become obsolete. If so, the blob file is not
  1479. // visible to all existing snapshots.
  1480. //
  1481. // If we keep track of the earliest sequence of the keys in the blob file,
  1482. // we could instead check if there's a snapshot falls in range
  1483. // [earliest_sequence, obsolete_sequence). But doing so will make the
  1484. // implementation more complicated.
  1485. SequenceNumber obsolete_sequence = bfile->GetObsoleteSequence();
  1486. SequenceNumber oldest_snapshot = kMaxSequenceNumber;
  1487. {
  1488. // Need to lock DBImpl mutex before access snapshot list.
  1489. InstrumentedMutexLock l(db_impl_->mutex());
  1490. auto& snapshots = db_impl_->snapshots();
  1491. if (!snapshots.empty()) {
  1492. oldest_snapshot = snapshots.oldest()->GetSequenceNumber();
  1493. }
  1494. }
  1495. bool visible = oldest_snapshot < obsolete_sequence;
  1496. if (visible) {
  1497. ROCKS_LOG_INFO(db_options_.info_log,
  1498. "Obsolete blob file %" PRIu64 " (obsolete at %" PRIu64
  1499. ") visible to oldest snapshot %" PRIu64 ".",
  1500. bfile->BlobFileNumber(), obsolete_sequence, oldest_snapshot);
  1501. }
  1502. return visible;
  1503. }
  1504. std::pair<bool, int64_t> BlobDBImpl::EvictExpiredFiles(bool aborted) {
  1505. if (aborted) {
  1506. return std::make_pair(false, -1);
  1507. }
  1508. TEST_SYNC_POINT("BlobDBImpl::EvictExpiredFiles:0");
  1509. TEST_SYNC_POINT("BlobDBImpl::EvictExpiredFiles:1");
  1510. std::vector<std::shared_ptr<BlobFile>> process_files;
  1511. uint64_t now = EpochNow();
  1512. {
  1513. ReadLock rl(&mutex_);
  1514. for (auto p : blob_files_) {
  1515. auto& blob_file = p.second;
  1516. ReadLock file_lock(&blob_file->mutex_);
  1517. if (blob_file->HasTTL() && !blob_file->Obsolete() &&
  1518. blob_file->GetExpirationRange().second <= now) {
  1519. process_files.push_back(blob_file);
  1520. }
  1521. }
  1522. }
  1523. TEST_SYNC_POINT("BlobDBImpl::EvictExpiredFiles:2");
  1524. TEST_SYNC_POINT("BlobDBImpl::EvictExpiredFiles:3");
  1525. TEST_SYNC_POINT_CALLBACK("BlobDBImpl::EvictExpiredFiles:cb", nullptr);
  1526. SequenceNumber seq = GetLatestSequenceNumber();
  1527. {
  1528. MutexLock l(&write_mutex_);
  1529. WriteLock lock(&mutex_);
  1530. for (auto& blob_file : process_files) {
  1531. WriteLock file_lock(&blob_file->mutex_);
  1532. // Need to double check if the file is obsolete.
  1533. if (blob_file->Obsolete()) {
  1534. assert(blob_file->Immutable());
  1535. continue;
  1536. }
  1537. if (!blob_file->Immutable()) {
  1538. CloseBlobFile(blob_file);
  1539. }
  1540. assert(blob_file->Immutable());
  1541. ObsoleteBlobFile(blob_file, seq, true /*update_size*/);
  1542. }
  1543. }
  1544. return std::make_pair(true, -1);
  1545. }
  1546. Status BlobDBImpl::SyncBlobFiles() {
  1547. MutexLock l(&write_mutex_);
  1548. std::vector<std::shared_ptr<BlobFile>> process_files;
  1549. {
  1550. ReadLock rl(&mutex_);
  1551. for (auto fitr : open_ttl_files_) {
  1552. process_files.push_back(fitr);
  1553. }
  1554. if (open_non_ttl_file_ != nullptr) {
  1555. process_files.push_back(open_non_ttl_file_);
  1556. }
  1557. }
  1558. Status s;
  1559. for (auto& blob_file : process_files) {
  1560. s = blob_file->Fsync();
  1561. if (!s.ok()) {
  1562. ROCKS_LOG_ERROR(db_options_.info_log,
  1563. "Failed to sync blob file %" PRIu64 ", status: %s",
  1564. blob_file->BlobFileNumber(), s.ToString().c_str());
  1565. return s;
  1566. }
  1567. }
  1568. s = dir_ent_->Fsync();
  1569. if (!s.ok()) {
  1570. ROCKS_LOG_ERROR(db_options_.info_log,
  1571. "Failed to sync blob directory, status: %s",
  1572. s.ToString().c_str());
  1573. }
  1574. return s;
  1575. }
  1576. std::pair<bool, int64_t> BlobDBImpl::ReclaimOpenFiles(bool aborted) {
  1577. if (aborted) return std::make_pair(false, -1);
  1578. if (open_file_count_.load() < kOpenFilesTrigger) {
  1579. return std::make_pair(true, -1);
  1580. }
  1581. // in the future, we should sort by last_access_
  1582. // instead of closing every file
  1583. ReadLock rl(&mutex_);
  1584. for (auto const& ent : blob_files_) {
  1585. auto bfile = ent.second;
  1586. if (bfile->last_access_.load() == -1) continue;
  1587. WriteLock lockbfile_w(&bfile->mutex_);
  1588. CloseRandomAccessLocked(bfile);
  1589. }
  1590. return std::make_pair(true, -1);
  1591. }
  1592. std::pair<bool, int64_t> BlobDBImpl::DeleteObsoleteFiles(bool aborted) {
  1593. if (aborted) {
  1594. return std::make_pair(false, -1);
  1595. }
  1596. MutexLock delete_file_lock(&delete_file_mutex_);
  1597. if (disable_file_deletions_ > 0) {
  1598. return std::make_pair(true, -1);
  1599. }
  1600. std::list<std::shared_ptr<BlobFile>> tobsolete;
  1601. {
  1602. WriteLock wl(&mutex_);
  1603. if (obsolete_files_.empty()) {
  1604. return std::make_pair(true, -1);
  1605. }
  1606. tobsolete.swap(obsolete_files_);
  1607. }
  1608. bool file_deleted = false;
  1609. for (auto iter = tobsolete.begin(); iter != tobsolete.end();) {
  1610. auto bfile = *iter;
  1611. {
  1612. ReadLock lockbfile_r(&bfile->mutex_);
  1613. if (VisibleToActiveSnapshot(bfile)) {
  1614. ROCKS_LOG_INFO(db_options_.info_log,
  1615. "Could not delete file due to snapshot failure %s",
  1616. bfile->PathName().c_str());
  1617. ++iter;
  1618. continue;
  1619. }
  1620. }
  1621. ROCKS_LOG_INFO(db_options_.info_log,
  1622. "Will delete file due to snapshot success %s",
  1623. bfile->PathName().c_str());
  1624. {
  1625. WriteLock wl(&mutex_);
  1626. blob_files_.erase(bfile->BlobFileNumber());
  1627. }
  1628. Status s = DeleteDBFile(&(db_impl_->immutable_db_options()),
  1629. bfile->PathName(), blob_dir_, true,
  1630. /*force_fg=*/false);
  1631. if (!s.ok()) {
  1632. ROCKS_LOG_ERROR(db_options_.info_log,
  1633. "File failed to be deleted as obsolete %s",
  1634. bfile->PathName().c_str());
  1635. ++iter;
  1636. continue;
  1637. }
  1638. file_deleted = true;
  1639. ROCKS_LOG_INFO(db_options_.info_log,
  1640. "File deleted as obsolete from blob dir %s",
  1641. bfile->PathName().c_str());
  1642. iter = tobsolete.erase(iter);
  1643. }
  1644. // directory change. Fsync
  1645. if (file_deleted) {
  1646. Status s = dir_ent_->Fsync();
  1647. if (!s.ok()) {
  1648. ROCKS_LOG_ERROR(db_options_.info_log, "Failed to sync dir %s: %s",
  1649. blob_dir_.c_str(), s.ToString().c_str());
  1650. }
  1651. }
  1652. // put files back into obsolete if for some reason, delete failed
  1653. if (!tobsolete.empty()) {
  1654. WriteLock wl(&mutex_);
  1655. for (auto bfile : tobsolete) {
  1656. blob_files_.insert(std::make_pair(bfile->BlobFileNumber(), bfile));
  1657. obsolete_files_.push_front(bfile);
  1658. }
  1659. }
  1660. return std::make_pair(!aborted, -1);
  1661. }
  1662. void BlobDBImpl::CopyBlobFiles(
  1663. std::vector<std::shared_ptr<BlobFile>>* bfiles_copy) {
  1664. ReadLock rl(&mutex_);
  1665. for (auto const& p : blob_files_) {
  1666. bfiles_copy->push_back(p.second);
  1667. }
  1668. }
  1669. Iterator* BlobDBImpl::NewIterator(const ReadOptions& read_options) {
  1670. auto* cfd =
  1671. reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily())->cfd();
  1672. // Get a snapshot to avoid blob file get deleted between we
  1673. // fetch and index entry and reading from the file.
  1674. ManagedSnapshot* own_snapshot = nullptr;
  1675. const Snapshot* snapshot = read_options.snapshot;
  1676. if (snapshot == nullptr) {
  1677. own_snapshot = new ManagedSnapshot(db_);
  1678. snapshot = own_snapshot->snapshot();
  1679. }
  1680. auto* iter = db_impl_->NewIteratorImpl(
  1681. read_options, cfd, snapshot->GetSequenceNumber(),
  1682. nullptr /*read_callback*/, true /*allow_blob*/);
  1683. return new BlobDBIterator(own_snapshot, iter, this, env_, statistics_);
  1684. }
  1685. Status DestroyBlobDB(const std::string& dbname, const Options& options,
  1686. const BlobDBOptions& bdb_options) {
  1687. const ImmutableDBOptions soptions(SanitizeOptions(dbname, options));
  1688. Env* env = soptions.env;
  1689. Status status;
  1690. std::string blobdir;
  1691. blobdir = (bdb_options.path_relative) ? dbname + "/" + bdb_options.blob_dir
  1692. : bdb_options.blob_dir;
  1693. std::vector<std::string> filenames;
  1694. env->GetChildren(blobdir, &filenames);
  1695. for (const auto& f : filenames) {
  1696. uint64_t number;
  1697. FileType type;
  1698. if (ParseFileName(f, &number, &type) && type == kBlobFile) {
  1699. Status del = DeleteDBFile(&soptions, blobdir + "/" + f, blobdir, true,
  1700. /*force_fg=*/false);
  1701. if (status.ok() && !del.ok()) {
  1702. status = del;
  1703. }
  1704. }
  1705. }
  1706. env->DeleteDir(blobdir);
  1707. Status destroy = DestroyDB(dbname, options);
  1708. if (status.ok() && !destroy.ok()) {
  1709. status = destroy;
  1710. }
  1711. return status;
  1712. }
  1713. #ifndef NDEBUG
  1714. Status BlobDBImpl::TEST_GetBlobValue(const Slice& key, const Slice& index_entry,
  1715. PinnableSlice* value) {
  1716. return GetBlobValue(key, index_entry, value);
  1717. }
  1718. void BlobDBImpl::TEST_AddDummyBlobFile(uint64_t blob_file_number,
  1719. SequenceNumber immutable_sequence) {
  1720. auto blob_file = std::make_shared<BlobFile>(this, blob_dir_, blob_file_number,
  1721. db_options_.info_log.get());
  1722. blob_file->MarkImmutable(immutable_sequence);
  1723. blob_files_[blob_file_number] = blob_file;
  1724. live_imm_non_ttl_blob_files_[blob_file_number] = blob_file;
  1725. }
  1726. std::vector<std::shared_ptr<BlobFile>> BlobDBImpl::TEST_GetBlobFiles() const {
  1727. ReadLock l(&mutex_);
  1728. std::vector<std::shared_ptr<BlobFile>> blob_files;
  1729. for (auto& p : blob_files_) {
  1730. blob_files.emplace_back(p.second);
  1731. }
  1732. return blob_files;
  1733. }
  1734. std::vector<std::shared_ptr<BlobFile>> BlobDBImpl::TEST_GetLiveImmNonTTLFiles()
  1735. const {
  1736. ReadLock l(&mutex_);
  1737. std::vector<std::shared_ptr<BlobFile>> live_imm_non_ttl_files;
  1738. for (const auto& pair : live_imm_non_ttl_blob_files_) {
  1739. live_imm_non_ttl_files.emplace_back(pair.second);
  1740. }
  1741. return live_imm_non_ttl_files;
  1742. }
  1743. std::vector<std::shared_ptr<BlobFile>> BlobDBImpl::TEST_GetObsoleteFiles()
  1744. const {
  1745. ReadLock l(&mutex_);
  1746. std::vector<std::shared_ptr<BlobFile>> obsolete_files;
  1747. for (auto& bfile : obsolete_files_) {
  1748. obsolete_files.emplace_back(bfile);
  1749. }
  1750. return obsolete_files;
  1751. }
  1752. void BlobDBImpl::TEST_DeleteObsoleteFiles() {
  1753. DeleteObsoleteFiles(false /*abort*/);
  1754. }
  1755. Status BlobDBImpl::TEST_CloseBlobFile(std::shared_ptr<BlobFile>& bfile) {
  1756. MutexLock l(&write_mutex_);
  1757. WriteLock lock(&mutex_);
  1758. WriteLock file_lock(&bfile->mutex_);
  1759. return CloseBlobFile(bfile);
  1760. }
  1761. void BlobDBImpl::TEST_ObsoleteBlobFile(std::shared_ptr<BlobFile>& blob_file,
  1762. SequenceNumber obsolete_seq,
  1763. bool update_size) {
  1764. return ObsoleteBlobFile(blob_file, obsolete_seq, update_size);
  1765. }
  1766. void BlobDBImpl::TEST_EvictExpiredFiles() {
  1767. EvictExpiredFiles(false /*abort*/);
  1768. }
  1769. uint64_t BlobDBImpl::TEST_live_sst_size() { return live_sst_size_.load(); }
  1770. void BlobDBImpl::TEST_InitializeBlobFileToSstMapping(
  1771. const std::vector<LiveFileMetaData>& live_files) {
  1772. InitializeBlobFileToSstMapping(live_files);
  1773. }
  1774. void BlobDBImpl::TEST_ProcessFlushJobInfo(const FlushJobInfo& info) {
  1775. ProcessFlushJobInfo(info);
  1776. }
  1777. void BlobDBImpl::TEST_ProcessCompactionJobInfo(const CompactionJobInfo& info) {
  1778. ProcessCompactionJobInfo(info);
  1779. }
  1780. #endif // !NDEBUG
  1781. } // namespace blob_db
  1782. } // namespace ROCKSDB_NAMESPACE
  1783. #endif // ROCKSDB_LITE