column_family.cc 74 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #include "db/column_family.h"
  10. #include <algorithm>
  11. #include <cinttypes>
  12. #include <limits>
  13. #include <sstream>
  14. #include <string>
  15. #include <vector>
  16. #include "db/blob/blob_file_cache.h"
  17. #include "db/blob/blob_source.h"
  18. #include "db/compaction/compaction_picker.h"
  19. #include "db/compaction/compaction_picker_fifo.h"
  20. #include "db/compaction/compaction_picker_level.h"
  21. #include "db/compaction/compaction_picker_universal.h"
  22. #include "db/db_impl/db_impl.h"
  23. #include "db/internal_stats.h"
  24. #include "db/job_context.h"
  25. #include "db/range_del_aggregator.h"
  26. #include "db/table_properties_collector.h"
  27. #include "db/version_set.h"
  28. #include "db/write_controller.h"
  29. #include "file/sst_file_manager_impl.h"
  30. #include "logging/logging.h"
  31. #include "monitoring/thread_status_util.h"
  32. #include "options/options_helper.h"
  33. #include "port/port.h"
  34. #include "rocksdb/convenience.h"
  35. #include "rocksdb/table.h"
  36. #include "table/merging_iterator.h"
  37. #include "util/autovector.h"
  38. #include "util/cast_util.h"
  39. #include "util/compression.h"
  40. namespace ROCKSDB_NAMESPACE {
  41. ColumnFamilyHandleImpl::ColumnFamilyHandleImpl(
  42. ColumnFamilyData* column_family_data, DBImpl* db, InstrumentedMutex* mutex)
  43. : cfd_(column_family_data), db_(db), mutex_(mutex) {
  44. if (cfd_ != nullptr) {
  45. cfd_->Ref();
  46. }
  47. }
  48. ColumnFamilyHandleImpl::~ColumnFamilyHandleImpl() {
  49. if (cfd_ != nullptr) {
  50. for (auto& listener : cfd_->ioptions().listeners) {
  51. listener->OnColumnFamilyHandleDeletionStarted(this);
  52. }
  53. // Job id == 0 means that this is not our background process, but rather
  54. // user thread
  55. // Need to hold some shared pointers owned by the initial_cf_options
  56. // before final cleaning up finishes.
  57. ColumnFamilyOptions initial_cf_options_copy = cfd_->initial_cf_options();
  58. JobContext job_context(0);
  59. mutex_->Lock();
  60. bool dropped = cfd_->IsDropped();
  61. if (cfd_->UnrefAndTryDelete()) {
  62. if (dropped) {
  63. db_->FindObsoleteFiles(&job_context, false, true);
  64. }
  65. }
  66. mutex_->Unlock();
  67. if (job_context.HaveSomethingToDelete()) {
  68. bool defer_purge =
  69. db_->immutable_db_options().avoid_unnecessary_blocking_io;
  70. db_->PurgeObsoleteFiles(job_context, defer_purge);
  71. }
  72. job_context.Clean();
  73. }
  74. }
  75. uint32_t ColumnFamilyHandleImpl::GetID() const { return cfd()->GetID(); }
  76. const std::string& ColumnFamilyHandleImpl::GetName() const {
  77. return cfd()->GetName();
  78. }
  79. Status ColumnFamilyHandleImpl::GetDescriptor(ColumnFamilyDescriptor* desc) {
  80. // accessing mutable cf-options requires db mutex.
  81. InstrumentedMutexLock l(mutex_);
  82. *desc = ColumnFamilyDescriptor(cfd()->GetName(), cfd()->GetLatestCFOptions());
  83. return Status::OK();
  84. }
  85. const Comparator* ColumnFamilyHandleImpl::GetComparator() const {
  86. return cfd()->user_comparator();
  87. }
  88. void GetInternalTblPropCollFactory(
  89. const ImmutableCFOptions& ioptions,
  90. InternalTblPropCollFactories* internal_tbl_prop_coll_factories) {
  91. assert(internal_tbl_prop_coll_factories);
  92. auto& collector_factories = ioptions.table_properties_collector_factories;
  93. for (size_t i = 0; i < ioptions.table_properties_collector_factories.size();
  94. ++i) {
  95. assert(collector_factories[i]);
  96. internal_tbl_prop_coll_factories->emplace_back(
  97. new UserKeyTablePropertiesCollectorFactory(collector_factories[i]));
  98. }
  99. }
  100. Status CheckCompressionSupportedWithManager(
  101. CompressionType type, UnownedPtr<CompressionManager> mgr) {
  102. if (mgr) {
  103. if (!mgr->SupportsCompressionType(type)) {
  104. return Status::NotSupported("Compression type " +
  105. CompressionTypeToString(type) +
  106. " is not recognized/supported by this "
  107. "version of CompressionManager " +
  108. mgr->GetId());
  109. }
  110. } else {
  111. if (!CompressionTypeSupported(type)) {
  112. if (type <= kLastBuiltinCompression) {
  113. return Status::InvalidArgument("Compression type " +
  114. CompressionTypeToString(type) +
  115. " is not linked with the binary.");
  116. } else {
  117. return Status::NotSupported(
  118. "Compression type " + CompressionTypeToString(type) +
  119. " is not recognized/supported by built-in CompressionManager.");
  120. }
  121. }
  122. }
  123. return Status::OK();
  124. }
  125. Status CheckCompressionSupported(const ColumnFamilyOptions& cf_options) {
  126. if (!cf_options.compression_per_level.empty()) {
  127. for (size_t level = 0; level < cf_options.compression_per_level.size();
  128. ++level) {
  129. Status s = CheckCompressionSupportedWithManager(
  130. cf_options.compression_per_level[level],
  131. cf_options.compression_manager.get());
  132. if (!s.ok()) {
  133. return s;
  134. }
  135. }
  136. } else {
  137. Status s = CheckCompressionSupportedWithManager(
  138. cf_options.compression, cf_options.compression_manager.get());
  139. if (!s.ok()) {
  140. return s;
  141. }
  142. }
  143. if (cf_options.compression_opts.zstd_max_train_bytes > 0) {
  144. if (cf_options.compression_opts.use_zstd_dict_trainer) {
  145. if (!ZSTD_TrainDictionarySupported()) {
  146. return Status::InvalidArgument(
  147. "zstd dictionary trainer cannot be used because ZSTD 1.1.3+ "
  148. "is not linked with the binary.");
  149. }
  150. } else if (!ZSTD_FinalizeDictionarySupported()) {
  151. return Status::InvalidArgument(
  152. "zstd finalizeDictionary cannot be used because ZSTD 1.4.5+ "
  153. "is not linked with the binary.");
  154. }
  155. if (cf_options.compression_opts.max_dict_bytes == 0) {
  156. return Status::InvalidArgument(
  157. "The dictionary size limit (`CompressionOptions::max_dict_bytes`) "
  158. "should be nonzero if we're using zstd's dictionary generator.");
  159. }
  160. }
  161. if (!CompressionTypeSupported(cf_options.blob_compression_type)) {
  162. std::ostringstream oss;
  163. oss << "The specified blob compression type "
  164. << CompressionTypeToString(cf_options.blob_compression_type)
  165. << " is not available.";
  166. return Status::InvalidArgument(oss.str());
  167. }
  168. return Status::OK();
  169. }
  170. Status CheckConcurrentWritesSupported(const ColumnFamilyOptions& cf_options) {
  171. if (cf_options.inplace_update_support) {
  172. return Status::InvalidArgument(
  173. "In-place memtable updates (inplace_update_support) is not compatible "
  174. "with concurrent writes (allow_concurrent_memtable_write)");
  175. }
  176. if (!cf_options.memtable_factory->IsInsertConcurrentlySupported()) {
  177. return Status::InvalidArgument(
  178. "Memtable doesn't allow concurrent writes "
  179. "(allow_concurrent_memtable_write)");
  180. }
  181. return Status::OK();
  182. }
  183. Status CheckCFPathsSupported(const DBOptions& db_options,
  184. const ColumnFamilyOptions& cf_options) {
  185. // More than one cf_paths are supported only in universal
  186. // and level compaction styles. This function also checks the case
  187. // in which cf_paths is not specified, which results in db_paths
  188. // being used.
  189. if ((cf_options.compaction_style != kCompactionStyleUniversal) &&
  190. (cf_options.compaction_style != kCompactionStyleLevel)) {
  191. if (cf_options.cf_paths.size() > 1) {
  192. return Status::NotSupported(
  193. "More than one CF paths are only supported in "
  194. "universal and level compaction styles. ");
  195. } else if (cf_options.cf_paths.empty() && db_options.db_paths.size() > 1) {
  196. return Status::NotSupported(
  197. "More than one DB paths are only supported in "
  198. "universal and level compaction styles. ");
  199. }
  200. }
  201. return Status::OK();
  202. }
  203. namespace {
  204. const uint64_t kDefaultTtl = 0xfffffffffffffffe;
  205. const uint64_t kDefaultPeriodicCompSecs = 0xfffffffffffffffe;
  206. } // anonymous namespace
  207. ColumnFamilyOptions SanitizeCfOptions(const ImmutableDBOptions& db_options,
  208. bool read_only,
  209. const ColumnFamilyOptions& src) {
  210. ColumnFamilyOptions result = src;
  211. size_t clamp_max = std::conditional<
  212. sizeof(size_t) == 4, std::integral_constant<size_t, 0xffffffff>,
  213. std::integral_constant<uint64_t, 64ull << 30>>::type::value;
  214. ClipToRange(&result.write_buffer_size, (static_cast<size_t>(64)) << 10,
  215. clamp_max);
  216. // if user sets arena_block_size, we trust user to use this value. Otherwise,
  217. // calculate a proper value from writer_buffer_size;
  218. if (result.arena_block_size <= 0) {
  219. result.arena_block_size =
  220. std::min(size_t{1024 * 1024}, result.write_buffer_size / 8);
  221. // Align up to 4k
  222. const size_t align = 4 * 1024;
  223. result.arena_block_size =
  224. ((result.arena_block_size + align - 1) / align) * align;
  225. }
  226. result.min_write_buffer_number_to_merge =
  227. std::min(result.min_write_buffer_number_to_merge,
  228. result.max_write_buffer_number - 1);
  229. if (result.min_write_buffer_number_to_merge < 1) {
  230. result.min_write_buffer_number_to_merge = 1;
  231. }
  232. if (db_options.atomic_flush && result.min_write_buffer_number_to_merge > 1) {
  233. ROCKS_LOG_WARN(
  234. db_options.logger,
  235. "Currently, if atomic_flush is true, then triggering flush for any "
  236. "column family internally (non-manual flush) will trigger flushing "
  237. "all column families even if the number of memtables is smaller "
  238. "min_write_buffer_number_to_merge. Therefore, configuring "
  239. "min_write_buffer_number_to_merge > 1 is not compatible and should "
  240. "be satinized to 1. Not doing so will lead to data loss and "
  241. "inconsistent state across multiple column families when WAL is "
  242. "disabled, which is a common setting for atomic flush");
  243. result.min_write_buffer_number_to_merge = 1;
  244. }
  245. if (result.disallow_memtable_writes) {
  246. // A simple memtable that enforces MarkReadOnly (unlike skip list)
  247. result.memtable_factory = std::make_shared<VectorRepFactory>();
  248. }
  249. if (result.num_levels < 1) {
  250. result.num_levels = 1;
  251. }
  252. if (result.compaction_style == kCompactionStyleLevel &&
  253. result.num_levels < 2) {
  254. result.num_levels = 2;
  255. }
  256. if (result.compaction_style == kCompactionStyleUniversal &&
  257. (db_options.allow_ingest_behind || result.cf_allow_ingest_behind) &&
  258. result.num_levels < 3) {
  259. result.num_levels = 3;
  260. }
  261. if (result.max_write_buffer_number < 2) {
  262. result.max_write_buffer_number = 2;
  263. }
  264. if (result.max_write_buffer_size_to_maintain < 0) {
  265. result.max_write_buffer_size_to_maintain =
  266. result.max_write_buffer_number *
  267. static_cast<int64_t>(result.write_buffer_size);
  268. }
  269. // bloom filter size shouldn't exceed 1/4 of memtable size.
  270. if (result.memtable_prefix_bloom_size_ratio > 0.25) {
  271. result.memtable_prefix_bloom_size_ratio = 0.25;
  272. } else if (result.memtable_prefix_bloom_size_ratio < 0) {
  273. result.memtable_prefix_bloom_size_ratio = 0;
  274. }
  275. if (!result.prefix_extractor) {
  276. assert(result.memtable_factory);
  277. Slice name = result.memtable_factory->Name();
  278. if (name.compare("HashSkipListRepFactory") == 0 ||
  279. name.compare("HashLinkListRepFactory") == 0) {
  280. result.memtable_factory = std::make_shared<SkipListFactory>();
  281. }
  282. }
  283. if (result.compaction_style == kCompactionStyleFIFO) {
  284. // since we delete level0 files in FIFO compaction when there are too many
  285. // of them, these options don't really mean anything
  286. result.level0_slowdown_writes_trigger = std::numeric_limits<int>::max();
  287. result.level0_stop_writes_trigger = std::numeric_limits<int>::max();
  288. }
  289. if (result.max_bytes_for_level_multiplier <= 0) {
  290. result.max_bytes_for_level_multiplier = 1;
  291. }
  292. if (result.level0_file_num_compaction_trigger == 0) {
  293. ROCKS_LOG_WARN(db_options.logger,
  294. "level0_file_num_compaction_trigger cannot be 0");
  295. result.level0_file_num_compaction_trigger = 1;
  296. }
  297. if (result.level0_stop_writes_trigger <
  298. result.level0_slowdown_writes_trigger ||
  299. result.level0_slowdown_writes_trigger <
  300. result.level0_file_num_compaction_trigger) {
  301. ROCKS_LOG_WARN(db_options.logger,
  302. "This condition must be satisfied: "
  303. "level0_stop_writes_trigger(%d) >= "
  304. "level0_slowdown_writes_trigger(%d) >= "
  305. "level0_file_num_compaction_trigger(%d)",
  306. result.level0_stop_writes_trigger,
  307. result.level0_slowdown_writes_trigger,
  308. result.level0_file_num_compaction_trigger);
  309. if (result.level0_slowdown_writes_trigger <
  310. result.level0_file_num_compaction_trigger) {
  311. result.level0_slowdown_writes_trigger =
  312. result.level0_file_num_compaction_trigger;
  313. }
  314. if (result.level0_stop_writes_trigger <
  315. result.level0_slowdown_writes_trigger) {
  316. result.level0_stop_writes_trigger = result.level0_slowdown_writes_trigger;
  317. }
  318. ROCKS_LOG_WARN(db_options.logger,
  319. "Adjust the value to "
  320. "level0_stop_writes_trigger(%d) "
  321. "level0_slowdown_writes_trigger(%d) "
  322. "level0_file_num_compaction_trigger(%d)",
  323. result.level0_stop_writes_trigger,
  324. result.level0_slowdown_writes_trigger,
  325. result.level0_file_num_compaction_trigger);
  326. }
  327. if (result.soft_pending_compaction_bytes_limit == 0) {
  328. result.soft_pending_compaction_bytes_limit =
  329. result.hard_pending_compaction_bytes_limit;
  330. } else if (result.hard_pending_compaction_bytes_limit > 0 &&
  331. result.soft_pending_compaction_bytes_limit >
  332. result.hard_pending_compaction_bytes_limit) {
  333. result.soft_pending_compaction_bytes_limit =
  334. result.hard_pending_compaction_bytes_limit;
  335. }
  336. // When the DB is stopped, it's possible that there are some .trash files that
  337. // were not deleted yet, when we open the DB we will find these .trash files
  338. // and schedule them to be deleted (or delete immediately if SstFileManager
  339. // was not used)
  340. auto sfm =
  341. static_cast<SstFileManagerImpl*>(db_options.sst_file_manager.get());
  342. for (size_t i = 0; i < result.cf_paths.size(); i++) {
  343. DeleteScheduler::CleanupDirectory(db_options.env, sfm,
  344. result.cf_paths[i].path)
  345. .PermitUncheckedError();
  346. }
  347. if (result.cf_paths.empty()) {
  348. result.cf_paths = db_options.db_paths;
  349. }
  350. if (result.level_compaction_dynamic_level_bytes) {
  351. if (result.compaction_style != kCompactionStyleLevel) {
  352. ROCKS_LOG_INFO(db_options.info_log.get(),
  353. "level_compaction_dynamic_level_bytes only makes sense "
  354. "for level-based compaction");
  355. result.level_compaction_dynamic_level_bytes = false;
  356. } else if (result.cf_paths.size() > 1U) {
  357. // we don't yet know how to make both of this feature and multiple
  358. // DB path work.
  359. ROCKS_LOG_WARN(db_options.info_log.get(),
  360. "multiple cf_paths/db_paths and "
  361. "level_compaction_dynamic_level_bytes "
  362. "can't be used together");
  363. result.level_compaction_dynamic_level_bytes = false;
  364. }
  365. }
  366. if (result.max_compaction_bytes == 0) {
  367. result.max_compaction_bytes = result.target_file_size_base * 25;
  368. }
  369. bool is_block_based_table = (result.table_factory->IsInstanceOf(
  370. TableFactory::kBlockBasedTableName()));
  371. const uint64_t kAdjustedTtl = 30 * 24 * 60 * 60;
  372. if (result.ttl == kDefaultTtl) {
  373. if (is_block_based_table) {
  374. // FIFO also requires max_open_files=-1, which is checked in
  375. // ValidateOptions().
  376. result.ttl = kAdjustedTtl;
  377. } else {
  378. result.ttl = 0;
  379. }
  380. }
  381. const uint64_t kAdjustedPeriodicCompSecs = 30 * 24 * 60 * 60;
  382. if (result.compaction_style == kCompactionStyleLevel) {
  383. if ((result.compaction_filter != nullptr ||
  384. result.compaction_filter_factory != nullptr) &&
  385. result.periodic_compaction_seconds == kDefaultPeriodicCompSecs &&
  386. is_block_based_table) {
  387. result.periodic_compaction_seconds = kAdjustedPeriodicCompSecs;
  388. }
  389. } else if (result.compaction_style == kCompactionStyleUniversal) {
  390. if (result.periodic_compaction_seconds == kDefaultPeriodicCompSecs &&
  391. is_block_based_table) {
  392. result.periodic_compaction_seconds = kAdjustedPeriodicCompSecs;
  393. }
  394. } else if (result.compaction_style == kCompactionStyleFIFO) {
  395. if (result.periodic_compaction_seconds != kDefaultPeriodicCompSecs) {
  396. ROCKS_LOG_WARN(
  397. db_options.info_log.get(),
  398. "periodic_compaction_seconds does not support FIFO compaction. You"
  399. "may want to set option TTL instead.");
  400. }
  401. if (result.last_level_temperature != Temperature::kUnknown) {
  402. ROCKS_LOG_WARN(
  403. db_options.info_log.get(),
  404. "last_level_temperature is ignored with FIFO compaction. Consider "
  405. "CompactionOptionsFIFO::file_temperature_age_thresholds.");
  406. result.last_level_temperature = Temperature::kUnknown;
  407. }
  408. }
  409. // For universal compaction, `ttl` and `periodic_compaction_seconds` mean the
  410. // same thing, take the stricter value.
  411. if (result.compaction_style == kCompactionStyleUniversal) {
  412. if (result.periodic_compaction_seconds == 0) {
  413. result.periodic_compaction_seconds = result.ttl;
  414. } else if (result.ttl != 0) {
  415. result.periodic_compaction_seconds =
  416. std::min(result.ttl, result.periodic_compaction_seconds);
  417. }
  418. }
  419. if (result.periodic_compaction_seconds == kDefaultPeriodicCompSecs) {
  420. result.periodic_compaction_seconds = 0;
  421. }
  422. if (read_only && (result.preserve_internal_time_seconds > 0 ||
  423. result.preclude_last_level_data_seconds > 0)) {
  424. // With no writes coming in, we don't need periodic SeqnoToTime entries.
  425. // Existing SST files may or may not have that info associated with them.
  426. ROCKS_LOG_WARN(
  427. db_options.info_log.get(),
  428. "preserve_internal_time_seconds and preclude_last_level_data_seconds "
  429. "are ignored in read-only DB");
  430. result.preserve_internal_time_seconds = 0;
  431. result.preclude_last_level_data_seconds = 0;
  432. }
  433. if (read_only) {
  434. if (result.memtable_op_scan_flush_trigger) {
  435. ROCKS_LOG_WARN(db_options.info_log.get(),
  436. "option memtable_op_scan_flush_trigger is sanitized to "
  437. "0(disabled) for read only DB.");
  438. result.memtable_op_scan_flush_trigger = 0;
  439. }
  440. if (result.memtable_avg_op_scan_flush_trigger) {
  441. ROCKS_LOG_WARN(
  442. db_options.info_log.get(),
  443. "option memtable_avg_op_scan_flush_trigger is sanitized to "
  444. "0(disabled) for read only DB.");
  445. result.memtable_avg_op_scan_flush_trigger = 0;
  446. }
  447. }
  448. return result;
  449. }
  450. int SuperVersion::dummy = 0;
  451. void* const SuperVersion::kSVInUse = &SuperVersion::dummy;
  452. void* const SuperVersion::kSVObsolete = nullptr;
  453. SuperVersion::~SuperVersion() {
  454. for (auto td : to_delete) {
  455. delete td;
  456. }
  457. }
  458. SuperVersion* SuperVersion::Ref() {
  459. refs.fetch_add(1, std::memory_order_relaxed);
  460. return this;
  461. }
  462. bool SuperVersion::Unref() {
  463. // fetch_sub returns the previous value of ref
  464. uint32_t previous_refs = refs.fetch_sub(1);
  465. assert(previous_refs > 0);
  466. return previous_refs == 1;
  467. }
  468. void SuperVersion::Cleanup() {
  469. assert(refs.load(std::memory_order_relaxed) == 0);
  470. // Since this SuperVersion object is being deleted,
  471. // decrement reference to the immutable MemtableList
  472. // this SV object was pointing to.
  473. imm->Unref(&to_delete);
  474. ReadOnlyMemTable* m = mem->Unref();
  475. if (m != nullptr) {
  476. auto* memory_usage = current->cfd()->imm()->current_memory_usage();
  477. assert(*memory_usage >= m->ApproximateMemoryUsage());
  478. *memory_usage -= m->ApproximateMemoryUsage();
  479. to_delete.push_back(m);
  480. }
  481. current->Unref();
  482. cfd->UnrefAndTryDelete();
  483. }
  484. void SuperVersion::Init(
  485. ColumnFamilyData* new_cfd, MemTable* new_mem, MemTableListVersion* new_imm,
  486. Version* new_current,
  487. std::shared_ptr<const SeqnoToTimeMapping> new_seqno_to_time_mapping) {
  488. cfd = new_cfd;
  489. mem = new_mem;
  490. imm = new_imm;
  491. current = new_current;
  492. full_history_ts_low = cfd->GetFullHistoryTsLow();
  493. seqno_to_time_mapping = std::move(new_seqno_to_time_mapping);
  494. cfd->Ref();
  495. mem->Ref();
  496. imm->Ref();
  497. current->Ref();
  498. refs.store(1, std::memory_order_relaxed);
  499. // There should be at least one mapping entry iff time tracking is enabled.
  500. #ifndef NDEBUG
  501. MinAndMaxPreserveSeconds preserve_info{mutable_cf_options};
  502. if (preserve_info.IsEnabled()) {
  503. assert(seqno_to_time_mapping);
  504. assert(!seqno_to_time_mapping->Empty());
  505. } else {
  506. assert(seqno_to_time_mapping == nullptr);
  507. }
  508. #endif // NDEBUG
  509. }
  510. namespace {
  511. void SuperVersionUnrefHandle(void* ptr) {
  512. // UnrefHandle is called when a thread exits or a ThreadLocalPtr gets
  513. // destroyed. When the former happens, the thread shouldn't see kSVInUse.
  514. // When the latter happens, only super_version_ holds a reference
  515. // to ColumnFamilyData, so no further queries are possible.
  516. SuperVersion* sv = static_cast<SuperVersion*>(ptr);
  517. bool was_last_ref __attribute__((__unused__));
  518. was_last_ref = sv->Unref();
  519. // Thread-local SuperVersions can't outlive ColumnFamilyData::super_version_.
  520. // This is important because we can't do SuperVersion cleanup here.
  521. // That would require locking DB mutex, which would deadlock because
  522. // SuperVersionUnrefHandle is called with locked ThreadLocalPtr mutex.
  523. assert(!was_last_ref);
  524. }
  525. } // anonymous namespace
  526. std::vector<std::string> ColumnFamilyData::GetDbPaths() const {
  527. std::vector<std::string> paths;
  528. paths.reserve(ioptions_.cf_paths.size());
  529. for (const DbPath& db_path : ioptions_.cf_paths) {
  530. paths.emplace_back(db_path.path);
  531. }
  532. return paths;
  533. }
  534. const uint32_t ColumnFamilyData::kDummyColumnFamilyDataId =
  535. std::numeric_limits<uint32_t>::max();
  536. ColumnFamilyData::ColumnFamilyData(
  537. uint32_t id, const std::string& name, Version* _dummy_versions,
  538. Cache* _table_cache, WriteBufferManager* write_buffer_manager,
  539. const ColumnFamilyOptions& cf_options, const ImmutableDBOptions& db_options,
  540. const FileOptions* file_options, ColumnFamilySet* column_family_set,
  541. BlockCacheTracer* const block_cache_tracer,
  542. const std::shared_ptr<IOTracer>& io_tracer, const std::string& db_id,
  543. const std::string& db_session_id, bool read_only)
  544. : id_(id),
  545. name_(name),
  546. dummy_versions_(_dummy_versions),
  547. current_(nullptr),
  548. refs_(0),
  549. initialized_(false),
  550. dropped_(false),
  551. flush_skip_reschedule_(false),
  552. internal_comparator_(cf_options.comparator),
  553. initial_cf_options_(SanitizeCfOptions(db_options, read_only, cf_options)),
  554. ioptions_(db_options, initial_cf_options_),
  555. mutable_cf_options_(initial_cf_options_),
  556. is_delete_range_supported_(
  557. cf_options.table_factory->IsDeleteRangeSupported()),
  558. write_buffer_manager_(write_buffer_manager),
  559. mem_(nullptr),
  560. imm_(ioptions_.min_write_buffer_number_to_merge,
  561. ioptions_.max_write_buffer_size_to_maintain),
  562. super_version_(nullptr),
  563. super_version_number_(0),
  564. local_sv_(new ThreadLocalPtr(&SuperVersionUnrefHandle)),
  565. next_(nullptr),
  566. prev_(nullptr),
  567. log_number_(0),
  568. column_family_set_(column_family_set),
  569. queued_for_flush_(false),
  570. queued_for_compaction_(false),
  571. prev_compaction_needed_bytes_(0),
  572. allow_2pc_(db_options.allow_2pc),
  573. last_memtable_id_(0),
  574. db_paths_registered_(false),
  575. mempurge_used_(false),
  576. next_epoch_number_(1) {
  577. if (id_ != kDummyColumnFamilyDataId) {
  578. // TODO(cc): RegisterDbPaths can be expensive, considering moving it
  579. // outside of this constructor which might be called with db mutex held.
  580. // TODO(cc): considering using ioptions_.fs, currently some tests rely on
  581. // EnvWrapper, that's the main reason why we use env here.
  582. Status s = ioptions_.env->RegisterDbPaths(GetDbPaths());
  583. if (s.ok()) {
  584. db_paths_registered_ = true;
  585. } else {
  586. ROCKS_LOG_ERROR(
  587. ioptions_.logger,
  588. "Failed to register data paths of column family (id: %d, name: %s)",
  589. id_, name_.c_str());
  590. }
  591. }
  592. Ref();
  593. // Convert user defined table properties collector factories to internal ones.
  594. GetInternalTblPropCollFactory(ioptions_, &internal_tbl_prop_coll_factories_);
  595. // if _dummy_versions is nullptr, then this is a dummy column family.
  596. if (_dummy_versions != nullptr) {
  597. internal_stats_.reset(
  598. new InternalStats(ioptions_.num_levels, ioptions_.clock, this));
  599. table_cache_.reset(new TableCache(ioptions_, file_options, _table_cache,
  600. block_cache_tracer, io_tracer,
  601. db_session_id));
  602. blob_file_cache_.reset(
  603. new BlobFileCache(_table_cache, &ioptions(), soptions(), id_,
  604. internal_stats_->GetBlobFileReadHist(), io_tracer));
  605. blob_source_.reset(new BlobSource(ioptions_, mutable_cf_options_, db_id,
  606. db_session_id, blob_file_cache_.get()));
  607. if (ioptions_.compaction_style == kCompactionStyleLevel) {
  608. compaction_picker_.reset(
  609. new LevelCompactionPicker(ioptions_, &internal_comparator_));
  610. } else if (ioptions_.compaction_style == kCompactionStyleUniversal) {
  611. compaction_picker_.reset(
  612. new UniversalCompactionPicker(ioptions_, &internal_comparator_));
  613. } else if (ioptions_.compaction_style == kCompactionStyleFIFO) {
  614. compaction_picker_.reset(
  615. new FIFOCompactionPicker(ioptions_, &internal_comparator_));
  616. } else if (ioptions_.compaction_style == kCompactionStyleNone) {
  617. compaction_picker_.reset(
  618. new NullCompactionPicker(ioptions_, &internal_comparator_));
  619. ROCKS_LOG_WARN(ioptions_.logger,
  620. "Column family %s does not use any background compaction. "
  621. "Compactions can only be done via CompactFiles\n",
  622. GetName().c_str());
  623. } else {
  624. ROCKS_LOG_ERROR(ioptions_.logger,
  625. "Unable to recognize the specified compaction style %d. "
  626. "Column family %s will use kCompactionStyleLevel.\n",
  627. ioptions_.compaction_style, GetName().c_str());
  628. compaction_picker_.reset(
  629. new LevelCompactionPicker(ioptions_, &internal_comparator_));
  630. }
  631. if (column_family_set_->NumberOfColumnFamilies() < 10) {
  632. ROCKS_LOG_INFO(ioptions_.logger,
  633. "--------------- Options for column family [%s]:\n",
  634. name.c_str());
  635. initial_cf_options_.Dump(ioptions_.logger);
  636. } else {
  637. ROCKS_LOG_INFO(ioptions_.logger, "\t(skipping printing options)\n");
  638. }
  639. }
  640. RecalculateWriteStallConditions(mutable_cf_options_);
  641. if (cf_options.table_factory->IsInstanceOf(
  642. TableFactory::kBlockBasedTableName()) &&
  643. cf_options.table_factory->GetOptions<BlockBasedTableOptions>()) {
  644. const BlockBasedTableOptions* bbto =
  645. cf_options.table_factory->GetOptions<BlockBasedTableOptions>();
  646. const auto& options_overrides = bbto->cache_usage_options.options_overrides;
  647. const auto file_metadata_charged =
  648. options_overrides.at(CacheEntryRole::kFileMetadata).charged;
  649. if (bbto->block_cache &&
  650. file_metadata_charged == CacheEntryRoleOptions::Decision::kEnabled) {
  651. // TODO(hx235): Add a `ConcurrentCacheReservationManager` at DB scope
  652. // responsible for reservation of `ObsoleteFileInfo` so that we can keep
  653. // this `file_metadata_cache_res_mgr_` nonconcurrent
  654. file_metadata_cache_res_mgr_.reset(new ConcurrentCacheReservationManager(
  655. std::make_shared<
  656. CacheReservationManagerImpl<CacheEntryRole::kFileMetadata>>(
  657. bbto->block_cache)));
  658. }
  659. }
  660. }
  661. // DB mutex held
  662. ColumnFamilyData::~ColumnFamilyData() {
  663. assert(refs_.load(std::memory_order_relaxed) == 0);
  664. // remove from linked list
  665. auto prev = prev_;
  666. auto next = next_;
  667. prev->next_ = next;
  668. next->prev_ = prev;
  669. if (!dropped_ && column_family_set_ != nullptr) {
  670. // If it's dropped, it's already removed from column family set
  671. // If column_family_set_ == nullptr, this is dummy CFD and not in
  672. // ColumnFamilySet
  673. column_family_set_->RemoveColumnFamily(this);
  674. }
  675. if (current_ != nullptr) {
  676. current_->Unref();
  677. }
  678. // It would be wrong if this ColumnFamilyData is in flush_queue_ or
  679. // compaction_queue_ and we destroyed it
  680. assert(!queued_for_flush_);
  681. assert(!queued_for_compaction_);
  682. assert(super_version_ == nullptr);
  683. if (dummy_versions_ != nullptr) {
  684. // List must be empty
  685. assert(dummy_versions_->Next() == dummy_versions_);
  686. bool deleted __attribute__((__unused__));
  687. deleted = dummy_versions_->Unref();
  688. assert(deleted);
  689. }
  690. if (mem_ != nullptr) {
  691. delete mem_->Unref();
  692. }
  693. autovector<ReadOnlyMemTable*> to_delete;
  694. imm_.current()->Unref(&to_delete);
  695. for (auto* m : to_delete) {
  696. delete m;
  697. }
  698. if (db_paths_registered_) {
  699. // TODO(cc): considering using ioptions_.fs, currently some tests rely on
  700. // EnvWrapper, that's the main reason why we use env here.
  701. Status s = ioptions_.env->UnregisterDbPaths(GetDbPaths());
  702. if (!s.ok()) {
  703. ROCKS_LOG_ERROR(
  704. ioptions_.logger,
  705. "Failed to unregister data paths of column family (id: %d, name: %s)",
  706. id_, name_.c_str());
  707. }
  708. }
  709. }
  710. bool ColumnFamilyData::UnrefAndTryDelete() {
  711. int old_refs = refs_.fetch_sub(1);
  712. assert(old_refs > 0);
  713. if (old_refs == 1) {
  714. assert(super_version_ == nullptr);
  715. delete this;
  716. return true;
  717. }
  718. if (old_refs == 2 && super_version_ != nullptr) {
  719. // Only the super_version_ holds me
  720. SuperVersion* sv = super_version_;
  721. super_version_ = nullptr;
  722. // Release SuperVersion references kept in ThreadLocalPtr.
  723. local_sv_.reset();
  724. if (sv->Unref()) {
  725. // Note: sv will delete this ColumnFamilyData during Cleanup()
  726. assert(sv->cfd == this);
  727. sv->Cleanup();
  728. delete sv;
  729. return true;
  730. }
  731. }
  732. return false;
  733. }
  734. void ColumnFamilyData::SetDropped() {
  735. // can't drop default CF
  736. assert(id_ != 0);
  737. dropped_ = true;
  738. write_controller_token_.reset();
  739. // remove from column_family_set
  740. column_family_set_->RemoveColumnFamily(this);
  741. }
  742. ColumnFamilyOptions ColumnFamilyData::GetLatestCFOptions() const {
  743. return BuildColumnFamilyOptions(initial_cf_options_, mutable_cf_options_);
  744. }
  745. uint64_t ColumnFamilyData::OldestLogToKeep() {
  746. auto current_log = GetLogNumber();
  747. if (allow_2pc_) {
  748. auto imm_prep_log = imm()->PrecomputeMinLogContainingPrepSection();
  749. auto mem_prep_log = mem()->GetMinLogContainingPrepSection();
  750. if (imm_prep_log > 0 && imm_prep_log < current_log) {
  751. current_log = imm_prep_log;
  752. }
  753. if (mem_prep_log > 0 && mem_prep_log < current_log) {
  754. current_log = mem_prep_log;
  755. }
  756. }
  757. return current_log;
  758. }
  759. const double kIncSlowdownRatio = 0.8;
  760. const double kDecSlowdownRatio = 1 / kIncSlowdownRatio;
  761. const double kNearStopSlowdownRatio = 0.6;
  762. const double kDelayRecoverSlowdownRatio = 1.4;
  763. namespace {
  764. // If penalize_stop is true, we further reduce slowdown rate.
  765. std::unique_ptr<WriteControllerToken> SetupDelay(
  766. WriteController* write_controller, uint64_t compaction_needed_bytes,
  767. uint64_t prev_compaction_need_bytes, bool penalize_stop,
  768. bool auto_compactions_disabled) {
  769. const uint64_t kMinWriteRate = 16 * 1024u; // Minimum write rate 16KB/s.
  770. uint64_t max_write_rate = write_controller->max_delayed_write_rate();
  771. uint64_t write_rate = write_controller->delayed_write_rate();
  772. if (auto_compactions_disabled) {
  773. // When auto compaction is disabled, always use the value user gave.
  774. write_rate = max_write_rate;
  775. } else if (write_controller->NeedsDelay() && max_write_rate > kMinWriteRate) {
  776. // If user gives rate less than kMinWriteRate, don't adjust it.
  777. //
  778. // If already delayed, need to adjust based on previous compaction debt.
  779. // When there are two or more column families require delay, we always
  780. // increase or reduce write rate based on information for one single
  781. // column family. It is likely to be OK but we can improve if there is a
  782. // problem.
  783. // Ignore compaction_needed_bytes = 0 case because compaction_needed_bytes
  784. // is only available in level-based compaction
  785. //
  786. // If the compaction debt stays the same as previously, we also further slow
  787. // down. It usually means a mem table is full. It's mainly for the case
  788. // where both of flush and compaction are much slower than the speed we
  789. // insert to mem tables, so we need to actively slow down before we get
  790. // feedback signal from compaction and flushes to avoid the full stop
  791. // because of hitting the max write buffer number.
  792. //
  793. // If DB just falled into the stop condition, we need to further reduce
  794. // the write rate to avoid the stop condition.
  795. if (penalize_stop) {
  796. // Penalize the near stop or stop condition by more aggressive slowdown.
  797. // This is to provide the long term slowdown increase signal.
  798. // The penalty is more than the reward of recovering to the normal
  799. // condition.
  800. write_rate = static_cast<uint64_t>(static_cast<double>(write_rate) *
  801. kNearStopSlowdownRatio);
  802. if (write_rate < kMinWriteRate) {
  803. write_rate = kMinWriteRate;
  804. }
  805. } else if (prev_compaction_need_bytes > 0 &&
  806. prev_compaction_need_bytes <= compaction_needed_bytes) {
  807. write_rate = static_cast<uint64_t>(static_cast<double>(write_rate) *
  808. kIncSlowdownRatio);
  809. if (write_rate < kMinWriteRate) {
  810. write_rate = kMinWriteRate;
  811. }
  812. } else if (prev_compaction_need_bytes > compaction_needed_bytes) {
  813. // We are speeding up by ratio of kSlowdownRatio when we have paid
  814. // compaction debt. But we'll never speed up to faster than the write rate
  815. // given by users.
  816. write_rate = static_cast<uint64_t>(static_cast<double>(write_rate) *
  817. kDecSlowdownRatio);
  818. if (write_rate > max_write_rate) {
  819. write_rate = max_write_rate;
  820. }
  821. }
  822. }
  823. return write_controller->GetDelayToken(write_rate);
  824. }
  825. int GetL0FileCountForCompactionSpeedup(int level0_file_num_compaction_trigger,
  826. int level0_slowdown_writes_trigger) {
  827. // SanitizeOptions() ensures it.
  828. assert(level0_file_num_compaction_trigger <= level0_slowdown_writes_trigger);
  829. if (level0_file_num_compaction_trigger < 0) {
  830. return std::numeric_limits<int>::max();
  831. }
  832. const int64_t twice_level0_trigger =
  833. static_cast<int64_t>(level0_file_num_compaction_trigger) * 2;
  834. const int64_t one_fourth_trigger_slowdown =
  835. static_cast<int64_t>(level0_file_num_compaction_trigger) +
  836. ((level0_slowdown_writes_trigger - level0_file_num_compaction_trigger) /
  837. 4);
  838. assert(twice_level0_trigger >= 0);
  839. assert(one_fourth_trigger_slowdown >= 0);
  840. // 1/4 of the way between L0 compaction trigger threshold and slowdown
  841. // condition.
  842. // Or twice as compaction trigger, if it is smaller.
  843. int64_t res = std::min(twice_level0_trigger, one_fourth_trigger_slowdown);
  844. if (res >= std::numeric_limits<int32_t>::max()) {
  845. return std::numeric_limits<int32_t>::max();
  846. } else {
  847. // res fits in int
  848. return static_cast<int>(res);
  849. }
  850. }
  851. uint64_t GetPendingCompactionBytesForCompactionSpeedup(
  852. const MutableCFOptions& mutable_cf_options,
  853. const VersionStorageInfo* vstorage) {
  854. // Compaction debt relatively large compared to the stable (bottommost) data
  855. // size indicates compaction fell behind.
  856. const uint64_t kBottommostSizeDivisor = 8;
  857. // Meaningful progress toward the slowdown trigger is another good indication.
  858. const uint64_t kSlowdownTriggerDivisor = 4;
  859. uint64_t bottommost_files_size = 0;
  860. for (const auto& level_and_file : vstorage->BottommostFiles()) {
  861. bottommost_files_size += level_and_file.second->fd.GetFileSize();
  862. }
  863. // Slowdown trigger might be zero but that means compaction speedup should
  864. // always happen (undocumented/historical), so no special treatment is needed.
  865. uint64_t slowdown_threshold =
  866. mutable_cf_options.soft_pending_compaction_bytes_limit /
  867. kSlowdownTriggerDivisor;
  868. // Size of zero, however, should not be used to decide to speedup compaction.
  869. if (bottommost_files_size == 0) {
  870. return slowdown_threshold;
  871. }
  872. // Prevent a small CF from triggering parallel compactions for other CFs.
  873. // Require compaction debt to be more than a full L0 to Lbase compaction.
  874. const uint64_t kMinDebtSize = 2 * mutable_cf_options.max_bytes_for_level_base;
  875. uint64_t size_threshold =
  876. std::max(bottommost_files_size / kBottommostSizeDivisor, kMinDebtSize);
  877. return std::min(size_threshold, slowdown_threshold);
  878. }
  879. uint64_t GetMarkedFileCountForCompactionSpeedup() {
  880. // When just one file is marked, it is not clear that parallel compaction will
  881. // help the compaction that the user nicely requested to happen sooner. When
  882. // multiple files are marked, however, it is pretty clearly helpful, except
  883. // for the rare case in which a single compaction grabs all the marked files.
  884. return 2;
  885. }
  886. } // anonymous namespace
  887. std::pair<WriteStallCondition, WriteStallCause>
  888. ColumnFamilyData::GetWriteStallConditionAndCause(
  889. int num_unflushed_memtables, int num_l0_files,
  890. uint64_t num_compaction_needed_bytes,
  891. const MutableCFOptions& mutable_cf_options,
  892. const ImmutableCFOptions& immutable_cf_options) {
  893. if (num_unflushed_memtables >= mutable_cf_options.max_write_buffer_number) {
  894. return {WriteStallCondition::kStopped, WriteStallCause::kMemtableLimit};
  895. } else if (!mutable_cf_options.disable_auto_compactions &&
  896. num_l0_files >= mutable_cf_options.level0_stop_writes_trigger) {
  897. return {WriteStallCondition::kStopped, WriteStallCause::kL0FileCountLimit};
  898. } else if (!mutable_cf_options.disable_auto_compactions &&
  899. mutable_cf_options.hard_pending_compaction_bytes_limit > 0 &&
  900. num_compaction_needed_bytes >=
  901. mutable_cf_options.hard_pending_compaction_bytes_limit) {
  902. return {WriteStallCondition::kStopped,
  903. WriteStallCause::kPendingCompactionBytes};
  904. } else if (mutable_cf_options.max_write_buffer_number > 3 &&
  905. num_unflushed_memtables >=
  906. mutable_cf_options.max_write_buffer_number - 1 &&
  907. num_unflushed_memtables - 1 >=
  908. immutable_cf_options.min_write_buffer_number_to_merge) {
  909. return {WriteStallCondition::kDelayed, WriteStallCause::kMemtableLimit};
  910. } else if (!mutable_cf_options.disable_auto_compactions &&
  911. mutable_cf_options.level0_slowdown_writes_trigger >= 0 &&
  912. num_l0_files >=
  913. mutable_cf_options.level0_slowdown_writes_trigger) {
  914. return {WriteStallCondition::kDelayed, WriteStallCause::kL0FileCountLimit};
  915. } else if (!mutable_cf_options.disable_auto_compactions &&
  916. mutable_cf_options.soft_pending_compaction_bytes_limit > 0 &&
  917. num_compaction_needed_bytes >=
  918. mutable_cf_options.soft_pending_compaction_bytes_limit) {
  919. return {WriteStallCondition::kDelayed,
  920. WriteStallCause::kPendingCompactionBytes};
  921. }
  922. return {WriteStallCondition::kNormal, WriteStallCause::kNone};
  923. }
  924. WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
  925. const MutableCFOptions& mutable_cf_options) {
  926. auto write_stall_condition = WriteStallCondition::kNormal;
  927. if (current_ != nullptr) {
  928. auto* vstorage = current_->storage_info();
  929. auto write_controller = column_family_set_->write_controller_;
  930. uint64_t compaction_needed_bytes =
  931. vstorage->estimated_compaction_needed_bytes();
  932. auto write_stall_condition_and_cause = GetWriteStallConditionAndCause(
  933. imm()->NumNotFlushed(), vstorage->l0_delay_trigger_count(),
  934. vstorage->estimated_compaction_needed_bytes(), mutable_cf_options,
  935. ioptions());
  936. write_stall_condition = write_stall_condition_and_cause.first;
  937. auto write_stall_cause = write_stall_condition_and_cause.second;
  938. bool was_stopped = write_controller->IsStopped();
  939. bool needed_delay = write_controller->NeedsDelay();
  940. if (write_stall_condition == WriteStallCondition::kStopped &&
  941. write_stall_cause == WriteStallCause::kMemtableLimit) {
  942. write_controller_token_ = write_controller->GetStopToken();
  943. internal_stats_->AddCFStats(InternalStats::MEMTABLE_LIMIT_STOPS, 1);
  944. ROCKS_LOG_WARN(
  945. ioptions_.logger,
  946. "[%s] Stopping writes because we have %d immutable memtables "
  947. "(waiting for flush), max_write_buffer_number is set to %d",
  948. name_.c_str(), imm()->NumNotFlushed(),
  949. mutable_cf_options.max_write_buffer_number);
  950. } else if (write_stall_condition == WriteStallCondition::kStopped &&
  951. write_stall_cause == WriteStallCause::kL0FileCountLimit) {
  952. write_controller_token_ = write_controller->GetStopToken();
  953. internal_stats_->AddCFStats(InternalStats::L0_FILE_COUNT_LIMIT_STOPS, 1);
  954. if (compaction_picker_->IsLevel0CompactionInProgress()) {
  955. internal_stats_->AddCFStats(
  956. InternalStats::L0_FILE_COUNT_LIMIT_STOPS_WITH_ONGOING_COMPACTION,
  957. 1);
  958. }
  959. ROCKS_LOG_WARN(ioptions_.logger,
  960. "[%s] Stopping writes because we have %d level-0 files",
  961. name_.c_str(), vstorage->l0_delay_trigger_count());
  962. } else if (write_stall_condition == WriteStallCondition::kStopped &&
  963. write_stall_cause == WriteStallCause::kPendingCompactionBytes) {
  964. write_controller_token_ = write_controller->GetStopToken();
  965. internal_stats_->AddCFStats(
  966. InternalStats::PENDING_COMPACTION_BYTES_LIMIT_STOPS, 1);
  967. ROCKS_LOG_WARN(
  968. ioptions_.logger,
  969. "[%s] Stopping writes because of estimated pending compaction "
  970. "bytes %" PRIu64,
  971. name_.c_str(), compaction_needed_bytes);
  972. } else if (write_stall_condition == WriteStallCondition::kDelayed &&
  973. write_stall_cause == WriteStallCause::kMemtableLimit) {
  974. write_controller_token_ =
  975. SetupDelay(write_controller, compaction_needed_bytes,
  976. prev_compaction_needed_bytes_, was_stopped,
  977. mutable_cf_options.disable_auto_compactions);
  978. internal_stats_->AddCFStats(InternalStats::MEMTABLE_LIMIT_DELAYS, 1);
  979. ROCKS_LOG_WARN(
  980. ioptions_.logger,
  981. "[%s] Stalling writes because we have %d immutable memtables "
  982. "(waiting for flush), max_write_buffer_number is set to %d "
  983. "rate %" PRIu64,
  984. name_.c_str(), imm()->NumNotFlushed(),
  985. mutable_cf_options.max_write_buffer_number,
  986. write_controller->delayed_write_rate());
  987. } else if (write_stall_condition == WriteStallCondition::kDelayed &&
  988. write_stall_cause == WriteStallCause::kL0FileCountLimit) {
  989. // L0 is the last two files from stopping.
  990. bool near_stop = vstorage->l0_delay_trigger_count() >=
  991. mutable_cf_options.level0_stop_writes_trigger - 2;
  992. write_controller_token_ =
  993. SetupDelay(write_controller, compaction_needed_bytes,
  994. prev_compaction_needed_bytes_, was_stopped || near_stop,
  995. mutable_cf_options.disable_auto_compactions);
  996. internal_stats_->AddCFStats(InternalStats::L0_FILE_COUNT_LIMIT_DELAYS, 1);
  997. if (compaction_picker_->IsLevel0CompactionInProgress()) {
  998. internal_stats_->AddCFStats(
  999. InternalStats::L0_FILE_COUNT_LIMIT_DELAYS_WITH_ONGOING_COMPACTION,
  1000. 1);
  1001. }
  1002. ROCKS_LOG_WARN(ioptions_.logger,
  1003. "[%s] Stalling writes because we have %d level-0 files "
  1004. "rate %" PRIu64,
  1005. name_.c_str(), vstorage->l0_delay_trigger_count(),
  1006. write_controller->delayed_write_rate());
  1007. } else if (write_stall_condition == WriteStallCondition::kDelayed &&
  1008. write_stall_cause == WriteStallCause::kPendingCompactionBytes) {
  1009. // If the distance to hard limit is less than 1/4 of the gap between soft
  1010. // and
  1011. // hard bytes limit, we think it is near stop and speed up the slowdown.
  1012. bool near_stop =
  1013. mutable_cf_options.hard_pending_compaction_bytes_limit > 0 &&
  1014. (compaction_needed_bytes -
  1015. mutable_cf_options.soft_pending_compaction_bytes_limit) >
  1016. 3 *
  1017. (mutable_cf_options.hard_pending_compaction_bytes_limit -
  1018. mutable_cf_options.soft_pending_compaction_bytes_limit) /
  1019. 4;
  1020. write_controller_token_ =
  1021. SetupDelay(write_controller, compaction_needed_bytes,
  1022. prev_compaction_needed_bytes_, was_stopped || near_stop,
  1023. mutable_cf_options.disable_auto_compactions);
  1024. internal_stats_->AddCFStats(
  1025. InternalStats::PENDING_COMPACTION_BYTES_LIMIT_DELAYS, 1);
  1026. ROCKS_LOG_WARN(
  1027. ioptions_.logger,
  1028. "[%s] Stalling writes because of estimated pending compaction "
  1029. "bytes %" PRIu64 " rate %" PRIu64,
  1030. name_.c_str(), vstorage->estimated_compaction_needed_bytes(),
  1031. write_controller->delayed_write_rate());
  1032. } else {
  1033. assert(write_stall_condition == WriteStallCondition::kNormal);
  1034. if (vstorage->l0_delay_trigger_count() >=
  1035. GetL0FileCountForCompactionSpeedup(
  1036. mutable_cf_options.level0_file_num_compaction_trigger,
  1037. mutable_cf_options.level0_slowdown_writes_trigger)) {
  1038. write_controller_token_ =
  1039. write_controller->GetCompactionPressureToken();
  1040. ROCKS_LOG_INFO(
  1041. ioptions_.logger,
  1042. "[%s] Increasing compaction threads because we have %d level-0 "
  1043. "files ",
  1044. name_.c_str(), vstorage->l0_delay_trigger_count());
  1045. } else if (mutable_cf_options.soft_pending_compaction_bytes_limit == 0) {
  1046. // If soft pending compaction byte limit is not set, always speed up
  1047. // compaction.
  1048. write_controller_token_ =
  1049. write_controller->GetCompactionPressureToken();
  1050. } else if (vstorage->estimated_compaction_needed_bytes() >=
  1051. GetPendingCompactionBytesForCompactionSpeedup(
  1052. mutable_cf_options, vstorage)) {
  1053. write_controller_token_ =
  1054. write_controller->GetCompactionPressureToken();
  1055. ROCKS_LOG_INFO(
  1056. ioptions_.logger,
  1057. "[%s] Increasing compaction threads because of estimated pending "
  1058. "compaction "
  1059. "bytes %" PRIu64,
  1060. name_.c_str(), vstorage->estimated_compaction_needed_bytes());
  1061. } else if (uint64_t(vstorage->FilesMarkedForCompaction().size()) >=
  1062. GetMarkedFileCountForCompactionSpeedup()) {
  1063. write_controller_token_ =
  1064. write_controller->GetCompactionPressureToken();
  1065. ROCKS_LOG_INFO(
  1066. ioptions_.logger,
  1067. "[%s] Increasing compaction threads because we have %" PRIu64
  1068. " files marked for compaction",
  1069. name_.c_str(),
  1070. uint64_t(vstorage->FilesMarkedForCompaction().size()));
  1071. } else {
  1072. write_controller_token_.reset();
  1073. }
  1074. // If the DB recovers from delay conditions, we reward with reducing
  1075. // double the slowdown ratio. This is to balance the long term slowdown
  1076. // increase signal.
  1077. if (needed_delay) {
  1078. uint64_t write_rate = write_controller->delayed_write_rate();
  1079. write_controller->set_delayed_write_rate(static_cast<uint64_t>(
  1080. static_cast<double>(write_rate) * kDelayRecoverSlowdownRatio));
  1081. // Set the low pri limit to be 1/4 the delayed write rate.
  1082. // Note we don't reset this value even after delay condition is relased.
  1083. // Low-pri rate will continue to apply if there is a compaction
  1084. // pressure.
  1085. write_controller->low_pri_rate_limiter()->SetBytesPerSecond(write_rate /
  1086. 4);
  1087. }
  1088. }
  1089. prev_compaction_needed_bytes_ = compaction_needed_bytes;
  1090. }
  1091. return write_stall_condition;
  1092. }
  1093. const FileOptions* ColumnFamilyData::soptions() const {
  1094. return &(column_family_set_->file_options_);
  1095. }
  1096. void ColumnFamilyData::SetCurrent(Version* current_version) {
  1097. current_ = current_version;
  1098. }
  1099. uint64_t ColumnFamilyData::GetNumLiveVersions() const {
  1100. return VersionSet::GetNumLiveVersions(dummy_versions_);
  1101. }
  1102. uint64_t ColumnFamilyData::GetTotalSstFilesSize() const {
  1103. return VersionSet::GetTotalSstFilesSize(dummy_versions_);
  1104. }
  1105. uint64_t ColumnFamilyData::GetTotalBlobFileSize() const {
  1106. return VersionSet::GetTotalBlobFileSize(dummy_versions_);
  1107. }
  1108. uint64_t ColumnFamilyData::GetLiveSstFilesSize() const {
  1109. return current_->GetSstFilesSize();
  1110. }
  1111. MemTable* ColumnFamilyData::ConstructNewMemtable(
  1112. const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq) {
  1113. return new MemTable(internal_comparator_, ioptions_, mutable_cf_options,
  1114. write_buffer_manager_, earliest_seq, id_);
  1115. }
  1116. void ColumnFamilyData::CreateNewMemtable(SequenceNumber earliest_seq) {
  1117. if (mem_ != nullptr) {
  1118. delete mem_->Unref();
  1119. }
  1120. // NOTE: db mutex must be locked for SetMemtable, so safe for
  1121. // GetLatestMutableCFOptions
  1122. SetMemtable(ConstructNewMemtable(GetLatestMutableCFOptions(), earliest_seq));
  1123. mem_->Ref();
  1124. }
  1125. bool ColumnFamilyData::NeedsCompaction() const {
  1126. return !mutable_cf_options_.disable_auto_compactions &&
  1127. compaction_picker_->NeedsCompaction(current_->storage_info());
  1128. }
  1129. Compaction* ColumnFamilyData::PickCompaction(
  1130. const MutableCFOptions& mutable_options,
  1131. const MutableDBOptions& mutable_db_options,
  1132. const std::vector<SequenceNumber>& existing_snapshots,
  1133. const SnapshotChecker* snapshot_checker, LogBuffer* log_buffer,
  1134. bool require_max_output_level) {
  1135. auto* result = compaction_picker_->PickCompaction(
  1136. GetName(), mutable_options, mutable_db_options, existing_snapshots,
  1137. snapshot_checker, current_->storage_info(), log_buffer,
  1138. require_max_output_level);
  1139. if (result != nullptr) {
  1140. result->FinalizeInputInfo(current_);
  1141. }
  1142. return result;
  1143. }
  1144. bool ColumnFamilyData::RangeOverlapWithCompaction(
  1145. const Slice& smallest_user_key, const Slice& largest_user_key,
  1146. int level) const {
  1147. return compaction_picker_->RangeOverlapWithCompaction(
  1148. smallest_user_key, largest_user_key, level);
  1149. }
  1150. Status ColumnFamilyData::RangesOverlapWithMemtables(
  1151. const autovector<UserKeyRange>& ranges, SuperVersion* super_version,
  1152. bool allow_data_in_errors, bool* overlap) {
  1153. assert(overlap != nullptr);
  1154. *overlap = false;
  1155. // Create an InternalIterator over all unflushed memtables
  1156. Arena arena;
  1157. // TODO: plumb Env::IOActivity, Env::IOPriority
  1158. ReadOptions read_opts;
  1159. read_opts.total_order_seek = true;
  1160. MergeIteratorBuilder merge_iter_builder(&internal_comparator_, &arena);
  1161. merge_iter_builder.AddIterator(super_version->mem->NewIterator(
  1162. read_opts, /*seqno_to_time_mapping=*/nullptr, &arena,
  1163. /*prefix_extractor=*/nullptr, /*for_flush=*/false));
  1164. super_version->imm->AddIterators(read_opts, /*seqno_to_time_mapping=*/nullptr,
  1165. /*prefix_extractor=*/nullptr,
  1166. &merge_iter_builder,
  1167. false /* add_range_tombstone_iter */);
  1168. ScopedArenaPtr<InternalIterator> memtable_iter(merge_iter_builder.Finish());
  1169. auto read_seq = super_version->current->version_set()->LastSequence();
  1170. ReadRangeDelAggregator range_del_agg(&internal_comparator_, read_seq);
  1171. auto* active_range_del_iter = super_version->mem->NewRangeTombstoneIterator(
  1172. read_opts, read_seq, false /* immutable_memtable */);
  1173. range_del_agg.AddTombstones(
  1174. std::unique_ptr<FragmentedRangeTombstoneIterator>(active_range_del_iter));
  1175. Status status;
  1176. status = super_version->imm->AddRangeTombstoneIterators(
  1177. read_opts, nullptr /* arena */, &range_del_agg);
  1178. // AddRangeTombstoneIterators always return Status::OK.
  1179. assert(status.ok());
  1180. for (size_t i = 0; i < ranges.size() && status.ok() && !*overlap; ++i) {
  1181. auto* vstorage = super_version->current->storage_info();
  1182. auto* ucmp = vstorage->InternalComparator()->user_comparator();
  1183. InternalKey range_start(ranges[i].start, kMaxSequenceNumber,
  1184. kValueTypeForSeek);
  1185. memtable_iter->Seek(range_start.Encode());
  1186. status = memtable_iter->status();
  1187. ParsedInternalKey seek_result;
  1188. if (status.ok() && memtable_iter->Valid()) {
  1189. status = ParseInternalKey(memtable_iter->key(), &seek_result,
  1190. allow_data_in_errors);
  1191. }
  1192. if (status.ok()) {
  1193. if (memtable_iter->Valid() &&
  1194. ucmp->CompareWithoutTimestamp(seek_result.user_key,
  1195. ranges[i].limit) <= 0) {
  1196. *overlap = true;
  1197. } else if (range_del_agg.IsRangeOverlapped(ranges[i].start,
  1198. ranges[i].limit)) {
  1199. *overlap = true;
  1200. }
  1201. }
  1202. }
  1203. return status;
  1204. }
  1205. const int ColumnFamilyData::kCompactAllLevels = -1;
  1206. const int ColumnFamilyData::kCompactToBaseLevel = -2;
  1207. Compaction* ColumnFamilyData::CompactRange(
  1208. const MutableCFOptions& mutable_cf_options,
  1209. const MutableDBOptions& mutable_db_options, int input_level,
  1210. int output_level, const CompactRangeOptions& compact_range_options,
  1211. const InternalKey* begin, const InternalKey* end,
  1212. InternalKey** compaction_end, bool* conflict,
  1213. uint64_t max_file_num_to_ignore, const std::string& trim_ts) {
  1214. auto* result = compaction_picker_->PickCompactionForCompactRange(
  1215. GetName(), mutable_cf_options, mutable_db_options,
  1216. current_->storage_info(), input_level, output_level,
  1217. compact_range_options, begin, end, compaction_end, conflict,
  1218. max_file_num_to_ignore, trim_ts);
  1219. if (result != nullptr) {
  1220. result->FinalizeInputInfo(current_);
  1221. }
  1222. TEST_SYNC_POINT("ColumnFamilyData::CompactRange:Return");
  1223. return result;
  1224. }
  1225. SuperVersion* ColumnFamilyData::GetReferencedSuperVersion(DBImpl* db) {
  1226. SuperVersion* sv = GetThreadLocalSuperVersion(db);
  1227. sv->Ref();
  1228. if (!ReturnThreadLocalSuperVersion(sv)) {
  1229. // This Unref() corresponds to the Ref() in GetThreadLocalSuperVersion()
  1230. // when the thread-local pointer was populated. So, the Ref() earlier in
  1231. // this function still prevents the returned SuperVersion* from being
  1232. // deleted out from under the caller.
  1233. sv->Unref();
  1234. }
  1235. return sv;
  1236. }
  1237. SuperVersion* ColumnFamilyData::GetThreadLocalSuperVersion(DBImpl* db) {
  1238. // The SuperVersion is cached in thread local storage to avoid acquiring
  1239. // mutex when SuperVersion does not change since the last use. When a new
  1240. // SuperVersion is installed, the compaction or flush thread cleans up
  1241. // cached SuperVersion in all existing thread local storage. To avoid
  1242. // acquiring mutex for this operation, we use atomic Swap() on the thread
  1243. // local pointer to guarantee exclusive access. If the thread local pointer
  1244. // is being used while a new SuperVersion is installed, the cached
  1245. // SuperVersion can become stale. In that case, the background thread would
  1246. // have swapped in kSVObsolete. We re-check the value at when returning
  1247. // SuperVersion back to thread local, with an atomic compare and swap.
  1248. // The superversion will need to be released if detected to be stale.
  1249. void* ptr = local_sv_->Swap(SuperVersion::kSVInUse);
  1250. // Invariant:
  1251. // (1) Scrape (always) installs kSVObsolete in ThreadLocal storage
  1252. // (2) the Swap above (always) installs kSVInUse, ThreadLocal storage
  1253. // should only keep kSVInUse before ReturnThreadLocalSuperVersion call
  1254. // (if no Scrape happens).
  1255. assert(ptr != SuperVersion::kSVInUse);
  1256. SuperVersion* sv = static_cast<SuperVersion*>(ptr);
  1257. if (sv == SuperVersion::kSVObsolete) {
  1258. RecordTick(ioptions_.stats, NUMBER_SUPERVERSION_ACQUIRES);
  1259. db->mutex()->Lock();
  1260. sv = super_version_->Ref();
  1261. db->mutex()->Unlock();
  1262. }
  1263. assert(sv != nullptr);
  1264. return sv;
  1265. }
  1266. bool ColumnFamilyData::ReturnThreadLocalSuperVersion(SuperVersion* sv) {
  1267. assert(sv != nullptr);
  1268. // Put the SuperVersion back
  1269. void* expected = SuperVersion::kSVInUse;
  1270. if (local_sv_->CompareAndSwap(static_cast<void*>(sv), expected)) {
  1271. // When we see kSVInUse in the ThreadLocal, we are sure ThreadLocal
  1272. // storage has not been altered and no Scrape has happened. The
  1273. // SuperVersion is still current.
  1274. return true;
  1275. } else {
  1276. // ThreadLocal scrape happened in the process of this GetImpl call (after
  1277. // thread local Swap() at the beginning and before CompareAndSwap()).
  1278. // This means the SuperVersion it holds is obsolete.
  1279. assert(expected == SuperVersion::kSVObsolete);
  1280. }
  1281. return false;
  1282. }
  1283. void ColumnFamilyData::InstallSuperVersion(
  1284. SuperVersionContext* sv_context, InstrumentedMutex* db_mutex,
  1285. std::optional<std::shared_ptr<SeqnoToTimeMapping>>
  1286. new_seqno_to_time_mapping) {
  1287. db_mutex->AssertHeld();
  1288. SuperVersion* new_superversion = sv_context->new_superversion.release();
  1289. new_superversion->mutable_cf_options = GetLatestMutableCFOptions();
  1290. new_superversion->Init(this, mem_, imm_.current(), current_,
  1291. new_seqno_to_time_mapping.has_value()
  1292. ? std::move(new_seqno_to_time_mapping.value())
  1293. : super_version_
  1294. ? super_version_->ShareSeqnoToTimeMapping()
  1295. : nullptr);
  1296. SuperVersion* old_superversion = super_version_;
  1297. super_version_ = new_superversion;
  1298. if (old_superversion == nullptr || old_superversion->current != current() ||
  1299. old_superversion->mem != mem_ ||
  1300. old_superversion->imm != imm_.current()) {
  1301. // Should not recalculate slow down condition if nothing has changed, since
  1302. // currently RecalculateWriteStallConditions() treats it as further slowing
  1303. // down is needed.
  1304. super_version_->write_stall_condition =
  1305. RecalculateWriteStallConditions(new_superversion->mutable_cf_options);
  1306. } else {
  1307. super_version_->write_stall_condition =
  1308. old_superversion->write_stall_condition;
  1309. }
  1310. if (old_superversion != nullptr) {
  1311. // Reset SuperVersions cached in thread local storage.
  1312. // This should be done before old_superversion->Unref(). That's to ensure
  1313. // that local_sv_ never holds the last reference to SuperVersion, since
  1314. // it has no means to safely do SuperVersion cleanup.
  1315. ResetThreadLocalSuperVersions();
  1316. if (old_superversion->mutable_cf_options.write_buffer_size !=
  1317. new_superversion->mutable_cf_options.write_buffer_size) {
  1318. mem_->UpdateWriteBufferSize(
  1319. new_superversion->mutable_cf_options.write_buffer_size);
  1320. }
  1321. if (old_superversion->write_stall_condition !=
  1322. new_superversion->write_stall_condition) {
  1323. sv_context->PushWriteStallNotification(
  1324. old_superversion->write_stall_condition,
  1325. new_superversion->write_stall_condition, GetName(), &ioptions());
  1326. }
  1327. if (old_superversion->Unref()) {
  1328. old_superversion->Cleanup();
  1329. sv_context->superversions_to_free.push_back(old_superversion);
  1330. }
  1331. }
  1332. ++super_version_number_;
  1333. super_version_->version_number = super_version_number_;
  1334. }
  1335. void ColumnFamilyData::ResetThreadLocalSuperVersions() {
  1336. autovector<void*> sv_ptrs;
  1337. local_sv_->Scrape(&sv_ptrs, SuperVersion::kSVObsolete);
  1338. for (auto ptr : sv_ptrs) {
  1339. assert(ptr);
  1340. if (ptr == SuperVersion::kSVInUse) {
  1341. continue;
  1342. }
  1343. auto sv = static_cast<SuperVersion*>(ptr);
  1344. bool was_last_ref __attribute__((__unused__));
  1345. was_last_ref = sv->Unref();
  1346. // sv couldn't have been the last reference because
  1347. // ResetThreadLocalSuperVersions() is called before
  1348. // unref'ing super_version_.
  1349. assert(!was_last_ref);
  1350. }
  1351. }
  1352. Status ColumnFamilyData::ValidateOptions(
  1353. const DBOptions& db_options, const ColumnFamilyOptions& cf_options) {
  1354. Status s;
  1355. s = CheckCompressionSupported(cf_options);
  1356. if (s.ok() && db_options.allow_concurrent_memtable_write) {
  1357. s = CheckConcurrentWritesSupported(cf_options);
  1358. }
  1359. if (s.ok() && db_options.unordered_write &&
  1360. cf_options.max_successive_merges != 0) {
  1361. s = Status::InvalidArgument(
  1362. "max_successive_merges > 0 is incompatible with unordered_write");
  1363. }
  1364. if (s.ok()) {
  1365. s = CheckCFPathsSupported(db_options, cf_options);
  1366. }
  1367. if (!s.ok()) {
  1368. return s;
  1369. }
  1370. if (cf_options.ttl > 0 && cf_options.ttl != kDefaultTtl) {
  1371. if (!cf_options.table_factory->IsInstanceOf(
  1372. TableFactory::kBlockBasedTableName())) {
  1373. return Status::NotSupported(
  1374. "TTL is only supported in Block-Based Table format. ");
  1375. }
  1376. }
  1377. if (cf_options.periodic_compaction_seconds > 0 &&
  1378. cf_options.periodic_compaction_seconds != kDefaultPeriodicCompSecs) {
  1379. if (!cf_options.table_factory->IsInstanceOf(
  1380. TableFactory::kBlockBasedTableName())) {
  1381. return Status::NotSupported(
  1382. "Periodic Compaction is only supported in "
  1383. "Block-Based Table format. ");
  1384. }
  1385. }
  1386. const auto* ucmp = cf_options.comparator;
  1387. assert(ucmp);
  1388. if (ucmp->timestamp_size() > 0 &&
  1389. !cf_options.persist_user_defined_timestamps) {
  1390. if (db_options.atomic_flush) {
  1391. return Status::NotSupported(
  1392. "Not persisting user-defined timestamps feature is not supported"
  1393. "in combination with atomic flush.");
  1394. }
  1395. if (db_options.allow_concurrent_memtable_write) {
  1396. return Status::NotSupported(
  1397. "Not persisting user-defined timestamps feature is not supported"
  1398. " in combination with concurrent memtable write.");
  1399. }
  1400. const char* comparator_name = cf_options.comparator->Name();
  1401. size_t name_size = strlen(comparator_name);
  1402. const char* suffix = ".u64ts";
  1403. size_t suffix_size = strlen(suffix);
  1404. if (name_size <= suffix_size ||
  1405. strcmp(comparator_name + name_size - suffix_size, suffix) != 0) {
  1406. return Status::NotSupported(
  1407. "Not persisting user-defined timestamps"
  1408. "feature only support user-defined timestamps formatted as "
  1409. "uint64_t.");
  1410. }
  1411. }
  1412. if (cf_options.enable_blob_garbage_collection) {
  1413. if (cf_options.blob_garbage_collection_age_cutoff < 0.0 ||
  1414. cf_options.blob_garbage_collection_age_cutoff > 1.0) {
  1415. return Status::InvalidArgument(
  1416. "The age cutoff for blob garbage collection should be in the range "
  1417. "[0.0, 1.0].");
  1418. }
  1419. if (cf_options.blob_garbage_collection_force_threshold < 0.0 ||
  1420. cf_options.blob_garbage_collection_force_threshold > 1.0) {
  1421. return Status::InvalidArgument(
  1422. "The garbage ratio threshold for forcing blob garbage collection "
  1423. "should be in the range [0.0, 1.0].");
  1424. }
  1425. }
  1426. if (cf_options.compaction_style == kCompactionStyleFIFO &&
  1427. db_options.max_open_files != -1 && cf_options.ttl > 0) {
  1428. return Status::NotSupported(
  1429. "FIFO compaction only supported with max_open_files = -1.");
  1430. }
  1431. std::vector<uint32_t> supported{0, 1, 2, 4, 8};
  1432. if (std::find(supported.begin(), supported.end(),
  1433. cf_options.memtable_protection_bytes_per_key) ==
  1434. supported.end()) {
  1435. return Status::NotSupported(
  1436. "Memtable per key-value checksum protection only supports 0, 1, 2, 4 "
  1437. "or 8 bytes per key.");
  1438. }
  1439. if (std::find(supported.begin(), supported.end(),
  1440. cf_options.block_protection_bytes_per_key) == supported.end()) {
  1441. return Status::NotSupported(
  1442. "Block per key-value checksum protection only supports 0, 1, 2, 4 "
  1443. "or 8 bytes per key.");
  1444. }
  1445. if (!cf_options.compaction_options_fifo.file_temperature_age_thresholds
  1446. .empty()) {
  1447. if (cf_options.compaction_style != kCompactionStyleFIFO) {
  1448. return Status::NotSupported(
  1449. "Option file_temperature_age_thresholds only supports FIFO "
  1450. "compaction.");
  1451. } else if (cf_options.num_levels > 1) {
  1452. return Status::NotSupported(
  1453. "Option file_temperature_age_thresholds is only supported when "
  1454. "num_levels = 1.");
  1455. } else {
  1456. const auto& ages =
  1457. cf_options.compaction_options_fifo.file_temperature_age_thresholds;
  1458. assert(ages.size() >= 1);
  1459. // check that age is sorted
  1460. for (size_t i = 0; i < ages.size() - 1; ++i) {
  1461. if (ages[i].age >= ages[i + 1].age) {
  1462. return Status::NotSupported(
  1463. "Option file_temperature_age_thresholds requires elements to be "
  1464. "sorted in increasing order with respect to `age` field.");
  1465. }
  1466. }
  1467. }
  1468. }
  1469. if (cf_options.compaction_style == kCompactionStyleUniversal) {
  1470. int max_read_amp = cf_options.compaction_options_universal.max_read_amp;
  1471. if (max_read_amp < -1) {
  1472. return Status::NotSupported(
  1473. "CompactionOptionsUniversal::max_read_amp should be at least -1.");
  1474. } else if (0 < max_read_amp &&
  1475. max_read_amp < cf_options.level0_file_num_compaction_trigger) {
  1476. return Status::NotSupported(
  1477. "CompactionOptionsUniversal::max_read_amp limits the number of sorted"
  1478. " runs but is smaller than the compaction trigger "
  1479. "level0_file_num_compaction_trigger.");
  1480. }
  1481. }
  1482. return s;
  1483. }
  1484. Status ColumnFamilyData::SetOptions(
  1485. const DBOptions& db_opts,
  1486. const std::unordered_map<std::string, std::string>& options_map) {
  1487. ColumnFamilyOptions cf_opts =
  1488. BuildColumnFamilyOptions(initial_cf_options_, mutable_cf_options_);
  1489. ConfigOptions config_opts;
  1490. config_opts.mutable_options_only = true;
  1491. #ifndef NDEBUG
  1492. if (TEST_allowSetOptionsImmutableInMutable) {
  1493. config_opts.mutable_options_only = false;
  1494. }
  1495. #endif
  1496. Status s = GetColumnFamilyOptionsFromMap(config_opts, cf_opts, options_map,
  1497. &cf_opts);
  1498. if (s.ok()) {
  1499. // FIXME: we should call SanitizeOptions() too or consolidate it with
  1500. // ValidateOptions().
  1501. s = ValidateOptions(db_opts, cf_opts);
  1502. }
  1503. if (s.ok()) {
  1504. mutable_cf_options_ = MutableCFOptions(cf_opts);
  1505. mutable_cf_options_.RefreshDerivedOptions(ioptions_);
  1506. }
  1507. return s;
  1508. }
  1509. Status ColumnFamilyData::AddDirectories(
  1510. std::map<std::string, std::shared_ptr<FSDirectory>>* created_dirs) {
  1511. Status s;
  1512. assert(created_dirs != nullptr);
  1513. assert(data_dirs_.empty());
  1514. for (auto& p : ioptions_.cf_paths) {
  1515. auto existing_dir = created_dirs->find(p.path);
  1516. if (existing_dir == created_dirs->end()) {
  1517. std::unique_ptr<FSDirectory> path_directory;
  1518. s = DBImpl::CreateAndNewDirectory(ioptions_.fs.get(), p.path,
  1519. &path_directory);
  1520. if (!s.ok()) {
  1521. return s;
  1522. }
  1523. assert(path_directory != nullptr);
  1524. data_dirs_.emplace_back(path_directory.release());
  1525. (*created_dirs)[p.path] = data_dirs_.back();
  1526. } else {
  1527. data_dirs_.emplace_back(existing_dir->second);
  1528. }
  1529. }
  1530. assert(data_dirs_.size() == ioptions_.cf_paths.size());
  1531. return s;
  1532. }
  1533. FSDirectory* ColumnFamilyData::GetDataDir(size_t path_id) const {
  1534. if (data_dirs_.empty()) {
  1535. return nullptr;
  1536. }
  1537. assert(path_id < data_dirs_.size());
  1538. return data_dirs_[path_id].get();
  1539. }
  1540. void ColumnFamilyData::SetFlushSkipReschedule() {
  1541. const Comparator* ucmp = user_comparator();
  1542. const size_t ts_sz = ucmp->timestamp_size();
  1543. if (ts_sz == 0 || ioptions_.persist_user_defined_timestamps) {
  1544. return;
  1545. }
  1546. flush_skip_reschedule_.store(true);
  1547. }
  1548. bool ColumnFamilyData::GetAndClearFlushSkipReschedule() {
  1549. return flush_skip_reschedule_.exchange(false);
  1550. }
  1551. bool ColumnFamilyData::ShouldPostponeFlushToRetainUDT(
  1552. uint64_t max_memtable_id) {
  1553. const Comparator* ucmp = user_comparator();
  1554. const size_t ts_sz = ucmp->timestamp_size();
  1555. if (ts_sz == 0 || ioptions_.persist_user_defined_timestamps) {
  1556. return false;
  1557. }
  1558. // If users set the `persist_user_defined_timestamps` flag to false, they
  1559. // should also set the `full_history_ts_low` flag to indicate the range of
  1560. // user-defined timestamps to retain in memory. Otherwise, we do not
  1561. // explicitly postpone flush to retain UDTs.
  1562. const std::string& full_history_ts_low = GetFullHistoryTsLow();
  1563. if (full_history_ts_low.empty()) {
  1564. return false;
  1565. }
  1566. for (const Slice& table_newest_udt :
  1567. imm()->GetTablesNewestUDT(max_memtable_id)) {
  1568. if (table_newest_udt.empty()) {
  1569. continue;
  1570. }
  1571. assert(table_newest_udt.size() == full_history_ts_low.size());
  1572. // Checking the newest UDT contained in MemTable with ascending ID up to
  1573. // `max_memtable_id`. Return immediately on finding the first MemTable that
  1574. // needs postponing.
  1575. if (ucmp->CompareTimestamp(table_newest_udt, full_history_ts_low) >= 0) {
  1576. return true;
  1577. }
  1578. }
  1579. return false;
  1580. }
  1581. void ColumnFamilyData::RecoverEpochNumbers() {
  1582. assert(current_);
  1583. auto* vstorage = current_->storage_info();
  1584. assert(vstorage);
  1585. vstorage->RecoverEpochNumbers(this);
  1586. }
  1587. ColumnFamilySet::ColumnFamilySet(const std::string& dbname,
  1588. const ImmutableDBOptions* db_options,
  1589. const FileOptions& file_options,
  1590. Cache* table_cache,
  1591. WriteBufferManager* _write_buffer_manager,
  1592. WriteController* _write_controller,
  1593. BlockCacheTracer* const block_cache_tracer,
  1594. const std::shared_ptr<IOTracer>& io_tracer,
  1595. const std::string& db_id,
  1596. const std::string& db_session_id)
  1597. : max_column_family_(0),
  1598. file_options_(file_options),
  1599. dummy_cfd_(new ColumnFamilyData(
  1600. ColumnFamilyData::kDummyColumnFamilyDataId, "", nullptr, nullptr,
  1601. nullptr, ColumnFamilyOptions(), *db_options, &file_options_, nullptr,
  1602. block_cache_tracer, io_tracer, db_id, db_session_id,
  1603. /*read_only*/ true)),
  1604. default_cfd_cache_(nullptr),
  1605. db_name_(dbname),
  1606. db_options_(db_options),
  1607. table_cache_(table_cache),
  1608. write_buffer_manager_(_write_buffer_manager),
  1609. write_controller_(_write_controller),
  1610. block_cache_tracer_(block_cache_tracer),
  1611. io_tracer_(io_tracer),
  1612. db_id_(db_id),
  1613. db_session_id_(db_session_id) {
  1614. // initialize linked list
  1615. dummy_cfd_->prev_ = dummy_cfd_;
  1616. dummy_cfd_->next_ = dummy_cfd_;
  1617. }
  1618. ColumnFamilySet::~ColumnFamilySet() {
  1619. while (column_family_data_.size() > 0) {
  1620. // cfd destructor will delete itself from column_family_data_
  1621. auto cfd = column_family_data_.begin()->second;
  1622. bool last_ref __attribute__((__unused__));
  1623. last_ref = cfd->UnrefAndTryDelete();
  1624. assert(last_ref);
  1625. }
  1626. bool dummy_last_ref __attribute__((__unused__));
  1627. dummy_last_ref = dummy_cfd_->UnrefAndTryDelete();
  1628. assert(dummy_last_ref);
  1629. }
  1630. ColumnFamilyData* ColumnFamilySet::GetDefault() const {
  1631. assert(default_cfd_cache_ != nullptr);
  1632. return default_cfd_cache_;
  1633. }
  1634. ColumnFamilyData* ColumnFamilySet::GetColumnFamily(uint32_t id) const {
  1635. auto cfd_iter = column_family_data_.find(id);
  1636. if (cfd_iter != column_family_data_.end()) {
  1637. return cfd_iter->second;
  1638. } else {
  1639. return nullptr;
  1640. }
  1641. }
  1642. ColumnFamilyData* ColumnFamilySet::GetColumnFamily(
  1643. const std::string& name) const {
  1644. auto cfd_iter = column_families_.find(name);
  1645. if (cfd_iter != column_families_.end()) {
  1646. auto cfd = GetColumnFamily(cfd_iter->second);
  1647. assert(cfd != nullptr);
  1648. return cfd;
  1649. } else {
  1650. return nullptr;
  1651. }
  1652. }
  1653. uint32_t ColumnFamilySet::GetNextColumnFamilyID() {
  1654. return ++max_column_family_;
  1655. }
  1656. uint32_t ColumnFamilySet::GetMaxColumnFamily() { return max_column_family_; }
  1657. void ColumnFamilySet::UpdateMaxColumnFamily(uint32_t new_max_column_family) {
  1658. max_column_family_ = std::max(new_max_column_family, max_column_family_);
  1659. }
  1660. size_t ColumnFamilySet::NumberOfColumnFamilies() const {
  1661. return column_families_.size();
  1662. }
  1663. // under a DB mutex AND write thread
  1664. ColumnFamilyData* ColumnFamilySet::CreateColumnFamily(
  1665. const std::string& name, uint32_t id, Version* dummy_versions,
  1666. const ColumnFamilyOptions& options, bool read_only) {
  1667. assert(column_families_.find(name) == column_families_.end());
  1668. ColumnFamilyData* new_cfd = new ColumnFamilyData(
  1669. id, name, dummy_versions, table_cache_, write_buffer_manager_, options,
  1670. *db_options_, &file_options_, this, block_cache_tracer_, io_tracer_,
  1671. db_id_, db_session_id_, read_only);
  1672. column_families_.insert({name, id});
  1673. column_family_data_.insert({id, new_cfd});
  1674. auto ucmp = new_cfd->user_comparator();
  1675. assert(ucmp);
  1676. size_t ts_sz = ucmp->timestamp_size();
  1677. running_ts_sz_.insert({id, ts_sz});
  1678. if (ts_sz > 0) {
  1679. ts_sz_for_record_.insert({id, ts_sz});
  1680. }
  1681. max_column_family_ = std::max(max_column_family_, id);
  1682. // add to linked list
  1683. new_cfd->next_ = dummy_cfd_;
  1684. auto prev = dummy_cfd_->prev_;
  1685. new_cfd->prev_ = prev;
  1686. prev->next_ = new_cfd;
  1687. dummy_cfd_->prev_ = new_cfd;
  1688. if (id == 0) {
  1689. default_cfd_cache_ = new_cfd;
  1690. }
  1691. return new_cfd;
  1692. }
  1693. // under a DB mutex AND from a write thread
  1694. void ColumnFamilySet::RemoveColumnFamily(ColumnFamilyData* cfd) {
  1695. uint32_t cf_id = cfd->GetID();
  1696. auto cfd_iter = column_family_data_.find(cf_id);
  1697. assert(cfd_iter != column_family_data_.end());
  1698. column_family_data_.erase(cfd_iter);
  1699. column_families_.erase(cfd->GetName());
  1700. running_ts_sz_.erase(cf_id);
  1701. ts_sz_for_record_.erase(cf_id);
  1702. }
  1703. // under a DB mutex OR from a write thread
  1704. bool ColumnFamilyMemTablesImpl::Seek(uint32_t column_family_id) {
  1705. if (column_family_id == 0) {
  1706. // optimization for common case
  1707. current_ = column_family_set_->GetDefault();
  1708. } else {
  1709. current_ = column_family_set_->GetColumnFamily(column_family_id);
  1710. }
  1711. handle_.SetCFD(current_);
  1712. return current_ != nullptr;
  1713. }
  1714. uint64_t ColumnFamilyMemTablesImpl::GetLogNumber() const {
  1715. assert(current_ != nullptr);
  1716. return current_->GetLogNumber();
  1717. }
  1718. MemTable* ColumnFamilyMemTablesImpl::GetMemTable() const {
  1719. assert(current_ != nullptr);
  1720. return current_->mem();
  1721. }
  1722. ColumnFamilyHandle* ColumnFamilyMemTablesImpl::GetColumnFamilyHandle() {
  1723. assert(current_ != nullptr);
  1724. return &handle_;
  1725. }
  1726. uint32_t GetColumnFamilyID(ColumnFamilyHandle* column_family) {
  1727. uint32_t column_family_id = 0;
  1728. if (column_family != nullptr) {
  1729. auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
  1730. column_family_id = cfh->GetID();
  1731. }
  1732. return column_family_id;
  1733. }
  1734. const Comparator* GetColumnFamilyUserComparator(
  1735. ColumnFamilyHandle* column_family) {
  1736. if (column_family != nullptr) {
  1737. return column_family->GetComparator();
  1738. }
  1739. return nullptr;
  1740. }
  1741. const ImmutableOptions& GetImmutableOptions(ColumnFamilyHandle* column_family) {
  1742. assert(column_family);
  1743. ColumnFamilyHandleImpl* const handle =
  1744. static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
  1745. assert(handle);
  1746. const ColumnFamilyData* const cfd = handle->cfd();
  1747. assert(cfd);
  1748. return cfd->ioptions();
  1749. }
  1750. } // namespace ROCKSDB_NAMESPACE