trace_analyzer_tool.cc 68 KB


  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. #ifdef GFLAGS
  7. #ifdef NUMA
  8. #include <numa.h>
  9. #endif
  10. #ifndef OS_WIN
  11. #include <unistd.h>
  12. #endif
  13. #include <cinttypes>
  14. #include <cmath>
  15. #include <cstdio>
  16. #include <cstdlib>
  17. #include <memory>
  18. #include <sstream>
  19. #include <stdexcept>
  20. #include "db/db_impl/db_impl.h"
  21. #include "db/memtable.h"
  22. #include "db/write_batch_internal.h"
  23. #include "env/composite_env_wrapper.h"
  24. #include "file/line_file_reader.h"
  25. #include "file/writable_file_writer.h"
  26. #include "options/cf_options.h"
  27. #include "rocksdb/db.h"
  28. #include "rocksdb/env.h"
  29. #include "rocksdb/iterator.h"
  30. #include "rocksdb/slice.h"
  31. #include "rocksdb/slice_transform.h"
  32. #include "rocksdb/status.h"
  33. #include "rocksdb/table_properties.h"
  34. #include "rocksdb/utilities/ldb_cmd.h"
  35. #include "rocksdb/write_batch.h"
  36. #include "table/meta_blocks.h"
  37. #include "table/table_reader.h"
  38. #include "tools/trace_analyzer_tool.h"
  39. #include "trace_replay/trace_replay.h"
  40. #include "util/coding.h"
  41. #include "util/compression.h"
  42. #include "util/gflags_compat.h"
  43. #include "util/random.h"
  44. #include "util/string_util.h"
  45. using GFLAGS_NAMESPACE::ParseCommandLineFlags;
  46. DEFINE_string(trace_path, "", "The trace file path.");
  47. DEFINE_string(output_dir, "", "The directory to store the output files.");
  48. DEFINE_string(output_prefix, "trace",
  49. "The prefix used for all the output files.");
  50. DEFINE_bool(output_key_stats, false,
  51. "Output the key access count statistics to file\n"
  52. "for accessed keys:\n"
  53. "file name: <prefix>-<query_type>-<cf_id>-accessed_key_stats.txt\n"
  54. "Format:[cf_id value_size access_keyid access_count]\n"
  55. "for the whole key space keys:\n"
  56. "File name: <prefix>-<query_type>-<cf_id>-whole_key_stats.txt\n"
  57. "Format:[whole_key_space_keyid access_count]");
  58. DEFINE_bool(output_access_count_stats, false,
  59. "Output the access count distribution statistics to file.\n"
  60. "File name: <prefix>-<query_type>-<cf_id>-accessed_"
  61. "key_count_distribution.txt \n"
  62. "Format:[access_count number_of_access_count]");
  63. DEFINE_bool(output_time_series, false,
  64. "Output the access time in second of each key, "
  65. "such that we can have the time series data of the queries \n"
  66. "File name: <prefix>-<query_type>-<cf_id>-time_series.txt\n"
  67. "Format:[type_id time_in_sec access_keyid].");
  68. DEFINE_bool(try_process_corrupted_trace, false,
  69. "In default, trace_analyzer will exit if the trace file is "
  70. "corrupted due to the unexpected tracing cases. If this option "
  71. "is enabled, trace_analyzer will stop reading the trace file, "
  72. "and start analyzing the read-in data.");
  73. DEFINE_int32(output_prefix_cut, 0,
  74. "The number of bytes as prefix to cut the keys.\n"
  75. "If it is enabled, it will generate the following:\n"
  76. "For accessed keys:\n"
  77. "File name: <prefix>-<query_type>-<cf_id>-"
  78. "accessed_key_prefix_cut.txt \n"
  79. "Format:[acessed_keyid access_count_of_prefix "
  80. "number_of_keys_in_prefix average_key_access "
  81. "prefix_succ_ratio prefix]\n"
  82. "For whole key space keys:\n"
  83. "File name: <prefix>-<query_type>-<cf_id>"
  84. "-whole_key_prefix_cut.txt\n"
  85. "Format:[start_keyid_in_whole_keyspace prefix]\n"
  86. "if 'output_qps_stats' and 'top_k' are enabled, it will output:\n"
  87. "File name: <prefix>-<query_type>-<cf_id>"
  88. "-accessed_top_k_qps_prefix_cut.txt\n"
  89. "Format:[the_top_ith_qps_time QPS], [prefix qps_of_this_second].");
  90. DEFINE_bool(convert_to_human_readable_trace, false,
  91. "Convert the binary trace file to a human readable txt file "
  92. "for further processing. "
  93. "This file will be extremely large "
  94. "(similar size as the original binary trace file). "
  95. "You can specify 'no_key' to reduce the size, if key is not "
  96. "needed in the next step.\n"
  97. "File name: <prefix>_human_readable_trace.txt\n"
  98. "Format:[<key> type_id cf_id value_size time_in_micorsec].");
  99. DEFINE_bool(output_qps_stats, false,
  100. "Output the query per second(qps) statistics \n"
  101. "For the overall qps, it will contain all qps of each query type. "
  102. "The time is started from the first trace record\n"
  103. "File name: <prefix>_qps_stats.txt\n"
  104. "Format: [qps_type_1 qps_type_2 ...... overall_qps]\n"
  105. "For each cf and query, it will have its own qps output.\n"
  106. "File name: <prefix>-<query_type>-<cf_id>_qps_stats.txt \n"
  107. "Format:[query_count_in_this_second].");
  108. DEFINE_bool(no_print, false, "Do not print out any result");
  109. DEFINE_string(
  110. print_correlation, "",
  111. "intput format: [correlation pairs][.,.]\n"
  112. "Output the query correlations between the pairs of query types "
  113. "listed in the parameter, input should select the operations from:\n"
  114. "get, put, delete, single_delete, rangle_delete, merge. No space "
  115. "between the pairs separated by commar. Example: =[get,get]... "
  116. "It will print out the number of pairs of 'A after B' and "
  117. "the average time interval between the two query.");
  118. DEFINE_string(key_space_dir, "",
  119. "<the directory stores full key space files> \n"
  120. "The key space files should be: <column family id>.txt");
  121. DEFINE_bool(analyze_get, false, "Analyze the Get query.");
  122. DEFINE_bool(analyze_put, false, "Analyze the Put query.");
  123. DEFINE_bool(analyze_delete, false, "Analyze the Delete query.");
  124. DEFINE_bool(analyze_single_delete, false, "Analyze the SingleDelete query.");
  125. DEFINE_bool(analyze_range_delete, false, "Analyze the DeleteRange query.");
  126. DEFINE_bool(analyze_merge, false, "Analyze the Merge query.");
  127. DEFINE_bool(analyze_iterator, false,
  128. " Analyze the iterate query like Seek() and SeekForPrev().");
  129. DEFINE_bool(analyze_multiget, false,
  130. " Analyze the MultiGet query. NOTE: for"
  131. " MultiGet, we analyze each KV-pair read in one MultiGet query. "
  132. "Therefore, the total queries and QPS are calculated based on "
  133. "the number of KV-pairs being accessed not the number of MultiGet."
  134. "It can be improved in the future if needed");
  135. DEFINE_bool(no_key, false,
  136. " Does not output the key to the result files to make smaller.");
  137. DEFINE_bool(print_overall_stats, true,
  138. " Print the stats of the whole trace, "
  139. "like total requests, keys, and etc.");
  140. DEFINE_bool(output_key_distribution, false, "Print the key size distribution.");
  141. DEFINE_bool(
  142. output_value_distribution, false,
  143. "Out put the value size distribution, only available for Put and Merge.\n"
  144. "File name: <prefix>-<query_type>-<cf_id>"
  145. "-accessed_value_size_distribution.txt\n"
  146. "Format:[Number_of_value_size_between x and "
  147. "x+value_interval is: <the count>]");
  148. DEFINE_int32(print_top_k_access, 1,
  149. "<top K of the variables to be printed> "
  150. "Print the top k accessed keys, top k accessed prefix "
  151. "and etc.");
  152. DEFINE_int32(output_ignore_count, 0,
  153. "<threshold>, ignores the access count <= this value, "
  154. "it will shorter the output.");
  155. DEFINE_int32(value_interval, 8,
  156. "To output the value distribution, we need to set the value "
  157. "intervals and make the statistic of the value size distribution "
  158. "in different intervals. The default is 8.");
  159. DEFINE_double(sample_ratio, 1.0,
  160. "If the trace size is extremely huge or user want to sample "
  161. "the trace when analyzing, sample ratio can be set (0, 1.0]");
  162. namespace ROCKSDB_NAMESPACE {
  163. const size_t kShadowValueSize = 10;
  164. std::map<std::string, int> taOptToIndex = {
  165. {"get", kGet},
  166. {"put", kPut},
  167. {"delete", kDelete},
  168. {"single_delete", kSingleDelete},
  169. {"range_delete", kRangeDelete},
  170. {"merge", kMerge},
  171. {"iterator_Seek", kIteratorSeek},
  172. {"iterator_SeekForPrev", kIteratorSeekForPrev},
  173. {"multiget", kMultiGet}};
  174. std::map<int, std::string> taIndexToOpt = {
  175. {kGet, "get"},
  176. {kPut, "put"},
  177. {kDelete, "delete"},
  178. {kSingleDelete, "single_delete"},
  179. {kRangeDelete, "range_delete"},
  180. {kMerge, "merge"},
  181. {kIteratorSeek, "iterator_Seek"},
  182. {kIteratorSeekForPrev, "iterator_SeekForPrev"},
  183. {kMultiGet, "multiget"}};
  184. namespace {
  185. uint64_t MultiplyCheckOverflow(uint64_t op1, uint64_t op2) {
  186. if (op1 == 0 || op2 == 0) {
  187. return 0;
  188. }
  189. if (std::numeric_limits<uint64_t>::max() / op1 < op2) {
  190. return op1;
  191. }
  192. return (op1 * op2);
  193. }
  194. } // namespace
  195. // The default constructor of AnalyzerOptions
  196. AnalyzerOptions::AnalyzerOptions()
  197. : correlation_map(kTaTypeNum, std::vector<int>(kTaTypeNum, -1)) {}
  198. AnalyzerOptions::~AnalyzerOptions() = default;
  199. void AnalyzerOptions::SparseCorrelationInput(const std::string& in_str) {
  200. std::string cur = in_str;
  201. if (cur.size() == 0) {
  202. return;
  203. }
  204. while (!cur.empty()) {
  205. if (cur.compare(0, 1, "[") != 0) {
  206. fprintf(stderr, "Invalid correlation input: %s\n", in_str.c_str());
  207. exit(1);
  208. }
  209. std::string opt1, opt2;
  210. std::size_t split = cur.find_first_of(',');
  211. if (split != std::string::npos) {
  212. opt1 = cur.substr(1, split - 1);
  213. } else {
  214. fprintf(stderr, "Invalid correlation input: %s\n", in_str.c_str());
  215. exit(1);
  216. }
  217. std::size_t end = cur.find_first_of(']');
  218. if (end != std::string::npos) {
  219. opt2 = cur.substr(split + 1, end - split - 1);
  220. } else {
  221. fprintf(stderr, "Invalid correlation input: %s\n", in_str.c_str());
  222. exit(1);
  223. }
  224. cur = cur.substr(end + 1);
  225. if (taOptToIndex.find(opt1) != taOptToIndex.end() &&
  226. taOptToIndex.find(opt2) != taOptToIndex.end()) {
  227. correlation_list.emplace_back(taOptToIndex[opt1], taOptToIndex[opt2]);
  228. } else {
  229. fprintf(stderr, "Invalid correlation input: %s\n", in_str.c_str());
  230. exit(1);
  231. }
  232. }
  233. int sequence = 0;
  234. for (auto& it : correlation_list) {
  235. correlation_map[it.first][it.second] = sequence;
  236. sequence++;
  237. }
  238. }
  239. // The trace statistic struct constructor
  240. TraceStats::TraceStats() {
  241. cf_id = 0;
  242. cf_name = "0";
  243. a_count = 0;
  244. a_key_id = 0;
  245. a_key_size_sqsum = 0;
  246. a_key_size_sum = 0;
  247. a_key_mid = 0;
  248. a_value_size_sqsum = 0;
  249. a_value_size_sum = 0;
  250. a_value_mid = 0;
  251. a_peak_qps = 0;
  252. a_ave_qps = 0.0;
  253. }
  254. TraceStats::~TraceStats() = default;
  255. // The trace analyzer constructor
  256. TraceAnalyzer::TraceAnalyzer(std::string& trace_path, std::string& output_path,
  257. AnalyzerOptions _analyzer_opts)
  258. : write_batch_ts_(0),
  259. trace_name_(trace_path),
  260. output_path_(output_path),
  261. analyzer_opts_(_analyzer_opts) {
  262. ROCKSDB_NAMESPACE::EnvOptions env_options;
  263. env_ = ROCKSDB_NAMESPACE::Env::Default();
  264. offset_ = 0;
  265. total_requests_ = 0;
  266. total_access_keys_ = 0;
  267. total_gets_ = 0;
  268. total_writes_ = 0;
  269. total_seeks_ = 0;
  270. total_seek_prevs_ = 0;
  271. total_multigets_ = 0;
  272. trace_create_time_ = 0;
  273. begin_time_ = 0;
  274. end_time_ = 0;
  275. time_series_start_ = 0;
  276. cur_time_sec_ = 0;
  277. if (FLAGS_sample_ratio > 1.0 || FLAGS_sample_ratio <= 0) {
  278. sample_max_ = 1;
  279. } else {
  280. sample_max_ = static_cast<uint32_t>(1.0 / FLAGS_sample_ratio);
  281. }
  282. ta_.resize(kTaTypeNum);
  283. ta_[kGet].type_name = "get";
  284. if (FLAGS_analyze_get) {
  285. ta_[kGet].enabled = true;
  286. } else {
  287. ta_[kGet].enabled = false;
  288. }
  289. ta_[kPut].type_name = "put";
  290. if (FLAGS_analyze_put) {
  291. ta_[kPut].enabled = true;
  292. } else {
  293. ta_[kPut].enabled = false;
  294. }
  295. ta_[kDelete].type_name = "delete";
  296. if (FLAGS_analyze_delete) {
  297. ta_[kDelete].enabled = true;
  298. } else {
  299. ta_[kDelete].enabled = false;
  300. }
  301. ta_[kSingleDelete].type_name = "single_delete";
  302. if (FLAGS_analyze_single_delete) {
  303. ta_[kSingleDelete].enabled = true;
  304. } else {
  305. ta_[kSingleDelete].enabled = false;
  306. }
  307. ta_[kRangeDelete].type_name = "range_delete";
  308. if (FLAGS_analyze_range_delete) {
  309. ta_[kRangeDelete].enabled = true;
  310. } else {
  311. ta_[kRangeDelete].enabled = false;
  312. }
  313. ta_[kMerge].type_name = "merge";
  314. if (FLAGS_analyze_merge) {
  315. ta_[kMerge].enabled = true;
  316. } else {
  317. ta_[kMerge].enabled = false;
  318. }
  319. ta_[kIteratorSeek].type_name = "iterator_Seek";
  320. if (FLAGS_analyze_iterator) {
  321. ta_[kIteratorSeek].enabled = true;
  322. } else {
  323. ta_[kIteratorSeek].enabled = false;
  324. }
  325. ta_[kIteratorSeekForPrev].type_name = "iterator_SeekForPrev";
  326. if (FLAGS_analyze_iterator) {
  327. ta_[kIteratorSeekForPrev].enabled = true;
  328. } else {
  329. ta_[kIteratorSeekForPrev].enabled = false;
  330. }
  331. ta_[kMultiGet].type_name = "multiget";
  332. if (FLAGS_analyze_multiget) {
  333. ta_[kMultiGet].enabled = true;
  334. } else {
  335. ta_[kMultiGet].enabled = false;
  336. }
  337. for (int i = 0; i < kTaTypeNum; i++) {
  338. ta_[i].sample_count = 0;
  339. }
  340. }
  341. TraceAnalyzer::~TraceAnalyzer() = default;
  342. // Prepare the processing
  343. // Initiate the global trace reader and writer here
  344. Status TraceAnalyzer::PrepareProcessing() {
  345. Status s;
  346. // Prepare the trace reader
  347. if (trace_reader_ == nullptr) {
  348. s = NewFileTraceReader(env_, env_options_, trace_name_, &trace_reader_);
  349. } else {
  350. s = trace_reader_->Reset();
  351. }
  352. if (!s.ok()) {
  353. return s;
  354. }
  355. // Prepare and open the trace sequence file writer if needed
  356. if (FLAGS_convert_to_human_readable_trace) {
  357. std::string trace_sequence_name;
  358. trace_sequence_name =
  359. output_path_ + "/" + FLAGS_output_prefix + "-human_readable_trace.txt";
  360. s = env_->NewWritableFile(trace_sequence_name, &trace_sequence_f_,
  361. env_options_);
  362. if (!s.ok()) {
  363. return s;
  364. }
  365. }
  366. // prepare the general QPS file writer
  367. if (FLAGS_output_qps_stats) {
  368. std::string qps_stats_name;
  369. qps_stats_name =
  370. output_path_ + "/" + FLAGS_output_prefix + "-qps_stats.txt";
  371. s = env_->NewWritableFile(qps_stats_name, &qps_f_, env_options_);
  372. if (!s.ok()) {
  373. return s;
  374. }
  375. qps_stats_name =
  376. output_path_ + "/" + FLAGS_output_prefix + "-cf_qps_stats.txt";
  377. s = env_->NewWritableFile(qps_stats_name, &cf_qps_f_, env_options_);
  378. if (!s.ok()) {
  379. return s;
  380. }
  381. }
  382. return Status::OK();
  383. }
  384. Status TraceAnalyzer::ReadTraceHeader(Trace* header) {
  385. assert(header != nullptr);
  386. std::string encoded_trace;
  387. // Read the trace head
  388. Status s = trace_reader_->Read(&encoded_trace);
  389. if (!s.ok()) {
  390. return s;
  391. }
  392. s = TracerHelper::DecodeTrace(encoded_trace, header);
  393. if (header->type != kTraceBegin) {
  394. return Status::Corruption("Corrupted trace file. Incorrect header.");
  395. }
  396. if (header->payload.substr(0, kTraceMagic.length()) != kTraceMagic) {
  397. return Status::Corruption("Corrupted trace file. Incorrect magic.");
  398. }
  399. return s;
  400. }
  401. Status TraceAnalyzer::ReadTraceFooter(Trace* footer) {
  402. assert(footer != nullptr);
  403. Status s = ReadTraceRecord(footer);
  404. if (!s.ok()) {
  405. return s;
  406. }
  407. if (footer->type != kTraceEnd) {
  408. return Status::Corruption("Corrupted trace file. Incorrect footer.");
  409. }
  410. return s;
  411. }
  412. Status TraceAnalyzer::ReadTraceRecord(Trace* trace) {
  413. assert(trace != nullptr);
  414. std::string encoded_trace;
  415. Status s = trace_reader_->Read(&encoded_trace);
  416. if (!s.ok()) {
  417. return s;
  418. }
  419. return TracerHelper::DecodeTrace(encoded_trace, trace);
  420. }
  421. // process the trace itself and redirect the trace content
  422. // to different operation type handler. With different race
  423. // format, this function can be changed
  424. Status TraceAnalyzer::StartProcessing() {
  425. Status s;
  426. Trace header;
  427. s = ReadTraceHeader(&header);
  428. if (!s.ok()) {
  429. fprintf(stderr, "Cannot read the header\n");
  430. return s;
  431. }
  432. // Set the default trace file version as version 0.2
  433. int trace_file_version = 2;
  434. s = TracerHelper::ParseTraceHeader(header, &trace_file_version, &db_version_);
  435. if (!s.ok()) {
  436. return s;
  437. }
  438. trace_create_time_ = header.ts;
  439. if (FLAGS_output_time_series) {
  440. time_series_start_ = header.ts;
  441. }
  442. Trace trace;
  443. std::unique_ptr<TraceRecord> record;
  444. while (s.ok()) {
  445. trace.reset();
  446. s = ReadTraceRecord(&trace);
  447. if (!s.ok()) {
  448. break;
  449. }
  450. end_time_ = trace.ts;
  451. if (trace.type == kTraceEnd) {
  452. break;
  453. }
  454. // Do not count TraceEnd (if there is one)
  455. total_requests_++;
  456. s = TracerHelper::DecodeTraceRecord(&trace, trace_file_version, &record);
  457. if (s.IsNotSupported()) {
  458. continue;
  459. }
  460. if (!s.ok()) {
  461. return s;
  462. }
  463. s = record->Accept(this, nullptr);
  464. if (!s.ok()) {
  465. fprintf(stderr, "Cannot process the TraceRecord\n");
  466. return s;
  467. }
  468. }
  469. if (s.IsIncomplete()) {
  470. // Fix it: Reaching eof returns Incomplete status at the moment.
  471. return Status::OK();
  472. }
  473. return s;
  474. }
  475. // After the trace is processed by StartProcessing, the statistic data
  476. // is stored in the map or other in memory data structures. To get the
  477. // other statistic result such as key size distribution, value size
  478. // distribution, these data structures are re-processed here.
  479. Status TraceAnalyzer::MakeStatistics() {
  480. int ret;
  481. Status s;
  482. for (int type = 0; type < kTaTypeNum; type++) {
  483. if (!ta_[type].enabled) {
  484. continue;
  485. }
  486. for (auto& stat : ta_[type].stats) {
  487. stat.second.a_key_id = 0;
  488. for (auto& record : stat.second.a_key_stats) {
  489. record.second.key_id = stat.second.a_key_id;
  490. stat.second.a_key_id++;
  491. if (record.second.access_count <=
  492. static_cast<uint64_t>(FLAGS_output_ignore_count)) {
  493. continue;
  494. }
  495. // Generate the key access count distribution data
  496. if (FLAGS_output_access_count_stats) {
  497. if (stat.second.a_count_stats.find(record.second.access_count) ==
  498. stat.second.a_count_stats.end()) {
  499. stat.second.a_count_stats[record.second.access_count] = 1;
  500. } else {
  501. stat.second.a_count_stats[record.second.access_count]++;
  502. }
  503. }
  504. // Generate the key size distribution data
  505. if (FLAGS_output_key_distribution) {
  506. if (stat.second.a_key_size_stats.find(record.first.size()) ==
  507. stat.second.a_key_size_stats.end()) {
  508. stat.second.a_key_size_stats[record.first.size()] = 1;
  509. } else {
  510. stat.second.a_key_size_stats[record.first.size()]++;
  511. }
  512. }
  513. if (!FLAGS_print_correlation.empty()) {
  514. s = MakeStatisticCorrelation(stat.second, record.second);
  515. if (!s.ok()) {
  516. return s;
  517. }
  518. }
  519. }
  520. // Output the prefix cut or the whole content of the accessed key space
  521. if (FLAGS_output_key_stats || FLAGS_output_prefix_cut > 0) {
  522. s = MakeStatisticKeyStatsOrPrefix(stat.second);
  523. if (!s.ok()) {
  524. return s;
  525. }
  526. }
  527. // output the access count distribution
  528. if (FLAGS_output_access_count_stats && stat.second.a_count_dist_f) {
  529. for (auto& record : stat.second.a_count_stats) {
  530. ret = snprintf(buffer_, sizeof(buffer_),
  531. "access_count: %" PRIu64 " num: %" PRIu64 "\n",
  532. record.first, record.second);
  533. if (ret < 0) {
  534. return Status::IOError("Format the output failed");
  535. }
  536. std::string printout(buffer_);
  537. s = stat.second.a_count_dist_f->Append(printout);
  538. if (!s.ok()) {
  539. fprintf(stderr, "Write access count distribution file failed\n");
  540. return s;
  541. }
  542. }
  543. }
  544. // find the medium of the key size
  545. uint64_t k_count = 0;
  546. bool get_mid = false;
  547. for (auto& record : stat.second.a_key_size_stats) {
  548. k_count += record.second;
  549. if (!get_mid && k_count >= stat.second.a_key_mid) {
  550. stat.second.a_key_mid = record.first;
  551. get_mid = true;
  552. }
  553. if (FLAGS_output_key_distribution && stat.second.a_key_size_f) {
  554. ret = snprintf(buffer_, sizeof(buffer_), "%" PRIu64 " %" PRIu64 "\n",
  555. record.first, record.second);
  556. if (ret < 0) {
  557. return Status::IOError("Format output failed");
  558. }
  559. std::string printout(buffer_);
  560. s = stat.second.a_key_size_f->Append(printout);
  561. if (!s.ok()) {
  562. fprintf(stderr, "Write key size distribution file failed\n");
  563. return s;
  564. }
  565. }
  566. }
  567. // output the value size distribution
  568. uint64_t v_begin = 0, v_end = 0, v_count = 0;
  569. get_mid = false;
  570. for (auto& record : stat.second.a_value_size_stats) {
  571. v_begin = v_end;
  572. v_end = (record.first + 1) * FLAGS_value_interval;
  573. v_count += record.second;
  574. if (!get_mid && v_count >= stat.second.a_count / 2) {
  575. stat.second.a_value_mid = (v_begin + v_end) / 2;
  576. get_mid = true;
  577. }
  578. if (FLAGS_output_value_distribution && stat.second.a_value_size_f &&
  579. (type == TraceOperationType::kPut ||
  580. type == TraceOperationType::kMerge)) {
  581. ret = snprintf(buffer_, sizeof(buffer_),
  582. "Number_of_value_size_between %" PRIu64 " and %" PRIu64
  583. " is: %" PRIu64 "\n",
  584. v_begin, v_end, record.second);
  585. if (ret < 0) {
  586. return Status::IOError("Format output failed");
  587. }
  588. std::string printout(buffer_);
  589. s = stat.second.a_value_size_f->Append(printout);
  590. if (!s.ok()) {
  591. fprintf(stderr, "Write value size distribution file failed\n");
  592. return s;
  593. }
  594. }
  595. }
  596. }
  597. }
  598. // Make the QPS statistics
  599. if (FLAGS_output_qps_stats) {
  600. s = MakeStatisticQPS();
  601. if (!s.ok()) {
  602. return s;
  603. }
  604. }
  605. return Status::OK();
  606. }
  607. // Process the statistics of the key access and
  608. // prefix of the accessed keys if required
  609. Status TraceAnalyzer::MakeStatisticKeyStatsOrPrefix(TraceStats& stats) {
  610. int ret;
  611. Status s;
  612. std::string prefix = "0";
  613. uint64_t prefix_access = 0;
  614. uint64_t prefix_count = 0;
  615. uint64_t prefix_succ_access = 0;
  616. double prefix_ave_access = 0.0;
  617. stats.a_succ_count = 0;
  618. for (auto& record : stats.a_key_stats) {
  619. // write the key access statistic file
  620. if (!stats.a_key_f) {
  621. return Status::IOError("Failed to open accessed_key_stats file.");
  622. }
  623. stats.a_succ_count += record.second.succ_count;
  624. double succ_ratio = 0.0;
  625. if (record.second.access_count > 0) {
  626. succ_ratio = (static_cast<double>(record.second.succ_count)) /
  627. record.second.access_count;
  628. }
  629. ret = snprintf(buffer_, sizeof(buffer_),
  630. "%u %zu %" PRIu64 " %" PRIu64 " %f\n", record.second.cf_id,
  631. record.second.value_size, record.second.key_id,
  632. record.second.access_count, succ_ratio);
  633. if (ret < 0) {
  634. return Status::IOError("Format output failed");
  635. }
  636. std::string printout(buffer_);
  637. s = stats.a_key_f->Append(printout);
  638. if (!s.ok()) {
  639. fprintf(stderr, "Write key access file failed\n");
  640. return s;
  641. }
  642. // write the prefix cut of the accessed keys
  643. if (FLAGS_output_prefix_cut > 0 && stats.a_prefix_cut_f) {
  644. if (record.first.compare(0, FLAGS_output_prefix_cut, prefix) != 0) {
  645. std::string prefix_out =
  646. ROCKSDB_NAMESPACE::LDBCommand::StringToHex(prefix);
  647. if (prefix_count == 0) {
  648. prefix_ave_access = 0.0;
  649. } else {
  650. prefix_ave_access =
  651. (static_cast<double>(prefix_access)) / prefix_count;
  652. }
  653. double prefix_succ_ratio = 0.0;
  654. if (prefix_access > 0) {
  655. prefix_succ_ratio =
  656. (static_cast<double>(prefix_succ_access)) / prefix_access;
  657. }
  658. ret =
  659. snprintf(buffer_, sizeof(buffer_),
  660. "%" PRIu64 " %" PRIu64 " %" PRIu64 " %f %f %s\n",
  661. record.second.key_id, prefix_access, prefix_count,
  662. prefix_ave_access, prefix_succ_ratio, prefix_out.c_str());
  663. if (ret < 0) {
  664. return Status::IOError("Format output failed");
  665. }
  666. std::string pout(buffer_);
  667. s = stats.a_prefix_cut_f->Append(pout);
  668. if (!s.ok()) {
  669. fprintf(stderr, "Write accessed key prefix file failed\n");
  670. return s;
  671. }
  672. // make the top k statistic for the prefix
  673. if (static_cast<int32_t>(stats.top_k_prefix_access.size()) <
  674. FLAGS_print_top_k_access) {
  675. stats.top_k_prefix_access.push(
  676. std::make_pair(prefix_access, prefix_out));
  677. } else {
  678. if (prefix_access > stats.top_k_prefix_access.top().first) {
  679. stats.top_k_prefix_access.pop();
  680. stats.top_k_prefix_access.push(
  681. std::make_pair(prefix_access, prefix_out));
  682. }
  683. }
  684. if (static_cast<int32_t>(stats.top_k_prefix_ave.size()) <
  685. FLAGS_print_top_k_access) {
  686. stats.top_k_prefix_ave.push(
  687. std::make_pair(prefix_ave_access, prefix_out));
  688. } else {
  689. if (prefix_ave_access > stats.top_k_prefix_ave.top().first) {
  690. stats.top_k_prefix_ave.pop();
  691. stats.top_k_prefix_ave.push(
  692. std::make_pair(prefix_ave_access, prefix_out));
  693. }
  694. }
  695. prefix = record.first.substr(0, FLAGS_output_prefix_cut);
  696. prefix_access = 0;
  697. prefix_count = 0;
  698. prefix_succ_access = 0;
  699. }
  700. prefix_access += record.second.access_count;
  701. prefix_count += 1;
  702. prefix_succ_access += record.second.succ_count;
  703. }
  704. }
  705. return Status::OK();
  706. }
  707. // Process the statistics of different query type
  708. // correlations
  709. Status TraceAnalyzer::MakeStatisticCorrelation(TraceStats& stats,
  710. StatsUnit& unit) {
  711. if (stats.correlation_output.size() !=
  712. analyzer_opts_.correlation_list.size()) {
  713. return Status::Corruption("Cannot make the statistic of correlation.");
  714. }
  715. for (int i = 0; i < static_cast<int>(analyzer_opts_.correlation_list.size());
  716. i++) {
  717. if (i >= static_cast<int>(stats.correlation_output.size()) ||
  718. i >= static_cast<int>(unit.v_correlation.size())) {
  719. break;
  720. }
  721. stats.correlation_output[i].first += unit.v_correlation[i].count;
  722. stats.correlation_output[i].second += unit.v_correlation[i].total_ts;
  723. }
  724. return Status::OK();
  725. }
  726. // Process the statistics of QPS
  727. Status TraceAnalyzer::MakeStatisticQPS() {
  728. if (begin_time_ == 0) {
  729. begin_time_ = trace_create_time_;
  730. }
  731. uint32_t duration =
  732. static_cast<uint32_t>((end_time_ - begin_time_) / 1000000);
  733. int ret;
  734. Status s;
  735. std::vector<std::vector<uint32_t>> type_qps(
  736. duration, std::vector<uint32_t>(kTaTypeNum + 1, 0));
  737. std::vector<uint64_t> qps_sum(kTaTypeNum + 1, 0);
  738. std::vector<uint32_t> qps_peak(kTaTypeNum + 1, 0);
  739. qps_ave_.resize(kTaTypeNum + 1);
  740. for (int type = 0; type < kTaTypeNum; type++) {
  741. if (!ta_[type].enabled) {
  742. continue;
  743. }
  744. for (auto& stat : ta_[type].stats) {
  745. uint32_t time_line = 0;
  746. uint64_t cf_qps_sum = 0;
  747. for (auto& time_it : stat.second.a_qps_stats) {
  748. if (time_it.first >= duration) {
  749. continue;
  750. }
  751. type_qps[time_it.first][kTaTypeNum] += time_it.second;
  752. type_qps[time_it.first][type] += time_it.second;
  753. cf_qps_sum += time_it.second;
  754. if (time_it.second > stat.second.a_peak_qps) {
  755. stat.second.a_peak_qps = time_it.second;
  756. }
  757. if (stat.second.a_qps_f) {
  758. while (time_line < time_it.first) {
  759. ret = snprintf(buffer_, sizeof(buffer_), "%u\n", 0);
  760. if (ret < 0) {
  761. return Status::IOError("Format the output failed");
  762. }
  763. std::string printout(buffer_);
  764. s = stat.second.a_qps_f->Append(printout);
  765. if (!s.ok()) {
  766. fprintf(stderr, "Write QPS file failed\n");
  767. return s;
  768. }
  769. time_line++;
  770. }
  771. ret = snprintf(buffer_, sizeof(buffer_), "%u\n", time_it.second);
  772. if (ret < 0) {
  773. return Status::IOError("Format the output failed");
  774. }
  775. std::string printout(buffer_);
  776. s = stat.second.a_qps_f->Append(printout);
  777. if (!s.ok()) {
  778. fprintf(stderr, "Write QPS file failed\n");
  779. return s;
  780. }
  781. if (time_line == time_it.first) {
  782. time_line++;
  783. }
  784. }
  785. // Process the top k QPS peaks
  786. if (FLAGS_output_prefix_cut > 0) {
  787. if (static_cast<int32_t>(stat.second.top_k_qps_sec.size()) <
  788. FLAGS_print_top_k_access) {
  789. stat.second.top_k_qps_sec.push(
  790. std::make_pair(time_it.second, time_it.first));
  791. } else {
  792. if (stat.second.top_k_qps_sec.size() > 0 &&
  793. stat.second.top_k_qps_sec.top().first < time_it.second) {
  794. stat.second.top_k_qps_sec.pop();
  795. stat.second.top_k_qps_sec.push(
  796. std::make_pair(time_it.second, time_it.first));
  797. }
  798. }
  799. }
  800. }
  801. if (duration == 0) {
  802. stat.second.a_ave_qps = 0;
  803. } else {
  804. stat.second.a_ave_qps = (static_cast<double>(cf_qps_sum)) / duration;
  805. }
  806. // Output the accessed unique key number change overtime
  807. if (stat.second.a_key_num_f) {
  808. uint64_t cur_uni_key =
  809. static_cast<uint64_t>(stat.second.a_key_stats.size());
  810. double cur_ratio = 0.0;
  811. uint64_t cur_num = 0;
  812. for (uint32_t i = 0; i < duration; i++) {
  813. auto find_time = stat.second.uni_key_num.find(i);
  814. if (find_time != stat.second.uni_key_num.end()) {
  815. cur_ratio = (static_cast<double>(find_time->second)) / cur_uni_key;
  816. cur_num = find_time->second;
  817. }
  818. ret = snprintf(buffer_, sizeof(buffer_), "%" PRIu64 " %.12f\n",
  819. cur_num, cur_ratio);
  820. if (ret < 0) {
  821. return Status::IOError("Format the output failed");
  822. }
  823. std::string printout(buffer_);
  824. s = stat.second.a_key_num_f->Append(printout);
  825. if (!s.ok()) {
  826. fprintf(stderr,
  827. "Write accessed unique key number change file failed\n");
  828. return s;
  829. }
  830. }
  831. }
  832. // output the prefix of top k access peak
  833. if (FLAGS_output_prefix_cut > 0 && stat.second.a_top_qps_prefix_f) {
  834. while (!stat.second.top_k_qps_sec.empty()) {
  835. ret = snprintf(buffer_, sizeof(buffer_), "At time: %u with QPS: %u\n",
  836. stat.second.top_k_qps_sec.top().second,
  837. stat.second.top_k_qps_sec.top().first);
  838. if (ret < 0) {
  839. return Status::IOError("Format the output failed");
  840. }
  841. std::string printout(buffer_);
  842. s = stat.second.a_top_qps_prefix_f->Append(printout);
  843. if (!s.ok()) {
  844. fprintf(stderr, "Write prefix QPS top K file failed\n");
  845. return s;
  846. }
  847. uint32_t qps_time = stat.second.top_k_qps_sec.top().second;
  848. stat.second.top_k_qps_sec.pop();
  849. if (stat.second.a_qps_prefix_stats.find(qps_time) !=
  850. stat.second.a_qps_prefix_stats.end()) {
  851. for (auto& qps_prefix : stat.second.a_qps_prefix_stats[qps_time]) {
  852. std::string qps_prefix_out =
  853. ROCKSDB_NAMESPACE::LDBCommand::StringToHex(qps_prefix.first);
  854. ret = snprintf(buffer_, sizeof(buffer_),
  855. "The prefix: %s Access count: %u\n",
  856. qps_prefix_out.c_str(), qps_prefix.second);
  857. if (ret < 0) {
  858. return Status::IOError("Format the output failed");
  859. }
  860. std::string pout(buffer_);
  861. s = stat.second.a_top_qps_prefix_f->Append(pout);
  862. if (!s.ok()) {
  863. fprintf(stderr, "Write prefix QPS top K file failed\n");
  864. return s;
  865. }
  866. }
  867. }
  868. }
  869. }
  870. }
  871. }
  872. if (qps_f_) {
  873. for (uint32_t i = 0; i < duration; i++) {
  874. for (int type = 0; type <= kTaTypeNum; type++) {
  875. if (type < kTaTypeNum) {
  876. ret = snprintf(buffer_, sizeof(buffer_), "%u ", type_qps[i][type]);
  877. } else {
  878. ret = snprintf(buffer_, sizeof(buffer_), "%u\n", type_qps[i][type]);
  879. }
  880. if (ret < 0) {
  881. return Status::IOError("Format the output failed");
  882. }
  883. std::string printout(buffer_);
  884. s = qps_f_->Append(printout);
  885. if (!s.ok()) {
  886. return s;
  887. }
  888. qps_sum[type] += type_qps[i][type];
  889. if (type_qps[i][type] > qps_peak[type]) {
  890. qps_peak[type] = type_qps[i][type];
  891. }
  892. }
  893. }
  894. }
  895. if (cf_qps_f_) {
  896. int cfs_size = static_cast<uint32_t>(cfs_.size());
  897. uint32_t v;
  898. for (uint32_t i = 0; i < duration; i++) {
  899. for (int cf = 0; cf < cfs_size; cf++) {
  900. if (cfs_[cf].cf_qps.find(i) != cfs_[cf].cf_qps.end()) {
  901. v = cfs_[cf].cf_qps[i];
  902. } else {
  903. v = 0;
  904. }
  905. if (cf < cfs_size - 1) {
  906. ret = snprintf(buffer_, sizeof(buffer_), "%u ", v);
  907. } else {
  908. ret = snprintf(buffer_, sizeof(buffer_), "%u\n", v);
  909. }
  910. if (ret < 0) {
  911. return Status::IOError("Format the output failed");
  912. }
  913. std::string printout(buffer_);
  914. s = cf_qps_f_->Append(printout);
  915. if (!s.ok()) {
  916. return s;
  917. }
  918. }
  919. }
  920. }
  921. qps_peak_ = qps_peak;
  922. for (int type = 0; type <= kTaTypeNum; type++) {
  923. if (duration == 0) {
  924. qps_ave_[type] = 0;
  925. } else {
  926. qps_ave_[type] = (static_cast<double>(qps_sum[type])) / duration;
  927. }
  928. }
  929. return Status::OK();
  930. }
  931. // In reprocessing, if we have the whole key space
  932. // we can output the access count of all keys in a cf
  933. // we can make some statistics of the whole key space
  934. // also, we output the top k accessed keys here
  935. Status TraceAnalyzer::ReProcessing() {
  936. int ret;
  937. Status s;
  938. for (auto& cf_it : cfs_) {
  939. uint32_t cf_id = cf_it.first;
  940. // output the time series;
  941. if (FLAGS_output_time_series) {
  942. for (int type = 0; type < kTaTypeNum; type++) {
  943. if (!ta_[type].enabled ||
  944. ta_[type].stats.find(cf_id) == ta_[type].stats.end()) {
  945. continue;
  946. }
  947. TraceStats& stat = ta_[type].stats[cf_id];
  948. if (!stat.time_series_f) {
  949. fprintf(stderr, "Cannot write time_series of '%s' in '%u'\n",
  950. ta_[type].type_name.c_str(), cf_id);
  951. continue;
  952. }
  953. while (!stat.time_series.empty()) {
  954. uint64_t key_id = 0;
  955. auto found = stat.a_key_stats.find(stat.time_series.front().key);
  956. if (found != stat.a_key_stats.end()) {
  957. key_id = found->second.key_id;
  958. }
  959. ret =
  960. snprintf(buffer_, sizeof(buffer_), "%u %" PRIu64 " %" PRIu64 "\n",
  961. stat.time_series.front().type,
  962. stat.time_series.front().ts, key_id);
  963. if (ret < 0) {
  964. return Status::IOError("Format the output failed");
  965. }
  966. std::string printout(buffer_);
  967. s = stat.time_series_f->Append(printout);
  968. if (!s.ok()) {
  969. fprintf(stderr, "Write time series file failed\n");
  970. return s;
  971. }
  972. stat.time_series.pop_front();
  973. }
  974. }
  975. }
  976. // process the whole key space if needed
  977. if (!FLAGS_key_space_dir.empty()) {
  978. std::string whole_key_path =
  979. FLAGS_key_space_dir + "/" + std::to_string(cf_id) + ".txt";
  980. std::string input_key, get_key;
  981. std::vector<std::string> prefix(kTaTypeNum);
  982. std::unique_ptr<FSSequentialFile> file;
  983. s = env_->GetFileSystem()->NewSequentialFile(
  984. whole_key_path, FileOptions(env_options_), &file, nullptr);
  985. if (!s.ok()) {
  986. fprintf(stderr, "Cannot open the whole key space file of CF: %u\n",
  987. cf_id);
  988. file.reset();
  989. }
  990. if (file) {
  991. size_t kTraceFileReadaheadSize = 2 * 1024 * 1024;
  992. LineFileReader lf_reader(
  993. std::move(file), whole_key_path,
  994. kTraceFileReadaheadSize /* filereadahead_size */);
  995. for (cfs_[cf_id].w_count = 0; lf_reader.ReadLine(
  996. &get_key, Env::IO_TOTAL /* rate_limiter_priority */);
  997. ++cfs_[cf_id].w_count) {
  998. input_key = ROCKSDB_NAMESPACE::LDBCommand::HexToString(get_key);
  999. for (int type = 0; type < kTaTypeNum; type++) {
  1000. if (!ta_[type].enabled) {
  1001. continue;
  1002. }
  1003. TraceStats& stat = ta_[type].stats[cf_id];
  1004. if (stat.w_key_f) {
  1005. if (stat.a_key_stats.find(input_key) != stat.a_key_stats.end()) {
  1006. ret = snprintf(buffer_, sizeof(buffer_),
  1007. "%" PRIu64 " %" PRIu64 "\n", cfs_[cf_id].w_count,
  1008. stat.a_key_stats[input_key].access_count);
  1009. if (ret < 0) {
  1010. return Status::IOError("Format the output failed");
  1011. }
  1012. std::string printout(buffer_);
  1013. s = stat.w_key_f->Append(printout);
  1014. if (!s.ok()) {
  1015. fprintf(stderr, "Write whole key space access file failed\n");
  1016. return s;
  1017. }
  1018. }
  1019. }
  1020. // Output the prefix cut file of the whole key space
  1021. if (FLAGS_output_prefix_cut > 0 && stat.w_prefix_cut_f) {
  1022. if (input_key.compare(0, FLAGS_output_prefix_cut, prefix[type]) !=
  1023. 0) {
  1024. prefix[type] = input_key.substr(0, FLAGS_output_prefix_cut);
  1025. std::string prefix_out =
  1026. ROCKSDB_NAMESPACE::LDBCommand::StringToHex(prefix[type]);
  1027. ret = snprintf(buffer_, sizeof(buffer_), "%" PRIu64 " %s\n",
  1028. cfs_[cf_id].w_count, prefix_out.c_str());
  1029. if (ret < 0) {
  1030. return Status::IOError("Format the output failed");
  1031. }
  1032. std::string printout(buffer_);
  1033. s = stat.w_prefix_cut_f->Append(printout);
  1034. if (!s.ok()) {
  1035. fprintf(stderr,
  1036. "Write whole key space prefix cut file failed\n");
  1037. return s;
  1038. }
  1039. }
  1040. }
  1041. }
  1042. // Make the statistics fo the key size distribution
  1043. if (FLAGS_output_key_distribution) {
  1044. if (cfs_[cf_id].w_key_size_stats.find(input_key.size()) ==
  1045. cfs_[cf_id].w_key_size_stats.end()) {
  1046. cfs_[cf_id].w_key_size_stats[input_key.size()] = 1;
  1047. } else {
  1048. cfs_[cf_id].w_key_size_stats[input_key.size()]++;
  1049. }
  1050. }
  1051. }
  1052. s = lf_reader.GetStatus();
  1053. if (!s.ok()) {
  1054. fprintf(stderr, "Read whole key space file failed\n");
  1055. return s;
  1056. }
  1057. }
  1058. }
  1059. // process the top k accessed keys
  1060. if (FLAGS_print_top_k_access > 0) {
  1061. for (int type = 0; type < kTaTypeNum; type++) {
  1062. if (!ta_[type].enabled ||
  1063. ta_[type].stats.find(cf_id) == ta_[type].stats.end()) {
  1064. continue;
  1065. }
  1066. TraceStats& stat = ta_[type].stats[cf_id];
  1067. for (auto& record : stat.a_key_stats) {
  1068. if (static_cast<int32_t>(stat.top_k_queue.size()) <
  1069. FLAGS_print_top_k_access) {
  1070. stat.top_k_queue.push(
  1071. std::make_pair(record.second.access_count, record.first));
  1072. } else {
  1073. if (record.second.access_count > stat.top_k_queue.top().first) {
  1074. stat.top_k_queue.pop();
  1075. stat.top_k_queue.push(
  1076. std::make_pair(record.second.access_count, record.first));
  1077. }
  1078. }
  1079. }
  1080. }
  1081. }
  1082. }
  1083. return Status::OK();
  1084. }
  1085. // End the processing, print the requested results
  1086. Status TraceAnalyzer::EndProcessing() {
  1087. Status s;
  1088. if (trace_sequence_f_) {
  1089. s = trace_sequence_f_->Close();
  1090. }
  1091. if (FLAGS_no_print) {
  1092. return s;
  1093. }
  1094. PrintStatistics();
  1095. if (s.ok()) {
  1096. s = CloseOutputFiles();
  1097. }
  1098. return s;
  1099. }
  1100. // Insert the corresponding key statistics to the correct type
  1101. // and correct CF, output the time-series file if needed
  1102. Status TraceAnalyzer::KeyStatsInsertion(const uint32_t& type,
  1103. const uint32_t& cf_id,
  1104. const std::string& key,
  1105. const size_t value_size,
  1106. const uint64_t ts) {
  1107. Status s;
  1108. StatsUnit unit;
  1109. unit.key_id = 0;
  1110. unit.cf_id = cf_id;
  1111. unit.value_size = value_size;
  1112. unit.access_count = 1;
  1113. unit.latest_ts = ts;
  1114. if ((type != TraceOperationType::kGet &&
  1115. type != TraceOperationType::kMultiGet) ||
  1116. value_size > 0) {
  1117. unit.succ_count = 1;
  1118. } else {
  1119. unit.succ_count = 0;
  1120. }
  1121. unit.v_correlation.resize(analyzer_opts_.correlation_list.size());
  1122. for (int i = 0;
  1123. i < (static_cast<int>(analyzer_opts_.correlation_list.size())); i++) {
  1124. unit.v_correlation[i].count = 0;
  1125. unit.v_correlation[i].total_ts = 0;
  1126. }
  1127. std::string prefix;
  1128. if (FLAGS_output_prefix_cut > 0) {
  1129. prefix = key.substr(0, FLAGS_output_prefix_cut);
  1130. }
  1131. if (begin_time_ == 0) {
  1132. begin_time_ = ts;
  1133. }
  1134. uint32_t time_in_sec;
  1135. if (ts < begin_time_) {
  1136. time_in_sec = 0;
  1137. } else {
  1138. time_in_sec = static_cast<uint32_t>((ts - begin_time_) / 1000000);
  1139. }
  1140. uint64_t dist_value_size = value_size / FLAGS_value_interval;
  1141. auto found_stats = ta_[type].stats.find(cf_id);
  1142. if (found_stats == ta_[type].stats.end()) {
  1143. ta_[type].stats[cf_id].cf_id = cf_id;
  1144. ta_[type].stats[cf_id].cf_name = std::to_string(cf_id);
  1145. ta_[type].stats[cf_id].a_count = 1;
  1146. ta_[type].stats[cf_id].a_key_id = 0;
  1147. ta_[type].stats[cf_id].a_key_size_sqsum = MultiplyCheckOverflow(
  1148. static_cast<uint64_t>(key.size()), static_cast<uint64_t>(key.size()));
  1149. ta_[type].stats[cf_id].a_key_size_sum = key.size();
  1150. ta_[type].stats[cf_id].a_value_size_sqsum = MultiplyCheckOverflow(
  1151. static_cast<uint64_t>(value_size), static_cast<uint64_t>(value_size));
  1152. ta_[type].stats[cf_id].a_value_size_sum = value_size;
  1153. s = OpenStatsOutputFiles(ta_[type].type_name, ta_[type].stats[cf_id]);
  1154. if (!FLAGS_print_correlation.empty()) {
  1155. s = StatsUnitCorrelationUpdate(unit, type, ts, key);
  1156. }
  1157. ta_[type].stats[cf_id].a_key_stats[key] = unit;
  1158. ta_[type].stats[cf_id].a_value_size_stats[dist_value_size] = 1;
  1159. ta_[type].stats[cf_id].a_qps_stats[time_in_sec] = 1;
  1160. ta_[type].stats[cf_id].correlation_output.resize(
  1161. analyzer_opts_.correlation_list.size());
  1162. if (FLAGS_output_prefix_cut > 0) {
  1163. std::map<std::string, uint32_t> tmp_qps_map;
  1164. tmp_qps_map[prefix] = 1;
  1165. ta_[type].stats[cf_id].a_qps_prefix_stats[time_in_sec] = tmp_qps_map;
  1166. }
  1167. if (time_in_sec != cur_time_sec_) {
  1168. ta_[type].stats[cf_id].uni_key_num[cur_time_sec_] =
  1169. static_cast<uint64_t>(ta_[type].stats[cf_id].a_key_stats.size());
  1170. cur_time_sec_ = time_in_sec;
  1171. }
  1172. } else {
  1173. found_stats->second.a_count++;
  1174. found_stats->second.a_key_size_sqsum += MultiplyCheckOverflow(
  1175. static_cast<uint64_t>(key.size()), static_cast<uint64_t>(key.size()));
  1176. found_stats->second.a_key_size_sum += key.size();
  1177. found_stats->second.a_value_size_sqsum += MultiplyCheckOverflow(
  1178. static_cast<uint64_t>(value_size), static_cast<uint64_t>(value_size));
  1179. found_stats->second.a_value_size_sum += value_size;
  1180. auto found_key = found_stats->second.a_key_stats.find(key);
  1181. if (found_key == found_stats->second.a_key_stats.end()) {
  1182. found_stats->second.a_key_stats[key] = unit;
  1183. } else {
  1184. found_key->second.access_count++;
  1185. if (type != TraceOperationType::kGet || value_size > 0) {
  1186. found_key->second.succ_count++;
  1187. }
  1188. if (!FLAGS_print_correlation.empty()) {
  1189. s = StatsUnitCorrelationUpdate(found_key->second, type, ts, key);
  1190. }
  1191. }
  1192. if (time_in_sec != cur_time_sec_) {
  1193. found_stats->second.uni_key_num[cur_time_sec_] =
  1194. static_cast<uint64_t>(found_stats->second.a_key_stats.size());
  1195. cur_time_sec_ = time_in_sec;
  1196. }
  1197. auto found_value =
  1198. found_stats->second.a_value_size_stats.find(dist_value_size);
  1199. if (found_value == found_stats->second.a_value_size_stats.end()) {
  1200. found_stats->second.a_value_size_stats[dist_value_size] = 1;
  1201. } else {
  1202. found_value->second++;
  1203. }
  1204. auto found_qps = found_stats->second.a_qps_stats.find(time_in_sec);
  1205. if (found_qps == found_stats->second.a_qps_stats.end()) {
  1206. found_stats->second.a_qps_stats[time_in_sec] = 1;
  1207. } else {
  1208. found_qps->second++;
  1209. }
  1210. if (FLAGS_output_prefix_cut > 0) {
  1211. auto found_qps_prefix =
  1212. found_stats->second.a_qps_prefix_stats.find(time_in_sec);
  1213. if (found_qps_prefix == found_stats->second.a_qps_prefix_stats.end()) {
  1214. std::map<std::string, uint32_t> tmp_qps_map;
  1215. found_stats->second.a_qps_prefix_stats[time_in_sec] = tmp_qps_map;
  1216. }
  1217. if (found_stats->second.a_qps_prefix_stats[time_in_sec].find(prefix) ==
  1218. found_stats->second.a_qps_prefix_stats[time_in_sec].end()) {
  1219. found_stats->second.a_qps_prefix_stats[time_in_sec][prefix] = 1;
  1220. } else {
  1221. found_stats->second.a_qps_prefix_stats[time_in_sec][prefix]++;
  1222. }
  1223. }
  1224. }
  1225. if (cfs_.find(cf_id) == cfs_.end()) {
  1226. CfUnit cf_unit;
  1227. cf_unit.cf_id = cf_id;
  1228. cf_unit.w_count = 0;
  1229. cf_unit.a_count = 0;
  1230. cfs_[cf_id] = cf_unit;
  1231. }
  1232. if (FLAGS_output_qps_stats) {
  1233. cfs_[cf_id].cf_qps[time_in_sec]++;
  1234. }
  1235. if (FLAGS_output_time_series) {
  1236. TraceUnit trace_u;
  1237. trace_u.type = type;
  1238. trace_u.key = key;
  1239. trace_u.value_size = value_size;
  1240. trace_u.ts = (ts - time_series_start_) / 1000000;
  1241. trace_u.cf_id = cf_id;
  1242. ta_[type].stats[cf_id].time_series.push_back(trace_u);
  1243. }
  1244. return s;
  1245. }
  1246. // Update the correlation unit of each key if enabled
  1247. Status TraceAnalyzer::StatsUnitCorrelationUpdate(StatsUnit& unit,
  1248. const uint32_t& type_second,
  1249. const uint64_t& ts,
  1250. const std::string& key) {
  1251. if (type_second >= kTaTypeNum) {
  1252. fprintf(stderr, "Unknown Type Id: %u\n", type_second);
  1253. return Status::NotFound();
  1254. }
  1255. for (int type_first = 0; type_first < kTaTypeNum; type_first++) {
  1256. if (type_first >= static_cast<int>(ta_.size()) ||
  1257. type_first >= static_cast<int>(analyzer_opts_.correlation_map.size())) {
  1258. break;
  1259. }
  1260. if (analyzer_opts_.correlation_map[type_first][type_second] < 0 ||
  1261. ta_[type_first].stats.find(unit.cf_id) == ta_[type_first].stats.end() ||
  1262. ta_[type_first].stats[unit.cf_id].a_key_stats.find(key) ==
  1263. ta_[type_first].stats[unit.cf_id].a_key_stats.end() ||
  1264. ta_[type_first].stats[unit.cf_id].a_key_stats[key].latest_ts == ts) {
  1265. continue;
  1266. }
  1267. int correlation_id =
  1268. analyzer_opts_.correlation_map[type_first][type_second];
  1269. // after get the x-y operation time or x, update;
  1270. if (correlation_id < 0 ||
  1271. correlation_id >= static_cast<int>(unit.v_correlation.size())) {
  1272. continue;
  1273. }
  1274. unit.v_correlation[correlation_id].count++;
  1275. unit.v_correlation[correlation_id].total_ts +=
  1276. (ts - ta_[type_first].stats[unit.cf_id].a_key_stats[key].latest_ts);
  1277. }
  1278. unit.latest_ts = ts;
  1279. return Status::OK();
  1280. }
  1281. // when a new trace statistic is created, the file handler
  1282. // pointers should be initiated if needed according to
  1283. // the trace analyzer options
  1284. Status TraceAnalyzer::OpenStatsOutputFiles(const std::string& type,
  1285. TraceStats& new_stats) {
  1286. Status s;
  1287. if (FLAGS_output_key_stats) {
  1288. s = CreateOutputFile(type, new_stats.cf_name, "accessed_key_stats.txt",
  1289. &new_stats.a_key_f);
  1290. s = CreateOutputFile(type, new_stats.cf_name,
  1291. "accessed_unique_key_num_change.txt",
  1292. &new_stats.a_key_num_f);
  1293. if (!FLAGS_key_space_dir.empty()) {
  1294. s = CreateOutputFile(type, new_stats.cf_name, "whole_key_stats.txt",
  1295. &new_stats.w_key_f);
  1296. }
  1297. }
  1298. if (FLAGS_output_access_count_stats) {
  1299. s = CreateOutputFile(type, new_stats.cf_name,
  1300. "accessed_key_count_distribution.txt",
  1301. &new_stats.a_count_dist_f);
  1302. }
  1303. if (FLAGS_output_prefix_cut > 0) {
  1304. s = CreateOutputFile(type, new_stats.cf_name, "accessed_key_prefix_cut.txt",
  1305. &new_stats.a_prefix_cut_f);
  1306. if (!FLAGS_key_space_dir.empty()) {
  1307. s = CreateOutputFile(type, new_stats.cf_name, "whole_key_prefix_cut.txt",
  1308. &new_stats.w_prefix_cut_f);
  1309. }
  1310. if (FLAGS_output_qps_stats) {
  1311. s = CreateOutputFile(type, new_stats.cf_name,
  1312. "accessed_top_k_qps_prefix_cut.txt",
  1313. &new_stats.a_top_qps_prefix_f);
  1314. }
  1315. }
  1316. if (FLAGS_output_time_series) {
  1317. s = CreateOutputFile(type, new_stats.cf_name, "time_series.txt",
  1318. &new_stats.time_series_f);
  1319. }
  1320. if (FLAGS_output_value_distribution) {
  1321. s = CreateOutputFile(type, new_stats.cf_name,
  1322. "accessed_value_size_distribution.txt",
  1323. &new_stats.a_value_size_f);
  1324. }
  1325. if (FLAGS_output_key_distribution) {
  1326. s = CreateOutputFile(type, new_stats.cf_name,
  1327. "accessed_key_size_distribution.txt",
  1328. &new_stats.a_key_size_f);
  1329. }
  1330. if (FLAGS_output_qps_stats) {
  1331. s = CreateOutputFile(type, new_stats.cf_name, "qps_stats.txt",
  1332. &new_stats.a_qps_f);
  1333. }
  1334. return s;
  1335. }
  1336. // create the output path of the files to be opened
  1337. Status TraceAnalyzer::CreateOutputFile(
  1338. const std::string& type, const std::string& cf_name,
  1339. const std::string& ending,
  1340. std::unique_ptr<ROCKSDB_NAMESPACE::WritableFile>* f_ptr) {
  1341. std::string path;
  1342. path = output_path_ + "/" + FLAGS_output_prefix + "-" + type + "-" + cf_name +
  1343. "-" + ending;
  1344. Status s;
  1345. s = env_->NewWritableFile(path, f_ptr, env_options_);
  1346. if (!s.ok()) {
  1347. fprintf(stderr, "Cannot open file: %s\n", path.c_str());
  1348. exit(1);
  1349. }
  1350. return Status::OK();
  1351. }
  1352. // Close the output files in the TraceStats if they are opened
  1353. Status TraceAnalyzer::CloseOutputFiles() {
  1354. Status s;
  1355. for (int type = 0; type < kTaTypeNum; type++) {
  1356. if (!ta_[type].enabled) {
  1357. continue;
  1358. }
  1359. for (auto& stat : ta_[type].stats) {
  1360. if (s.ok() && stat.second.time_series_f) {
  1361. s = stat.second.time_series_f->Close();
  1362. }
  1363. if (s.ok() && stat.second.a_key_f) {
  1364. s = stat.second.a_key_f->Close();
  1365. }
  1366. if (s.ok() && stat.second.a_key_num_f) {
  1367. s = stat.second.a_key_num_f->Close();
  1368. }
  1369. if (s.ok() && stat.second.a_count_dist_f) {
  1370. s = stat.second.a_count_dist_f->Close();
  1371. }
  1372. if (s.ok() && stat.second.a_prefix_cut_f) {
  1373. s = stat.second.a_prefix_cut_f->Close();
  1374. }
  1375. if (s.ok() && stat.second.a_value_size_f) {
  1376. s = stat.second.a_value_size_f->Close();
  1377. }
  1378. if (s.ok() && stat.second.a_key_size_f) {
  1379. s = stat.second.a_key_size_f->Close();
  1380. }
  1381. if (s.ok() && stat.second.a_qps_f) {
  1382. s = stat.second.a_qps_f->Close();
  1383. }
  1384. if (s.ok() && stat.second.a_top_qps_prefix_f) {
  1385. s = stat.second.a_top_qps_prefix_f->Close();
  1386. }
  1387. if (s.ok() && stat.second.w_key_f) {
  1388. s = stat.second.w_key_f->Close();
  1389. }
  1390. if (s.ok() && stat.second.w_prefix_cut_f) {
  1391. s = stat.second.w_prefix_cut_f->Close();
  1392. }
  1393. }
  1394. }
  1395. return s;
  1396. }
  1397. Status TraceAnalyzer::Handle(const WriteQueryTraceRecord& record,
  1398. std::unique_ptr<TraceRecordResult>* /*result*/) {
  1399. total_writes_++;
  1400. // Note that, if the write happens in a transaction,
  1401. // 'Write' will be called twice, one for Prepare, one for
  1402. // Commit. Thus, in the trace, for the same WriteBatch, there
  1403. // will be two records if it is in a transaction. Here, we only
  1404. // process the reord that is committed. If write is non-transaction,
  1405. // HasBeginPrepare()==false, so we process it normally.
  1406. WriteBatch batch(record.GetWriteBatchRep().ToString());
  1407. if (batch.Count() == 0 || (batch.HasBeginPrepare() && !batch.HasCommit())) {
  1408. return Status::OK();
  1409. }
  1410. write_batch_ts_ = record.GetTimestamp();
  1411. // write_result_ will be updated in batch's handler during iteration.
  1412. Status s = batch.Iterate(this);
  1413. write_batch_ts_ = 0;
  1414. if (!s.ok()) {
  1415. fprintf(stderr, "Cannot process the write batch in the trace\n");
  1416. return s;
  1417. }
  1418. return Status::OK();
  1419. }
  1420. Status TraceAnalyzer::Handle(const GetQueryTraceRecord& record,
  1421. std::unique_ptr<TraceRecordResult>* /*result*/) {
  1422. total_gets_++;
  1423. return OutputAnalysisResult(TraceOperationType::kGet, record.GetTimestamp(),
  1424. record.GetColumnFamilyID(),
  1425. std::move(record.GetKey()), 0);
  1426. }
  1427. Status TraceAnalyzer::Handle(const IteratorSeekQueryTraceRecord& record,
  1428. std::unique_ptr<TraceRecordResult>* /*result*/) {
  1429. TraceOperationType op_type;
  1430. if (record.GetSeekType() == IteratorSeekQueryTraceRecord::kSeek) {
  1431. op_type = TraceOperationType::kIteratorSeek;
  1432. total_seeks_++;
  1433. } else {
  1434. op_type = TraceOperationType::kIteratorSeekForPrev;
  1435. total_seek_prevs_++;
  1436. }
  1437. // To do: shall we add lower/upper bounds?
  1438. return OutputAnalysisResult(op_type, record.GetTimestamp(),
  1439. record.GetColumnFamilyID(),
  1440. std::move(record.GetKey()), 0);
  1441. }
  1442. Status TraceAnalyzer::Handle(const MultiGetQueryTraceRecord& record,
  1443. std::unique_ptr<TraceRecordResult>* /*result*/) {
  1444. total_multigets_++;
  1445. std::vector<uint32_t> cf_ids = record.GetColumnFamilyIDs();
  1446. std::vector<Slice> keys = record.GetKeys();
  1447. std::vector<size_t> value_sizes;
  1448. // If the size does not match is not the error of tracing and anayzing, we
  1449. // just report it to the user. The analyzing continues.
  1450. if (cf_ids.size() > keys.size()) {
  1451. printf("The CF ID vector size does not match the keys vector size!\n");
  1452. // Make the sure the 2 vectors are of the same (smaller) size.
  1453. cf_ids.resize(keys.size());
  1454. } else if (cf_ids.size() < keys.size()) {
  1455. printf("The CF ID vector size does not match the keys vector size!\n");
  1456. // Make the sure the 2 vectors are of the same (smaller) size.
  1457. keys.resize(cf_ids.size());
  1458. }
  1459. // Now the 2 vectors must be of the same size.
  1460. value_sizes.resize(cf_ids.size(), 0);
  1461. return OutputAnalysisResult(TraceOperationType::kMultiGet,
  1462. record.GetTimestamp(), std::move(cf_ids),
  1463. std::move(keys), std::move(value_sizes));
  1464. }
  1465. // Handle the Put request in the write batch of the trace
  1466. Status TraceAnalyzer::PutCF(uint32_t column_family_id, const Slice& key,
  1467. const Slice& value) {
  1468. return OutputAnalysisResult(TraceOperationType::kPut, write_batch_ts_,
  1469. column_family_id, key, value.size());
  1470. }
  1471. Status TraceAnalyzer::PutEntityCF(uint32_t column_family_id, const Slice& key,
  1472. const Slice& value) {
  1473. return OutputAnalysisResult(TraceOperationType::kPutEntity, write_batch_ts_,
  1474. column_family_id, key, value.size());
  1475. }
  1476. // Handle the Delete request in the write batch of the trace
  1477. Status TraceAnalyzer::DeleteCF(uint32_t column_family_id, const Slice& key) {
  1478. return OutputAnalysisResult(TraceOperationType::kDelete, write_batch_ts_,
  1479. column_family_id, key, 0);
  1480. }
  1481. // Handle the SingleDelete request in the write batch of the trace
  1482. Status TraceAnalyzer::SingleDeleteCF(uint32_t column_family_id,
  1483. const Slice& key) {
  1484. return OutputAnalysisResult(TraceOperationType::kSingleDelete,
  1485. write_batch_ts_, column_family_id, key, 0);
  1486. }
  1487. // Handle the DeleteRange request in the write batch of the trace
  1488. Status TraceAnalyzer::DeleteRangeCF(uint32_t column_family_id,
  1489. const Slice& begin_key,
  1490. const Slice& end_key) {
  1491. return OutputAnalysisResult(TraceOperationType::kRangeDelete, write_batch_ts_,
  1492. {column_family_id, column_family_id},
  1493. {begin_key, end_key}, {0, 0});
  1494. }
  1495. // Handle the Merge request in the write batch of the trace
  1496. Status TraceAnalyzer::MergeCF(uint32_t column_family_id, const Slice& key,
  1497. const Slice& value) {
  1498. return OutputAnalysisResult(TraceOperationType::kMerge, write_batch_ts_,
  1499. column_family_id, key, value.size());
  1500. }
  1501. Status TraceAnalyzer::OutputAnalysisResult(TraceOperationType op_type,
  1502. uint64_t timestamp,
  1503. std::vector<uint32_t> cf_ids,
  1504. std::vector<Slice> keys,
  1505. std::vector<size_t> value_sizes) {
  1506. assert(!cf_ids.empty());
  1507. assert(cf_ids.size() == keys.size());
  1508. assert(cf_ids.size() == value_sizes.size());
  1509. Status s;
  1510. if (FLAGS_convert_to_human_readable_trace && trace_sequence_f_) {
  1511. // DeleteRane only writes the begin_key.
  1512. size_t cnt =
  1513. op_type == TraceOperationType::kRangeDelete ? 1 : cf_ids.size();
  1514. for (size_t i = 0; i < cnt; i++) {
  1515. s = WriteTraceSequence(op_type, cf_ids[i], keys[i], value_sizes[i],
  1516. timestamp);
  1517. if (!s.ok()) {
  1518. return Status::Corruption("Failed to write the trace sequence to file");
  1519. }
  1520. }
  1521. }
  1522. if (ta_[op_type].sample_count >= sample_max_) {
  1523. ta_[op_type].sample_count = 0;
  1524. }
  1525. if (ta_[op_type].sample_count > 0) {
  1526. ta_[op_type].sample_count++;
  1527. return Status::OK();
  1528. }
  1529. ta_[op_type].sample_count++;
  1530. if (!ta_[op_type].enabled) {
  1531. return Status::OK();
  1532. }
  1533. for (size_t i = 0; i < cf_ids.size(); i++) {
  1534. // Get query does not have value part, just give a fixed value 10 for easy
  1535. // calculation.
  1536. s = KeyStatsInsertion(
  1537. op_type, cf_ids[i], keys[i].ToString(),
  1538. value_sizes[i] == 0 ? kShadowValueSize : value_sizes[i], timestamp);
  1539. if (!s.ok()) {
  1540. return Status::Corruption("Failed to insert key statistics");
  1541. }
  1542. }
  1543. return Status::OK();
  1544. }
  1545. Status TraceAnalyzer::OutputAnalysisResult(TraceOperationType op_type,
  1546. uint64_t timestamp, uint32_t cf_id,
  1547. const Slice& key,
  1548. size_t value_size) {
  1549. return OutputAnalysisResult(
  1550. op_type, timestamp, std::vector<uint32_t>({cf_id}),
  1551. std::vector<Slice>({key}), std::vector<size_t>({value_size}));
  1552. }
  1553. // Before the analyzer is closed, the requested general statistic results are
  1554. // printed out here. In current stage, these information are not output to
  1555. // the files.
  1556. // -----type
  1557. // |__cf_id
  1558. // |_statistics
  1559. void TraceAnalyzer::PrintStatistics() {
  1560. for (int type = 0; type < kTaTypeNum; type++) {
  1561. if (!ta_[type].enabled) {
  1562. continue;
  1563. }
  1564. ta_[type].total_keys = 0;
  1565. ta_[type].total_access = 0;
  1566. ta_[type].total_succ_access = 0;
  1567. printf("\n################# Operation Type: %s #####################\n",
  1568. ta_[type].type_name.c_str());
  1569. if (qps_ave_.size() == kTaTypeNum + 1) {
  1570. printf("Peak QPS is: %u Average QPS is: %f\n", qps_peak_[type],
  1571. qps_ave_[type]);
  1572. }
  1573. for (auto& stat_it : ta_[type].stats) {
  1574. if (stat_it.second.a_count == 0) {
  1575. continue;
  1576. }
  1577. TraceStats& stat = stat_it.second;
  1578. uint64_t total_a_keys = static_cast<uint64_t>(stat.a_key_stats.size());
  1579. double key_size_ave = 0.0;
  1580. double value_size_ave = 0.0;
  1581. double key_size_vari = 0.0;
  1582. double value_size_vari = 0.0;
  1583. if (stat.a_count > 0) {
  1584. key_size_ave =
  1585. (static_cast<double>(stat.a_key_size_sum)) / stat.a_count;
  1586. value_size_ave =
  1587. (static_cast<double>(stat.a_value_size_sum)) / stat.a_count;
  1588. key_size_vari = std::sqrt((static_cast<double>(stat.a_key_size_sqsum)) /
  1589. stat.a_count -
  1590. key_size_ave * key_size_ave);
  1591. value_size_vari = std::sqrt(
  1592. (static_cast<double>(stat.a_value_size_sqsum)) / stat.a_count -
  1593. value_size_ave * value_size_ave);
  1594. }
  1595. if (value_size_ave == 0.0) {
  1596. stat.a_value_mid = 0;
  1597. }
  1598. cfs_[stat.cf_id].a_count += total_a_keys;
  1599. ta_[type].total_keys += total_a_keys;
  1600. ta_[type].total_access += stat.a_count;
  1601. ta_[type].total_succ_access += stat.a_succ_count;
  1602. printf("*********************************************************\n");
  1603. printf("colume family id: %u\n", stat.cf_id);
  1604. printf("Total number of queries to this cf by %s: %" PRIu64 "\n",
  1605. ta_[type].type_name.c_str(), stat.a_count);
  1606. printf("Total unique keys in this cf: %" PRIu64 "\n", total_a_keys);
  1607. printf("Average key size: %f key size medium: %" PRIu64
  1608. " Key size Variation: %f\n",
  1609. key_size_ave, stat.a_key_mid, key_size_vari);
  1610. if (type == kPut || type == kMerge) {
  1611. printf("Average value size: %f Value size medium: %" PRIu64
  1612. " Value size variation: %f\n",
  1613. value_size_ave, stat.a_value_mid, value_size_vari);
  1614. }
  1615. printf("Peak QPS is: %u Average QPS is: %f\n", stat.a_peak_qps,
  1616. stat.a_ave_qps);
  1617. // print the top k accessed key and its access count
  1618. if (FLAGS_print_top_k_access > 0) {
  1619. printf("The Top %d keys that are accessed:\n",
  1620. FLAGS_print_top_k_access);
  1621. while (!stat.top_k_queue.empty()) {
  1622. std::string hex_key = ROCKSDB_NAMESPACE::LDBCommand::StringToHex(
  1623. stat.top_k_queue.top().second);
  1624. printf("Access_count: %" PRIu64 " %s\n", stat.top_k_queue.top().first,
  1625. hex_key.c_str());
  1626. stat.top_k_queue.pop();
  1627. }
  1628. }
  1629. // print the top k access prefix range and
  1630. // top k prefix range with highest average access per key
  1631. if (FLAGS_output_prefix_cut > 0) {
  1632. printf("The Top %d accessed prefix range:\n", FLAGS_print_top_k_access);
  1633. while (!stat.top_k_prefix_access.empty()) {
  1634. printf("Prefix: %s Access count: %" PRIu64 "\n",
  1635. stat.top_k_prefix_access.top().second.c_str(),
  1636. stat.top_k_prefix_access.top().first);
  1637. stat.top_k_prefix_access.pop();
  1638. }
  1639. printf("The Top %d prefix with highest access per key:\n",
  1640. FLAGS_print_top_k_access);
  1641. while (!stat.top_k_prefix_ave.empty()) {
  1642. printf("Prefix: %s access per key: %f\n",
  1643. stat.top_k_prefix_ave.top().second.c_str(),
  1644. stat.top_k_prefix_ave.top().first);
  1645. stat.top_k_prefix_ave.pop();
  1646. }
  1647. }
  1648. // print the operation correlations
  1649. if (!FLAGS_print_correlation.empty()) {
  1650. for (int correlation = 0;
  1651. correlation <
  1652. static_cast<int>(analyzer_opts_.correlation_list.size());
  1653. correlation++) {
  1654. printf(
  1655. "The correlation statistics of '%s' after '%s' is:",
  1656. taIndexToOpt[analyzer_opts_.correlation_list[correlation].second]
  1657. .c_str(),
  1658. taIndexToOpt[analyzer_opts_.correlation_list[correlation].first]
  1659. .c_str());
  1660. double correlation_ave = 0.0;
  1661. if (stat.correlation_output[correlation].first > 0) {
  1662. correlation_ave =
  1663. (static_cast<double>(
  1664. stat.correlation_output[correlation].second)) /
  1665. (stat.correlation_output[correlation].first * 1000);
  1666. }
  1667. printf(" total numbers: %" PRIu64 " average time: %f(ms)\n",
  1668. stat.correlation_output[correlation].first, correlation_ave);
  1669. }
  1670. }
  1671. }
  1672. printf("*********************************************************\n");
  1673. printf("Total keys of '%s' is: %" PRIu64 "\n", ta_[type].type_name.c_str(),
  1674. ta_[type].total_keys);
  1675. printf("Total access is: %" PRIu64 "\n", ta_[type].total_access);
  1676. total_access_keys_ += ta_[type].total_keys;
  1677. }
  1678. // Print the overall statistic information of the trace
  1679. printf("\n*********************************************************\n");
  1680. printf("*********************************************************\n");
  1681. printf("The column family based statistics\n");
  1682. for (auto& cf : cfs_) {
  1683. printf("The column family id: %u\n", cf.first);
  1684. printf("The whole key space key numbers: %" PRIu64 "\n", cf.second.w_count);
  1685. printf("The accessed key space key numbers: %" PRIu64 "\n",
  1686. cf.second.a_count);
  1687. }
  1688. if (FLAGS_print_overall_stats) {
  1689. printf("\n*********************************************************\n");
  1690. printf("*********************************************************\n");
  1691. if (qps_peak_.size() == kTaTypeNum + 1) {
  1692. printf("Average QPS per second: %f Peak QPS: %u\n", qps_ave_[kTaTypeNum],
  1693. qps_peak_[kTaTypeNum]);
  1694. }
  1695. printf("The statistics related to query number need to times: %u\n",
  1696. sample_max_);
  1697. printf("Total_requests: %" PRIu64 " Total_accessed_keys: %" PRIu64
  1698. " Total_gets: %" PRIu64 " Total_write_batches: %" PRIu64
  1699. " Total_seeks: %" PRIu64 " Total_seek_for_prevs: %" PRIu64
  1700. " Total_multigets: %" PRIu64 "\n",
  1701. total_requests_, total_access_keys_, total_gets_, total_writes_,
  1702. total_seeks_, total_seek_prevs_, total_multigets_);
  1703. for (int type = 0; type < kTaTypeNum; type++) {
  1704. if (!ta_[type].enabled) {
  1705. continue;
  1706. }
  1707. printf("Operation: '%s' has: %" PRIu64 "\n", ta_[type].type_name.c_str(),
  1708. ta_[type].total_access);
  1709. }
  1710. }
  1711. }
  1712. // Write the trace sequence to file
  1713. Status TraceAnalyzer::WriteTraceSequence(const uint32_t& type,
  1714. const uint32_t& cf_id,
  1715. const Slice& key,
  1716. const size_t value_size,
  1717. const uint64_t ts) {
  1718. std::string hex_key =
  1719. ROCKSDB_NAMESPACE::LDBCommand::StringToHex(key.ToString());
  1720. int ret;
  1721. ret = snprintf(buffer_, sizeof(buffer_), "%u %u %zu %" PRIu64 "\n", type,
  1722. cf_id, value_size, ts);
  1723. if (ret < 0) {
  1724. return Status::IOError("failed to format the output");
  1725. }
  1726. std::string printout(buffer_);
  1727. if (!FLAGS_no_key) {
  1728. printout = hex_key + " " + printout;
  1729. }
  1730. return trace_sequence_f_->Append(printout);
  1731. }
  1732. // The entrance function of Trace_Analyzer
  1733. int trace_analyzer_tool(int argc, char** argv) {
  1734. std::string trace_path;
  1735. std::string output_path;
  1736. AnalyzerOptions analyzer_opts;
  1737. ParseCommandLineFlags(&argc, &argv, true);
  1738. if (!FLAGS_print_correlation.empty()) {
  1739. analyzer_opts.SparseCorrelationInput(FLAGS_print_correlation);
  1740. }
  1741. std::unique_ptr<TraceAnalyzer> analyzer(
  1742. new TraceAnalyzer(FLAGS_trace_path, FLAGS_output_dir, analyzer_opts));
  1743. if (!analyzer) {
  1744. fprintf(stderr, "Cannot initiate the trace analyzer\n");
  1745. exit(1);
  1746. }
  1747. ROCKSDB_NAMESPACE::Status s = analyzer->PrepareProcessing();
  1748. if (!s.ok()) {
  1749. fprintf(stderr, "%s\n", s.getState());
  1750. fprintf(stderr, "Cannot initiate the trace reader\n");
  1751. exit(1);
  1752. }
  1753. s = analyzer->StartProcessing();
  1754. if (!s.ok() && !FLAGS_try_process_corrupted_trace) {
  1755. fprintf(stderr, "%s\n", s.getState());
  1756. fprintf(stderr, "Cannot process the trace\n");
  1757. exit(1);
  1758. }
  1759. s = analyzer->MakeStatistics();
  1760. if (!s.ok()) {
  1761. fprintf(stderr, "%s\n", s.getState());
  1762. analyzer->EndProcessing();
  1763. fprintf(stderr, "Cannot make the statistics\n");
  1764. exit(1);
  1765. }
  1766. s = analyzer->ReProcessing();
  1767. if (!s.ok()) {
  1768. fprintf(stderr, "%s\n", s.getState());
  1769. fprintf(stderr, "Cannot re-process the trace for more statistics\n");
  1770. analyzer->EndProcessing();
  1771. exit(1);
  1772. }
  1773. s = analyzer->EndProcessing();
  1774. if (!s.ok()) {
  1775. fprintf(stderr, "%s\n", s.getState());
  1776. fprintf(stderr, "Cannot close the trace analyzer\n");
  1777. exit(1);
  1778. }
  1779. return 0;
  1780. }
  1781. } // namespace ROCKSDB_NAMESPACE
  1782. #endif // Endif of Gflag