event_helpers.cc 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #include "db/event_helpers.h"
  6. #include "rocksdb/convenience.h"
  7. #include "rocksdb/listener.h"
  8. #include "rocksdb/utilities/customizable_util.h"
  9. namespace ROCKSDB_NAMESPACE {
  10. Status EventListener::CreateFromString(const ConfigOptions& config_options,
  11. const std::string& id,
  12. std::shared_ptr<EventListener>* result) {
  13. return LoadSharedObject<EventListener>(config_options, id, result);
  14. }
  15. namespace {
  16. template <class T>
  17. inline T SafeDivide(T a, T b) {
  18. return b == 0 ? 0 : a / b;
  19. }
  20. } // anonymous namespace
  21. void EventHelpers::AppendCurrentTime(JSONWriter* jwriter) {
  22. *jwriter << "time_micros"
  23. << std::chrono::duration_cast<std::chrono::microseconds>(
  24. std::chrono::system_clock::now().time_since_epoch())
  25. .count();
  26. }
  27. void EventHelpers::NotifyTableFileCreationStarted(
  28. const std::vector<std::shared_ptr<EventListener>>& listeners,
  29. const std::string& db_name, const std::string& cf_name,
  30. const std::string& file_path, int job_id, TableFileCreationReason reason) {
  31. if (listeners.empty()) {
  32. return;
  33. }
  34. TableFileCreationBriefInfo info;
  35. info.db_name = db_name;
  36. info.cf_name = cf_name;
  37. info.file_path = file_path;
  38. info.job_id = job_id;
  39. info.reason = reason;
  40. for (auto& listener : listeners) {
  41. listener->OnTableFileCreationStarted(info);
  42. }
  43. }
  44. void EventHelpers::NotifyOnBackgroundError(
  45. const std::vector<std::shared_ptr<EventListener>>& listeners,
  46. BackgroundErrorReason reason, Status* bg_error, InstrumentedMutex* db_mutex,
  47. bool* auto_recovery) {
  48. if (listeners.empty()) {
  49. return;
  50. }
  51. db_mutex->AssertHeld();
  52. // release lock while notifying events
  53. db_mutex->Unlock();
  54. for (auto& listener : listeners) {
  55. listener->OnBackgroundError(reason, bg_error);
  56. bg_error->PermitUncheckedError();
  57. if (*auto_recovery) {
  58. listener->OnErrorRecoveryBegin(reason, *bg_error, auto_recovery);
  59. }
  60. }
  61. db_mutex->Lock();
  62. }
  63. void EventHelpers::LogAndNotifyTableFileCreationFinished(
  64. EventLogger* event_logger,
  65. const std::vector<std::shared_ptr<EventListener>>& listeners,
  66. const std::string& db_name, const std::string& cf_name,
  67. const std::string& file_path, int job_id, const FileDescriptor& fd,
  68. uint64_t oldest_blob_file_number, const TableProperties& table_properties,
  69. TableFileCreationReason reason, const Status& s,
  70. const std::string& file_checksum,
  71. const std::string& file_checksum_func_name) {
  72. if (!event_logger && listeners.empty()) {
  73. s.PermitUncheckedError();
  74. return;
  75. }
  76. if (event_logger) {
  77. JSONWriter jwriter;
  78. AppendCurrentTime(&jwriter);
  79. jwriter << "cf_name" << cf_name << "job" << job_id << "event"
  80. << "table_file_creation" << "file_number" << fd.GetNumber()
  81. << "file_size" << fd.GetFileSize() << "file_checksum"
  82. << Slice(file_checksum).ToString(true) << "file_checksum_func_name"
  83. << file_checksum_func_name << "smallest_seqno" << fd.smallest_seqno
  84. << "largest_seqno" << fd.largest_seqno;
  85. // table_properties
  86. {
  87. jwriter << "table_properties";
  88. jwriter.StartObject();
  89. // basic properties:
  90. jwriter << "data_size" << table_properties.data_size << "index_size"
  91. << table_properties.index_size << "index_partitions"
  92. << table_properties.index_partitions << "top_level_index_size"
  93. << table_properties.top_level_index_size
  94. << "index_key_is_user_key"
  95. << table_properties.index_key_is_user_key
  96. << "index_value_is_delta_encoded"
  97. << table_properties.index_value_is_delta_encoded << "filter_size"
  98. << table_properties.filter_size << "raw_key_size"
  99. << table_properties.raw_key_size << "raw_average_key_size"
  100. << SafeDivide(table_properties.raw_key_size,
  101. table_properties.num_entries)
  102. << "raw_value_size" << table_properties.raw_value_size
  103. << "raw_average_value_size"
  104. << SafeDivide(table_properties.raw_value_size,
  105. table_properties.num_entries)
  106. << "num_data_blocks" << table_properties.num_data_blocks
  107. << "num_entries" << table_properties.num_entries
  108. << "num_filter_entries" << table_properties.num_filter_entries
  109. << "num_deletions" << table_properties.num_deletions
  110. << "num_merge_operands" << table_properties.num_merge_operands
  111. << "num_range_deletions" << table_properties.num_range_deletions
  112. << "format_version" << table_properties.format_version
  113. << "fixed_key_len" << table_properties.fixed_key_len
  114. << "filter_policy" << table_properties.filter_policy_name
  115. << "column_family_name" << table_properties.column_family_name
  116. << "column_family_id" << table_properties.column_family_id
  117. << "comparator" << table_properties.comparator_name
  118. << "user_defined_timestamps_persisted"
  119. << table_properties.user_defined_timestamps_persisted
  120. << "key_largest_seqno" << table_properties.key_largest_seqno
  121. << "key_smallest_seqno" << table_properties.key_smallest_seqno
  122. << "merge_operator" << table_properties.merge_operator_name
  123. << "prefix_extractor_name"
  124. << table_properties.prefix_extractor_name << "property_collectors"
  125. << table_properties.property_collectors_names << "compression"
  126. << table_properties.compression_name << "compression_options"
  127. << table_properties.compression_options << "creation_time"
  128. << table_properties.creation_time << "oldest_key_time"
  129. << table_properties.newest_key_time << "newest_key_time"
  130. << table_properties.oldest_key_time << "file_creation_time"
  131. << table_properties.file_creation_time
  132. << "slow_compression_estimated_data_size"
  133. << table_properties.slow_compression_estimated_data_size
  134. << "fast_compression_estimated_data_size"
  135. << table_properties.fast_compression_estimated_data_size
  136. << "db_id" << table_properties.db_id << "db_session_id"
  137. << table_properties.db_session_id << "orig_file_number"
  138. << table_properties.orig_file_number << "seqno_to_time_mapping";
  139. if (table_properties.seqno_to_time_mapping.empty()) {
  140. jwriter << "N/A";
  141. } else {
  142. SeqnoToTimeMapping tmp;
  143. Status status = tmp.DecodeFrom(table_properties.seqno_to_time_mapping);
  144. if (status.ok()) {
  145. jwriter << tmp.ToHumanString();
  146. } else {
  147. jwriter << "Invalid";
  148. }
  149. }
  150. // user collected properties
  151. for (const auto& prop : table_properties.readable_properties) {
  152. jwriter << prop.first << prop.second;
  153. }
  154. jwriter.EndObject();
  155. }
  156. if (oldest_blob_file_number != kInvalidBlobFileNumber) {
  157. jwriter << "oldest_blob_file_number" << oldest_blob_file_number;
  158. }
  159. jwriter << "status" << s.ToString();
  160. jwriter.EndObject();
  161. event_logger->Log(jwriter);
  162. }
  163. if (listeners.empty()) {
  164. return;
  165. }
  166. TableFileCreationInfo info;
  167. info.db_name = db_name;
  168. info.cf_name = cf_name;
  169. info.file_path = file_path;
  170. info.file_size = fd.file_size;
  171. info.job_id = job_id;
  172. info.table_properties = table_properties;
  173. info.reason = reason;
  174. info.status = s;
  175. info.file_checksum = file_checksum;
  176. info.file_checksum_func_name = file_checksum_func_name;
  177. for (auto& listener : listeners) {
  178. listener->OnTableFileCreated(info);
  179. }
  180. info.status.PermitUncheckedError();
  181. }
  182. void EventHelpers::LogAndNotifyTableFileDeletion(
  183. EventLogger* event_logger, int job_id, uint64_t file_number,
  184. const std::string& file_path, const Status& status,
  185. const std::string& dbname,
  186. const std::vector<std::shared_ptr<EventListener>>& listeners) {
  187. if (!event_logger && listeners.empty()) {
  188. status.PermitUncheckedError();
  189. return;
  190. }
  191. if (event_logger) {
  192. JSONWriter jwriter;
  193. AppendCurrentTime(&jwriter);
  194. jwriter << "job" << job_id << "event" << "table_file_deletion"
  195. << "file_number" << file_number << "status" << status.ToString();
  196. jwriter.EndObject();
  197. event_logger->Log(jwriter);
  198. }
  199. if (listeners.empty()) {
  200. return;
  201. }
  202. TableFileDeletionInfo info;
  203. info.db_name = dbname;
  204. info.job_id = job_id;
  205. info.file_path = file_path;
  206. info.status = status;
  207. for (auto& listener : listeners) {
  208. listener->OnTableFileDeleted(info);
  209. }
  210. info.status.PermitUncheckedError();
  211. }
  212. void EventHelpers::NotifyOnErrorRecoveryEnd(
  213. const std::vector<std::shared_ptr<EventListener>>& listeners,
  214. const Status& old_bg_error, const Status& new_bg_error,
  215. InstrumentedMutex* db_mutex) {
  216. if (!listeners.empty()) {
  217. db_mutex->AssertHeld();
  218. // Make copies before releasing mutex to avoid race.
  219. Status old_bg_error_cp = old_bg_error;
  220. Status new_bg_error_cp = new_bg_error;
  221. // release lock while notifying events
  222. db_mutex->Unlock();
  223. TEST_SYNC_POINT("NotifyOnErrorRecoveryEnd:MutexUnlocked:1");
  224. TEST_SYNC_POINT("NotifyOnErrorRecoveryEnd:MutexUnlocked:2");
  225. for (auto& listener : listeners) {
  226. BackgroundErrorRecoveryInfo info;
  227. info.old_bg_error = old_bg_error_cp;
  228. info.new_bg_error = new_bg_error_cp;
  229. listener->OnErrorRecoveryCompleted(old_bg_error_cp);
  230. listener->OnErrorRecoveryEnd(info);
  231. info.old_bg_error.PermitUncheckedError();
  232. info.new_bg_error.PermitUncheckedError();
  233. }
  234. db_mutex->Lock();
  235. } else {
  236. old_bg_error.PermitUncheckedError();
  237. }
  238. }
  239. void EventHelpers::NotifyBlobFileCreationStarted(
  240. const std::vector<std::shared_ptr<EventListener>>& listeners,
  241. const std::string& db_name, const std::string& cf_name,
  242. const std::string& file_path, int job_id,
  243. BlobFileCreationReason creation_reason) {
  244. if (listeners.empty()) {
  245. return;
  246. }
  247. BlobFileCreationBriefInfo info(db_name, cf_name, file_path, job_id,
  248. creation_reason);
  249. for (const auto& listener : listeners) {
  250. listener->OnBlobFileCreationStarted(info);
  251. }
  252. }
  253. void EventHelpers::LogAndNotifyBlobFileCreationFinished(
  254. EventLogger* event_logger,
  255. const std::vector<std::shared_ptr<EventListener>>& listeners,
  256. const std::string& db_name, const std::string& cf_name,
  257. const std::string& file_path, int job_id, uint64_t file_number,
  258. BlobFileCreationReason creation_reason, const Status& s,
  259. const std::string& file_checksum,
  260. const std::string& file_checksum_func_name, uint64_t total_blob_count,
  261. uint64_t total_blob_bytes) {
  262. if (!event_logger && listeners.empty()) {
  263. s.PermitUncheckedError();
  264. return;
  265. }
  266. if (event_logger) {
  267. JSONWriter jwriter;
  268. AppendCurrentTime(&jwriter);
  269. jwriter << "cf_name" << cf_name << "job" << job_id << "event"
  270. << "blob_file_creation" << "file_number" << file_number
  271. << "total_blob_count" << total_blob_count << "total_blob_bytes"
  272. << total_blob_bytes << "file_checksum" << file_checksum
  273. << "file_checksum_func_name" << file_checksum_func_name << "status"
  274. << s.ToString();
  275. jwriter.EndObject();
  276. event_logger->Log(jwriter);
  277. }
  278. if (listeners.empty()) {
  279. return;
  280. }
  281. BlobFileCreationInfo info(db_name, cf_name, file_path, job_id,
  282. creation_reason, total_blob_count, total_blob_bytes,
  283. s, file_checksum, file_checksum_func_name);
  284. for (const auto& listener : listeners) {
  285. listener->OnBlobFileCreated(info);
  286. }
  287. info.status.PermitUncheckedError();
  288. }
  289. void EventHelpers::LogAndNotifyBlobFileDeletion(
  290. EventLogger* event_logger,
  291. const std::vector<std::shared_ptr<EventListener>>& listeners, int job_id,
  292. uint64_t file_number, const std::string& file_path, const Status& status,
  293. const std::string& dbname) {
  294. if (!event_logger && listeners.empty()) {
  295. status.PermitUncheckedError();
  296. return;
  297. }
  298. if (event_logger) {
  299. JSONWriter jwriter;
  300. AppendCurrentTime(&jwriter);
  301. jwriter << "job" << job_id << "event" << "blob_file_deletion"
  302. << "file_number" << file_number << "status" << status.ToString();
  303. jwriter.EndObject();
  304. event_logger->Log(jwriter);
  305. }
  306. if (listeners.empty()) {
  307. return;
  308. }
  309. BlobFileDeletionInfo info(dbname, file_path, job_id, status);
  310. for (const auto& listener : listeners) {
  311. listener->OnBlobFileDeleted(info);
  312. }
  313. info.status.PermitUncheckedError();
  314. }
  315. } // namespace ROCKSDB_NAMESPACE