| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350 |
- // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
- // This source code is licensed under both the GPLv2 (found in the
- // COPYING file in the root directory) and Apache 2.0 License
- // (found in the LICENSE.Apache file in the root directory).
- #include "db/event_helpers.h"
- #include "rocksdb/convenience.h"
- #include "rocksdb/listener.h"
- #include "rocksdb/utilities/customizable_util.h"
- namespace ROCKSDB_NAMESPACE {
- Status EventListener::CreateFromString(const ConfigOptions& config_options,
- const std::string& id,
- std::shared_ptr<EventListener>* result) {
- return LoadSharedObject<EventListener>(config_options, id, result);
- }
- namespace {
- template <class T>
- inline T SafeDivide(T a, T b) {
- return b == 0 ? 0 : a / b;
- }
- } // anonymous namespace
- void EventHelpers::AppendCurrentTime(JSONWriter* jwriter) {
- *jwriter << "time_micros"
- << std::chrono::duration_cast<std::chrono::microseconds>(
- std::chrono::system_clock::now().time_since_epoch())
- .count();
- }
- void EventHelpers::NotifyTableFileCreationStarted(
- const std::vector<std::shared_ptr<EventListener>>& listeners,
- const std::string& db_name, const std::string& cf_name,
- const std::string& file_path, int job_id, TableFileCreationReason reason) {
- if (listeners.empty()) {
- return;
- }
- TableFileCreationBriefInfo info;
- info.db_name = db_name;
- info.cf_name = cf_name;
- info.file_path = file_path;
- info.job_id = job_id;
- info.reason = reason;
- for (auto& listener : listeners) {
- listener->OnTableFileCreationStarted(info);
- }
- }
- void EventHelpers::NotifyOnBackgroundError(
- const std::vector<std::shared_ptr<EventListener>>& listeners,
- BackgroundErrorReason reason, Status* bg_error, InstrumentedMutex* db_mutex,
- bool* auto_recovery) {
- if (listeners.empty()) {
- return;
- }
- db_mutex->AssertHeld();
- // release lock while notifying events
- db_mutex->Unlock();
- for (auto& listener : listeners) {
- listener->OnBackgroundError(reason, bg_error);
- bg_error->PermitUncheckedError();
- if (*auto_recovery) {
- listener->OnErrorRecoveryBegin(reason, *bg_error, auto_recovery);
- }
- }
- db_mutex->Lock();
- }
- void EventHelpers::LogAndNotifyTableFileCreationFinished(
- EventLogger* event_logger,
- const std::vector<std::shared_ptr<EventListener>>& listeners,
- const std::string& db_name, const std::string& cf_name,
- const std::string& file_path, int job_id, const FileDescriptor& fd,
- uint64_t oldest_blob_file_number, const TableProperties& table_properties,
- TableFileCreationReason reason, const Status& s,
- const std::string& file_checksum,
- const std::string& file_checksum_func_name) {
- if (!event_logger && listeners.empty()) {
- s.PermitUncheckedError();
- return;
- }
- if (event_logger) {
- JSONWriter jwriter;
- AppendCurrentTime(&jwriter);
- jwriter << "cf_name" << cf_name << "job" << job_id << "event"
- << "table_file_creation" << "file_number" << fd.GetNumber()
- << "file_size" << fd.GetFileSize() << "file_checksum"
- << Slice(file_checksum).ToString(true) << "file_checksum_func_name"
- << file_checksum_func_name << "smallest_seqno" << fd.smallest_seqno
- << "largest_seqno" << fd.largest_seqno;
- // table_properties
- {
- jwriter << "table_properties";
- jwriter.StartObject();
- // basic properties:
- jwriter << "data_size" << table_properties.data_size << "index_size"
- << table_properties.index_size << "index_partitions"
- << table_properties.index_partitions << "top_level_index_size"
- << table_properties.top_level_index_size
- << "index_key_is_user_key"
- << table_properties.index_key_is_user_key
- << "index_value_is_delta_encoded"
- << table_properties.index_value_is_delta_encoded << "filter_size"
- << table_properties.filter_size << "raw_key_size"
- << table_properties.raw_key_size << "raw_average_key_size"
- << SafeDivide(table_properties.raw_key_size,
- table_properties.num_entries)
- << "raw_value_size" << table_properties.raw_value_size
- << "raw_average_value_size"
- << SafeDivide(table_properties.raw_value_size,
- table_properties.num_entries)
- << "num_data_blocks" << table_properties.num_data_blocks
- << "num_entries" << table_properties.num_entries
- << "num_filter_entries" << table_properties.num_filter_entries
- << "num_deletions" << table_properties.num_deletions
- << "num_merge_operands" << table_properties.num_merge_operands
- << "num_range_deletions" << table_properties.num_range_deletions
- << "format_version" << table_properties.format_version
- << "fixed_key_len" << table_properties.fixed_key_len
- << "filter_policy" << table_properties.filter_policy_name
- << "column_family_name" << table_properties.column_family_name
- << "column_family_id" << table_properties.column_family_id
- << "comparator" << table_properties.comparator_name
- << "user_defined_timestamps_persisted"
- << table_properties.user_defined_timestamps_persisted
- << "key_largest_seqno" << table_properties.key_largest_seqno
- << "key_smallest_seqno" << table_properties.key_smallest_seqno
- << "merge_operator" << table_properties.merge_operator_name
- << "prefix_extractor_name"
- << table_properties.prefix_extractor_name << "property_collectors"
- << table_properties.property_collectors_names << "compression"
- << table_properties.compression_name << "compression_options"
- << table_properties.compression_options << "creation_time"
- << table_properties.creation_time << "oldest_key_time"
- << table_properties.newest_key_time << "newest_key_time"
- << table_properties.oldest_key_time << "file_creation_time"
- << table_properties.file_creation_time
- << "slow_compression_estimated_data_size"
- << table_properties.slow_compression_estimated_data_size
- << "fast_compression_estimated_data_size"
- << table_properties.fast_compression_estimated_data_size
- << "db_id" << table_properties.db_id << "db_session_id"
- << table_properties.db_session_id << "orig_file_number"
- << table_properties.orig_file_number << "seqno_to_time_mapping";
- if (table_properties.seqno_to_time_mapping.empty()) {
- jwriter << "N/A";
- } else {
- SeqnoToTimeMapping tmp;
- Status status = tmp.DecodeFrom(table_properties.seqno_to_time_mapping);
- if (status.ok()) {
- jwriter << tmp.ToHumanString();
- } else {
- jwriter << "Invalid";
- }
- }
- // user collected properties
- for (const auto& prop : table_properties.readable_properties) {
- jwriter << prop.first << prop.second;
- }
- jwriter.EndObject();
- }
- if (oldest_blob_file_number != kInvalidBlobFileNumber) {
- jwriter << "oldest_blob_file_number" << oldest_blob_file_number;
- }
- jwriter << "status" << s.ToString();
- jwriter.EndObject();
- event_logger->Log(jwriter);
- }
- if (listeners.empty()) {
- return;
- }
- TableFileCreationInfo info;
- info.db_name = db_name;
- info.cf_name = cf_name;
- info.file_path = file_path;
- info.file_size = fd.file_size;
- info.job_id = job_id;
- info.table_properties = table_properties;
- info.reason = reason;
- info.status = s;
- info.file_checksum = file_checksum;
- info.file_checksum_func_name = file_checksum_func_name;
- for (auto& listener : listeners) {
- listener->OnTableFileCreated(info);
- }
- info.status.PermitUncheckedError();
- }
- void EventHelpers::LogAndNotifyTableFileDeletion(
- EventLogger* event_logger, int job_id, uint64_t file_number,
- const std::string& file_path, const Status& status,
- const std::string& dbname,
- const std::vector<std::shared_ptr<EventListener>>& listeners) {
- if (!event_logger && listeners.empty()) {
- status.PermitUncheckedError();
- return;
- }
- if (event_logger) {
- JSONWriter jwriter;
- AppendCurrentTime(&jwriter);
- jwriter << "job" << job_id << "event" << "table_file_deletion"
- << "file_number" << file_number << "status" << status.ToString();
- jwriter.EndObject();
- event_logger->Log(jwriter);
- }
- if (listeners.empty()) {
- return;
- }
- TableFileDeletionInfo info;
- info.db_name = dbname;
- info.job_id = job_id;
- info.file_path = file_path;
- info.status = status;
- for (auto& listener : listeners) {
- listener->OnTableFileDeleted(info);
- }
- info.status.PermitUncheckedError();
- }
- void EventHelpers::NotifyOnErrorRecoveryEnd(
- const std::vector<std::shared_ptr<EventListener>>& listeners,
- const Status& old_bg_error, const Status& new_bg_error,
- InstrumentedMutex* db_mutex) {
- if (!listeners.empty()) {
- db_mutex->AssertHeld();
- // Make copies before releasing mutex to avoid race.
- Status old_bg_error_cp = old_bg_error;
- Status new_bg_error_cp = new_bg_error;
- // release lock while notifying events
- db_mutex->Unlock();
- TEST_SYNC_POINT("NotifyOnErrorRecoveryEnd:MutexUnlocked:1");
- TEST_SYNC_POINT("NotifyOnErrorRecoveryEnd:MutexUnlocked:2");
- for (auto& listener : listeners) {
- BackgroundErrorRecoveryInfo info;
- info.old_bg_error = old_bg_error_cp;
- info.new_bg_error = new_bg_error_cp;
- listener->OnErrorRecoveryCompleted(old_bg_error_cp);
- listener->OnErrorRecoveryEnd(info);
- info.old_bg_error.PermitUncheckedError();
- info.new_bg_error.PermitUncheckedError();
- }
- db_mutex->Lock();
- } else {
- old_bg_error.PermitUncheckedError();
- }
- }
- void EventHelpers::NotifyBlobFileCreationStarted(
- const std::vector<std::shared_ptr<EventListener>>& listeners,
- const std::string& db_name, const std::string& cf_name,
- const std::string& file_path, int job_id,
- BlobFileCreationReason creation_reason) {
- if (listeners.empty()) {
- return;
- }
- BlobFileCreationBriefInfo info(db_name, cf_name, file_path, job_id,
- creation_reason);
- for (const auto& listener : listeners) {
- listener->OnBlobFileCreationStarted(info);
- }
- }
- void EventHelpers::LogAndNotifyBlobFileCreationFinished(
- EventLogger* event_logger,
- const std::vector<std::shared_ptr<EventListener>>& listeners,
- const std::string& db_name, const std::string& cf_name,
- const std::string& file_path, int job_id, uint64_t file_number,
- BlobFileCreationReason creation_reason, const Status& s,
- const std::string& file_checksum,
- const std::string& file_checksum_func_name, uint64_t total_blob_count,
- uint64_t total_blob_bytes) {
- if (!event_logger && listeners.empty()) {
- s.PermitUncheckedError();
- return;
- }
- if (event_logger) {
- JSONWriter jwriter;
- AppendCurrentTime(&jwriter);
- jwriter << "cf_name" << cf_name << "job" << job_id << "event"
- << "blob_file_creation" << "file_number" << file_number
- << "total_blob_count" << total_blob_count << "total_blob_bytes"
- << total_blob_bytes << "file_checksum" << file_checksum
- << "file_checksum_func_name" << file_checksum_func_name << "status"
- << s.ToString();
- jwriter.EndObject();
- event_logger->Log(jwriter);
- }
- if (listeners.empty()) {
- return;
- }
- BlobFileCreationInfo info(db_name, cf_name, file_path, job_id,
- creation_reason, total_blob_count, total_blob_bytes,
- s, file_checksum, file_checksum_func_name);
- for (const auto& listener : listeners) {
- listener->OnBlobFileCreated(info);
- }
- info.status.PermitUncheckedError();
- }
- void EventHelpers::LogAndNotifyBlobFileDeletion(
- EventLogger* event_logger,
- const std::vector<std::shared_ptr<EventListener>>& listeners, int job_id,
- uint64_t file_number, const std::string& file_path, const Status& status,
- const std::string& dbname) {
- if (!event_logger && listeners.empty()) {
- status.PermitUncheckedError();
- return;
- }
- if (event_logger) {
- JSONWriter jwriter;
- AppendCurrentTime(&jwriter);
- jwriter << "job" << job_id << "event" << "blob_file_deletion"
- << "file_number" << file_number << "status" << status.ToString();
- jwriter.EndObject();
- event_logger->Log(jwriter);
- }
- if (listeners.empty()) {
- return;
- }
- BlobFileDeletionInfo info(dbname, file_path, job_id, status);
- for (const auto& listener : listeners) {
- listener->OnBlobFileDeleted(info);
- }
- info.status.PermitUncheckedError();
- }
- } // namespace ROCKSDB_NAMESPACE
|