| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544 |
- // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
- // This source code is licensed under both the GPLv2 (found in the
- // COPYING file in the root directory) and Apache 2.0 License
- // (found in the LICENSE.Apache file in the root directory).
- #include "rocksdb/sst_file_writer.h"
- #include <vector>
- #include "db/db_impl/db_impl.h"
- #include "db/dbformat.h"
- #include "db/wide/wide_column_serialization.h"
- #include "db/wide/wide_columns_helper.h"
- #include "file/writable_file_writer.h"
- #include "rocksdb/file_system.h"
- #include "rocksdb/table.h"
- #include "table/block_based/block_based_table_builder.h"
- #include "table/sst_file_writer_collectors.h"
- #include "test_util/sync_point.h"
- namespace ROCKSDB_NAMESPACE {
- const std::string ExternalSstFilePropertyNames::kVersion =
- "rocksdb.external_sst_file.version";
- const std::string ExternalSstFilePropertyNames::kGlobalSeqno =
- "rocksdb.external_sst_file.global_seqno";
- const size_t kFadviseTrigger = 1024 * 1024; // 1MB
- struct SstFileWriter::Rep {
- Rep(const EnvOptions& _env_options, const Options& options,
- Env::IOPriority _io_priority, const Comparator* _user_comparator,
- ColumnFamilyHandle* _cfh, bool _invalidate_page_cache, bool _skip_filters,
- std::string _db_session_id)
- : env_options(_env_options),
- ioptions(options),
- mutable_cf_options(options),
- io_priority(_io_priority),
- internal_comparator(_user_comparator),
- cfh(_cfh),
- invalidate_page_cache(_invalidate_page_cache),
- skip_filters(_skip_filters),
- db_session_id(_db_session_id),
- ts_sz(_user_comparator->timestamp_size()),
- strip_timestamp(ts_sz > 0 &&
- !ioptions.persist_user_defined_timestamps) {
- // TODO (hx235): pass in `WriteOptions` instead of `rate_limiter_priority`
- // during construction
- write_options.rate_limiter_priority = io_priority;
- }
- std::unique_ptr<WritableFileWriter> file_writer;
- std::unique_ptr<TableBuilder> builder;
- EnvOptions env_options;
- ImmutableOptions ioptions;
- MutableCFOptions mutable_cf_options;
- Env::IOPriority io_priority;
- WriteOptions write_options;
- InternalKeyComparator internal_comparator;
- ExternalSstFileInfo file_info;
- InternalKey ikey;
- std::string column_family_name;
- ColumnFamilyHandle* cfh;
- // If true, We will give the OS a hint that this file pages is not needed
- // every time we write 1MB to the file.
- bool invalidate_page_cache;
- // The size of the file during the last time we called Fadvise to remove
- // cached pages from page cache.
- uint64_t last_fadvise_size = 0;
- bool skip_filters;
- std::string db_session_id;
- uint64_t next_file_number = 1;
- size_t ts_sz;
- bool strip_timestamp;
- Status AddImpl(const Slice& user_key, const Slice& value,
- ValueType value_type) {
- if (!builder) {
- return Status::InvalidArgument("File is not opened");
- }
- if (!builder->status().ok()) {
- return builder->status();
- }
- assert(user_key.size() >= ts_sz);
- if (strip_timestamp) {
- // In this mode, we expect users to always provide a min timestamp.
- if (internal_comparator.user_comparator()->CompareTimestamp(
- Slice(user_key.data() + user_key.size() - ts_sz, ts_sz),
- MinU64Ts()) != 0) {
- return Status::InvalidArgument(
- "persist_user_defined_timestamps flag is set to false, only "
- "minimum timestamp is accepted.");
- }
- }
- if (file_info.num_entries == 0) {
- file_info.smallest_key.assign(user_key.data(), user_key.size());
- } else {
- if (internal_comparator.user_comparator()->Compare(
- user_key, file_info.largest_key) <= 0) {
- // Make sure that keys are added in order
- return Status::InvalidArgument(
- "Keys must be added in strict ascending order.");
- }
- }
- assert(value_type == kTypeValue || value_type == kTypeMerge ||
- value_type == kTypeDeletion ||
- value_type == kTypeDeletionWithTimestamp ||
- value_type == kTypeWideColumnEntity);
- constexpr SequenceNumber sequence_number = 0;
- ikey.Set(user_key, sequence_number, value_type);
- builder->Add(ikey.Encode(), value);
- // update file info
- file_info.num_entries++;
- file_info.largest_key.assign(user_key.data(), user_key.size());
- file_info.file_size = builder->FileSize();
- InvalidatePageCache(false /* closing */).PermitUncheckedError();
- return builder->status();
- }
- Status Add(const Slice& user_key, const Slice& value, ValueType value_type) {
- if (internal_comparator.user_comparator()->timestamp_size() != 0) {
- return Status::InvalidArgument("Timestamp size mismatch");
- }
- return AddImpl(user_key, value, value_type);
- }
- Status Add(const Slice& user_key, const Slice& timestamp, const Slice& value,
- ValueType value_type) {
- const size_t timestamp_size = timestamp.size();
- if (internal_comparator.user_comparator()->timestamp_size() !=
- timestamp_size) {
- return Status::InvalidArgument("Timestamp size mismatch");
- }
- const size_t user_key_size = user_key.size();
- if (user_key.data() + user_key_size == timestamp.data()) {
- Slice user_key_with_ts(user_key.data(), user_key_size + timestamp_size);
- return AddImpl(user_key_with_ts, value, value_type);
- }
- std::string user_key_with_ts;
- user_key_with_ts.reserve(user_key_size + timestamp_size);
- user_key_with_ts.append(user_key.data(), user_key_size);
- user_key_with_ts.append(timestamp.data(), timestamp_size);
- return AddImpl(user_key_with_ts, value, value_type);
- }
- Status AddEntity(const Slice& user_key, const WideColumns& columns) {
- WideColumns sorted_columns(columns);
- WideColumnsHelper::SortColumns(sorted_columns);
- std::string entity;
- const Status s = WideColumnSerialization::Serialize(sorted_columns, entity);
- if (!s.ok()) {
- return s;
- }
- if (entity.size() > size_t{std::numeric_limits<uint32_t>::max()}) {
- return Status::InvalidArgument("wide column entity is too large");
- }
- return Add(user_key, entity, kTypeWideColumnEntity);
- }
- Status DeleteRangeImpl(const Slice& begin_key, const Slice& end_key) {
- if (!builder) {
- return Status::InvalidArgument("File is not opened");
- }
- int cmp = internal_comparator.user_comparator()->CompareWithoutTimestamp(
- begin_key, end_key);
- if (cmp > 0) {
- // It's an empty range where endpoints appear mistaken. Don't bother
- // applying it to the DB, and return an error to the user.
- return Status::InvalidArgument("end key comes before start key");
- } else if (cmp == 0) {
- // It's an empty range. Don't bother applying it to the DB.
- return Status::OK();
- }
- assert(begin_key.size() >= ts_sz);
- assert(end_key.size() >= ts_sz);
- Slice begin_key_ts =
- Slice(begin_key.data() + begin_key.size() - ts_sz, ts_sz);
- Slice end_key_ts = Slice(end_key.data() + end_key.size() - ts_sz, ts_sz);
- assert(begin_key_ts.compare(end_key_ts) == 0);
- if (strip_timestamp) {
- // In this mode, we expect users to always provide a min timestamp.
- if (internal_comparator.user_comparator()->CompareTimestamp(
- begin_key_ts, MinU64Ts()) != 0) {
- return Status::InvalidArgument(
- "persist_user_defined_timestamps flag is set to false, only "
- "minimum timestamp is accepted for start key.");
- }
- if (internal_comparator.user_comparator()->CompareTimestamp(
- end_key_ts, MinU64Ts()) != 0) {
- return Status::InvalidArgument(
- "persist_user_defined_timestamps flag is set to false, only "
- "minimum timestamp is accepted for end key.");
- }
- }
- RangeTombstone tombstone(begin_key, end_key, 0 /* Sequence Number */);
- if (file_info.num_range_del_entries == 0) {
- file_info.smallest_range_del_key.assign(tombstone.start_key_.data(),
- tombstone.start_key_.size());
- file_info.largest_range_del_key.assign(tombstone.end_key_.data(),
- tombstone.end_key_.size());
- } else {
- if (internal_comparator.user_comparator()->Compare(
- tombstone.start_key_, file_info.smallest_range_del_key) < 0) {
- file_info.smallest_range_del_key.assign(tombstone.start_key_.data(),
- tombstone.start_key_.size());
- }
- if (internal_comparator.user_comparator()->Compare(
- tombstone.end_key_, file_info.largest_range_del_key) > 0) {
- file_info.largest_range_del_key.assign(tombstone.end_key_.data(),
- tombstone.end_key_.size());
- }
- }
- auto ikey_and_end_key = tombstone.Serialize();
- builder->Add(ikey_and_end_key.first.Encode(), ikey_and_end_key.second);
- // update file info
- file_info.num_range_del_entries++;
- file_info.file_size = builder->FileSize();
- InvalidatePageCache(false /* closing */).PermitUncheckedError();
- return Status::OK();
- }
- Status DeleteRange(const Slice& begin_key, const Slice& end_key) {
- if (internal_comparator.user_comparator()->timestamp_size() != 0) {
- return Status::InvalidArgument("Timestamp size mismatch");
- }
- return DeleteRangeImpl(begin_key, end_key);
- }
- // begin_key and end_key should be users keys without timestamp.
- Status DeleteRange(const Slice& begin_key, const Slice& end_key,
- const Slice& timestamp) {
- const size_t timestamp_size = timestamp.size();
- if (internal_comparator.user_comparator()->timestamp_size() !=
- timestamp_size) {
- return Status::InvalidArgument("Timestamp size mismatch");
- }
- const size_t begin_key_size = begin_key.size();
- const size_t end_key_size = end_key.size();
- if (begin_key.data() + begin_key_size == timestamp.data() ||
- end_key.data() + begin_key_size == timestamp.data()) {
- assert(memcmp(begin_key.data() + begin_key_size,
- end_key.data() + end_key_size, timestamp_size) == 0);
- Slice begin_key_with_ts(begin_key.data(),
- begin_key_size + timestamp_size);
- Slice end_key_with_ts(end_key.data(), end_key.size() + timestamp_size);
- return DeleteRangeImpl(begin_key_with_ts, end_key_with_ts);
- }
- std::string begin_key_with_ts;
- begin_key_with_ts.reserve(begin_key_size + timestamp_size);
- begin_key_with_ts.append(begin_key.data(), begin_key_size);
- begin_key_with_ts.append(timestamp.data(), timestamp_size);
- std::string end_key_with_ts;
- end_key_with_ts.reserve(end_key_size + timestamp_size);
- end_key_with_ts.append(end_key.data(), end_key_size);
- end_key_with_ts.append(timestamp.data(), timestamp_size);
- return DeleteRangeImpl(begin_key_with_ts, end_key_with_ts);
- }
- Status InvalidatePageCache(bool closing) {
- Status s = Status::OK();
- if (invalidate_page_cache == false) {
- // Fadvise disabled
- return s;
- }
- uint64_t bytes_since_last_fadvise = builder->FileSize() - last_fadvise_size;
- if (bytes_since_last_fadvise > kFadviseTrigger || closing) {
- TEST_SYNC_POINT_CALLBACK("SstFileWriter::Rep::InvalidatePageCache",
- &(bytes_since_last_fadvise));
- // Tell the OS that we don't need this file in page cache
- s = file_writer->InvalidateCache(0, 0);
- if (s.IsNotSupported()) {
- // NotSupported is fine as it could be a file type that doesn't use page
- // cache.
- s = Status::OK();
- }
- last_fadvise_size = builder->FileSize();
- }
- return s;
- }
- };
- SstFileWriter::SstFileWriter(const EnvOptions& env_options,
- const Options& options,
- const Comparator* user_comparator,
- ColumnFamilyHandle* column_family,
- bool invalidate_page_cache,
- Env::IOPriority io_priority, bool skip_filters)
- : rep_(new Rep(env_options, options, io_priority, user_comparator,
- column_family, invalidate_page_cache, skip_filters,
- DBImpl::GenerateDbSessionId(options.env))) {
- // SstFileWriter is used to create sst files that can be added to database
- // later. Therefore, no real db_id and db_session_id are associated with it.
- // Here we mimic the way db_session_id behaves by getting a db_session_id
- // for each SstFileWriter, and (later below) assign unique file numbers
- // in the table properties. The db_id is set to be "SST Writer" for clarity.
- rep_->file_info.file_size = 0;
- }
- SstFileWriter::~SstFileWriter() {
- if (rep_->builder) {
- // User did not call Finish() or Finish() failed, we need to
- // abandon the builder.
- rep_->builder->Abandon();
- }
- }
- Status SstFileWriter::Open(const std::string& file_path, Temperature temp) {
- Rep* r = rep_.get();
- Status s;
- std::unique_ptr<FSWritableFile> sst_file;
- FileOptions cur_file_opts(r->env_options);
- cur_file_opts.temperature = temp;
- s = r->ioptions.env->GetFileSystem()->NewWritableFile(
- file_path, cur_file_opts, &sst_file, nullptr);
- if (!s.ok()) {
- return s;
- }
- sst_file->SetIOPriority(r->io_priority);
- CompressionType compression_type;
- CompressionOptions compression_opts;
- if (r->mutable_cf_options.bottommost_compression !=
- kDisableCompressionOption) {
- compression_type = r->mutable_cf_options.bottommost_compression;
- if (r->mutable_cf_options.bottommost_compression_opts.enabled) {
- compression_opts = r->mutable_cf_options.bottommost_compression_opts;
- } else {
- compression_opts = r->mutable_cf_options.compression_opts;
- }
- } else if (!r->mutable_cf_options.compression_per_level.empty()) {
- // Use the compression of the last level if we have per level compression
- compression_type = *(r->mutable_cf_options.compression_per_level.rbegin());
- compression_opts = r->mutable_cf_options.compression_opts;
- } else {
- compression_type = r->mutable_cf_options.compression;
- compression_opts = r->mutable_cf_options.compression_opts;
- }
- InternalTblPropCollFactories internal_tbl_prop_coll_factories;
- // SstFileWriter properties collector to add SstFileWriter version.
- internal_tbl_prop_coll_factories.emplace_back(
- new SstFileWriterPropertiesCollectorFactory(2 /* version */,
- 0 /* global_seqno*/));
- // User collector factories
- auto user_collector_factories =
- r->ioptions.table_properties_collector_factories;
- for (size_t i = 0; i < user_collector_factories.size(); i++) {
- internal_tbl_prop_coll_factories.emplace_back(
- new UserKeyTablePropertiesCollectorFactory(
- user_collector_factories[i]));
- }
- int unknown_level = -1;
- uint32_t cf_id;
- if (r->cfh != nullptr) {
- // user explicitly specified that this file will be ingested into cfh,
- // we can persist this information in the file.
- cf_id = r->cfh->GetID();
- r->column_family_name = r->cfh->GetName();
- } else {
- r->column_family_name = "";
- cf_id = TablePropertiesCollectorFactory::Context::kUnknownColumnFamily;
- }
- // TODO: it would be better to set oldest_key_time to be used for getting the
- // approximate time of ingested keys.
- // TODO: plumb Env::IOActivity, Env::IOPriority
- TableBuilderOptions table_builder_options(
- r->ioptions, r->mutable_cf_options, ReadOptions(), r->write_options,
- r->internal_comparator, &internal_tbl_prop_coll_factories,
- compression_type, compression_opts, cf_id, r->column_family_name,
- unknown_level, kUnknownNewestKeyTime, false /* is_bottommost */,
- TableFileCreationReason::kMisc, 0 /* oldest_key_time */,
- 0 /* file_creation_time */, "SST Writer" /* db_id */, r->db_session_id,
- 0 /* target_file_size */, r->next_file_number);
- // External SST files used to each get a unique session id. Now for
- // slightly better uniqueness probability in constructing cache keys, we
- // assign fake file numbers to each file (into table properties) and keep
- // the same session id for the life of the SstFileWriter.
- r->next_file_number++;
- // XXX: when we can remove skip_filters from the SstFileWriter public API
- // we can remove it from TableBuilderOptions.
- table_builder_options.skip_filters = r->skip_filters;
- FileTypeSet tmp_set = r->ioptions.checksum_handoff_file_types;
- r->file_writer.reset(new WritableFileWriter(
- std::move(sst_file), file_path, r->env_options, r->ioptions.clock,
- nullptr /* io_tracer */, r->ioptions.stats, Histograms::SST_WRITE_MICROS,
- r->ioptions.listeners, r->ioptions.file_checksum_gen_factory.get(),
- tmp_set.Contains(FileType::kTableFile), false));
- // TODO(tec) : If table_factory is using compressed block cache, we will
- // be adding the external sst file blocks into it, which is wasteful.
- r->builder.reset(r->mutable_cf_options.table_factory->NewTableBuilder(
- table_builder_options, r->file_writer.get()));
- r->file_info = ExternalSstFileInfo();
- r->file_info.file_path = file_path;
- r->file_info.version = 2;
- return s;
- }
- Status SstFileWriter::Add(const Slice& user_key, const Slice& value) {
- return rep_->Add(user_key, value, ValueType::kTypeValue);
- }
- Status SstFileWriter::Put(const Slice& user_key, const Slice& value) {
- return rep_->Add(user_key, value, ValueType::kTypeValue);
- }
- Status SstFileWriter::Put(const Slice& user_key, const Slice& timestamp,
- const Slice& value) {
- return rep_->Add(user_key, timestamp, value, ValueType::kTypeValue);
- }
- Status SstFileWriter::PutEntity(const Slice& user_key,
- const WideColumns& columns) {
- return rep_->AddEntity(user_key, columns);
- }
- Status SstFileWriter::Merge(const Slice& user_key, const Slice& value) {
- return rep_->Add(user_key, value, ValueType::kTypeMerge);
- }
- Status SstFileWriter::Delete(const Slice& user_key) {
- return rep_->Add(user_key, Slice(), ValueType::kTypeDeletion);
- }
- Status SstFileWriter::Delete(const Slice& user_key, const Slice& timestamp) {
- return rep_->Add(user_key, timestamp, Slice(),
- ValueType::kTypeDeletionWithTimestamp);
- }
- Status SstFileWriter::DeleteRange(const Slice& begin_key,
- const Slice& end_key) {
- return rep_->DeleteRange(begin_key, end_key);
- }
- Status SstFileWriter::DeleteRange(const Slice& begin_key, const Slice& end_key,
- const Slice& timestamp) {
- return rep_->DeleteRange(begin_key, end_key, timestamp);
- }
- Status SstFileWriter::Finish(ExternalSstFileInfo* file_info) {
- Rep* r = rep_.get();
- if (!r->builder) {
- return Status::InvalidArgument("File is not opened");
- }
- if (r->file_info.num_entries == 0 &&
- r->file_info.num_range_del_entries == 0) {
- r->builder->status().PermitUncheckedError();
- return Status::InvalidArgument("Cannot create sst file with no entries");
- }
- Status s = r->builder->Finish();
- r->file_info.file_size = r->builder->FileSize();
- IOOptions opts;
- if (s.ok()) {
- s = WritableFileWriter::PrepareIOOptions(r->write_options, opts);
- }
- if (s.ok()) {
- s = r->file_writer->Sync(opts, r->ioptions.use_fsync);
- r->InvalidatePageCache(true /* closing */).PermitUncheckedError();
- if (s.ok()) {
- s = r->file_writer->Close(opts);
- }
- }
- if (s.ok()) {
- r->file_info.file_checksum = r->file_writer->GetFileChecksum();
- r->file_info.file_checksum_func_name =
- r->file_writer->GetFileChecksumFuncName();
- }
- if (!s.ok()) {
- Status status = r->ioptions.env->DeleteFile(r->file_info.file_path);
- // Silence ASSERT_STATUS_CHECKED warning, since DeleteFile may fail under
- // some error injection, and we can just ignore the failure
- status.PermitUncheckedError();
- }
- if (file_info != nullptr) {
- *file_info = r->file_info;
- Slice smallest_key = r->file_info.smallest_key;
- Slice largest_key = r->file_info.largest_key;
- Slice smallest_range_del_key = r->file_info.smallest_range_del_key;
- Slice largest_range_del_key = r->file_info.largest_range_del_key;
- assert(smallest_key.empty() == largest_key.empty());
- assert(smallest_range_del_key.empty() == largest_range_del_key.empty());
- // Remove user-defined timestamps from external file metadata too when they
- // should not be persisted.
- if (r->strip_timestamp) {
- if (!smallest_key.empty()) {
- assert(smallest_key.size() >= r->ts_sz);
- assert(largest_key.size() >= r->ts_sz);
- file_info->smallest_key.resize(smallest_key.size() - r->ts_sz);
- file_info->largest_key.resize(largest_key.size() - r->ts_sz);
- }
- if (!smallest_range_del_key.empty()) {
- assert(smallest_range_del_key.size() >= r->ts_sz);
- assert(largest_range_del_key.size() >= r->ts_sz);
- file_info->smallest_range_del_key.resize(smallest_range_del_key.size() -
- r->ts_sz);
- file_info->largest_range_del_key.resize(largest_range_del_key.size() -
- r->ts_sz);
- }
- }
- }
- r->builder.reset();
- return s;
- }
- uint64_t SstFileWriter::FileSize() { return rep_->file_info.file_size; }
- bool SstFileWriter::CreatedBySstFileWriter(const TableProperties& tp) {
- const auto& uprops = tp.user_collected_properties;
- return uprops.find(ExternalSstFilePropertyNames::kVersion) != uprops.end();
- }
- } // namespace ROCKSDB_NAMESPACE
|