column_family.cc 57 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #include "db/column_family.h"
  10. #include <algorithm>
  11. #include <cinttypes>
  12. #include <limits>
  13. #include <string>
  14. #include <vector>
  15. #include "db/compaction/compaction_picker.h"
  16. #include "db/compaction/compaction_picker_fifo.h"
  17. #include "db/compaction/compaction_picker_level.h"
  18. #include "db/compaction/compaction_picker_universal.h"
  19. #include "db/db_impl/db_impl.h"
  20. #include "db/internal_stats.h"
  21. #include "db/job_context.h"
  22. #include "db/range_del_aggregator.h"
  23. #include "db/table_properties_collector.h"
  24. #include "db/version_set.h"
  25. #include "db/write_controller.h"
  26. #include "file/sst_file_manager_impl.h"
  27. #include "memtable/hash_skiplist_rep.h"
  28. #include "monitoring/thread_status_util.h"
  29. #include "options/options_helper.h"
  30. #include "port/port.h"
  31. #include "table/block_based/block_based_table_factory.h"
  32. #include "table/merging_iterator.h"
  33. #include "util/autovector.h"
  34. #include "util/compression.h"
  35. namespace ROCKSDB_NAMESPACE {
  36. ColumnFamilyHandleImpl::ColumnFamilyHandleImpl(
  37. ColumnFamilyData* column_family_data, DBImpl* db, InstrumentedMutex* mutex)
  38. : cfd_(column_family_data), db_(db), mutex_(mutex) {
  39. if (cfd_ != nullptr) {
  40. cfd_->Ref();
  41. }
  42. }
  43. ColumnFamilyHandleImpl::~ColumnFamilyHandleImpl() {
  44. if (cfd_ != nullptr) {
  45. #ifndef ROCKSDB_LITE
  46. for (auto& listener : cfd_->ioptions()->listeners) {
  47. listener->OnColumnFamilyHandleDeletionStarted(this);
  48. }
  49. #endif // ROCKSDB_LITE
  50. // Job id == 0 means that this is not our background process, but rather
  51. // user thread
  52. // Need to hold some shared pointers owned by the initial_cf_options
  53. // before final cleaning up finishes.
  54. ColumnFamilyOptions initial_cf_options_copy = cfd_->initial_cf_options();
  55. JobContext job_context(0);
  56. mutex_->Lock();
  57. bool dropped = cfd_->IsDropped();
  58. if (cfd_->UnrefAndTryDelete()) {
  59. if (dropped) {
  60. db_->FindObsoleteFiles(&job_context, false, true);
  61. }
  62. }
  63. mutex_->Unlock();
  64. if (job_context.HaveSomethingToDelete()) {
  65. bool defer_purge =
  66. db_->immutable_db_options().avoid_unnecessary_blocking_io;
  67. db_->PurgeObsoleteFiles(job_context, defer_purge);
  68. if (defer_purge) {
  69. mutex_->Lock();
  70. db_->SchedulePurge();
  71. mutex_->Unlock();
  72. }
  73. }
  74. job_context.Clean();
  75. }
  76. }
  77. uint32_t ColumnFamilyHandleImpl::GetID() const { return cfd()->GetID(); }
  78. const std::string& ColumnFamilyHandleImpl::GetName() const {
  79. return cfd()->GetName();
  80. }
  81. Status ColumnFamilyHandleImpl::GetDescriptor(ColumnFamilyDescriptor* desc) {
  82. #ifndef ROCKSDB_LITE
  83. // accessing mutable cf-options requires db mutex.
  84. InstrumentedMutexLock l(mutex_);
  85. *desc = ColumnFamilyDescriptor(cfd()->GetName(), cfd()->GetLatestCFOptions());
  86. return Status::OK();
  87. #else
  88. (void)desc;
  89. return Status::NotSupported();
  90. #endif // !ROCKSDB_LITE
  91. }
  92. const Comparator* ColumnFamilyHandleImpl::GetComparator() const {
  93. return cfd()->user_comparator();
  94. }
  95. void GetIntTblPropCollectorFactory(
  96. const ImmutableCFOptions& ioptions,
  97. std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
  98. int_tbl_prop_collector_factories) {
  99. auto& collector_factories = ioptions.table_properties_collector_factories;
  100. for (size_t i = 0; i < ioptions.table_properties_collector_factories.size();
  101. ++i) {
  102. assert(collector_factories[i]);
  103. int_tbl_prop_collector_factories->emplace_back(
  104. new UserKeyTablePropertiesCollectorFactory(collector_factories[i]));
  105. }
  106. }
  107. Status CheckCompressionSupported(const ColumnFamilyOptions& cf_options) {
  108. if (!cf_options.compression_per_level.empty()) {
  109. for (size_t level = 0; level < cf_options.compression_per_level.size();
  110. ++level) {
  111. if (!CompressionTypeSupported(cf_options.compression_per_level[level])) {
  112. return Status::InvalidArgument(
  113. "Compression type " +
  114. CompressionTypeToString(cf_options.compression_per_level[level]) +
  115. " is not linked with the binary.");
  116. }
  117. }
  118. } else {
  119. if (!CompressionTypeSupported(cf_options.compression)) {
  120. return Status::InvalidArgument(
  121. "Compression type " +
  122. CompressionTypeToString(cf_options.compression) +
  123. " is not linked with the binary.");
  124. }
  125. }
  126. if (cf_options.compression_opts.zstd_max_train_bytes > 0) {
  127. if (!ZSTD_TrainDictionarySupported()) {
  128. return Status::InvalidArgument(
  129. "zstd dictionary trainer cannot be used because ZSTD 1.1.3+ "
  130. "is not linked with the binary.");
  131. }
  132. if (cf_options.compression_opts.max_dict_bytes == 0) {
  133. return Status::InvalidArgument(
  134. "The dictionary size limit (`CompressionOptions::max_dict_bytes`) "
  135. "should be nonzero if we're using zstd's dictionary generator.");
  136. }
  137. }
  138. return Status::OK();
  139. }
  140. Status CheckConcurrentWritesSupported(const ColumnFamilyOptions& cf_options) {
  141. if (cf_options.inplace_update_support) {
  142. return Status::InvalidArgument(
  143. "In-place memtable updates (inplace_update_support) is not compatible "
  144. "with concurrent writes (allow_concurrent_memtable_write)");
  145. }
  146. if (!cf_options.memtable_factory->IsInsertConcurrentlySupported()) {
  147. return Status::InvalidArgument(
  148. "Memtable doesn't concurrent writes (allow_concurrent_memtable_write)");
  149. }
  150. return Status::OK();
  151. }
  152. Status CheckCFPathsSupported(const DBOptions& db_options,
  153. const ColumnFamilyOptions& cf_options) {
  154. // More than one cf_paths are supported only in universal
  155. // and level compaction styles. This function also checks the case
  156. // in which cf_paths is not specified, which results in db_paths
  157. // being used.
  158. if ((cf_options.compaction_style != kCompactionStyleUniversal) &&
  159. (cf_options.compaction_style != kCompactionStyleLevel)) {
  160. if (cf_options.cf_paths.size() > 1) {
  161. return Status::NotSupported(
  162. "More than one CF paths are only supported in "
  163. "universal and level compaction styles. ");
  164. } else if (cf_options.cf_paths.empty() &&
  165. db_options.db_paths.size() > 1) {
  166. return Status::NotSupported(
  167. "More than one DB paths are only supported in "
  168. "universal and level compaction styles. ");
  169. }
  170. }
  171. return Status::OK();
  172. }
  173. namespace {
  174. const uint64_t kDefaultTtl = 0xfffffffffffffffe;
  175. const uint64_t kDefaultPeriodicCompSecs = 0xfffffffffffffffe;
  176. }; // namespace
  177. ColumnFamilyOptions SanitizeOptions(const ImmutableDBOptions& db_options,
  178. const ColumnFamilyOptions& src) {
  179. ColumnFamilyOptions result = src;
  180. size_t clamp_max = std::conditional<
  181. sizeof(size_t) == 4, std::integral_constant<size_t, 0xffffffff>,
  182. std::integral_constant<uint64_t, 64ull << 30>>::type::value;
  183. ClipToRange(&result.write_buffer_size, ((size_t)64) << 10, clamp_max);
  184. // if user sets arena_block_size, we trust user to use this value. Otherwise,
  185. // calculate a proper value from writer_buffer_size;
  186. if (result.arena_block_size <= 0) {
  187. result.arena_block_size = result.write_buffer_size / 8;
  188. // Align up to 4k
  189. const size_t align = 4 * 1024;
  190. result.arena_block_size =
  191. ((result.arena_block_size + align - 1) / align) * align;
  192. }
  193. result.min_write_buffer_number_to_merge =
  194. std::min(result.min_write_buffer_number_to_merge,
  195. result.max_write_buffer_number - 1);
  196. if (result.min_write_buffer_number_to_merge < 1) {
  197. result.min_write_buffer_number_to_merge = 1;
  198. }
  199. if (result.num_levels < 1) {
  200. result.num_levels = 1;
  201. }
  202. if (result.compaction_style == kCompactionStyleLevel &&
  203. result.num_levels < 2) {
  204. result.num_levels = 2;
  205. }
  206. if (result.compaction_style == kCompactionStyleUniversal &&
  207. db_options.allow_ingest_behind && result.num_levels < 3) {
  208. result.num_levels = 3;
  209. }
  210. if (result.max_write_buffer_number < 2) {
  211. result.max_write_buffer_number = 2;
  212. }
  213. // fall back max_write_buffer_number_to_maintain if
  214. // max_write_buffer_size_to_maintain is not set
  215. if (result.max_write_buffer_size_to_maintain < 0) {
  216. result.max_write_buffer_size_to_maintain =
  217. result.max_write_buffer_number *
  218. static_cast<int64_t>(result.write_buffer_size);
  219. } else if (result.max_write_buffer_size_to_maintain == 0 &&
  220. result.max_write_buffer_number_to_maintain < 0) {
  221. result.max_write_buffer_number_to_maintain = result.max_write_buffer_number;
  222. }
  223. // bloom filter size shouldn't exceed 1/4 of memtable size.
  224. if (result.memtable_prefix_bloom_size_ratio > 0.25) {
  225. result.memtable_prefix_bloom_size_ratio = 0.25;
  226. } else if (result.memtable_prefix_bloom_size_ratio < 0) {
  227. result.memtable_prefix_bloom_size_ratio = 0;
  228. }
  229. if (!result.prefix_extractor) {
  230. assert(result.memtable_factory);
  231. Slice name = result.memtable_factory->Name();
  232. if (name.compare("HashSkipListRepFactory") == 0 ||
  233. name.compare("HashLinkListRepFactory") == 0) {
  234. result.memtable_factory = std::make_shared<SkipListFactory>();
  235. }
  236. }
  237. if (result.compaction_style == kCompactionStyleFIFO) {
  238. result.num_levels = 1;
  239. // since we delete level0 files in FIFO compaction when there are too many
  240. // of them, these options don't really mean anything
  241. result.level0_slowdown_writes_trigger = std::numeric_limits<int>::max();
  242. result.level0_stop_writes_trigger = std::numeric_limits<int>::max();
  243. }
  244. if (result.max_bytes_for_level_multiplier <= 0) {
  245. result.max_bytes_for_level_multiplier = 1;
  246. }
  247. if (result.level0_file_num_compaction_trigger == 0) {
  248. ROCKS_LOG_WARN(db_options.info_log.get(),
  249. "level0_file_num_compaction_trigger cannot be 0");
  250. result.level0_file_num_compaction_trigger = 1;
  251. }
  252. if (result.level0_stop_writes_trigger <
  253. result.level0_slowdown_writes_trigger ||
  254. result.level0_slowdown_writes_trigger <
  255. result.level0_file_num_compaction_trigger) {
  256. ROCKS_LOG_WARN(db_options.info_log.get(),
  257. "This condition must be satisfied: "
  258. "level0_stop_writes_trigger(%d) >= "
  259. "level0_slowdown_writes_trigger(%d) >= "
  260. "level0_file_num_compaction_trigger(%d)",
  261. result.level0_stop_writes_trigger,
  262. result.level0_slowdown_writes_trigger,
  263. result.level0_file_num_compaction_trigger);
  264. if (result.level0_slowdown_writes_trigger <
  265. result.level0_file_num_compaction_trigger) {
  266. result.level0_slowdown_writes_trigger =
  267. result.level0_file_num_compaction_trigger;
  268. }
  269. if (result.level0_stop_writes_trigger <
  270. result.level0_slowdown_writes_trigger) {
  271. result.level0_stop_writes_trigger = result.level0_slowdown_writes_trigger;
  272. }
  273. ROCKS_LOG_WARN(db_options.info_log.get(),
  274. "Adjust the value to "
  275. "level0_stop_writes_trigger(%d)"
  276. "level0_slowdown_writes_trigger(%d)"
  277. "level0_file_num_compaction_trigger(%d)",
  278. result.level0_stop_writes_trigger,
  279. result.level0_slowdown_writes_trigger,
  280. result.level0_file_num_compaction_trigger);
  281. }
  282. if (result.soft_pending_compaction_bytes_limit == 0) {
  283. result.soft_pending_compaction_bytes_limit =
  284. result.hard_pending_compaction_bytes_limit;
  285. } else if (result.hard_pending_compaction_bytes_limit > 0 &&
  286. result.soft_pending_compaction_bytes_limit >
  287. result.hard_pending_compaction_bytes_limit) {
  288. result.soft_pending_compaction_bytes_limit =
  289. result.hard_pending_compaction_bytes_limit;
  290. }
  291. #ifndef ROCKSDB_LITE
  292. // When the DB is stopped, it's possible that there are some .trash files that
  293. // were not deleted yet, when we open the DB we will find these .trash files
  294. // and schedule them to be deleted (or delete immediately if SstFileManager
  295. // was not used)
  296. auto sfm = static_cast<SstFileManagerImpl*>(db_options.sst_file_manager.get());
  297. for (size_t i = 0; i < result.cf_paths.size(); i++) {
  298. DeleteScheduler::CleanupDirectory(db_options.env, sfm, result.cf_paths[i].path);
  299. }
  300. #endif
  301. if (result.cf_paths.empty()) {
  302. result.cf_paths = db_options.db_paths;
  303. }
  304. if (result.level_compaction_dynamic_level_bytes) {
  305. if (result.compaction_style != kCompactionStyleLevel ||
  306. result.cf_paths.size() > 1U) {
  307. // 1. level_compaction_dynamic_level_bytes only makes sense for
  308. // level-based compaction.
  309. // 2. we don't yet know how to make both of this feature and multiple
  310. // DB path work.
  311. result.level_compaction_dynamic_level_bytes = false;
  312. }
  313. }
  314. if (result.max_compaction_bytes == 0) {
  315. result.max_compaction_bytes = result.target_file_size_base * 25;
  316. }
  317. bool is_block_based_table =
  318. (result.table_factory->Name() == BlockBasedTableFactory().Name());
  319. const uint64_t kAdjustedTtl = 30 * 24 * 60 * 60;
  320. if (result.ttl == kDefaultTtl) {
  321. if (is_block_based_table &&
  322. result.compaction_style != kCompactionStyleFIFO) {
  323. result.ttl = kAdjustedTtl;
  324. } else {
  325. result.ttl = 0;
  326. }
  327. }
  328. const uint64_t kAdjustedPeriodicCompSecs = 30 * 24 * 60 * 60;
  329. // Turn on periodic compactions and set them to occur once every 30 days if
  330. // compaction filters are used and periodic_compaction_seconds is set to the
  331. // default value.
  332. if (result.compaction_style != kCompactionStyleFIFO) {
  333. if ((result.compaction_filter != nullptr ||
  334. result.compaction_filter_factory != nullptr) &&
  335. result.periodic_compaction_seconds == kDefaultPeriodicCompSecs &&
  336. is_block_based_table) {
  337. result.periodic_compaction_seconds = kAdjustedPeriodicCompSecs;
  338. }
  339. } else {
  340. // result.compaction_style == kCompactionStyleFIFO
  341. if (result.ttl == 0) {
  342. if (is_block_based_table) {
  343. if (result.periodic_compaction_seconds == kDefaultPeriodicCompSecs) {
  344. result.periodic_compaction_seconds = kAdjustedPeriodicCompSecs;
  345. }
  346. result.ttl = result.periodic_compaction_seconds;
  347. }
  348. } else if (result.periodic_compaction_seconds != 0) {
  349. result.ttl = std::min(result.ttl, result.periodic_compaction_seconds);
  350. }
  351. }
  352. // TTL compactions would work similar to Periodic Compactions in Universal in
  353. // most of the cases. So, if ttl is set, execute the periodic compaction
  354. // codepath.
  355. if (result.compaction_style == kCompactionStyleUniversal && result.ttl != 0) {
  356. if (result.periodic_compaction_seconds != 0) {
  357. result.periodic_compaction_seconds =
  358. std::min(result.ttl, result.periodic_compaction_seconds);
  359. } else {
  360. result.periodic_compaction_seconds = result.ttl;
  361. }
  362. }
  363. if (result.periodic_compaction_seconds == kDefaultPeriodicCompSecs) {
  364. result.periodic_compaction_seconds = 0;
  365. }
  366. return result;
  367. }
  368. int SuperVersion::dummy = 0;
  369. void* const SuperVersion::kSVInUse = &SuperVersion::dummy;
  370. void* const SuperVersion::kSVObsolete = nullptr;
  371. SuperVersion::~SuperVersion() {
  372. for (auto td : to_delete) {
  373. delete td;
  374. }
  375. }
  376. SuperVersion* SuperVersion::Ref() {
  377. refs.fetch_add(1, std::memory_order_relaxed);
  378. return this;
  379. }
  380. bool SuperVersion::Unref() {
  381. // fetch_sub returns the previous value of ref
  382. uint32_t previous_refs = refs.fetch_sub(1);
  383. assert(previous_refs > 0);
  384. return previous_refs == 1;
  385. }
  386. void SuperVersion::Cleanup() {
  387. assert(refs.load(std::memory_order_relaxed) == 0);
  388. imm->Unref(&to_delete);
  389. MemTable* m = mem->Unref();
  390. if (m != nullptr) {
  391. auto* memory_usage = current->cfd()->imm()->current_memory_usage();
  392. assert(*memory_usage >= m->ApproximateMemoryUsage());
  393. *memory_usage -= m->ApproximateMemoryUsage();
  394. to_delete.push_back(m);
  395. }
  396. current->Unref();
  397. if (cfd->Unref()) {
  398. delete cfd;
  399. }
  400. }
  401. void SuperVersion::Init(ColumnFamilyData* new_cfd, MemTable* new_mem,
  402. MemTableListVersion* new_imm, Version* new_current) {
  403. cfd = new_cfd;
  404. mem = new_mem;
  405. imm = new_imm;
  406. current = new_current;
  407. cfd->Ref();
  408. mem->Ref();
  409. imm->Ref();
  410. current->Ref();
  411. refs.store(1, std::memory_order_relaxed);
  412. }
  413. namespace {
  414. void SuperVersionUnrefHandle(void* ptr) {
  415. // UnrefHandle is called when a thread exists or a ThreadLocalPtr gets
  416. // destroyed. When former happens, the thread shouldn't see kSVInUse.
  417. // When latter happens, we are in ~ColumnFamilyData(), no get should happen as
  418. // well.
  419. SuperVersion* sv = static_cast<SuperVersion*>(ptr);
  420. bool was_last_ref __attribute__((__unused__));
  421. was_last_ref = sv->Unref();
  422. // Thread-local SuperVersions can't outlive ColumnFamilyData::super_version_.
  423. // This is important because we can't do SuperVersion cleanup here.
  424. // That would require locking DB mutex, which would deadlock because
  425. // SuperVersionUnrefHandle is called with locked ThreadLocalPtr mutex.
  426. assert(!was_last_ref);
  427. }
  428. } // anonymous namespace
  429. ColumnFamilyData::ColumnFamilyData(
  430. uint32_t id, const std::string& name, Version* _dummy_versions,
  431. Cache* _table_cache, WriteBufferManager* write_buffer_manager,
  432. const ColumnFamilyOptions& cf_options, const ImmutableDBOptions& db_options,
  433. const FileOptions& file_options, ColumnFamilySet* column_family_set,
  434. BlockCacheTracer* const block_cache_tracer)
  435. : id_(id),
  436. name_(name),
  437. dummy_versions_(_dummy_versions),
  438. current_(nullptr),
  439. refs_(0),
  440. initialized_(false),
  441. dropped_(false),
  442. internal_comparator_(cf_options.comparator),
  443. initial_cf_options_(SanitizeOptions(db_options, cf_options)),
  444. ioptions_(db_options, initial_cf_options_),
  445. mutable_cf_options_(initial_cf_options_),
  446. is_delete_range_supported_(
  447. cf_options.table_factory->IsDeleteRangeSupported()),
  448. write_buffer_manager_(write_buffer_manager),
  449. mem_(nullptr),
  450. imm_(ioptions_.min_write_buffer_number_to_merge,
  451. ioptions_.max_write_buffer_number_to_maintain,
  452. ioptions_.max_write_buffer_size_to_maintain),
  453. super_version_(nullptr),
  454. super_version_number_(0),
  455. local_sv_(new ThreadLocalPtr(&SuperVersionUnrefHandle)),
  456. next_(nullptr),
  457. prev_(nullptr),
  458. log_number_(0),
  459. flush_reason_(FlushReason::kOthers),
  460. column_family_set_(column_family_set),
  461. queued_for_flush_(false),
  462. queued_for_compaction_(false),
  463. prev_compaction_needed_bytes_(0),
  464. allow_2pc_(db_options.allow_2pc),
  465. last_memtable_id_(0) {
  466. Ref();
  467. // Convert user defined table properties collector factories to internal ones.
  468. GetIntTblPropCollectorFactory(ioptions_, &int_tbl_prop_collector_factories_);
  469. // if _dummy_versions is nullptr, then this is a dummy column family.
  470. if (_dummy_versions != nullptr) {
  471. internal_stats_.reset(
  472. new InternalStats(ioptions_.num_levels, db_options.env, this));
  473. table_cache_.reset(new TableCache(ioptions_, file_options, _table_cache,
  474. block_cache_tracer));
  475. if (ioptions_.compaction_style == kCompactionStyleLevel) {
  476. compaction_picker_.reset(
  477. new LevelCompactionPicker(ioptions_, &internal_comparator_));
  478. #ifndef ROCKSDB_LITE
  479. } else if (ioptions_.compaction_style == kCompactionStyleUniversal) {
  480. compaction_picker_.reset(
  481. new UniversalCompactionPicker(ioptions_, &internal_comparator_));
  482. } else if (ioptions_.compaction_style == kCompactionStyleFIFO) {
  483. compaction_picker_.reset(
  484. new FIFOCompactionPicker(ioptions_, &internal_comparator_));
  485. } else if (ioptions_.compaction_style == kCompactionStyleNone) {
  486. compaction_picker_.reset(new NullCompactionPicker(
  487. ioptions_, &internal_comparator_));
  488. ROCKS_LOG_WARN(ioptions_.info_log,
  489. "Column family %s does not use any background compaction. "
  490. "Compactions can only be done via CompactFiles\n",
  491. GetName().c_str());
  492. #endif // !ROCKSDB_LITE
  493. } else {
  494. ROCKS_LOG_ERROR(ioptions_.info_log,
  495. "Unable to recognize the specified compaction style %d. "
  496. "Column family %s will use kCompactionStyleLevel.\n",
  497. ioptions_.compaction_style, GetName().c_str());
  498. compaction_picker_.reset(
  499. new LevelCompactionPicker(ioptions_, &internal_comparator_));
  500. }
  501. if (column_family_set_->NumberOfColumnFamilies() < 10) {
  502. ROCKS_LOG_INFO(ioptions_.info_log,
  503. "--------------- Options for column family [%s]:\n",
  504. name.c_str());
  505. initial_cf_options_.Dump(ioptions_.info_log);
  506. } else {
  507. ROCKS_LOG_INFO(ioptions_.info_log, "\t(skipping printing options)\n");
  508. }
  509. }
  510. RecalculateWriteStallConditions(mutable_cf_options_);
  511. }
  512. // DB mutex held
  513. ColumnFamilyData::~ColumnFamilyData() {
  514. assert(refs_.load(std::memory_order_relaxed) == 0);
  515. // remove from linked list
  516. auto prev = prev_;
  517. auto next = next_;
  518. prev->next_ = next;
  519. next->prev_ = prev;
  520. if (!dropped_ && column_family_set_ != nullptr) {
  521. // If it's dropped, it's already removed from column family set
  522. // If column_family_set_ == nullptr, this is dummy CFD and not in
  523. // ColumnFamilySet
  524. column_family_set_->RemoveColumnFamily(this);
  525. }
  526. if (current_ != nullptr) {
  527. current_->Unref();
  528. }
  529. // It would be wrong if this ColumnFamilyData is in flush_queue_ or
  530. // compaction_queue_ and we destroyed it
  531. assert(!queued_for_flush_);
  532. assert(!queued_for_compaction_);
  533. assert(super_version_ == nullptr);
  534. if (dummy_versions_ != nullptr) {
  535. // List must be empty
  536. assert(dummy_versions_->TEST_Next() == dummy_versions_);
  537. bool deleted __attribute__((__unused__));
  538. deleted = dummy_versions_->Unref();
  539. assert(deleted);
  540. }
  541. if (mem_ != nullptr) {
  542. delete mem_->Unref();
  543. }
  544. autovector<MemTable*> to_delete;
  545. imm_.current()->Unref(&to_delete);
  546. for (MemTable* m : to_delete) {
  547. delete m;
  548. }
  549. }
  550. bool ColumnFamilyData::UnrefAndTryDelete() {
  551. int old_refs = refs_.fetch_sub(1);
  552. assert(old_refs > 0);
  553. if (old_refs == 1) {
  554. assert(super_version_ == nullptr);
  555. delete this;
  556. return true;
  557. }
  558. if (old_refs == 2 && super_version_ != nullptr) {
  559. // Only the super_version_ holds me
  560. SuperVersion* sv = super_version_;
  561. super_version_ = nullptr;
  562. // Release SuperVersion reference kept in ThreadLocalPtr.
  563. // This must be done outside of mutex_ since unref handler can lock mutex.
  564. sv->db_mutex->Unlock();
  565. local_sv_.reset();
  566. sv->db_mutex->Lock();
  567. if (sv->Unref()) {
  568. // May delete this ColumnFamilyData after calling Cleanup()
  569. sv->Cleanup();
  570. delete sv;
  571. return true;
  572. }
  573. }
  574. return false;
  575. }
  576. void ColumnFamilyData::SetDropped() {
  577. // can't drop default CF
  578. assert(id_ != 0);
  579. dropped_ = true;
  580. write_controller_token_.reset();
  581. // remove from column_family_set
  582. column_family_set_->RemoveColumnFamily(this);
  583. }
  584. ColumnFamilyOptions ColumnFamilyData::GetLatestCFOptions() const {
  585. return BuildColumnFamilyOptions(initial_cf_options_, mutable_cf_options_);
  586. }
  587. uint64_t ColumnFamilyData::OldestLogToKeep() {
  588. auto current_log = GetLogNumber();
  589. if (allow_2pc_) {
  590. autovector<MemTable*> empty_list;
  591. auto imm_prep_log =
  592. imm()->PrecomputeMinLogContainingPrepSection(empty_list);
  593. auto mem_prep_log = mem()->GetMinLogContainingPrepSection();
  594. if (imm_prep_log > 0 && imm_prep_log < current_log) {
  595. current_log = imm_prep_log;
  596. }
  597. if (mem_prep_log > 0 && mem_prep_log < current_log) {
  598. current_log = mem_prep_log;
  599. }
  600. }
  601. return current_log;
  602. }
  603. const double kIncSlowdownRatio = 0.8;
  604. const double kDecSlowdownRatio = 1 / kIncSlowdownRatio;
  605. const double kNearStopSlowdownRatio = 0.6;
  606. const double kDelayRecoverSlowdownRatio = 1.4;
  607. namespace {
  608. // If penalize_stop is true, we further reduce slowdown rate.
  609. std::unique_ptr<WriteControllerToken> SetupDelay(
  610. WriteController* write_controller, uint64_t compaction_needed_bytes,
  611. uint64_t prev_compaction_need_bytes, bool penalize_stop,
  612. bool auto_comapctions_disabled) {
  613. const uint64_t kMinWriteRate = 16 * 1024u; // Minimum write rate 16KB/s.
  614. uint64_t max_write_rate = write_controller->max_delayed_write_rate();
  615. uint64_t write_rate = write_controller->delayed_write_rate();
  616. if (auto_comapctions_disabled) {
  617. // When auto compaction is disabled, always use the value user gave.
  618. write_rate = max_write_rate;
  619. } else if (write_controller->NeedsDelay() && max_write_rate > kMinWriteRate) {
  620. // If user gives rate less than kMinWriteRate, don't adjust it.
  621. //
  622. // If already delayed, need to adjust based on previous compaction debt.
  623. // When there are two or more column families require delay, we always
  624. // increase or reduce write rate based on information for one single
  625. // column family. It is likely to be OK but we can improve if there is a
  626. // problem.
  627. // Ignore compaction_needed_bytes = 0 case because compaction_needed_bytes
  628. // is only available in level-based compaction
  629. //
  630. // If the compaction debt stays the same as previously, we also further slow
  631. // down. It usually means a mem table is full. It's mainly for the case
  632. // where both of flush and compaction are much slower than the speed we
  633. // insert to mem tables, so we need to actively slow down before we get
  634. // feedback signal from compaction and flushes to avoid the full stop
  635. // because of hitting the max write buffer number.
  636. //
  637. // If DB just falled into the stop condition, we need to further reduce
  638. // the write rate to avoid the stop condition.
  639. if (penalize_stop) {
  640. // Penalize the near stop or stop condition by more aggressive slowdown.
  641. // This is to provide the long term slowdown increase signal.
  642. // The penalty is more than the reward of recovering to the normal
  643. // condition.
  644. write_rate = static_cast<uint64_t>(static_cast<double>(write_rate) *
  645. kNearStopSlowdownRatio);
  646. if (write_rate < kMinWriteRate) {
  647. write_rate = kMinWriteRate;
  648. }
  649. } else if (prev_compaction_need_bytes > 0 &&
  650. prev_compaction_need_bytes <= compaction_needed_bytes) {
  651. write_rate = static_cast<uint64_t>(static_cast<double>(write_rate) *
  652. kIncSlowdownRatio);
  653. if (write_rate < kMinWriteRate) {
  654. write_rate = kMinWriteRate;
  655. }
  656. } else if (prev_compaction_need_bytes > compaction_needed_bytes) {
  657. // We are speeding up by ratio of kSlowdownRatio when we have paid
  658. // compaction debt. But we'll never speed up to faster than the write rate
  659. // given by users.
  660. write_rate = static_cast<uint64_t>(static_cast<double>(write_rate) *
  661. kDecSlowdownRatio);
  662. if (write_rate > max_write_rate) {
  663. write_rate = max_write_rate;
  664. }
  665. }
  666. }
  667. return write_controller->GetDelayToken(write_rate);
  668. }
  669. int GetL0ThresholdSpeedupCompaction(int level0_file_num_compaction_trigger,
  670. int level0_slowdown_writes_trigger) {
  671. // SanitizeOptions() ensures it.
  672. assert(level0_file_num_compaction_trigger <= level0_slowdown_writes_trigger);
  673. if (level0_file_num_compaction_trigger < 0) {
  674. return std::numeric_limits<int>::max();
  675. }
  676. const int64_t twice_level0_trigger =
  677. static_cast<int64_t>(level0_file_num_compaction_trigger) * 2;
  678. const int64_t one_fourth_trigger_slowdown =
  679. static_cast<int64_t>(level0_file_num_compaction_trigger) +
  680. ((level0_slowdown_writes_trigger - level0_file_num_compaction_trigger) /
  681. 4);
  682. assert(twice_level0_trigger >= 0);
  683. assert(one_fourth_trigger_slowdown >= 0);
  684. // 1/4 of the way between L0 compaction trigger threshold and slowdown
  685. // condition.
  686. // Or twice as compaction trigger, if it is smaller.
  687. int64_t res = std::min(twice_level0_trigger, one_fourth_trigger_slowdown);
  688. if (res >= port::kMaxInt32) {
  689. return port::kMaxInt32;
  690. } else {
  691. // res fits in int
  692. return static_cast<int>(res);
  693. }
  694. }
  695. } // namespace
  696. std::pair<WriteStallCondition, ColumnFamilyData::WriteStallCause>
  697. ColumnFamilyData::GetWriteStallConditionAndCause(
  698. int num_unflushed_memtables, int num_l0_files,
  699. uint64_t num_compaction_needed_bytes,
  700. const MutableCFOptions& mutable_cf_options) {
  701. if (num_unflushed_memtables >= mutable_cf_options.max_write_buffer_number) {
  702. return {WriteStallCondition::kStopped, WriteStallCause::kMemtableLimit};
  703. } else if (!mutable_cf_options.disable_auto_compactions &&
  704. num_l0_files >= mutable_cf_options.level0_stop_writes_trigger) {
  705. return {WriteStallCondition::kStopped, WriteStallCause::kL0FileCountLimit};
  706. } else if (!mutable_cf_options.disable_auto_compactions &&
  707. mutable_cf_options.hard_pending_compaction_bytes_limit > 0 &&
  708. num_compaction_needed_bytes >=
  709. mutable_cf_options.hard_pending_compaction_bytes_limit) {
  710. return {WriteStallCondition::kStopped,
  711. WriteStallCause::kPendingCompactionBytes};
  712. } else if (mutable_cf_options.max_write_buffer_number > 3 &&
  713. num_unflushed_memtables >=
  714. mutable_cf_options.max_write_buffer_number - 1) {
  715. return {WriteStallCondition::kDelayed, WriteStallCause::kMemtableLimit};
  716. } else if (!mutable_cf_options.disable_auto_compactions &&
  717. mutable_cf_options.level0_slowdown_writes_trigger >= 0 &&
  718. num_l0_files >=
  719. mutable_cf_options.level0_slowdown_writes_trigger) {
  720. return {WriteStallCondition::kDelayed, WriteStallCause::kL0FileCountLimit};
  721. } else if (!mutable_cf_options.disable_auto_compactions &&
  722. mutable_cf_options.soft_pending_compaction_bytes_limit > 0 &&
  723. num_compaction_needed_bytes >=
  724. mutable_cf_options.soft_pending_compaction_bytes_limit) {
  725. return {WriteStallCondition::kDelayed,
  726. WriteStallCause::kPendingCompactionBytes};
  727. }
  728. return {WriteStallCondition::kNormal, WriteStallCause::kNone};
  729. }
  730. WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
  731. const MutableCFOptions& mutable_cf_options) {
  732. auto write_stall_condition = WriteStallCondition::kNormal;
  733. if (current_ != nullptr) {
  734. auto* vstorage = current_->storage_info();
  735. auto write_controller = column_family_set_->write_controller_;
  736. uint64_t compaction_needed_bytes =
  737. vstorage->estimated_compaction_needed_bytes();
  738. auto write_stall_condition_and_cause = GetWriteStallConditionAndCause(
  739. imm()->NumNotFlushed(), vstorage->l0_delay_trigger_count(),
  740. vstorage->estimated_compaction_needed_bytes(), mutable_cf_options);
  741. write_stall_condition = write_stall_condition_and_cause.first;
  742. auto write_stall_cause = write_stall_condition_and_cause.second;
  743. bool was_stopped = write_controller->IsStopped();
  744. bool needed_delay = write_controller->NeedsDelay();
  745. if (write_stall_condition == WriteStallCondition::kStopped &&
  746. write_stall_cause == WriteStallCause::kMemtableLimit) {
  747. write_controller_token_ = write_controller->GetStopToken();
  748. internal_stats_->AddCFStats(InternalStats::MEMTABLE_LIMIT_STOPS, 1);
  749. ROCKS_LOG_WARN(
  750. ioptions_.info_log,
  751. "[%s] Stopping writes because we have %d immutable memtables "
  752. "(waiting for flush), max_write_buffer_number is set to %d",
  753. name_.c_str(), imm()->NumNotFlushed(),
  754. mutable_cf_options.max_write_buffer_number);
  755. } else if (write_stall_condition == WriteStallCondition::kStopped &&
  756. write_stall_cause == WriteStallCause::kL0FileCountLimit) {
  757. write_controller_token_ = write_controller->GetStopToken();
  758. internal_stats_->AddCFStats(InternalStats::L0_FILE_COUNT_LIMIT_STOPS, 1);
  759. if (compaction_picker_->IsLevel0CompactionInProgress()) {
  760. internal_stats_->AddCFStats(
  761. InternalStats::LOCKED_L0_FILE_COUNT_LIMIT_STOPS, 1);
  762. }
  763. ROCKS_LOG_WARN(ioptions_.info_log,
  764. "[%s] Stopping writes because we have %d level-0 files",
  765. name_.c_str(), vstorage->l0_delay_trigger_count());
  766. } else if (write_stall_condition == WriteStallCondition::kStopped &&
  767. write_stall_cause == WriteStallCause::kPendingCompactionBytes) {
  768. write_controller_token_ = write_controller->GetStopToken();
  769. internal_stats_->AddCFStats(
  770. InternalStats::PENDING_COMPACTION_BYTES_LIMIT_STOPS, 1);
  771. ROCKS_LOG_WARN(
  772. ioptions_.info_log,
  773. "[%s] Stopping writes because of estimated pending compaction "
  774. "bytes %" PRIu64,
  775. name_.c_str(), compaction_needed_bytes);
  776. } else if (write_stall_condition == WriteStallCondition::kDelayed &&
  777. write_stall_cause == WriteStallCause::kMemtableLimit) {
  778. write_controller_token_ =
  779. SetupDelay(write_controller, compaction_needed_bytes,
  780. prev_compaction_needed_bytes_, was_stopped,
  781. mutable_cf_options.disable_auto_compactions);
  782. internal_stats_->AddCFStats(InternalStats::MEMTABLE_LIMIT_SLOWDOWNS, 1);
  783. ROCKS_LOG_WARN(
  784. ioptions_.info_log,
  785. "[%s] Stalling writes because we have %d immutable memtables "
  786. "(waiting for flush), max_write_buffer_number is set to %d "
  787. "rate %" PRIu64,
  788. name_.c_str(), imm()->NumNotFlushed(),
  789. mutable_cf_options.max_write_buffer_number,
  790. write_controller->delayed_write_rate());
  791. } else if (write_stall_condition == WriteStallCondition::kDelayed &&
  792. write_stall_cause == WriteStallCause::kL0FileCountLimit) {
  793. // L0 is the last two files from stopping.
  794. bool near_stop = vstorage->l0_delay_trigger_count() >=
  795. mutable_cf_options.level0_stop_writes_trigger - 2;
  796. write_controller_token_ =
  797. SetupDelay(write_controller, compaction_needed_bytes,
  798. prev_compaction_needed_bytes_, was_stopped || near_stop,
  799. mutable_cf_options.disable_auto_compactions);
  800. internal_stats_->AddCFStats(InternalStats::L0_FILE_COUNT_LIMIT_SLOWDOWNS,
  801. 1);
  802. if (compaction_picker_->IsLevel0CompactionInProgress()) {
  803. internal_stats_->AddCFStats(
  804. InternalStats::LOCKED_L0_FILE_COUNT_LIMIT_SLOWDOWNS, 1);
  805. }
  806. ROCKS_LOG_WARN(ioptions_.info_log,
  807. "[%s] Stalling writes because we have %d level-0 files "
  808. "rate %" PRIu64,
  809. name_.c_str(), vstorage->l0_delay_trigger_count(),
  810. write_controller->delayed_write_rate());
  811. } else if (write_stall_condition == WriteStallCondition::kDelayed &&
  812. write_stall_cause == WriteStallCause::kPendingCompactionBytes) {
  813. // If the distance to hard limit is less than 1/4 of the gap between soft
  814. // and
  815. // hard bytes limit, we think it is near stop and speed up the slowdown.
  816. bool near_stop =
  817. mutable_cf_options.hard_pending_compaction_bytes_limit > 0 &&
  818. (compaction_needed_bytes -
  819. mutable_cf_options.soft_pending_compaction_bytes_limit) >
  820. 3 * (mutable_cf_options.hard_pending_compaction_bytes_limit -
  821. mutable_cf_options.soft_pending_compaction_bytes_limit) /
  822. 4;
  823. write_controller_token_ =
  824. SetupDelay(write_controller, compaction_needed_bytes,
  825. prev_compaction_needed_bytes_, was_stopped || near_stop,
  826. mutable_cf_options.disable_auto_compactions);
  827. internal_stats_->AddCFStats(
  828. InternalStats::PENDING_COMPACTION_BYTES_LIMIT_SLOWDOWNS, 1);
  829. ROCKS_LOG_WARN(
  830. ioptions_.info_log,
  831. "[%s] Stalling writes because of estimated pending compaction "
  832. "bytes %" PRIu64 " rate %" PRIu64,
  833. name_.c_str(), vstorage->estimated_compaction_needed_bytes(),
  834. write_controller->delayed_write_rate());
  835. } else {
  836. assert(write_stall_condition == WriteStallCondition::kNormal);
  837. if (vstorage->l0_delay_trigger_count() >=
  838. GetL0ThresholdSpeedupCompaction(
  839. mutable_cf_options.level0_file_num_compaction_trigger,
  840. mutable_cf_options.level0_slowdown_writes_trigger)) {
  841. write_controller_token_ =
  842. write_controller->GetCompactionPressureToken();
  843. ROCKS_LOG_INFO(
  844. ioptions_.info_log,
  845. "[%s] Increasing compaction threads because we have %d level-0 "
  846. "files ",
  847. name_.c_str(), vstorage->l0_delay_trigger_count());
  848. } else if (vstorage->estimated_compaction_needed_bytes() >=
  849. mutable_cf_options.soft_pending_compaction_bytes_limit / 4) {
  850. // Increase compaction threads if bytes needed for compaction exceeds
  851. // 1/4 of threshold for slowing down.
  852. // If soft pending compaction byte limit is not set, always speed up
  853. // compaction.
  854. write_controller_token_ =
  855. write_controller->GetCompactionPressureToken();
  856. if (mutable_cf_options.soft_pending_compaction_bytes_limit > 0) {
  857. ROCKS_LOG_INFO(
  858. ioptions_.info_log,
  859. "[%s] Increasing compaction threads because of estimated pending "
  860. "compaction "
  861. "bytes %" PRIu64,
  862. name_.c_str(), vstorage->estimated_compaction_needed_bytes());
  863. }
  864. } else {
  865. write_controller_token_.reset();
  866. }
  867. // If the DB recovers from delay conditions, we reward with reducing
  868. // double the slowdown ratio. This is to balance the long term slowdown
  869. // increase signal.
  870. if (needed_delay) {
  871. uint64_t write_rate = write_controller->delayed_write_rate();
  872. write_controller->set_delayed_write_rate(static_cast<uint64_t>(
  873. static_cast<double>(write_rate) * kDelayRecoverSlowdownRatio));
  874. // Set the low pri limit to be 1/4 the delayed write rate.
  875. // Note we don't reset this value even after delay condition is relased.
  876. // Low-pri rate will continue to apply if there is a compaction
  877. // pressure.
  878. write_controller->low_pri_rate_limiter()->SetBytesPerSecond(write_rate /
  879. 4);
  880. }
  881. }
  882. prev_compaction_needed_bytes_ = compaction_needed_bytes;
  883. }
  884. return write_stall_condition;
  885. }
  886. const FileOptions* ColumnFamilyData::soptions() const {
  887. return &(column_family_set_->file_options_);
  888. }
  889. void ColumnFamilyData::SetCurrent(Version* current_version) {
  890. current_ = current_version;
  891. }
  892. uint64_t ColumnFamilyData::GetNumLiveVersions() const {
  893. return VersionSet::GetNumLiveVersions(dummy_versions_);
  894. }
  895. uint64_t ColumnFamilyData::GetTotalSstFilesSize() const {
  896. return VersionSet::GetTotalSstFilesSize(dummy_versions_);
  897. }
  898. uint64_t ColumnFamilyData::GetLiveSstFilesSize() const {
  899. return current_->GetSstFilesSize();
  900. }
  901. MemTable* ColumnFamilyData::ConstructNewMemtable(
  902. const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq) {
  903. return new MemTable(internal_comparator_, ioptions_, mutable_cf_options,
  904. write_buffer_manager_, earliest_seq, id_);
  905. }
  906. void ColumnFamilyData::CreateNewMemtable(
  907. const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq) {
  908. if (mem_ != nullptr) {
  909. delete mem_->Unref();
  910. }
  911. SetMemtable(ConstructNewMemtable(mutable_cf_options, earliest_seq));
  912. mem_->Ref();
  913. }
  914. bool ColumnFamilyData::NeedsCompaction() const {
  915. return compaction_picker_->NeedsCompaction(current_->storage_info());
  916. }
  917. Compaction* ColumnFamilyData::PickCompaction(
  918. const MutableCFOptions& mutable_options, LogBuffer* log_buffer) {
  919. SequenceNumber earliest_mem_seqno =
  920. std::min(mem_->GetEarliestSequenceNumber(),
  921. imm_.current()->GetEarliestSequenceNumber(false));
  922. auto* result = compaction_picker_->PickCompaction(
  923. GetName(), mutable_options, current_->storage_info(), log_buffer,
  924. earliest_mem_seqno);
  925. if (result != nullptr) {
  926. result->SetInputVersion(current_);
  927. }
  928. return result;
  929. }
  930. bool ColumnFamilyData::RangeOverlapWithCompaction(
  931. const Slice& smallest_user_key, const Slice& largest_user_key,
  932. int level) const {
  933. return compaction_picker_->RangeOverlapWithCompaction(
  934. smallest_user_key, largest_user_key, level);
  935. }
  936. Status ColumnFamilyData::RangesOverlapWithMemtables(
  937. const autovector<Range>& ranges, SuperVersion* super_version,
  938. bool* overlap) {
  939. assert(overlap != nullptr);
  940. *overlap = false;
  941. // Create an InternalIterator over all unflushed memtables
  942. Arena arena;
  943. ReadOptions read_opts;
  944. read_opts.total_order_seek = true;
  945. MergeIteratorBuilder merge_iter_builder(&internal_comparator_, &arena);
  946. merge_iter_builder.AddIterator(
  947. super_version->mem->NewIterator(read_opts, &arena));
  948. super_version->imm->AddIterators(read_opts, &merge_iter_builder);
  949. ScopedArenaIterator memtable_iter(merge_iter_builder.Finish());
  950. auto read_seq = super_version->current->version_set()->LastSequence();
  951. ReadRangeDelAggregator range_del_agg(&internal_comparator_, read_seq);
  952. auto* active_range_del_iter =
  953. super_version->mem->NewRangeTombstoneIterator(read_opts, read_seq);
  954. range_del_agg.AddTombstones(
  955. std::unique_ptr<FragmentedRangeTombstoneIterator>(active_range_del_iter));
  956. super_version->imm->AddRangeTombstoneIterators(read_opts, nullptr /* arena */,
  957. &range_del_agg);
  958. Status status;
  959. for (size_t i = 0; i < ranges.size() && status.ok() && !*overlap; ++i) {
  960. auto* vstorage = super_version->current->storage_info();
  961. auto* ucmp = vstorage->InternalComparator()->user_comparator();
  962. InternalKey range_start(ranges[i].start, kMaxSequenceNumber,
  963. kValueTypeForSeek);
  964. memtable_iter->Seek(range_start.Encode());
  965. status = memtable_iter->status();
  966. ParsedInternalKey seek_result;
  967. if (status.ok()) {
  968. if (memtable_iter->Valid() &&
  969. !ParseInternalKey(memtable_iter->key(), &seek_result)) {
  970. status = Status::Corruption("DB have corrupted keys");
  971. }
  972. }
  973. if (status.ok()) {
  974. if (memtable_iter->Valid() &&
  975. ucmp->Compare(seek_result.user_key, ranges[i].limit) <= 0) {
  976. *overlap = true;
  977. } else if (range_del_agg.IsRangeOverlapped(ranges[i].start,
  978. ranges[i].limit)) {
  979. *overlap = true;
  980. }
  981. }
  982. }
  983. return status;
  984. }
  985. const int ColumnFamilyData::kCompactAllLevels = -1;
  986. const int ColumnFamilyData::kCompactToBaseLevel = -2;
  987. Compaction* ColumnFamilyData::CompactRange(
  988. const MutableCFOptions& mutable_cf_options, int input_level,
  989. int output_level, const CompactRangeOptions& compact_range_options,
  990. const InternalKey* begin, const InternalKey* end,
  991. InternalKey** compaction_end, bool* conflict,
  992. uint64_t max_file_num_to_ignore) {
  993. auto* result = compaction_picker_->CompactRange(
  994. GetName(), mutable_cf_options, current_->storage_info(), input_level,
  995. output_level, compact_range_options, begin, end, compaction_end, conflict,
  996. max_file_num_to_ignore);
  997. if (result != nullptr) {
  998. result->SetInputVersion(current_);
  999. }
  1000. return result;
  1001. }
  1002. SuperVersion* ColumnFamilyData::GetReferencedSuperVersion(DBImpl* db) {
  1003. SuperVersion* sv = GetThreadLocalSuperVersion(db);
  1004. sv->Ref();
  1005. if (!ReturnThreadLocalSuperVersion(sv)) {
  1006. // This Unref() corresponds to the Ref() in GetThreadLocalSuperVersion()
  1007. // when the thread-local pointer was populated. So, the Ref() earlier in
  1008. // this function still prevents the returned SuperVersion* from being
  1009. // deleted out from under the caller.
  1010. sv->Unref();
  1011. }
  1012. return sv;
  1013. }
  1014. SuperVersion* ColumnFamilyData::GetThreadLocalSuperVersion(DBImpl* db) {
  1015. // The SuperVersion is cached in thread local storage to avoid acquiring
  1016. // mutex when SuperVersion does not change since the last use. When a new
  1017. // SuperVersion is installed, the compaction or flush thread cleans up
  1018. // cached SuperVersion in all existing thread local storage. To avoid
  1019. // acquiring mutex for this operation, we use atomic Swap() on the thread
  1020. // local pointer to guarantee exclusive access. If the thread local pointer
  1021. // is being used while a new SuperVersion is installed, the cached
  1022. // SuperVersion can become stale. In that case, the background thread would
  1023. // have swapped in kSVObsolete. We re-check the value at when returning
  1024. // SuperVersion back to thread local, with an atomic compare and swap.
  1025. // The superversion will need to be released if detected to be stale.
  1026. void* ptr = local_sv_->Swap(SuperVersion::kSVInUse);
  1027. // Invariant:
  1028. // (1) Scrape (always) installs kSVObsolete in ThreadLocal storage
  1029. // (2) the Swap above (always) installs kSVInUse, ThreadLocal storage
  1030. // should only keep kSVInUse before ReturnThreadLocalSuperVersion call
  1031. // (if no Scrape happens).
  1032. assert(ptr != SuperVersion::kSVInUse);
  1033. SuperVersion* sv = static_cast<SuperVersion*>(ptr);
  1034. if (sv == SuperVersion::kSVObsolete ||
  1035. sv->version_number != super_version_number_.load()) {
  1036. RecordTick(ioptions_.statistics, NUMBER_SUPERVERSION_ACQUIRES);
  1037. SuperVersion* sv_to_delete = nullptr;
  1038. if (sv && sv->Unref()) {
  1039. RecordTick(ioptions_.statistics, NUMBER_SUPERVERSION_CLEANUPS);
  1040. db->mutex()->Lock();
  1041. // NOTE: underlying resources held by superversion (sst files) might
  1042. // not be released until the next background job.
  1043. sv->Cleanup();
  1044. if (db->immutable_db_options().avoid_unnecessary_blocking_io) {
  1045. db->AddSuperVersionsToFreeQueue(sv);
  1046. db->SchedulePurge();
  1047. } else {
  1048. sv_to_delete = sv;
  1049. }
  1050. } else {
  1051. db->mutex()->Lock();
  1052. }
  1053. sv = super_version_->Ref();
  1054. db->mutex()->Unlock();
  1055. delete sv_to_delete;
  1056. }
  1057. assert(sv != nullptr);
  1058. return sv;
  1059. }
  1060. bool ColumnFamilyData::ReturnThreadLocalSuperVersion(SuperVersion* sv) {
  1061. assert(sv != nullptr);
  1062. // Put the SuperVersion back
  1063. void* expected = SuperVersion::kSVInUse;
  1064. if (local_sv_->CompareAndSwap(static_cast<void*>(sv), expected)) {
  1065. // When we see kSVInUse in the ThreadLocal, we are sure ThreadLocal
  1066. // storage has not been altered and no Scrape has happened. The
  1067. // SuperVersion is still current.
  1068. return true;
  1069. } else {
  1070. // ThreadLocal scrape happened in the process of this GetImpl call (after
  1071. // thread local Swap() at the beginning and before CompareAndSwap()).
  1072. // This means the SuperVersion it holds is obsolete.
  1073. assert(expected == SuperVersion::kSVObsolete);
  1074. }
  1075. return false;
  1076. }
  1077. void ColumnFamilyData::InstallSuperVersion(
  1078. SuperVersionContext* sv_context, InstrumentedMutex* db_mutex) {
  1079. db_mutex->AssertHeld();
  1080. return InstallSuperVersion(sv_context, db_mutex, mutable_cf_options_);
  1081. }
  1082. void ColumnFamilyData::InstallSuperVersion(
  1083. SuperVersionContext* sv_context, InstrumentedMutex* db_mutex,
  1084. const MutableCFOptions& mutable_cf_options) {
  1085. SuperVersion* new_superversion = sv_context->new_superversion.release();
  1086. new_superversion->db_mutex = db_mutex;
  1087. new_superversion->mutable_cf_options = mutable_cf_options;
  1088. new_superversion->Init(this, mem_, imm_.current(), current_);
  1089. SuperVersion* old_superversion = super_version_;
  1090. super_version_ = new_superversion;
  1091. ++super_version_number_;
  1092. super_version_->version_number = super_version_number_;
  1093. super_version_->write_stall_condition =
  1094. RecalculateWriteStallConditions(mutable_cf_options);
  1095. if (old_superversion != nullptr) {
  1096. // Reset SuperVersions cached in thread local storage.
  1097. // This should be done before old_superversion->Unref(). That's to ensure
  1098. // that local_sv_ never holds the last reference to SuperVersion, since
  1099. // it has no means to safely do SuperVersion cleanup.
  1100. ResetThreadLocalSuperVersions();
  1101. if (old_superversion->mutable_cf_options.write_buffer_size !=
  1102. mutable_cf_options.write_buffer_size) {
  1103. mem_->UpdateWriteBufferSize(mutable_cf_options.write_buffer_size);
  1104. }
  1105. if (old_superversion->write_stall_condition !=
  1106. new_superversion->write_stall_condition) {
  1107. sv_context->PushWriteStallNotification(
  1108. old_superversion->write_stall_condition,
  1109. new_superversion->write_stall_condition, GetName(), ioptions());
  1110. }
  1111. if (old_superversion->Unref()) {
  1112. old_superversion->Cleanup();
  1113. sv_context->superversions_to_free.push_back(old_superversion);
  1114. }
  1115. }
  1116. }
  1117. void ColumnFamilyData::ResetThreadLocalSuperVersions() {
  1118. autovector<void*> sv_ptrs;
  1119. local_sv_->Scrape(&sv_ptrs, SuperVersion::kSVObsolete);
  1120. for (auto ptr : sv_ptrs) {
  1121. assert(ptr);
  1122. if (ptr == SuperVersion::kSVInUse) {
  1123. continue;
  1124. }
  1125. auto sv = static_cast<SuperVersion*>(ptr);
  1126. bool was_last_ref __attribute__((__unused__));
  1127. was_last_ref = sv->Unref();
  1128. // sv couldn't have been the last reference because
  1129. // ResetThreadLocalSuperVersions() is called before
  1130. // unref'ing super_version_.
  1131. assert(!was_last_ref);
  1132. }
  1133. }
  1134. Status ColumnFamilyData::ValidateOptions(
  1135. const DBOptions& db_options, const ColumnFamilyOptions& cf_options) {
  1136. Status s;
  1137. s = CheckCompressionSupported(cf_options);
  1138. if (s.ok() && db_options.allow_concurrent_memtable_write) {
  1139. s = CheckConcurrentWritesSupported(cf_options);
  1140. }
  1141. if (s.ok() && db_options.unordered_write &&
  1142. cf_options.max_successive_merges != 0) {
  1143. s = Status::InvalidArgument(
  1144. "max_successive_merges > 0 is incompatible with unordered_write");
  1145. }
  1146. if (s.ok()) {
  1147. s = CheckCFPathsSupported(db_options, cf_options);
  1148. }
  1149. if (!s.ok()) {
  1150. return s;
  1151. }
  1152. if (cf_options.ttl > 0 && cf_options.ttl != kDefaultTtl) {
  1153. if (cf_options.table_factory->Name() != BlockBasedTableFactory().Name()) {
  1154. return Status::NotSupported(
  1155. "TTL is only supported in Block-Based Table format. ");
  1156. }
  1157. }
  1158. if (cf_options.periodic_compaction_seconds > 0 &&
  1159. cf_options.periodic_compaction_seconds != kDefaultPeriodicCompSecs) {
  1160. if (cf_options.table_factory->Name() != BlockBasedTableFactory().Name()) {
  1161. return Status::NotSupported(
  1162. "Periodic Compaction is only supported in "
  1163. "Block-Based Table format. ");
  1164. }
  1165. }
  1166. return s;
  1167. }
  1168. #ifndef ROCKSDB_LITE
  1169. Status ColumnFamilyData::SetOptions(
  1170. const DBOptions& db_options,
  1171. const std::unordered_map<std::string, std::string>& options_map) {
  1172. MutableCFOptions new_mutable_cf_options;
  1173. Status s =
  1174. GetMutableOptionsFromStrings(mutable_cf_options_, options_map,
  1175. ioptions_.info_log, &new_mutable_cf_options);
  1176. if (s.ok()) {
  1177. ColumnFamilyOptions cf_options =
  1178. BuildColumnFamilyOptions(initial_cf_options_, new_mutable_cf_options);
  1179. s = ValidateOptions(db_options, cf_options);
  1180. }
  1181. if (s.ok()) {
  1182. mutable_cf_options_ = new_mutable_cf_options;
  1183. mutable_cf_options_.RefreshDerivedOptions(ioptions_);
  1184. }
  1185. return s;
  1186. }
  1187. #endif // ROCKSDB_LITE
  1188. // REQUIRES: DB mutex held
  1189. Env::WriteLifeTimeHint ColumnFamilyData::CalculateSSTWriteHint(int level) {
  1190. if (initial_cf_options_.compaction_style != kCompactionStyleLevel) {
  1191. return Env::WLTH_NOT_SET;
  1192. }
  1193. if (level == 0) {
  1194. return Env::WLTH_MEDIUM;
  1195. }
  1196. int base_level = current_->storage_info()->base_level();
  1197. // L1: medium, L2: long, ...
  1198. if (level - base_level >= 2) {
  1199. return Env::WLTH_EXTREME;
  1200. } else if (level < base_level) {
  1201. // There is no restriction which prevents level passed in to be smaller
  1202. // than base_level.
  1203. return Env::WLTH_MEDIUM;
  1204. }
  1205. return static_cast<Env::WriteLifeTimeHint>(level - base_level +
  1206. static_cast<int>(Env::WLTH_MEDIUM));
  1207. }
  1208. Status ColumnFamilyData::AddDirectories(
  1209. std::map<std::string, std::shared_ptr<Directory>>* created_dirs) {
  1210. Status s;
  1211. assert(created_dirs != nullptr);
  1212. assert(data_dirs_.empty());
  1213. for (auto& p : ioptions_.cf_paths) {
  1214. auto existing_dir = created_dirs->find(p.path);
  1215. if (existing_dir == created_dirs->end()) {
  1216. std::unique_ptr<Directory> path_directory;
  1217. s = DBImpl::CreateAndNewDirectory(ioptions_.env, p.path, &path_directory);
  1218. if (!s.ok()) {
  1219. return s;
  1220. }
  1221. assert(path_directory != nullptr);
  1222. data_dirs_.emplace_back(path_directory.release());
  1223. (*created_dirs)[p.path] = data_dirs_.back();
  1224. } else {
  1225. data_dirs_.emplace_back(existing_dir->second);
  1226. }
  1227. }
  1228. assert(data_dirs_.size() == ioptions_.cf_paths.size());
  1229. return s;
  1230. }
  1231. Directory* ColumnFamilyData::GetDataDir(size_t path_id) const {
  1232. if (data_dirs_.empty()) {
  1233. return nullptr;
  1234. }
  1235. assert(path_id < data_dirs_.size());
  1236. return data_dirs_[path_id].get();
  1237. }
  1238. ColumnFamilySet::ColumnFamilySet(const std::string& dbname,
  1239. const ImmutableDBOptions* db_options,
  1240. const FileOptions& file_options,
  1241. Cache* table_cache,
  1242. WriteBufferManager* write_buffer_manager,
  1243. WriteController* write_controller,
  1244. BlockCacheTracer* const block_cache_tracer)
  1245. : max_column_family_(0),
  1246. dummy_cfd_(new ColumnFamilyData(
  1247. 0, "", nullptr, nullptr, nullptr, ColumnFamilyOptions(), *db_options,
  1248. file_options, nullptr, block_cache_tracer)),
  1249. default_cfd_cache_(nullptr),
  1250. db_name_(dbname),
  1251. db_options_(db_options),
  1252. file_options_(file_options),
  1253. table_cache_(table_cache),
  1254. write_buffer_manager_(write_buffer_manager),
  1255. write_controller_(write_controller),
  1256. block_cache_tracer_(block_cache_tracer) {
  1257. // initialize linked list
  1258. dummy_cfd_->prev_ = dummy_cfd_;
  1259. dummy_cfd_->next_ = dummy_cfd_;
  1260. }
  1261. ColumnFamilySet::~ColumnFamilySet() {
  1262. while (column_family_data_.size() > 0) {
  1263. // cfd destructor will delete itself from column_family_data_
  1264. auto cfd = column_family_data_.begin()->second;
  1265. bool last_ref __attribute__((__unused__));
  1266. last_ref = cfd->UnrefAndTryDelete();
  1267. assert(last_ref);
  1268. }
  1269. bool dummy_last_ref __attribute__((__unused__));
  1270. dummy_last_ref = dummy_cfd_->UnrefAndTryDelete();
  1271. assert(dummy_last_ref);
  1272. }
  1273. ColumnFamilyData* ColumnFamilySet::GetDefault() const {
  1274. assert(default_cfd_cache_ != nullptr);
  1275. return default_cfd_cache_;
  1276. }
  1277. ColumnFamilyData* ColumnFamilySet::GetColumnFamily(uint32_t id) const {
  1278. auto cfd_iter = column_family_data_.find(id);
  1279. if (cfd_iter != column_family_data_.end()) {
  1280. return cfd_iter->second;
  1281. } else {
  1282. return nullptr;
  1283. }
  1284. }
  1285. ColumnFamilyData* ColumnFamilySet::GetColumnFamily(const std::string& name)
  1286. const {
  1287. auto cfd_iter = column_families_.find(name);
  1288. if (cfd_iter != column_families_.end()) {
  1289. auto cfd = GetColumnFamily(cfd_iter->second);
  1290. assert(cfd != nullptr);
  1291. return cfd;
  1292. } else {
  1293. return nullptr;
  1294. }
  1295. }
  1296. uint32_t ColumnFamilySet::GetNextColumnFamilyID() {
  1297. return ++max_column_family_;
  1298. }
  1299. uint32_t ColumnFamilySet::GetMaxColumnFamily() { return max_column_family_; }
  1300. void ColumnFamilySet::UpdateMaxColumnFamily(uint32_t new_max_column_family) {
  1301. max_column_family_ = std::max(new_max_column_family, max_column_family_);
  1302. }
  1303. size_t ColumnFamilySet::NumberOfColumnFamilies() const {
  1304. return column_families_.size();
  1305. }
  1306. // under a DB mutex AND write thread
  1307. ColumnFamilyData* ColumnFamilySet::CreateColumnFamily(
  1308. const std::string& name, uint32_t id, Version* dummy_versions,
  1309. const ColumnFamilyOptions& options) {
  1310. assert(column_families_.find(name) == column_families_.end());
  1311. ColumnFamilyData* new_cfd = new ColumnFamilyData(
  1312. id, name, dummy_versions, table_cache_, write_buffer_manager_, options,
  1313. *db_options_, file_options_, this, block_cache_tracer_);
  1314. column_families_.insert({name, id});
  1315. column_family_data_.insert({id, new_cfd});
  1316. max_column_family_ = std::max(max_column_family_, id);
  1317. // add to linked list
  1318. new_cfd->next_ = dummy_cfd_;
  1319. auto prev = dummy_cfd_->prev_;
  1320. new_cfd->prev_ = prev;
  1321. prev->next_ = new_cfd;
  1322. dummy_cfd_->prev_ = new_cfd;
  1323. if (id == 0) {
  1324. default_cfd_cache_ = new_cfd;
  1325. }
  1326. return new_cfd;
  1327. }
  1328. // REQUIRES: DB mutex held
  1329. void ColumnFamilySet::FreeDeadColumnFamilies() {
  1330. autovector<ColumnFamilyData*> to_delete;
  1331. for (auto cfd = dummy_cfd_->next_; cfd != dummy_cfd_; cfd = cfd->next_) {
  1332. if (cfd->refs_.load(std::memory_order_relaxed) == 0) {
  1333. to_delete.push_back(cfd);
  1334. }
  1335. }
  1336. for (auto cfd : to_delete) {
  1337. // this is very rare, so it's not a problem that we do it under a mutex
  1338. delete cfd;
  1339. }
  1340. }
  1341. // under a DB mutex AND from a write thread
  1342. void ColumnFamilySet::RemoveColumnFamily(ColumnFamilyData* cfd) {
  1343. auto cfd_iter = column_family_data_.find(cfd->GetID());
  1344. assert(cfd_iter != column_family_data_.end());
  1345. column_family_data_.erase(cfd_iter);
  1346. column_families_.erase(cfd->GetName());
  1347. }
  1348. // under a DB mutex OR from a write thread
  1349. bool ColumnFamilyMemTablesImpl::Seek(uint32_t column_family_id) {
  1350. if (column_family_id == 0) {
  1351. // optimization for common case
  1352. current_ = column_family_set_->GetDefault();
  1353. } else {
  1354. current_ = column_family_set_->GetColumnFamily(column_family_id);
  1355. }
  1356. handle_.SetCFD(current_);
  1357. return current_ != nullptr;
  1358. }
  1359. uint64_t ColumnFamilyMemTablesImpl::GetLogNumber() const {
  1360. assert(current_ != nullptr);
  1361. return current_->GetLogNumber();
  1362. }
  1363. MemTable* ColumnFamilyMemTablesImpl::GetMemTable() const {
  1364. assert(current_ != nullptr);
  1365. return current_->mem();
  1366. }
  1367. ColumnFamilyHandle* ColumnFamilyMemTablesImpl::GetColumnFamilyHandle() {
  1368. assert(current_ != nullptr);
  1369. return &handle_;
  1370. }
  1371. uint32_t GetColumnFamilyID(ColumnFamilyHandle* column_family) {
  1372. uint32_t column_family_id = 0;
  1373. if (column_family != nullptr) {
  1374. auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
  1375. column_family_id = cfh->GetID();
  1376. }
  1377. return column_family_id;
  1378. }
  1379. const Comparator* GetColumnFamilyUserComparator(
  1380. ColumnFamilyHandle* column_family) {
  1381. if (column_family != nullptr) {
  1382. return column_family->GetComparator();
  1383. }
  1384. return nullptr;
  1385. }
  1386. } // namespace ROCKSDB_NAMESPACE