| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523 |
- // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
- // This source code is licensed under both the GPLv2 (found in the
- // COPYING file in the root directory) and Apache 2.0 License
- // (found in the LICENSE.Apache file in the root directory).
- //
- // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
- // Use of this source code is governed by a BSD-style license that can be
- // found in the LICENSE file. See the AUTHORS file for names of contributors.
- #include "db/column_family.h"
- #include <algorithm>
- #include <cinttypes>
- #include <limits>
- #include <string>
- #include <vector>
- #include "db/compaction/compaction_picker.h"
- #include "db/compaction/compaction_picker_fifo.h"
- #include "db/compaction/compaction_picker_level.h"
- #include "db/compaction/compaction_picker_universal.h"
- #include "db/db_impl/db_impl.h"
- #include "db/internal_stats.h"
- #include "db/job_context.h"
- #include "db/range_del_aggregator.h"
- #include "db/table_properties_collector.h"
- #include "db/version_set.h"
- #include "db/write_controller.h"
- #include "file/sst_file_manager_impl.h"
- #include "memtable/hash_skiplist_rep.h"
- #include "monitoring/thread_status_util.h"
- #include "options/options_helper.h"
- #include "port/port.h"
- #include "table/block_based/block_based_table_factory.h"
- #include "table/merging_iterator.h"
- #include "util/autovector.h"
- #include "util/compression.h"
- namespace ROCKSDB_NAMESPACE {
- ColumnFamilyHandleImpl::ColumnFamilyHandleImpl(
- ColumnFamilyData* column_family_data, DBImpl* db, InstrumentedMutex* mutex)
- : cfd_(column_family_data), db_(db), mutex_(mutex) {
- if (cfd_ != nullptr) {
- cfd_->Ref();
- }
- }
- ColumnFamilyHandleImpl::~ColumnFamilyHandleImpl() {
- if (cfd_ != nullptr) {
- #ifndef ROCKSDB_LITE
- for (auto& listener : cfd_->ioptions()->listeners) {
- listener->OnColumnFamilyHandleDeletionStarted(this);
- }
- #endif // ROCKSDB_LITE
- // Job id == 0 means that this is not our background process, but rather
- // user thread
- // Need to hold some shared pointers owned by the initial_cf_options
- // before final cleaning up finishes.
- ColumnFamilyOptions initial_cf_options_copy = cfd_->initial_cf_options();
- JobContext job_context(0);
- mutex_->Lock();
- bool dropped = cfd_->IsDropped();
- if (cfd_->UnrefAndTryDelete()) {
- if (dropped) {
- db_->FindObsoleteFiles(&job_context, false, true);
- }
- }
- mutex_->Unlock();
- if (job_context.HaveSomethingToDelete()) {
- bool defer_purge =
- db_->immutable_db_options().avoid_unnecessary_blocking_io;
- db_->PurgeObsoleteFiles(job_context, defer_purge);
- if (defer_purge) {
- mutex_->Lock();
- db_->SchedulePurge();
- mutex_->Unlock();
- }
- }
- job_context.Clean();
- }
- }
- uint32_t ColumnFamilyHandleImpl::GetID() const { return cfd()->GetID(); }
- const std::string& ColumnFamilyHandleImpl::GetName() const {
- return cfd()->GetName();
- }
- Status ColumnFamilyHandleImpl::GetDescriptor(ColumnFamilyDescriptor* desc) {
- #ifndef ROCKSDB_LITE
- // accessing mutable cf-options requires db mutex.
- InstrumentedMutexLock l(mutex_);
- *desc = ColumnFamilyDescriptor(cfd()->GetName(), cfd()->GetLatestCFOptions());
- return Status::OK();
- #else
- (void)desc;
- return Status::NotSupported();
- #endif // !ROCKSDB_LITE
- }
- const Comparator* ColumnFamilyHandleImpl::GetComparator() const {
- return cfd()->user_comparator();
- }
- void GetIntTblPropCollectorFactory(
- const ImmutableCFOptions& ioptions,
- std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
- int_tbl_prop_collector_factories) {
- auto& collector_factories = ioptions.table_properties_collector_factories;
- for (size_t i = 0; i < ioptions.table_properties_collector_factories.size();
- ++i) {
- assert(collector_factories[i]);
- int_tbl_prop_collector_factories->emplace_back(
- new UserKeyTablePropertiesCollectorFactory(collector_factories[i]));
- }
- }
- Status CheckCompressionSupported(const ColumnFamilyOptions& cf_options) {
- if (!cf_options.compression_per_level.empty()) {
- for (size_t level = 0; level < cf_options.compression_per_level.size();
- ++level) {
- if (!CompressionTypeSupported(cf_options.compression_per_level[level])) {
- return Status::InvalidArgument(
- "Compression type " +
- CompressionTypeToString(cf_options.compression_per_level[level]) +
- " is not linked with the binary.");
- }
- }
- } else {
- if (!CompressionTypeSupported(cf_options.compression)) {
- return Status::InvalidArgument(
- "Compression type " +
- CompressionTypeToString(cf_options.compression) +
- " is not linked with the binary.");
- }
- }
- if (cf_options.compression_opts.zstd_max_train_bytes > 0) {
- if (!ZSTD_TrainDictionarySupported()) {
- return Status::InvalidArgument(
- "zstd dictionary trainer cannot be used because ZSTD 1.1.3+ "
- "is not linked with the binary.");
- }
- if (cf_options.compression_opts.max_dict_bytes == 0) {
- return Status::InvalidArgument(
- "The dictionary size limit (`CompressionOptions::max_dict_bytes`) "
- "should be nonzero if we're using zstd's dictionary generator.");
- }
- }
- return Status::OK();
- }
- Status CheckConcurrentWritesSupported(const ColumnFamilyOptions& cf_options) {
- if (cf_options.inplace_update_support) {
- return Status::InvalidArgument(
- "In-place memtable updates (inplace_update_support) is not compatible "
- "with concurrent writes (allow_concurrent_memtable_write)");
- }
- if (!cf_options.memtable_factory->IsInsertConcurrentlySupported()) {
- return Status::InvalidArgument(
- "Memtable doesn't concurrent writes (allow_concurrent_memtable_write)");
- }
- return Status::OK();
- }
- Status CheckCFPathsSupported(const DBOptions& db_options,
- const ColumnFamilyOptions& cf_options) {
- // More than one cf_paths are supported only in universal
- // and level compaction styles. This function also checks the case
- // in which cf_paths is not specified, which results in db_paths
- // being used.
- if ((cf_options.compaction_style != kCompactionStyleUniversal) &&
- (cf_options.compaction_style != kCompactionStyleLevel)) {
- if (cf_options.cf_paths.size() > 1) {
- return Status::NotSupported(
- "More than one CF paths are only supported in "
- "universal and level compaction styles. ");
- } else if (cf_options.cf_paths.empty() &&
- db_options.db_paths.size() > 1) {
- return Status::NotSupported(
- "More than one DB paths are only supported in "
- "universal and level compaction styles. ");
- }
- }
- return Status::OK();
- }
- namespace {
- const uint64_t kDefaultTtl = 0xfffffffffffffffe;
- const uint64_t kDefaultPeriodicCompSecs = 0xfffffffffffffffe;
- }; // namespace
- ColumnFamilyOptions SanitizeOptions(const ImmutableDBOptions& db_options,
- const ColumnFamilyOptions& src) {
- ColumnFamilyOptions result = src;
- size_t clamp_max = std::conditional<
- sizeof(size_t) == 4, std::integral_constant<size_t, 0xffffffff>,
- std::integral_constant<uint64_t, 64ull << 30>>::type::value;
- ClipToRange(&result.write_buffer_size, ((size_t)64) << 10, clamp_max);
- // if user sets arena_block_size, we trust user to use this value. Otherwise,
- // calculate a proper value from writer_buffer_size;
- if (result.arena_block_size <= 0) {
- result.arena_block_size = result.write_buffer_size / 8;
- // Align up to 4k
- const size_t align = 4 * 1024;
- result.arena_block_size =
- ((result.arena_block_size + align - 1) / align) * align;
- }
- result.min_write_buffer_number_to_merge =
- std::min(result.min_write_buffer_number_to_merge,
- result.max_write_buffer_number - 1);
- if (result.min_write_buffer_number_to_merge < 1) {
- result.min_write_buffer_number_to_merge = 1;
- }
- if (result.num_levels < 1) {
- result.num_levels = 1;
- }
- if (result.compaction_style == kCompactionStyleLevel &&
- result.num_levels < 2) {
- result.num_levels = 2;
- }
- if (result.compaction_style == kCompactionStyleUniversal &&
- db_options.allow_ingest_behind && result.num_levels < 3) {
- result.num_levels = 3;
- }
- if (result.max_write_buffer_number < 2) {
- result.max_write_buffer_number = 2;
- }
- // fall back max_write_buffer_number_to_maintain if
- // max_write_buffer_size_to_maintain is not set
- if (result.max_write_buffer_size_to_maintain < 0) {
- result.max_write_buffer_size_to_maintain =
- result.max_write_buffer_number *
- static_cast<int64_t>(result.write_buffer_size);
- } else if (result.max_write_buffer_size_to_maintain == 0 &&
- result.max_write_buffer_number_to_maintain < 0) {
- result.max_write_buffer_number_to_maintain = result.max_write_buffer_number;
- }
- // bloom filter size shouldn't exceed 1/4 of memtable size.
- if (result.memtable_prefix_bloom_size_ratio > 0.25) {
- result.memtable_prefix_bloom_size_ratio = 0.25;
- } else if (result.memtable_prefix_bloom_size_ratio < 0) {
- result.memtable_prefix_bloom_size_ratio = 0;
- }
- if (!result.prefix_extractor) {
- assert(result.memtable_factory);
- Slice name = result.memtable_factory->Name();
- if (name.compare("HashSkipListRepFactory") == 0 ||
- name.compare("HashLinkListRepFactory") == 0) {
- result.memtable_factory = std::make_shared<SkipListFactory>();
- }
- }
- if (result.compaction_style == kCompactionStyleFIFO) {
- result.num_levels = 1;
- // since we delete level0 files in FIFO compaction when there are too many
- // of them, these options don't really mean anything
- result.level0_slowdown_writes_trigger = std::numeric_limits<int>::max();
- result.level0_stop_writes_trigger = std::numeric_limits<int>::max();
- }
- if (result.max_bytes_for_level_multiplier <= 0) {
- result.max_bytes_for_level_multiplier = 1;
- }
- if (result.level0_file_num_compaction_trigger == 0) {
- ROCKS_LOG_WARN(db_options.info_log.get(),
- "level0_file_num_compaction_trigger cannot be 0");
- result.level0_file_num_compaction_trigger = 1;
- }
- if (result.level0_stop_writes_trigger <
- result.level0_slowdown_writes_trigger ||
- result.level0_slowdown_writes_trigger <
- result.level0_file_num_compaction_trigger) {
- ROCKS_LOG_WARN(db_options.info_log.get(),
- "This condition must be satisfied: "
- "level0_stop_writes_trigger(%d) >= "
- "level0_slowdown_writes_trigger(%d) >= "
- "level0_file_num_compaction_trigger(%d)",
- result.level0_stop_writes_trigger,
- result.level0_slowdown_writes_trigger,
- result.level0_file_num_compaction_trigger);
- if (result.level0_slowdown_writes_trigger <
- result.level0_file_num_compaction_trigger) {
- result.level0_slowdown_writes_trigger =
- result.level0_file_num_compaction_trigger;
- }
- if (result.level0_stop_writes_trigger <
- result.level0_slowdown_writes_trigger) {
- result.level0_stop_writes_trigger = result.level0_slowdown_writes_trigger;
- }
- ROCKS_LOG_WARN(db_options.info_log.get(),
- "Adjust the value to "
- "level0_stop_writes_trigger(%d)"
- "level0_slowdown_writes_trigger(%d)"
- "level0_file_num_compaction_trigger(%d)",
- result.level0_stop_writes_trigger,
- result.level0_slowdown_writes_trigger,
- result.level0_file_num_compaction_trigger);
- }
- if (result.soft_pending_compaction_bytes_limit == 0) {
- result.soft_pending_compaction_bytes_limit =
- result.hard_pending_compaction_bytes_limit;
- } else if (result.hard_pending_compaction_bytes_limit > 0 &&
- result.soft_pending_compaction_bytes_limit >
- result.hard_pending_compaction_bytes_limit) {
- result.soft_pending_compaction_bytes_limit =
- result.hard_pending_compaction_bytes_limit;
- }
- #ifndef ROCKSDB_LITE
- // When the DB is stopped, it's possible that there are some .trash files that
- // were not deleted yet, when we open the DB we will find these .trash files
- // and schedule them to be deleted (or delete immediately if SstFileManager
- // was not used)
- auto sfm = static_cast<SstFileManagerImpl*>(db_options.sst_file_manager.get());
- for (size_t i = 0; i < result.cf_paths.size(); i++) {
- DeleteScheduler::CleanupDirectory(db_options.env, sfm, result.cf_paths[i].path);
- }
- #endif
- if (result.cf_paths.empty()) {
- result.cf_paths = db_options.db_paths;
- }
- if (result.level_compaction_dynamic_level_bytes) {
- if (result.compaction_style != kCompactionStyleLevel ||
- result.cf_paths.size() > 1U) {
- // 1. level_compaction_dynamic_level_bytes only makes sense for
- // level-based compaction.
- // 2. we don't yet know how to make both of this feature and multiple
- // DB path work.
- result.level_compaction_dynamic_level_bytes = false;
- }
- }
- if (result.max_compaction_bytes == 0) {
- result.max_compaction_bytes = result.target_file_size_base * 25;
- }
- bool is_block_based_table =
- (result.table_factory->Name() == BlockBasedTableFactory().Name());
- const uint64_t kAdjustedTtl = 30 * 24 * 60 * 60;
- if (result.ttl == kDefaultTtl) {
- if (is_block_based_table &&
- result.compaction_style != kCompactionStyleFIFO) {
- result.ttl = kAdjustedTtl;
- } else {
- result.ttl = 0;
- }
- }
- const uint64_t kAdjustedPeriodicCompSecs = 30 * 24 * 60 * 60;
- // Turn on periodic compactions and set them to occur once every 30 days if
- // compaction filters are used and periodic_compaction_seconds is set to the
- // default value.
- if (result.compaction_style != kCompactionStyleFIFO) {
- if ((result.compaction_filter != nullptr ||
- result.compaction_filter_factory != nullptr) &&
- result.periodic_compaction_seconds == kDefaultPeriodicCompSecs &&
- is_block_based_table) {
- result.periodic_compaction_seconds = kAdjustedPeriodicCompSecs;
- }
- } else {
- // result.compaction_style == kCompactionStyleFIFO
- if (result.ttl == 0) {
- if (is_block_based_table) {
- if (result.periodic_compaction_seconds == kDefaultPeriodicCompSecs) {
- result.periodic_compaction_seconds = kAdjustedPeriodicCompSecs;
- }
- result.ttl = result.periodic_compaction_seconds;
- }
- } else if (result.periodic_compaction_seconds != 0) {
- result.ttl = std::min(result.ttl, result.periodic_compaction_seconds);
- }
- }
- // TTL compactions would work similar to Periodic Compactions in Universal in
- // most of the cases. So, if ttl is set, execute the periodic compaction
- // codepath.
- if (result.compaction_style == kCompactionStyleUniversal && result.ttl != 0) {
- if (result.periodic_compaction_seconds != 0) {
- result.periodic_compaction_seconds =
- std::min(result.ttl, result.periodic_compaction_seconds);
- } else {
- result.periodic_compaction_seconds = result.ttl;
- }
- }
- if (result.periodic_compaction_seconds == kDefaultPeriodicCompSecs) {
- result.periodic_compaction_seconds = 0;
- }
- return result;
- }
- int SuperVersion::dummy = 0;
- void* const SuperVersion::kSVInUse = &SuperVersion::dummy;
- void* const SuperVersion::kSVObsolete = nullptr;
- SuperVersion::~SuperVersion() {
- for (auto td : to_delete) {
- delete td;
- }
- }
- SuperVersion* SuperVersion::Ref() {
- refs.fetch_add(1, std::memory_order_relaxed);
- return this;
- }
- bool SuperVersion::Unref() {
- // fetch_sub returns the previous value of ref
- uint32_t previous_refs = refs.fetch_sub(1);
- assert(previous_refs > 0);
- return previous_refs == 1;
- }
- void SuperVersion::Cleanup() {
- assert(refs.load(std::memory_order_relaxed) == 0);
- imm->Unref(&to_delete);
- MemTable* m = mem->Unref();
- if (m != nullptr) {
- auto* memory_usage = current->cfd()->imm()->current_memory_usage();
- assert(*memory_usage >= m->ApproximateMemoryUsage());
- *memory_usage -= m->ApproximateMemoryUsage();
- to_delete.push_back(m);
- }
- current->Unref();
- if (cfd->Unref()) {
- delete cfd;
- }
- }
- void SuperVersion::Init(ColumnFamilyData* new_cfd, MemTable* new_mem,
- MemTableListVersion* new_imm, Version* new_current) {
- cfd = new_cfd;
- mem = new_mem;
- imm = new_imm;
- current = new_current;
- cfd->Ref();
- mem->Ref();
- imm->Ref();
- current->Ref();
- refs.store(1, std::memory_order_relaxed);
- }
- namespace {
- void SuperVersionUnrefHandle(void* ptr) {
- // UnrefHandle is called when a thread exists or a ThreadLocalPtr gets
- // destroyed. When former happens, the thread shouldn't see kSVInUse.
- // When latter happens, we are in ~ColumnFamilyData(), no get should happen as
- // well.
- SuperVersion* sv = static_cast<SuperVersion*>(ptr);
- bool was_last_ref __attribute__((__unused__));
- was_last_ref = sv->Unref();
- // Thread-local SuperVersions can't outlive ColumnFamilyData::super_version_.
- // This is important because we can't do SuperVersion cleanup here.
- // That would require locking DB mutex, which would deadlock because
- // SuperVersionUnrefHandle is called with locked ThreadLocalPtr mutex.
- assert(!was_last_ref);
- }
- } // anonymous namespace
- ColumnFamilyData::ColumnFamilyData(
- uint32_t id, const std::string& name, Version* _dummy_versions,
- Cache* _table_cache, WriteBufferManager* write_buffer_manager,
- const ColumnFamilyOptions& cf_options, const ImmutableDBOptions& db_options,
- const FileOptions& file_options, ColumnFamilySet* column_family_set,
- BlockCacheTracer* const block_cache_tracer)
- : id_(id),
- name_(name),
- dummy_versions_(_dummy_versions),
- current_(nullptr),
- refs_(0),
- initialized_(false),
- dropped_(false),
- internal_comparator_(cf_options.comparator),
- initial_cf_options_(SanitizeOptions(db_options, cf_options)),
- ioptions_(db_options, initial_cf_options_),
- mutable_cf_options_(initial_cf_options_),
- is_delete_range_supported_(
- cf_options.table_factory->IsDeleteRangeSupported()),
- write_buffer_manager_(write_buffer_manager),
- mem_(nullptr),
- imm_(ioptions_.min_write_buffer_number_to_merge,
- ioptions_.max_write_buffer_number_to_maintain,
- ioptions_.max_write_buffer_size_to_maintain),
- super_version_(nullptr),
- super_version_number_(0),
- local_sv_(new ThreadLocalPtr(&SuperVersionUnrefHandle)),
- next_(nullptr),
- prev_(nullptr),
- log_number_(0),
- flush_reason_(FlushReason::kOthers),
- column_family_set_(column_family_set),
- queued_for_flush_(false),
- queued_for_compaction_(false),
- prev_compaction_needed_bytes_(0),
- allow_2pc_(db_options.allow_2pc),
- last_memtable_id_(0) {
- Ref();
- // Convert user defined table properties collector factories to internal ones.
- GetIntTblPropCollectorFactory(ioptions_, &int_tbl_prop_collector_factories_);
- // if _dummy_versions is nullptr, then this is a dummy column family.
- if (_dummy_versions != nullptr) {
- internal_stats_.reset(
- new InternalStats(ioptions_.num_levels, db_options.env, this));
- table_cache_.reset(new TableCache(ioptions_, file_options, _table_cache,
- block_cache_tracer));
- if (ioptions_.compaction_style == kCompactionStyleLevel) {
- compaction_picker_.reset(
- new LevelCompactionPicker(ioptions_, &internal_comparator_));
- #ifndef ROCKSDB_LITE
- } else if (ioptions_.compaction_style == kCompactionStyleUniversal) {
- compaction_picker_.reset(
- new UniversalCompactionPicker(ioptions_, &internal_comparator_));
- } else if (ioptions_.compaction_style == kCompactionStyleFIFO) {
- compaction_picker_.reset(
- new FIFOCompactionPicker(ioptions_, &internal_comparator_));
- } else if (ioptions_.compaction_style == kCompactionStyleNone) {
- compaction_picker_.reset(new NullCompactionPicker(
- ioptions_, &internal_comparator_));
- ROCKS_LOG_WARN(ioptions_.info_log,
- "Column family %s does not use any background compaction. "
- "Compactions can only be done via CompactFiles\n",
- GetName().c_str());
- #endif // !ROCKSDB_LITE
- } else {
- ROCKS_LOG_ERROR(ioptions_.info_log,
- "Unable to recognize the specified compaction style %d. "
- "Column family %s will use kCompactionStyleLevel.\n",
- ioptions_.compaction_style, GetName().c_str());
- compaction_picker_.reset(
- new LevelCompactionPicker(ioptions_, &internal_comparator_));
- }
- if (column_family_set_->NumberOfColumnFamilies() < 10) {
- ROCKS_LOG_INFO(ioptions_.info_log,
- "--------------- Options for column family [%s]:\n",
- name.c_str());
- initial_cf_options_.Dump(ioptions_.info_log);
- } else {
- ROCKS_LOG_INFO(ioptions_.info_log, "\t(skipping printing options)\n");
- }
- }
- RecalculateWriteStallConditions(mutable_cf_options_);
- }
- // DB mutex held
- ColumnFamilyData::~ColumnFamilyData() {
- assert(refs_.load(std::memory_order_relaxed) == 0);
- // remove from linked list
- auto prev = prev_;
- auto next = next_;
- prev->next_ = next;
- next->prev_ = prev;
- if (!dropped_ && column_family_set_ != nullptr) {
- // If it's dropped, it's already removed from column family set
- // If column_family_set_ == nullptr, this is dummy CFD and not in
- // ColumnFamilySet
- column_family_set_->RemoveColumnFamily(this);
- }
- if (current_ != nullptr) {
- current_->Unref();
- }
- // It would be wrong if this ColumnFamilyData is in flush_queue_ or
- // compaction_queue_ and we destroyed it
- assert(!queued_for_flush_);
- assert(!queued_for_compaction_);
- assert(super_version_ == nullptr);
- if (dummy_versions_ != nullptr) {
- // List must be empty
- assert(dummy_versions_->TEST_Next() == dummy_versions_);
- bool deleted __attribute__((__unused__));
- deleted = dummy_versions_->Unref();
- assert(deleted);
- }
- if (mem_ != nullptr) {
- delete mem_->Unref();
- }
- autovector<MemTable*> to_delete;
- imm_.current()->Unref(&to_delete);
- for (MemTable* m : to_delete) {
- delete m;
- }
- }
- bool ColumnFamilyData::UnrefAndTryDelete() {
- int old_refs = refs_.fetch_sub(1);
- assert(old_refs > 0);
- if (old_refs == 1) {
- assert(super_version_ == nullptr);
- delete this;
- return true;
- }
- if (old_refs == 2 && super_version_ != nullptr) {
- // Only the super_version_ holds me
- SuperVersion* sv = super_version_;
- super_version_ = nullptr;
- // Release SuperVersion reference kept in ThreadLocalPtr.
- // This must be done outside of mutex_ since unref handler can lock mutex.
- sv->db_mutex->Unlock();
- local_sv_.reset();
- sv->db_mutex->Lock();
- if (sv->Unref()) {
- // May delete this ColumnFamilyData after calling Cleanup()
- sv->Cleanup();
- delete sv;
- return true;
- }
- }
- return false;
- }
- void ColumnFamilyData::SetDropped() {
- // can't drop default CF
- assert(id_ != 0);
- dropped_ = true;
- write_controller_token_.reset();
- // remove from column_family_set
- column_family_set_->RemoveColumnFamily(this);
- }
- ColumnFamilyOptions ColumnFamilyData::GetLatestCFOptions() const {
- return BuildColumnFamilyOptions(initial_cf_options_, mutable_cf_options_);
- }
- uint64_t ColumnFamilyData::OldestLogToKeep() {
- auto current_log = GetLogNumber();
- if (allow_2pc_) {
- autovector<MemTable*> empty_list;
- auto imm_prep_log =
- imm()->PrecomputeMinLogContainingPrepSection(empty_list);
- auto mem_prep_log = mem()->GetMinLogContainingPrepSection();
- if (imm_prep_log > 0 && imm_prep_log < current_log) {
- current_log = imm_prep_log;
- }
- if (mem_prep_log > 0 && mem_prep_log < current_log) {
- current_log = mem_prep_log;
- }
- }
- return current_log;
- }
- const double kIncSlowdownRatio = 0.8;
- const double kDecSlowdownRatio = 1 / kIncSlowdownRatio;
- const double kNearStopSlowdownRatio = 0.6;
- const double kDelayRecoverSlowdownRatio = 1.4;
- namespace {
- // If penalize_stop is true, we further reduce slowdown rate.
- std::unique_ptr<WriteControllerToken> SetupDelay(
- WriteController* write_controller, uint64_t compaction_needed_bytes,
- uint64_t prev_compaction_need_bytes, bool penalize_stop,
- bool auto_comapctions_disabled) {
- const uint64_t kMinWriteRate = 16 * 1024u; // Minimum write rate 16KB/s.
- uint64_t max_write_rate = write_controller->max_delayed_write_rate();
- uint64_t write_rate = write_controller->delayed_write_rate();
- if (auto_comapctions_disabled) {
- // When auto compaction is disabled, always use the value user gave.
- write_rate = max_write_rate;
- } else if (write_controller->NeedsDelay() && max_write_rate > kMinWriteRate) {
- // If user gives rate less than kMinWriteRate, don't adjust it.
- //
- // If already delayed, need to adjust based on previous compaction debt.
- // When there are two or more column families require delay, we always
- // increase or reduce write rate based on information for one single
- // column family. It is likely to be OK but we can improve if there is a
- // problem.
- // Ignore compaction_needed_bytes = 0 case because compaction_needed_bytes
- // is only available in level-based compaction
- //
- // If the compaction debt stays the same as previously, we also further slow
- // down. It usually means a mem table is full. It's mainly for the case
- // where both of flush and compaction are much slower than the speed we
- // insert to mem tables, so we need to actively slow down before we get
- // feedback signal from compaction and flushes to avoid the full stop
- // because of hitting the max write buffer number.
- //
- // If DB just falled into the stop condition, we need to further reduce
- // the write rate to avoid the stop condition.
- if (penalize_stop) {
- // Penalize the near stop or stop condition by more aggressive slowdown.
- // This is to provide the long term slowdown increase signal.
- // The penalty is more than the reward of recovering to the normal
- // condition.
- write_rate = static_cast<uint64_t>(static_cast<double>(write_rate) *
- kNearStopSlowdownRatio);
- if (write_rate < kMinWriteRate) {
- write_rate = kMinWriteRate;
- }
- } else if (prev_compaction_need_bytes > 0 &&
- prev_compaction_need_bytes <= compaction_needed_bytes) {
- write_rate = static_cast<uint64_t>(static_cast<double>(write_rate) *
- kIncSlowdownRatio);
- if (write_rate < kMinWriteRate) {
- write_rate = kMinWriteRate;
- }
- } else if (prev_compaction_need_bytes > compaction_needed_bytes) {
- // We are speeding up by ratio of kSlowdownRatio when we have paid
- // compaction debt. But we'll never speed up to faster than the write rate
- // given by users.
- write_rate = static_cast<uint64_t>(static_cast<double>(write_rate) *
- kDecSlowdownRatio);
- if (write_rate > max_write_rate) {
- write_rate = max_write_rate;
- }
- }
- }
- return write_controller->GetDelayToken(write_rate);
- }
- int GetL0ThresholdSpeedupCompaction(int level0_file_num_compaction_trigger,
- int level0_slowdown_writes_trigger) {
- // SanitizeOptions() ensures it.
- assert(level0_file_num_compaction_trigger <= level0_slowdown_writes_trigger);
- if (level0_file_num_compaction_trigger < 0) {
- return std::numeric_limits<int>::max();
- }
- const int64_t twice_level0_trigger =
- static_cast<int64_t>(level0_file_num_compaction_trigger) * 2;
- const int64_t one_fourth_trigger_slowdown =
- static_cast<int64_t>(level0_file_num_compaction_trigger) +
- ((level0_slowdown_writes_trigger - level0_file_num_compaction_trigger) /
- 4);
- assert(twice_level0_trigger >= 0);
- assert(one_fourth_trigger_slowdown >= 0);
- // 1/4 of the way between L0 compaction trigger threshold and slowdown
- // condition.
- // Or twice as compaction trigger, if it is smaller.
- int64_t res = std::min(twice_level0_trigger, one_fourth_trigger_slowdown);
- if (res >= port::kMaxInt32) {
- return port::kMaxInt32;
- } else {
- // res fits in int
- return static_cast<int>(res);
- }
- }
- } // namespace
- std::pair<WriteStallCondition, ColumnFamilyData::WriteStallCause>
- ColumnFamilyData::GetWriteStallConditionAndCause(
- int num_unflushed_memtables, int num_l0_files,
- uint64_t num_compaction_needed_bytes,
- const MutableCFOptions& mutable_cf_options) {
- if (num_unflushed_memtables >= mutable_cf_options.max_write_buffer_number) {
- return {WriteStallCondition::kStopped, WriteStallCause::kMemtableLimit};
- } else if (!mutable_cf_options.disable_auto_compactions &&
- num_l0_files >= mutable_cf_options.level0_stop_writes_trigger) {
- return {WriteStallCondition::kStopped, WriteStallCause::kL0FileCountLimit};
- } else if (!mutable_cf_options.disable_auto_compactions &&
- mutable_cf_options.hard_pending_compaction_bytes_limit > 0 &&
- num_compaction_needed_bytes >=
- mutable_cf_options.hard_pending_compaction_bytes_limit) {
- return {WriteStallCondition::kStopped,
- WriteStallCause::kPendingCompactionBytes};
- } else if (mutable_cf_options.max_write_buffer_number > 3 &&
- num_unflushed_memtables >=
- mutable_cf_options.max_write_buffer_number - 1) {
- return {WriteStallCondition::kDelayed, WriteStallCause::kMemtableLimit};
- } else if (!mutable_cf_options.disable_auto_compactions &&
- mutable_cf_options.level0_slowdown_writes_trigger >= 0 &&
- num_l0_files >=
- mutable_cf_options.level0_slowdown_writes_trigger) {
- return {WriteStallCondition::kDelayed, WriteStallCause::kL0FileCountLimit};
- } else if (!mutable_cf_options.disable_auto_compactions &&
- mutable_cf_options.soft_pending_compaction_bytes_limit > 0 &&
- num_compaction_needed_bytes >=
- mutable_cf_options.soft_pending_compaction_bytes_limit) {
- return {WriteStallCondition::kDelayed,
- WriteStallCause::kPendingCompactionBytes};
- }
- return {WriteStallCondition::kNormal, WriteStallCause::kNone};
- }
- WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
- const MutableCFOptions& mutable_cf_options) {
- auto write_stall_condition = WriteStallCondition::kNormal;
- if (current_ != nullptr) {
- auto* vstorage = current_->storage_info();
- auto write_controller = column_family_set_->write_controller_;
- uint64_t compaction_needed_bytes =
- vstorage->estimated_compaction_needed_bytes();
- auto write_stall_condition_and_cause = GetWriteStallConditionAndCause(
- imm()->NumNotFlushed(), vstorage->l0_delay_trigger_count(),
- vstorage->estimated_compaction_needed_bytes(), mutable_cf_options);
- write_stall_condition = write_stall_condition_and_cause.first;
- auto write_stall_cause = write_stall_condition_and_cause.second;
- bool was_stopped = write_controller->IsStopped();
- bool needed_delay = write_controller->NeedsDelay();
- if (write_stall_condition == WriteStallCondition::kStopped &&
- write_stall_cause == WriteStallCause::kMemtableLimit) {
- write_controller_token_ = write_controller->GetStopToken();
- internal_stats_->AddCFStats(InternalStats::MEMTABLE_LIMIT_STOPS, 1);
- ROCKS_LOG_WARN(
- ioptions_.info_log,
- "[%s] Stopping writes because we have %d immutable memtables "
- "(waiting for flush), max_write_buffer_number is set to %d",
- name_.c_str(), imm()->NumNotFlushed(),
- mutable_cf_options.max_write_buffer_number);
- } else if (write_stall_condition == WriteStallCondition::kStopped &&
- write_stall_cause == WriteStallCause::kL0FileCountLimit) {
- write_controller_token_ = write_controller->GetStopToken();
- internal_stats_->AddCFStats(InternalStats::L0_FILE_COUNT_LIMIT_STOPS, 1);
- if (compaction_picker_->IsLevel0CompactionInProgress()) {
- internal_stats_->AddCFStats(
- InternalStats::LOCKED_L0_FILE_COUNT_LIMIT_STOPS, 1);
- }
- ROCKS_LOG_WARN(ioptions_.info_log,
- "[%s] Stopping writes because we have %d level-0 files",
- name_.c_str(), vstorage->l0_delay_trigger_count());
- } else if (write_stall_condition == WriteStallCondition::kStopped &&
- write_stall_cause == WriteStallCause::kPendingCompactionBytes) {
- write_controller_token_ = write_controller->GetStopToken();
- internal_stats_->AddCFStats(
- InternalStats::PENDING_COMPACTION_BYTES_LIMIT_STOPS, 1);
- ROCKS_LOG_WARN(
- ioptions_.info_log,
- "[%s] Stopping writes because of estimated pending compaction "
- "bytes %" PRIu64,
- name_.c_str(), compaction_needed_bytes);
- } else if (write_stall_condition == WriteStallCondition::kDelayed &&
- write_stall_cause == WriteStallCause::kMemtableLimit) {
- write_controller_token_ =
- SetupDelay(write_controller, compaction_needed_bytes,
- prev_compaction_needed_bytes_, was_stopped,
- mutable_cf_options.disable_auto_compactions);
- internal_stats_->AddCFStats(InternalStats::MEMTABLE_LIMIT_SLOWDOWNS, 1);
- ROCKS_LOG_WARN(
- ioptions_.info_log,
- "[%s] Stalling writes because we have %d immutable memtables "
- "(waiting for flush), max_write_buffer_number is set to %d "
- "rate %" PRIu64,
- name_.c_str(), imm()->NumNotFlushed(),
- mutable_cf_options.max_write_buffer_number,
- write_controller->delayed_write_rate());
- } else if (write_stall_condition == WriteStallCondition::kDelayed &&
- write_stall_cause == WriteStallCause::kL0FileCountLimit) {
- // L0 is the last two files from stopping.
- bool near_stop = vstorage->l0_delay_trigger_count() >=
- mutable_cf_options.level0_stop_writes_trigger - 2;
- write_controller_token_ =
- SetupDelay(write_controller, compaction_needed_bytes,
- prev_compaction_needed_bytes_, was_stopped || near_stop,
- mutable_cf_options.disable_auto_compactions);
- internal_stats_->AddCFStats(InternalStats::L0_FILE_COUNT_LIMIT_SLOWDOWNS,
- 1);
- if (compaction_picker_->IsLevel0CompactionInProgress()) {
- internal_stats_->AddCFStats(
- InternalStats::LOCKED_L0_FILE_COUNT_LIMIT_SLOWDOWNS, 1);
- }
- ROCKS_LOG_WARN(ioptions_.info_log,
- "[%s] Stalling writes because we have %d level-0 files "
- "rate %" PRIu64,
- name_.c_str(), vstorage->l0_delay_trigger_count(),
- write_controller->delayed_write_rate());
- } else if (write_stall_condition == WriteStallCondition::kDelayed &&
- write_stall_cause == WriteStallCause::kPendingCompactionBytes) {
- // If the distance to hard limit is less than 1/4 of the gap between soft
- // and
- // hard bytes limit, we think it is near stop and speed up the slowdown.
- bool near_stop =
- mutable_cf_options.hard_pending_compaction_bytes_limit > 0 &&
- (compaction_needed_bytes -
- mutable_cf_options.soft_pending_compaction_bytes_limit) >
- 3 * (mutable_cf_options.hard_pending_compaction_bytes_limit -
- mutable_cf_options.soft_pending_compaction_bytes_limit) /
- 4;
- write_controller_token_ =
- SetupDelay(write_controller, compaction_needed_bytes,
- prev_compaction_needed_bytes_, was_stopped || near_stop,
- mutable_cf_options.disable_auto_compactions);
- internal_stats_->AddCFStats(
- InternalStats::PENDING_COMPACTION_BYTES_LIMIT_SLOWDOWNS, 1);
- ROCKS_LOG_WARN(
- ioptions_.info_log,
- "[%s] Stalling writes because of estimated pending compaction "
- "bytes %" PRIu64 " rate %" PRIu64,
- name_.c_str(), vstorage->estimated_compaction_needed_bytes(),
- write_controller->delayed_write_rate());
- } else {
- assert(write_stall_condition == WriteStallCondition::kNormal);
- if (vstorage->l0_delay_trigger_count() >=
- GetL0ThresholdSpeedupCompaction(
- mutable_cf_options.level0_file_num_compaction_trigger,
- mutable_cf_options.level0_slowdown_writes_trigger)) {
- write_controller_token_ =
- write_controller->GetCompactionPressureToken();
- ROCKS_LOG_INFO(
- ioptions_.info_log,
- "[%s] Increasing compaction threads because we have %d level-0 "
- "files ",
- name_.c_str(), vstorage->l0_delay_trigger_count());
- } else if (vstorage->estimated_compaction_needed_bytes() >=
- mutable_cf_options.soft_pending_compaction_bytes_limit / 4) {
- // Increase compaction threads if bytes needed for compaction exceeds
- // 1/4 of threshold for slowing down.
- // If soft pending compaction byte limit is not set, always speed up
- // compaction.
- write_controller_token_ =
- write_controller->GetCompactionPressureToken();
- if (mutable_cf_options.soft_pending_compaction_bytes_limit > 0) {
- ROCKS_LOG_INFO(
- ioptions_.info_log,
- "[%s] Increasing compaction threads because of estimated pending "
- "compaction "
- "bytes %" PRIu64,
- name_.c_str(), vstorage->estimated_compaction_needed_bytes());
- }
- } else {
- write_controller_token_.reset();
- }
- // If the DB recovers from delay conditions, we reward with reducing
- // double the slowdown ratio. This is to balance the long term slowdown
- // increase signal.
- if (needed_delay) {
- uint64_t write_rate = write_controller->delayed_write_rate();
- write_controller->set_delayed_write_rate(static_cast<uint64_t>(
- static_cast<double>(write_rate) * kDelayRecoverSlowdownRatio));
- // Set the low pri limit to be 1/4 the delayed write rate.
- // Note we don't reset this value even after delay condition is relased.
- // Low-pri rate will continue to apply if there is a compaction
- // pressure.
- write_controller->low_pri_rate_limiter()->SetBytesPerSecond(write_rate /
- 4);
- }
- }
- prev_compaction_needed_bytes_ = compaction_needed_bytes;
- }
- return write_stall_condition;
- }
- const FileOptions* ColumnFamilyData::soptions() const {
- return &(column_family_set_->file_options_);
- }
- void ColumnFamilyData::SetCurrent(Version* current_version) {
- current_ = current_version;
- }
- uint64_t ColumnFamilyData::GetNumLiveVersions() const {
- return VersionSet::GetNumLiveVersions(dummy_versions_);
- }
- uint64_t ColumnFamilyData::GetTotalSstFilesSize() const {
- return VersionSet::GetTotalSstFilesSize(dummy_versions_);
- }
- uint64_t ColumnFamilyData::GetLiveSstFilesSize() const {
- return current_->GetSstFilesSize();
- }
- MemTable* ColumnFamilyData::ConstructNewMemtable(
- const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq) {
- return new MemTable(internal_comparator_, ioptions_, mutable_cf_options,
- write_buffer_manager_, earliest_seq, id_);
- }
- void ColumnFamilyData::CreateNewMemtable(
- const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq) {
- if (mem_ != nullptr) {
- delete mem_->Unref();
- }
- SetMemtable(ConstructNewMemtable(mutable_cf_options, earliest_seq));
- mem_->Ref();
- }
- bool ColumnFamilyData::NeedsCompaction() const {
- return compaction_picker_->NeedsCompaction(current_->storage_info());
- }
- Compaction* ColumnFamilyData::PickCompaction(
- const MutableCFOptions& mutable_options, LogBuffer* log_buffer) {
- SequenceNumber earliest_mem_seqno =
- std::min(mem_->GetEarliestSequenceNumber(),
- imm_.current()->GetEarliestSequenceNumber(false));
- auto* result = compaction_picker_->PickCompaction(
- GetName(), mutable_options, current_->storage_info(), log_buffer,
- earliest_mem_seqno);
- if (result != nullptr) {
- result->SetInputVersion(current_);
- }
- return result;
- }
- bool ColumnFamilyData::RangeOverlapWithCompaction(
- const Slice& smallest_user_key, const Slice& largest_user_key,
- int level) const {
- return compaction_picker_->RangeOverlapWithCompaction(
- smallest_user_key, largest_user_key, level);
- }
- Status ColumnFamilyData::RangesOverlapWithMemtables(
- const autovector<Range>& ranges, SuperVersion* super_version,
- bool* overlap) {
- assert(overlap != nullptr);
- *overlap = false;
- // Create an InternalIterator over all unflushed memtables
- Arena arena;
- ReadOptions read_opts;
- read_opts.total_order_seek = true;
- MergeIteratorBuilder merge_iter_builder(&internal_comparator_, &arena);
- merge_iter_builder.AddIterator(
- super_version->mem->NewIterator(read_opts, &arena));
- super_version->imm->AddIterators(read_opts, &merge_iter_builder);
- ScopedArenaIterator memtable_iter(merge_iter_builder.Finish());
- auto read_seq = super_version->current->version_set()->LastSequence();
- ReadRangeDelAggregator range_del_agg(&internal_comparator_, read_seq);
- auto* active_range_del_iter =
- super_version->mem->NewRangeTombstoneIterator(read_opts, read_seq);
- range_del_agg.AddTombstones(
- std::unique_ptr<FragmentedRangeTombstoneIterator>(active_range_del_iter));
- super_version->imm->AddRangeTombstoneIterators(read_opts, nullptr /* arena */,
- &range_del_agg);
- Status status;
- for (size_t i = 0; i < ranges.size() && status.ok() && !*overlap; ++i) {
- auto* vstorage = super_version->current->storage_info();
- auto* ucmp = vstorage->InternalComparator()->user_comparator();
- InternalKey range_start(ranges[i].start, kMaxSequenceNumber,
- kValueTypeForSeek);
- memtable_iter->Seek(range_start.Encode());
- status = memtable_iter->status();
- ParsedInternalKey seek_result;
- if (status.ok()) {
- if (memtable_iter->Valid() &&
- !ParseInternalKey(memtable_iter->key(), &seek_result)) {
- status = Status::Corruption("DB have corrupted keys");
- }
- }
- if (status.ok()) {
- if (memtable_iter->Valid() &&
- ucmp->Compare(seek_result.user_key, ranges[i].limit) <= 0) {
- *overlap = true;
- } else if (range_del_agg.IsRangeOverlapped(ranges[i].start,
- ranges[i].limit)) {
- *overlap = true;
- }
- }
- }
- return status;
- }
- const int ColumnFamilyData::kCompactAllLevels = -1;
- const int ColumnFamilyData::kCompactToBaseLevel = -2;
- Compaction* ColumnFamilyData::CompactRange(
- const MutableCFOptions& mutable_cf_options, int input_level,
- int output_level, const CompactRangeOptions& compact_range_options,
- const InternalKey* begin, const InternalKey* end,
- InternalKey** compaction_end, bool* conflict,
- uint64_t max_file_num_to_ignore) {
- auto* result = compaction_picker_->CompactRange(
- GetName(), mutable_cf_options, current_->storage_info(), input_level,
- output_level, compact_range_options, begin, end, compaction_end, conflict,
- max_file_num_to_ignore);
- if (result != nullptr) {
- result->SetInputVersion(current_);
- }
- return result;
- }
- SuperVersion* ColumnFamilyData::GetReferencedSuperVersion(DBImpl* db) {
- SuperVersion* sv = GetThreadLocalSuperVersion(db);
- sv->Ref();
- if (!ReturnThreadLocalSuperVersion(sv)) {
- // This Unref() corresponds to the Ref() in GetThreadLocalSuperVersion()
- // when the thread-local pointer was populated. So, the Ref() earlier in
- // this function still prevents the returned SuperVersion* from being
- // deleted out from under the caller.
- sv->Unref();
- }
- return sv;
- }
- SuperVersion* ColumnFamilyData::GetThreadLocalSuperVersion(DBImpl* db) {
- // The SuperVersion is cached in thread local storage to avoid acquiring
- // mutex when SuperVersion does not change since the last use. When a new
- // SuperVersion is installed, the compaction or flush thread cleans up
- // cached SuperVersion in all existing thread local storage. To avoid
- // acquiring mutex for this operation, we use atomic Swap() on the thread
- // local pointer to guarantee exclusive access. If the thread local pointer
- // is being used while a new SuperVersion is installed, the cached
- // SuperVersion can become stale. In that case, the background thread would
- // have swapped in kSVObsolete. We re-check the value at when returning
- // SuperVersion back to thread local, with an atomic compare and swap.
- // The superversion will need to be released if detected to be stale.
- void* ptr = local_sv_->Swap(SuperVersion::kSVInUse);
- // Invariant:
- // (1) Scrape (always) installs kSVObsolete in ThreadLocal storage
- // (2) the Swap above (always) installs kSVInUse, ThreadLocal storage
- // should only keep kSVInUse before ReturnThreadLocalSuperVersion call
- // (if no Scrape happens).
- assert(ptr != SuperVersion::kSVInUse);
- SuperVersion* sv = static_cast<SuperVersion*>(ptr);
- if (sv == SuperVersion::kSVObsolete ||
- sv->version_number != super_version_number_.load()) {
- RecordTick(ioptions_.statistics, NUMBER_SUPERVERSION_ACQUIRES);
- SuperVersion* sv_to_delete = nullptr;
- if (sv && sv->Unref()) {
- RecordTick(ioptions_.statistics, NUMBER_SUPERVERSION_CLEANUPS);
- db->mutex()->Lock();
- // NOTE: underlying resources held by superversion (sst files) might
- // not be released until the next background job.
- sv->Cleanup();
- if (db->immutable_db_options().avoid_unnecessary_blocking_io) {
- db->AddSuperVersionsToFreeQueue(sv);
- db->SchedulePurge();
- } else {
- sv_to_delete = sv;
- }
- } else {
- db->mutex()->Lock();
- }
- sv = super_version_->Ref();
- db->mutex()->Unlock();
- delete sv_to_delete;
- }
- assert(sv != nullptr);
- return sv;
- }
- bool ColumnFamilyData::ReturnThreadLocalSuperVersion(SuperVersion* sv) {
- assert(sv != nullptr);
- // Put the SuperVersion back
- void* expected = SuperVersion::kSVInUse;
- if (local_sv_->CompareAndSwap(static_cast<void*>(sv), expected)) {
- // When we see kSVInUse in the ThreadLocal, we are sure ThreadLocal
- // storage has not been altered and no Scrape has happened. The
- // SuperVersion is still current.
- return true;
- } else {
- // ThreadLocal scrape happened in the process of this GetImpl call (after
- // thread local Swap() at the beginning and before CompareAndSwap()).
- // This means the SuperVersion it holds is obsolete.
- assert(expected == SuperVersion::kSVObsolete);
- }
- return false;
- }
- void ColumnFamilyData::InstallSuperVersion(
- SuperVersionContext* sv_context, InstrumentedMutex* db_mutex) {
- db_mutex->AssertHeld();
- return InstallSuperVersion(sv_context, db_mutex, mutable_cf_options_);
- }
- void ColumnFamilyData::InstallSuperVersion(
- SuperVersionContext* sv_context, InstrumentedMutex* db_mutex,
- const MutableCFOptions& mutable_cf_options) {
- SuperVersion* new_superversion = sv_context->new_superversion.release();
- new_superversion->db_mutex = db_mutex;
- new_superversion->mutable_cf_options = mutable_cf_options;
- new_superversion->Init(this, mem_, imm_.current(), current_);
- SuperVersion* old_superversion = super_version_;
- super_version_ = new_superversion;
- ++super_version_number_;
- super_version_->version_number = super_version_number_;
- super_version_->write_stall_condition =
- RecalculateWriteStallConditions(mutable_cf_options);
- if (old_superversion != nullptr) {
- // Reset SuperVersions cached in thread local storage.
- // This should be done before old_superversion->Unref(). That's to ensure
- // that local_sv_ never holds the last reference to SuperVersion, since
- // it has no means to safely do SuperVersion cleanup.
- ResetThreadLocalSuperVersions();
- if (old_superversion->mutable_cf_options.write_buffer_size !=
- mutable_cf_options.write_buffer_size) {
- mem_->UpdateWriteBufferSize(mutable_cf_options.write_buffer_size);
- }
- if (old_superversion->write_stall_condition !=
- new_superversion->write_stall_condition) {
- sv_context->PushWriteStallNotification(
- old_superversion->write_stall_condition,
- new_superversion->write_stall_condition, GetName(), ioptions());
- }
- if (old_superversion->Unref()) {
- old_superversion->Cleanup();
- sv_context->superversions_to_free.push_back(old_superversion);
- }
- }
- }
- void ColumnFamilyData::ResetThreadLocalSuperVersions() {
- autovector<void*> sv_ptrs;
- local_sv_->Scrape(&sv_ptrs, SuperVersion::kSVObsolete);
- for (auto ptr : sv_ptrs) {
- assert(ptr);
- if (ptr == SuperVersion::kSVInUse) {
- continue;
- }
- auto sv = static_cast<SuperVersion*>(ptr);
- bool was_last_ref __attribute__((__unused__));
- was_last_ref = sv->Unref();
- // sv couldn't have been the last reference because
- // ResetThreadLocalSuperVersions() is called before
- // unref'ing super_version_.
- assert(!was_last_ref);
- }
- }
- Status ColumnFamilyData::ValidateOptions(
- const DBOptions& db_options, const ColumnFamilyOptions& cf_options) {
- Status s;
- s = CheckCompressionSupported(cf_options);
- if (s.ok() && db_options.allow_concurrent_memtable_write) {
- s = CheckConcurrentWritesSupported(cf_options);
- }
- if (s.ok() && db_options.unordered_write &&
- cf_options.max_successive_merges != 0) {
- s = Status::InvalidArgument(
- "max_successive_merges > 0 is incompatible with unordered_write");
- }
- if (s.ok()) {
- s = CheckCFPathsSupported(db_options, cf_options);
- }
- if (!s.ok()) {
- return s;
- }
- if (cf_options.ttl > 0 && cf_options.ttl != kDefaultTtl) {
- if (cf_options.table_factory->Name() != BlockBasedTableFactory().Name()) {
- return Status::NotSupported(
- "TTL is only supported in Block-Based Table format. ");
- }
- }
- if (cf_options.periodic_compaction_seconds > 0 &&
- cf_options.periodic_compaction_seconds != kDefaultPeriodicCompSecs) {
- if (cf_options.table_factory->Name() != BlockBasedTableFactory().Name()) {
- return Status::NotSupported(
- "Periodic Compaction is only supported in "
- "Block-Based Table format. ");
- }
- }
- return s;
- }
- #ifndef ROCKSDB_LITE
- Status ColumnFamilyData::SetOptions(
- const DBOptions& db_options,
- const std::unordered_map<std::string, std::string>& options_map) {
- MutableCFOptions new_mutable_cf_options;
- Status s =
- GetMutableOptionsFromStrings(mutable_cf_options_, options_map,
- ioptions_.info_log, &new_mutable_cf_options);
- if (s.ok()) {
- ColumnFamilyOptions cf_options =
- BuildColumnFamilyOptions(initial_cf_options_, new_mutable_cf_options);
- s = ValidateOptions(db_options, cf_options);
- }
- if (s.ok()) {
- mutable_cf_options_ = new_mutable_cf_options;
- mutable_cf_options_.RefreshDerivedOptions(ioptions_);
- }
- return s;
- }
- #endif // ROCKSDB_LITE
- // REQUIRES: DB mutex held
- Env::WriteLifeTimeHint ColumnFamilyData::CalculateSSTWriteHint(int level) {
- if (initial_cf_options_.compaction_style != kCompactionStyleLevel) {
- return Env::WLTH_NOT_SET;
- }
- if (level == 0) {
- return Env::WLTH_MEDIUM;
- }
- int base_level = current_->storage_info()->base_level();
- // L1: medium, L2: long, ...
- if (level - base_level >= 2) {
- return Env::WLTH_EXTREME;
- } else if (level < base_level) {
- // There is no restriction which prevents level passed in to be smaller
- // than base_level.
- return Env::WLTH_MEDIUM;
- }
- return static_cast<Env::WriteLifeTimeHint>(level - base_level +
- static_cast<int>(Env::WLTH_MEDIUM));
- }
- Status ColumnFamilyData::AddDirectories(
- std::map<std::string, std::shared_ptr<Directory>>* created_dirs) {
- Status s;
- assert(created_dirs != nullptr);
- assert(data_dirs_.empty());
- for (auto& p : ioptions_.cf_paths) {
- auto existing_dir = created_dirs->find(p.path);
- if (existing_dir == created_dirs->end()) {
- std::unique_ptr<Directory> path_directory;
- s = DBImpl::CreateAndNewDirectory(ioptions_.env, p.path, &path_directory);
- if (!s.ok()) {
- return s;
- }
- assert(path_directory != nullptr);
- data_dirs_.emplace_back(path_directory.release());
- (*created_dirs)[p.path] = data_dirs_.back();
- } else {
- data_dirs_.emplace_back(existing_dir->second);
- }
- }
- assert(data_dirs_.size() == ioptions_.cf_paths.size());
- return s;
- }
- Directory* ColumnFamilyData::GetDataDir(size_t path_id) const {
- if (data_dirs_.empty()) {
- return nullptr;
- }
- assert(path_id < data_dirs_.size());
- return data_dirs_[path_id].get();
- }
- ColumnFamilySet::ColumnFamilySet(const std::string& dbname,
- const ImmutableDBOptions* db_options,
- const FileOptions& file_options,
- Cache* table_cache,
- WriteBufferManager* write_buffer_manager,
- WriteController* write_controller,
- BlockCacheTracer* const block_cache_tracer)
- : max_column_family_(0),
- dummy_cfd_(new ColumnFamilyData(
- 0, "", nullptr, nullptr, nullptr, ColumnFamilyOptions(), *db_options,
- file_options, nullptr, block_cache_tracer)),
- default_cfd_cache_(nullptr),
- db_name_(dbname),
- db_options_(db_options),
- file_options_(file_options),
- table_cache_(table_cache),
- write_buffer_manager_(write_buffer_manager),
- write_controller_(write_controller),
- block_cache_tracer_(block_cache_tracer) {
- // initialize linked list
- dummy_cfd_->prev_ = dummy_cfd_;
- dummy_cfd_->next_ = dummy_cfd_;
- }
- ColumnFamilySet::~ColumnFamilySet() {
- while (column_family_data_.size() > 0) {
- // cfd destructor will delete itself from column_family_data_
- auto cfd = column_family_data_.begin()->second;
- bool last_ref __attribute__((__unused__));
- last_ref = cfd->UnrefAndTryDelete();
- assert(last_ref);
- }
- bool dummy_last_ref __attribute__((__unused__));
- dummy_last_ref = dummy_cfd_->UnrefAndTryDelete();
- assert(dummy_last_ref);
- }
- ColumnFamilyData* ColumnFamilySet::GetDefault() const {
- assert(default_cfd_cache_ != nullptr);
- return default_cfd_cache_;
- }
- ColumnFamilyData* ColumnFamilySet::GetColumnFamily(uint32_t id) const {
- auto cfd_iter = column_family_data_.find(id);
- if (cfd_iter != column_family_data_.end()) {
- return cfd_iter->second;
- } else {
- return nullptr;
- }
- }
- ColumnFamilyData* ColumnFamilySet::GetColumnFamily(const std::string& name)
- const {
- auto cfd_iter = column_families_.find(name);
- if (cfd_iter != column_families_.end()) {
- auto cfd = GetColumnFamily(cfd_iter->second);
- assert(cfd != nullptr);
- return cfd;
- } else {
- return nullptr;
- }
- }
- uint32_t ColumnFamilySet::GetNextColumnFamilyID() {
- return ++max_column_family_;
- }
- uint32_t ColumnFamilySet::GetMaxColumnFamily() { return max_column_family_; }
- void ColumnFamilySet::UpdateMaxColumnFamily(uint32_t new_max_column_family) {
- max_column_family_ = std::max(new_max_column_family, max_column_family_);
- }
- size_t ColumnFamilySet::NumberOfColumnFamilies() const {
- return column_families_.size();
- }
- // under a DB mutex AND write thread
- ColumnFamilyData* ColumnFamilySet::CreateColumnFamily(
- const std::string& name, uint32_t id, Version* dummy_versions,
- const ColumnFamilyOptions& options) {
- assert(column_families_.find(name) == column_families_.end());
- ColumnFamilyData* new_cfd = new ColumnFamilyData(
- id, name, dummy_versions, table_cache_, write_buffer_manager_, options,
- *db_options_, file_options_, this, block_cache_tracer_);
- column_families_.insert({name, id});
- column_family_data_.insert({id, new_cfd});
- max_column_family_ = std::max(max_column_family_, id);
- // add to linked list
- new_cfd->next_ = dummy_cfd_;
- auto prev = dummy_cfd_->prev_;
- new_cfd->prev_ = prev;
- prev->next_ = new_cfd;
- dummy_cfd_->prev_ = new_cfd;
- if (id == 0) {
- default_cfd_cache_ = new_cfd;
- }
- return new_cfd;
- }
- // REQUIRES: DB mutex held
- void ColumnFamilySet::FreeDeadColumnFamilies() {
- autovector<ColumnFamilyData*> to_delete;
- for (auto cfd = dummy_cfd_->next_; cfd != dummy_cfd_; cfd = cfd->next_) {
- if (cfd->refs_.load(std::memory_order_relaxed) == 0) {
- to_delete.push_back(cfd);
- }
- }
- for (auto cfd : to_delete) {
- // this is very rare, so it's not a problem that we do it under a mutex
- delete cfd;
- }
- }
- // under a DB mutex AND from a write thread
- void ColumnFamilySet::RemoveColumnFamily(ColumnFamilyData* cfd) {
- auto cfd_iter = column_family_data_.find(cfd->GetID());
- assert(cfd_iter != column_family_data_.end());
- column_family_data_.erase(cfd_iter);
- column_families_.erase(cfd->GetName());
- }
- // under a DB mutex OR from a write thread
- bool ColumnFamilyMemTablesImpl::Seek(uint32_t column_family_id) {
- if (column_family_id == 0) {
- // optimization for common case
- current_ = column_family_set_->GetDefault();
- } else {
- current_ = column_family_set_->GetColumnFamily(column_family_id);
- }
- handle_.SetCFD(current_);
- return current_ != nullptr;
- }
- uint64_t ColumnFamilyMemTablesImpl::GetLogNumber() const {
- assert(current_ != nullptr);
- return current_->GetLogNumber();
- }
- MemTable* ColumnFamilyMemTablesImpl::GetMemTable() const {
- assert(current_ != nullptr);
- return current_->mem();
- }
- ColumnFamilyHandle* ColumnFamilyMemTablesImpl::GetColumnFamilyHandle() {
- assert(current_ != nullptr);
- return &handle_;
- }
- uint32_t GetColumnFamilyID(ColumnFamilyHandle* column_family) {
- uint32_t column_family_id = 0;
- if (column_family != nullptr) {
- auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
- column_family_id = cfh->GetID();
- }
- return column_family_id;
- }
- const Comparator* GetColumnFamilyUserComparator(
- ColumnFamilyHandle* column_family) {
- if (column_family != nullptr) {
- return column_family->GetComparator();
- }
- return nullptr;
- }
- } // namespace ROCKSDB_NAMESPACE
|