trace_analyzer_tool.cc 69 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. #ifndef ROCKSDB_LITE
  7. #ifdef GFLAGS
  8. #ifdef NUMA
  9. #include <numa.h>
  10. #include <numaif.h>
  11. #endif
  12. #ifndef OS_WIN
  13. #include <unistd.h>
  14. #endif
  15. #include <cinttypes>
  16. #include <cmath>
  17. #include <cstdio>
  18. #include <cstdlib>
  19. #include <memory>
  20. #include <sstream>
  21. #include <stdexcept>
  22. #include "db/db_impl/db_impl.h"
  23. #include "db/memtable.h"
  24. #include "db/write_batch_internal.h"
  25. #include "env/composite_env_wrapper.h"
  26. #include "file/read_write_util.h"
  27. #include "file/writable_file_writer.h"
  28. #include "options/cf_options.h"
  29. #include "rocksdb/db.h"
  30. #include "rocksdb/env.h"
  31. #include "rocksdb/iterator.h"
  32. #include "rocksdb/slice.h"
  33. #include "rocksdb/slice_transform.h"
  34. #include "rocksdb/status.h"
  35. #include "rocksdb/table_properties.h"
  36. #include "rocksdb/utilities/ldb_cmd.h"
  37. #include "rocksdb/write_batch.h"
  38. #include "table/meta_blocks.h"
  39. #include "table/plain/plain_table_factory.h"
  40. #include "table/table_reader.h"
  41. #include "tools/trace_analyzer_tool.h"
  42. #include "trace_replay/trace_replay.h"
  43. #include "util/coding.h"
  44. #include "util/compression.h"
  45. #include "util/gflags_compat.h"
  46. #include "util/random.h"
  47. #include "util/string_util.h"
  48. using GFLAGS_NAMESPACE::ParseCommandLineFlags;
  49. using GFLAGS_NAMESPACE::RegisterFlagValidator;
  50. using GFLAGS_NAMESPACE::SetUsageMessage;
  51. DEFINE_string(trace_path, "", "The trace file path.");
  52. DEFINE_string(output_dir, "", "The directory to store the output files.");
  53. DEFINE_string(output_prefix, "trace",
  54. "The prefix used for all the output files.");
  55. DEFINE_bool(output_key_stats, false,
  56. "Output the key access count statistics to file\n"
  57. "for accessed keys:\n"
  58. "file name: <prefix>-<query_type>-<cf_id>-accessed_key_stats.txt\n"
  59. "Format:[cf_id value_size access_keyid access_count]\n"
  60. "for the whole key space keys:\n"
  61. "File name: <prefix>-<query_type>-<cf_id>-whole_key_stats.txt\n"
  62. "Format:[whole_key_space_keyid access_count]");
  63. DEFINE_bool(output_access_count_stats, false,
  64. "Output the access count distribution statistics to file.\n"
  65. "File name: <prefix>-<query_type>-<cf_id>-accessed_"
  66. "key_count_distribution.txt \n"
  67. "Format:[access_count number_of_access_count]");
  68. DEFINE_bool(output_time_series, false,
  69. "Output the access time in second of each key, "
  70. "such that we can have the time series data of the queries \n"
  71. "File name: <prefix>-<query_type>-<cf_id>-time_series.txt\n"
  72. "Format:[type_id time_in_sec access_keyid].");
  73. DEFINE_bool(try_process_corrupted_trace, false,
  74. "In default, trace_analyzer will exit if the trace file is "
  75. "corrupted due to the unexpected tracing cases. If this option "
  76. "is enabled, trace_analyzer will stop reading the trace file, "
  77. "and start analyzing the read-in data.");
  78. DEFINE_int32(output_prefix_cut, 0,
  79. "The number of bytes as prefix to cut the keys.\n"
  80. "If it is enabled, it will generate the following:\n"
  81. "For accessed keys:\n"
  82. "File name: <prefix>-<query_type>-<cf_id>-"
  83. "accessed_key_prefix_cut.txt \n"
  84. "Format:[acessed_keyid access_count_of_prefix "
  85. "number_of_keys_in_prefix average_key_access "
  86. "prefix_succ_ratio prefix]\n"
  87. "For whole key space keys:\n"
  88. "File name: <prefix>-<query_type>-<cf_id>"
  89. "-whole_key_prefix_cut.txt\n"
  90. "Format:[start_keyid_in_whole_keyspace prefix]\n"
  91. "if 'output_qps_stats' and 'top_k' are enabled, it will output:\n"
  92. "File name: <prefix>-<query_type>-<cf_id>"
  93. "-accessed_top_k_qps_prefix_cut.txt\n"
  94. "Format:[the_top_ith_qps_time QPS], [prefix qps_of_this_second].");
  95. DEFINE_bool(convert_to_human_readable_trace, false,
  96. "Convert the binary trace file to a human readable txt file "
  97. "for further processing. "
  98. "This file will be extremely large "
  99. "(similar size as the original binary trace file). "
  100. "You can specify 'no_key' to reduce the size, if key is not "
  101. "needed in the next step.\n"
  102. "File name: <prefix>_human_readable_trace.txt\n"
  103. "Format:[type_id cf_id value_size time_in_micorsec <key>].");
  104. DEFINE_bool(output_qps_stats, false,
  105. "Output the query per second(qps) statistics \n"
  106. "For the overall qps, it will contain all qps of each query type. "
  107. "The time is started from the first trace record\n"
  108. "File name: <prefix>_qps_stats.txt\n"
  109. "Format: [qps_type_1 qps_type_2 ...... overall_qps]\n"
  110. "For each cf and query, it will have its own qps output.\n"
  111. "File name: <prefix>-<query_type>-<cf_id>_qps_stats.txt \n"
  112. "Format:[query_count_in_this_second].");
  113. DEFINE_bool(no_print, false, "Do not print out any result");
  114. DEFINE_string(
  115. print_correlation, "",
  116. "intput format: [correlation pairs][.,.]\n"
  117. "Output the query correlations between the pairs of query types "
  118. "listed in the parameter, input should select the operations from:\n"
  119. "get, put, delete, single_delete, rangle_delete, merge. No space "
  120. "between the pairs separated by commar. Example: =[get,get]... "
  121. "It will print out the number of pairs of 'A after B' and "
  122. "the average time interval between the two query.");
  123. DEFINE_string(key_space_dir, "",
  124. "<the directory stores full key space files> \n"
  125. "The key space files should be: <column family id>.txt");
  126. DEFINE_bool(analyze_get, false, "Analyze the Get query.");
  127. DEFINE_bool(analyze_put, false, "Analyze the Put query.");
  128. DEFINE_bool(analyze_delete, false, "Analyze the Delete query.");
  129. DEFINE_bool(analyze_single_delete, false, "Analyze the SingleDelete query.");
  130. DEFINE_bool(analyze_range_delete, false, "Analyze the DeleteRange query.");
  131. DEFINE_bool(analyze_merge, false, "Analyze the Merge query.");
  132. DEFINE_bool(analyze_iterator, false,
  133. " Analyze the iterate query like seek() and seekForPrev().");
  134. DEFINE_bool(no_key, false,
  135. " Does not output the key to the result files to make smaller.");
  136. DEFINE_bool(print_overall_stats, true,
  137. " Print the stats of the whole trace, "
  138. "like total requests, keys, and etc.");
  139. DEFINE_bool(output_key_distribution, false, "Print the key size distribution.");
  140. DEFINE_bool(
  141. output_value_distribution, false,
  142. "Out put the value size distribution, only available for Put and Merge.\n"
  143. "File name: <prefix>-<query_type>-<cf_id>"
  144. "-accessed_value_size_distribution.txt\n"
  145. "Format:[Number_of_value_size_between x and "
  146. "x+value_interval is: <the count>]");
  147. DEFINE_int32(print_top_k_access, 1,
  148. "<top K of the variables to be printed> "
  149. "Print the top k accessed keys, top k accessed prefix "
  150. "and etc.");
  151. DEFINE_int32(output_ignore_count, 0,
  152. "<threshold>, ignores the access count <= this value, "
  153. "it will shorter the output.");
  154. DEFINE_int32(value_interval, 8,
  155. "To output the value distribution, we need to set the value "
  156. "intervals and make the statistic of the value size distribution "
  157. "in different intervals. The default is 8.");
  158. DEFINE_double(sample_ratio, 1.0,
  159. "If the trace size is extremely huge or user want to sample "
  160. "the trace when analyzing, sample ratio can be set (0, 1.0]");
  161. namespace ROCKSDB_NAMESPACE {
  162. std::map<std::string, int> taOptToIndex = {
  163. {"get", 0}, {"put", 1},
  164. {"delete", 2}, {"single_delete", 3},
  165. {"range_delete", 4}, {"merge", 5},
  166. {"iterator_Seek", 6}, {"iterator_SeekForPrev", 7}};
  167. std::map<int, std::string> taIndexToOpt = {
  168. {0, "get"}, {1, "put"},
  169. {2, "delete"}, {3, "single_delete"},
  170. {4, "range_delete"}, {5, "merge"},
  171. {6, "iterator_Seek"}, {7, "iterator_SeekForPrev"}};
  172. namespace {
  173. uint64_t MultiplyCheckOverflow(uint64_t op1, uint64_t op2) {
  174. if (op1 == 0 || op2 == 0) {
  175. return 0;
  176. }
  177. if (port::kMaxUint64 / op1 < op2) {
  178. return op1;
  179. }
  180. return (op1 * op2);
  181. }
  182. void DecodeCFAndKeyFromString(std::string& buffer, uint32_t* cf_id, Slice* key) {
  183. Slice buf(buffer);
  184. GetFixed32(&buf, cf_id);
  185. GetLengthPrefixedSlice(&buf, key);
  186. }
  187. } // namespace
  188. // The default constructor of AnalyzerOptions
  189. AnalyzerOptions::AnalyzerOptions()
  190. : correlation_map(kTaTypeNum, std::vector<int>(kTaTypeNum, -1)) {}
  191. AnalyzerOptions::~AnalyzerOptions() {}
  192. void AnalyzerOptions::SparseCorrelationInput(const std::string& in_str) {
  193. std::string cur = in_str;
  194. if (cur.size() == 0) {
  195. return;
  196. }
  197. while (!cur.empty()) {
  198. if (cur.compare(0, 1, "[") != 0) {
  199. fprintf(stderr, "Invalid correlation input: %s\n", in_str.c_str());
  200. exit(1);
  201. }
  202. std::string opt1, opt2;
  203. std::size_t split = cur.find_first_of(",");
  204. if (split != std::string::npos) {
  205. opt1 = cur.substr(1, split - 1);
  206. } else {
  207. fprintf(stderr, "Invalid correlation input: %s\n", in_str.c_str());
  208. exit(1);
  209. }
  210. std::size_t end = cur.find_first_of("]");
  211. if (end != std::string::npos) {
  212. opt2 = cur.substr(split + 1, end - split - 1);
  213. } else {
  214. fprintf(stderr, "Invalid correlation input: %s\n", in_str.c_str());
  215. exit(1);
  216. }
  217. cur = cur.substr(end + 1);
  218. if (taOptToIndex.find(opt1) != taOptToIndex.end() &&
  219. taOptToIndex.find(opt2) != taOptToIndex.end()) {
  220. correlation_list.push_back(
  221. std::make_pair(taOptToIndex[opt1], taOptToIndex[opt2]));
  222. } else {
  223. fprintf(stderr, "Invalid correlation input: %s\n", in_str.c_str());
  224. exit(1);
  225. }
  226. }
  227. int sequence = 0;
  228. for (auto& it : correlation_list) {
  229. correlation_map[it.first][it.second] = sequence;
  230. sequence++;
  231. }
  232. return;
  233. }
  234. // The trace statistic struct constructor
  235. TraceStats::TraceStats() {
  236. cf_id = 0;
  237. cf_name = "0";
  238. a_count = 0;
  239. a_key_id = 0;
  240. a_key_size_sqsum = 0;
  241. a_key_size_sum = 0;
  242. a_key_mid = 0;
  243. a_value_size_sqsum = 0;
  244. a_value_size_sum = 0;
  245. a_value_mid = 0;
  246. a_peak_qps = 0;
  247. a_ave_qps = 0.0;
  248. }
  249. TraceStats::~TraceStats() {}
  250. // The trace analyzer constructor
  251. TraceAnalyzer::TraceAnalyzer(std::string& trace_path, std::string& output_path,
  252. AnalyzerOptions _analyzer_opts)
  253. : trace_name_(trace_path),
  254. output_path_(output_path),
  255. analyzer_opts_(_analyzer_opts) {
  256. ROCKSDB_NAMESPACE::EnvOptions env_options;
  257. env_ = ROCKSDB_NAMESPACE::Env::Default();
  258. offset_ = 0;
  259. c_time_ = 0;
  260. total_requests_ = 0;
  261. total_access_keys_ = 0;
  262. total_gets_ = 0;
  263. total_writes_ = 0;
  264. trace_create_time_ = 0;
  265. begin_time_ = 0;
  266. end_time_ = 0;
  267. time_series_start_ = 0;
  268. cur_time_sec_ = 0;
  269. if (FLAGS_sample_ratio > 1.0 || FLAGS_sample_ratio <= 0) {
  270. sample_max_ = 1;
  271. } else {
  272. sample_max_ = static_cast<uint32_t>(1.0 / FLAGS_sample_ratio);
  273. }
  274. ta_.resize(kTaTypeNum);
  275. ta_[0].type_name = "get";
  276. if (FLAGS_analyze_get) {
  277. ta_[0].enabled = true;
  278. } else {
  279. ta_[0].enabled = false;
  280. }
  281. ta_[1].type_name = "put";
  282. if (FLAGS_analyze_put) {
  283. ta_[1].enabled = true;
  284. } else {
  285. ta_[1].enabled = false;
  286. }
  287. ta_[2].type_name = "delete";
  288. if (FLAGS_analyze_delete) {
  289. ta_[2].enabled = true;
  290. } else {
  291. ta_[2].enabled = false;
  292. }
  293. ta_[3].type_name = "single_delete";
  294. if (FLAGS_analyze_single_delete) {
  295. ta_[3].enabled = true;
  296. } else {
  297. ta_[3].enabled = false;
  298. }
  299. ta_[4].type_name = "range_delete";
  300. if (FLAGS_analyze_range_delete) {
  301. ta_[4].enabled = true;
  302. } else {
  303. ta_[4].enabled = false;
  304. }
  305. ta_[5].type_name = "merge";
  306. if (FLAGS_analyze_merge) {
  307. ta_[5].enabled = true;
  308. } else {
  309. ta_[5].enabled = false;
  310. }
  311. ta_[6].type_name = "iterator_Seek";
  312. if (FLAGS_analyze_iterator) {
  313. ta_[6].enabled = true;
  314. } else {
  315. ta_[6].enabled = false;
  316. }
  317. ta_[7].type_name = "iterator_SeekForPrev";
  318. if (FLAGS_analyze_iterator) {
  319. ta_[7].enabled = true;
  320. } else {
  321. ta_[7].enabled = false;
  322. }
  323. for (int i = 0; i < kTaTypeNum; i++) {
  324. ta_[i].sample_count = 0;
  325. }
  326. }
  327. TraceAnalyzer::~TraceAnalyzer() {}
  328. // Prepare the processing
  329. // Initiate the global trace reader and writer here
  330. Status TraceAnalyzer::PrepareProcessing() {
  331. Status s;
  332. // Prepare the trace reader
  333. s = NewFileTraceReader(env_, env_options_, trace_name_, &trace_reader_);
  334. if (!s.ok()) {
  335. return s;
  336. }
  337. // Prepare and open the trace sequence file writer if needed
  338. if (FLAGS_convert_to_human_readable_trace) {
  339. std::string trace_sequence_name;
  340. trace_sequence_name =
  341. output_path_ + "/" + FLAGS_output_prefix + "-human_readable_trace.txt";
  342. s = env_->NewWritableFile(trace_sequence_name, &trace_sequence_f_,
  343. env_options_);
  344. if (!s.ok()) {
  345. return s;
  346. }
  347. }
  348. // prepare the general QPS file writer
  349. if (FLAGS_output_qps_stats) {
  350. std::string qps_stats_name;
  351. qps_stats_name =
  352. output_path_ + "/" + FLAGS_output_prefix + "-qps_stats.txt";
  353. s = env_->NewWritableFile(qps_stats_name, &qps_f_, env_options_);
  354. if (!s.ok()) {
  355. return s;
  356. }
  357. qps_stats_name =
  358. output_path_ + "/" + FLAGS_output_prefix + "-cf_qps_stats.txt";
  359. s = env_->NewWritableFile(qps_stats_name, &cf_qps_f_, env_options_);
  360. if (!s.ok()) {
  361. return s;
  362. }
  363. }
  364. return Status::OK();
  365. }
  366. Status TraceAnalyzer::ReadTraceHeader(Trace* header) {
  367. assert(header != nullptr);
  368. Status s = ReadTraceRecord(header);
  369. if (!s.ok()) {
  370. return s;
  371. }
  372. if (header->type != kTraceBegin) {
  373. return Status::Corruption("Corrupted trace file. Incorrect header.");
  374. }
  375. if (header->payload.substr(0, kTraceMagic.length()) != kTraceMagic) {
  376. return Status::Corruption("Corrupted trace file. Incorrect magic.");
  377. }
  378. return s;
  379. }
  380. Status TraceAnalyzer::ReadTraceFooter(Trace* footer) {
  381. assert(footer != nullptr);
  382. Status s = ReadTraceRecord(footer);
  383. if (!s.ok()) {
  384. return s;
  385. }
  386. if (footer->type != kTraceEnd) {
  387. return Status::Corruption("Corrupted trace file. Incorrect footer.");
  388. }
  389. return s;
  390. }
  391. Status TraceAnalyzer::ReadTraceRecord(Trace* trace) {
  392. assert(trace != nullptr);
  393. std::string encoded_trace;
  394. Status s = trace_reader_->Read(&encoded_trace);
  395. if (!s.ok()) {
  396. return s;
  397. }
  398. Slice enc_slice = Slice(encoded_trace);
  399. GetFixed64(&enc_slice, &trace->ts);
  400. trace->type = static_cast<TraceType>(enc_slice[0]);
  401. enc_slice.remove_prefix(kTraceTypeSize + kTracePayloadLengthSize);
  402. trace->payload = enc_slice.ToString();
  403. return s;
  404. }
  405. // process the trace itself and redirect the trace content
  406. // to different operation type handler. With different race
  407. // format, this function can be changed
  408. Status TraceAnalyzer::StartProcessing() {
  409. Status s;
  410. Trace header;
  411. s = ReadTraceHeader(&header);
  412. if (!s.ok()) {
  413. fprintf(stderr, "Cannot read the header\n");
  414. return s;
  415. }
  416. trace_create_time_ = header.ts;
  417. if (FLAGS_output_time_series) {
  418. time_series_start_ = header.ts;
  419. }
  420. Trace trace;
  421. while (s.ok()) {
  422. trace.reset();
  423. s = ReadTraceRecord(&trace);
  424. if (!s.ok()) {
  425. break;
  426. }
  427. total_requests_++;
  428. end_time_ = trace.ts;
  429. if (trace.type == kTraceWrite) {
  430. total_writes_++;
  431. c_time_ = trace.ts;
  432. WriteBatch batch(trace.payload);
  433. // Note that, if the write happens in a transaction,
  434. // 'Write' will be called twice, one for Prepare, one for
  435. // Commit. Thus, in the trace, for the same WriteBatch, there
  436. // will be two reords if it is in a transaction. Here, we only
  437. // process the reord that is committed. If write is non-transaction,
  438. // HasBeginPrepare()==false, so we process it normally.
  439. if (batch.HasBeginPrepare() && !batch.HasCommit()) {
  440. continue;
  441. }
  442. TraceWriteHandler write_handler(this);
  443. s = batch.Iterate(&write_handler);
  444. if (!s.ok()) {
  445. fprintf(stderr, "Cannot process the write batch in the trace\n");
  446. return s;
  447. }
  448. } else if (trace.type == kTraceGet) {
  449. uint32_t cf_id = 0;
  450. Slice key;
  451. DecodeCFAndKeyFromString(trace.payload, &cf_id, &key);
  452. total_gets_++;
  453. s = HandleGet(cf_id, key.ToString(), trace.ts, 1);
  454. if (!s.ok()) {
  455. fprintf(stderr, "Cannot process the get in the trace\n");
  456. return s;
  457. }
  458. } else if (trace.type == kTraceIteratorSeek ||
  459. trace.type == kTraceIteratorSeekForPrev) {
  460. uint32_t cf_id = 0;
  461. Slice key;
  462. DecodeCFAndKeyFromString(trace.payload, &cf_id, &key);
  463. s = HandleIter(cf_id, key.ToString(), trace.ts, trace.type);
  464. if (!s.ok()) {
  465. fprintf(stderr, "Cannot process the iterator in the trace\n");
  466. return s;
  467. }
  468. } else if (trace.type == kTraceEnd) {
  469. break;
  470. }
  471. }
  472. if (s.IsIncomplete()) {
  473. // Fix it: Reaching eof returns Incomplete status at the moment.
  474. //
  475. return Status::OK();
  476. }
  477. return s;
  478. }
  479. // After the trace is processed by StartProcessing, the statistic data
  480. // is stored in the map or other in memory data structures. To get the
  481. // other statistic result such as key size distribution, value size
  482. // distribution, these data structures are re-processed here.
  483. Status TraceAnalyzer::MakeStatistics() {
  484. int ret;
  485. Status s;
  486. for (int type = 0; type < kTaTypeNum; type++) {
  487. if (!ta_[type].enabled) {
  488. continue;
  489. }
  490. for (auto& stat : ta_[type].stats) {
  491. stat.second.a_key_id = 0;
  492. for (auto& record : stat.second.a_key_stats) {
  493. record.second.key_id = stat.second.a_key_id;
  494. stat.second.a_key_id++;
  495. if (record.second.access_count <=
  496. static_cast<uint64_t>(FLAGS_output_ignore_count)) {
  497. continue;
  498. }
  499. // Generate the key access count distribution data
  500. if (FLAGS_output_access_count_stats) {
  501. if (stat.second.a_count_stats.find(record.second.access_count) ==
  502. stat.second.a_count_stats.end()) {
  503. stat.second.a_count_stats[record.second.access_count] = 1;
  504. } else {
  505. stat.second.a_count_stats[record.second.access_count]++;
  506. }
  507. }
  508. // Generate the key size distribution data
  509. if (FLAGS_output_key_distribution) {
  510. if (stat.second.a_key_size_stats.find(record.first.size()) ==
  511. stat.second.a_key_size_stats.end()) {
  512. stat.second.a_key_size_stats[record.first.size()] = 1;
  513. } else {
  514. stat.second.a_key_size_stats[record.first.size()]++;
  515. }
  516. }
  517. if (!FLAGS_print_correlation.empty()) {
  518. s = MakeStatisticCorrelation(stat.second, record.second);
  519. if (!s.ok()) {
  520. return s;
  521. }
  522. }
  523. }
  524. // Output the prefix cut or the whole content of the accessed key space
  525. if (FLAGS_output_key_stats || FLAGS_output_prefix_cut > 0) {
  526. s = MakeStatisticKeyStatsOrPrefix(stat.second);
  527. if (!s.ok()) {
  528. return s;
  529. }
  530. }
  531. // output the access count distribution
  532. if (FLAGS_output_access_count_stats && stat.second.a_count_dist_f) {
  533. for (auto& record : stat.second.a_count_stats) {
  534. ret = snprintf(buffer_, sizeof(buffer_),
  535. "access_count: %" PRIu64 " num: %" PRIu64 "\n",
  536. record.first, record.second);
  537. if (ret < 0) {
  538. return Status::IOError("Format the output failed");
  539. }
  540. std::string printout(buffer_);
  541. s = stat.second.a_count_dist_f->Append(printout);
  542. if (!s.ok()) {
  543. fprintf(stderr, "Write access count distribution file failed\n");
  544. return s;
  545. }
  546. }
  547. }
  548. // find the medium of the key size
  549. uint64_t k_count = 0;
  550. bool get_mid = false;
  551. for (auto& record : stat.second.a_key_size_stats) {
  552. k_count += record.second;
  553. if (!get_mid && k_count >= stat.second.a_key_mid) {
  554. stat.second.a_key_mid = record.first;
  555. get_mid = true;
  556. }
  557. if (FLAGS_output_key_distribution && stat.second.a_key_size_f) {
  558. ret = snprintf(buffer_, sizeof(buffer_), "%" PRIu64 " %" PRIu64 "\n",
  559. record.first, record.second);
  560. if (ret < 0) {
  561. return Status::IOError("Format output failed");
  562. }
  563. std::string printout(buffer_);
  564. s = stat.second.a_key_size_f->Append(printout);
  565. if (!s.ok()) {
  566. fprintf(stderr, "Write key size distribution file failed\n");
  567. return s;
  568. }
  569. }
  570. }
  571. // output the value size distribution
  572. uint64_t v_begin = 0, v_end = 0, v_count = 0;
  573. get_mid = false;
  574. for (auto& record : stat.second.a_value_size_stats) {
  575. v_begin = v_end;
  576. v_end = (record.first + 1) * FLAGS_value_interval;
  577. v_count += record.second;
  578. if (!get_mid && v_count >= stat.second.a_count / 2) {
  579. stat.second.a_value_mid = (v_begin + v_end) / 2;
  580. get_mid = true;
  581. }
  582. if (FLAGS_output_value_distribution && stat.second.a_value_size_f &&
  583. (type == TraceOperationType::kPut ||
  584. type == TraceOperationType::kMerge)) {
  585. ret = snprintf(buffer_, sizeof(buffer_),
  586. "Number_of_value_size_between %" PRIu64 " and %" PRIu64
  587. " is: %" PRIu64 "\n",
  588. v_begin, v_end, record.second);
  589. if (ret < 0) {
  590. return Status::IOError("Format output failed");
  591. }
  592. std::string printout(buffer_);
  593. s = stat.second.a_value_size_f->Append(printout);
  594. if (!s.ok()) {
  595. fprintf(stderr, "Write value size distribution file failed\n");
  596. return s;
  597. }
  598. }
  599. }
  600. }
  601. }
  602. // Make the QPS statistics
  603. if (FLAGS_output_qps_stats) {
  604. s = MakeStatisticQPS();
  605. if (!s.ok()) {
  606. return s;
  607. }
  608. }
  609. return Status::OK();
  610. }
  611. // Process the statistics of the key access and
  612. // prefix of the accessed keys if required
  613. Status TraceAnalyzer::MakeStatisticKeyStatsOrPrefix(TraceStats& stats) {
  614. int ret;
  615. Status s;
  616. std::string prefix = "0";
  617. uint64_t prefix_access = 0;
  618. uint64_t prefix_count = 0;
  619. uint64_t prefix_succ_access = 0;
  620. double prefix_ave_access = 0.0;
  621. stats.a_succ_count = 0;
  622. for (auto& record : stats.a_key_stats) {
  623. // write the key access statistic file
  624. if (!stats.a_key_f) {
  625. return Status::IOError("Failed to open accessed_key_stats file.");
  626. }
  627. stats.a_succ_count += record.second.succ_count;
  628. double succ_ratio = 0.0;
  629. if (record.second.access_count > 0) {
  630. succ_ratio = (static_cast<double>(record.second.succ_count)) /
  631. record.second.access_count;
  632. }
  633. ret = snprintf(buffer_, sizeof(buffer_),
  634. "%u %zu %" PRIu64 " %" PRIu64 " %f\n", record.second.cf_id,
  635. record.second.value_size, record.second.key_id,
  636. record.second.access_count, succ_ratio);
  637. if (ret < 0) {
  638. return Status::IOError("Format output failed");
  639. }
  640. std::string printout(buffer_);
  641. s = stats.a_key_f->Append(printout);
  642. if (!s.ok()) {
  643. fprintf(stderr, "Write key access file failed\n");
  644. return s;
  645. }
  646. // write the prefix cut of the accessed keys
  647. if (FLAGS_output_prefix_cut > 0 && stats.a_prefix_cut_f) {
  648. if (record.first.compare(0, FLAGS_output_prefix_cut, prefix) != 0) {
  649. std::string prefix_out =
  650. ROCKSDB_NAMESPACE::LDBCommand::StringToHex(prefix);
  651. if (prefix_count == 0) {
  652. prefix_ave_access = 0.0;
  653. } else {
  654. prefix_ave_access =
  655. (static_cast<double>(prefix_access)) / prefix_count;
  656. }
  657. double prefix_succ_ratio = 0.0;
  658. if (prefix_access > 0) {
  659. prefix_succ_ratio =
  660. (static_cast<double>(prefix_succ_access)) / prefix_access;
  661. }
  662. ret =
  663. snprintf(buffer_, sizeof(buffer_),
  664. "%" PRIu64 " %" PRIu64 " %" PRIu64 " %f %f %s\n",
  665. record.second.key_id, prefix_access, prefix_count,
  666. prefix_ave_access, prefix_succ_ratio, prefix_out.c_str());
  667. if (ret < 0) {
  668. return Status::IOError("Format output failed");
  669. }
  670. std::string pout(buffer_);
  671. s = stats.a_prefix_cut_f->Append(pout);
  672. if (!s.ok()) {
  673. fprintf(stderr, "Write accessed key prefix file failed\n");
  674. return s;
  675. }
  676. // make the top k statistic for the prefix
  677. if (static_cast<int32_t>(stats.top_k_prefix_access.size()) <
  678. FLAGS_print_top_k_access) {
  679. stats.top_k_prefix_access.push(
  680. std::make_pair(prefix_access, prefix_out));
  681. } else {
  682. if (prefix_access > stats.top_k_prefix_access.top().first) {
  683. stats.top_k_prefix_access.pop();
  684. stats.top_k_prefix_access.push(
  685. std::make_pair(prefix_access, prefix_out));
  686. }
  687. }
  688. if (static_cast<int32_t>(stats.top_k_prefix_ave.size()) <
  689. FLAGS_print_top_k_access) {
  690. stats.top_k_prefix_ave.push(
  691. std::make_pair(prefix_ave_access, prefix_out));
  692. } else {
  693. if (prefix_ave_access > stats.top_k_prefix_ave.top().first) {
  694. stats.top_k_prefix_ave.pop();
  695. stats.top_k_prefix_ave.push(
  696. std::make_pair(prefix_ave_access, prefix_out));
  697. }
  698. }
  699. prefix = record.first.substr(0, FLAGS_output_prefix_cut);
  700. prefix_access = 0;
  701. prefix_count = 0;
  702. prefix_succ_access = 0;
  703. }
  704. prefix_access += record.second.access_count;
  705. prefix_count += 1;
  706. prefix_succ_access += record.second.succ_count;
  707. }
  708. }
  709. return Status::OK();
  710. }
  711. // Process the statistics of different query type
  712. // correlations
  713. Status TraceAnalyzer::MakeStatisticCorrelation(TraceStats& stats,
  714. StatsUnit& unit) {
  715. if (stats.correlation_output.size() !=
  716. analyzer_opts_.correlation_list.size()) {
  717. return Status::Corruption("Cannot make the statistic of correlation.");
  718. }
  719. for (int i = 0; i < static_cast<int>(analyzer_opts_.correlation_list.size());
  720. i++) {
  721. if (i >= static_cast<int>(stats.correlation_output.size()) ||
  722. i >= static_cast<int>(unit.v_correlation.size())) {
  723. break;
  724. }
  725. stats.correlation_output[i].first += unit.v_correlation[i].count;
  726. stats.correlation_output[i].second += unit.v_correlation[i].total_ts;
  727. }
  728. return Status::OK();
  729. }
  730. // Process the statistics of QPS
  731. Status TraceAnalyzer::MakeStatisticQPS() {
  732. if(begin_time_ == 0) {
  733. begin_time_ = trace_create_time_;
  734. }
  735. uint32_t duration =
  736. static_cast<uint32_t>((end_time_ - begin_time_) / 1000000);
  737. int ret;
  738. Status s;
  739. std::vector<std::vector<uint32_t>> type_qps(
  740. duration, std::vector<uint32_t>(kTaTypeNum + 1, 0));
  741. std::vector<uint64_t> qps_sum(kTaTypeNum + 1, 0);
  742. std::vector<uint32_t> qps_peak(kTaTypeNum + 1, 0);
  743. qps_ave_.resize(kTaTypeNum + 1);
  744. for (int type = 0; type < kTaTypeNum; type++) {
  745. if (!ta_[type].enabled) {
  746. continue;
  747. }
  748. for (auto& stat : ta_[type].stats) {
  749. uint32_t time_line = 0;
  750. uint64_t cf_qps_sum = 0;
  751. for (auto& time_it : stat.second.a_qps_stats) {
  752. if (time_it.first >= duration) {
  753. continue;
  754. }
  755. type_qps[time_it.first][kTaTypeNum] += time_it.second;
  756. type_qps[time_it.first][type] += time_it.second;
  757. cf_qps_sum += time_it.second;
  758. if (time_it.second > stat.second.a_peak_qps) {
  759. stat.second.a_peak_qps = time_it.second;
  760. }
  761. if (stat.second.a_qps_f) {
  762. while (time_line < time_it.first) {
  763. ret = snprintf(buffer_, sizeof(buffer_), "%u\n", 0);
  764. if (ret < 0) {
  765. return Status::IOError("Format the output failed");
  766. }
  767. std::string printout(buffer_);
  768. s = stat.second.a_qps_f->Append(printout);
  769. if (!s.ok()) {
  770. fprintf(stderr, "Write QPS file failed\n");
  771. return s;
  772. }
  773. time_line++;
  774. }
  775. ret = snprintf(buffer_, sizeof(buffer_), "%u\n", time_it.second);
  776. if (ret < 0) {
  777. return Status::IOError("Format the output failed");
  778. }
  779. std::string printout(buffer_);
  780. s = stat.second.a_qps_f->Append(printout);
  781. if (!s.ok()) {
  782. fprintf(stderr, "Write QPS file failed\n");
  783. return s;
  784. }
  785. if (time_line == time_it.first) {
  786. time_line++;
  787. }
  788. }
  789. // Process the top k QPS peaks
  790. if (FLAGS_output_prefix_cut > 0) {
  791. if (static_cast<int32_t>(stat.second.top_k_qps_sec.size()) <
  792. FLAGS_print_top_k_access) {
  793. stat.second.top_k_qps_sec.push(
  794. std::make_pair(time_it.second, time_it.first));
  795. } else {
  796. if (stat.second.top_k_qps_sec.size() > 0 &&
  797. stat.second.top_k_qps_sec.top().first < time_it.second) {
  798. stat.second.top_k_qps_sec.pop();
  799. stat.second.top_k_qps_sec.push(
  800. std::make_pair(time_it.second, time_it.first));
  801. }
  802. }
  803. }
  804. }
  805. if (duration == 0) {
  806. stat.second.a_ave_qps = 0;
  807. } else {
  808. stat.second.a_ave_qps = (static_cast<double>(cf_qps_sum)) / duration;
  809. }
  810. // Output the accessed unique key number change overtime
  811. if (stat.second.a_key_num_f) {
  812. uint64_t cur_uni_key =
  813. static_cast<uint64_t>(stat.second.a_key_stats.size());
  814. double cur_ratio = 0.0;
  815. uint64_t cur_num = 0;
  816. for (uint32_t i = 0; i < duration; i++) {
  817. auto find_time = stat.second.uni_key_num.find(i);
  818. if (find_time != stat.second.uni_key_num.end()) {
  819. cur_ratio = (static_cast<double>(find_time->second)) / cur_uni_key;
  820. cur_num = find_time->second;
  821. }
  822. ret = snprintf(buffer_, sizeof(buffer_), "%" PRIu64 " %.12f\n",
  823. cur_num, cur_ratio);
  824. if (ret < 0) {
  825. return Status::IOError("Format the output failed");
  826. }
  827. std::string printout(buffer_);
  828. s = stat.second.a_key_num_f->Append(printout);
  829. if (!s.ok()) {
  830. fprintf(stderr,
  831. "Write accessed unique key number change file failed\n");
  832. return s;
  833. }
  834. }
  835. }
  836. // output the prefix of top k access peak
  837. if (FLAGS_output_prefix_cut > 0 && stat.second.a_top_qps_prefix_f) {
  838. while (!stat.second.top_k_qps_sec.empty()) {
  839. ret = snprintf(buffer_, sizeof(buffer_), "At time: %u with QPS: %u\n",
  840. stat.second.top_k_qps_sec.top().second,
  841. stat.second.top_k_qps_sec.top().first);
  842. if (ret < 0) {
  843. return Status::IOError("Format the output failed");
  844. }
  845. std::string printout(buffer_);
  846. s = stat.second.a_top_qps_prefix_f->Append(printout);
  847. if (!s.ok()) {
  848. fprintf(stderr, "Write prefix QPS top K file failed\n");
  849. return s;
  850. }
  851. uint32_t qps_time = stat.second.top_k_qps_sec.top().second;
  852. stat.second.top_k_qps_sec.pop();
  853. if (stat.second.a_qps_prefix_stats.find(qps_time) !=
  854. stat.second.a_qps_prefix_stats.end()) {
  855. for (auto& qps_prefix : stat.second.a_qps_prefix_stats[qps_time]) {
  856. std::string qps_prefix_out =
  857. ROCKSDB_NAMESPACE::LDBCommand::StringToHex(qps_prefix.first);
  858. ret = snprintf(buffer_, sizeof(buffer_),
  859. "The prefix: %s Access count: %u\n",
  860. qps_prefix_out.c_str(), qps_prefix.second);
  861. if (ret < 0) {
  862. return Status::IOError("Format the output failed");
  863. }
  864. std::string pout(buffer_);
  865. s = stat.second.a_top_qps_prefix_f->Append(pout);
  866. if (!s.ok()) {
  867. fprintf(stderr, "Write prefix QPS top K file failed\n");
  868. return s;
  869. }
  870. }
  871. }
  872. }
  873. }
  874. }
  875. }
  876. if (qps_f_) {
  877. for (uint32_t i = 0; i < duration; i++) {
  878. for (int type = 0; type <= kTaTypeNum; type++) {
  879. if (type < kTaTypeNum) {
  880. ret = snprintf(buffer_, sizeof(buffer_), "%u ", type_qps[i][type]);
  881. } else {
  882. ret = snprintf(buffer_, sizeof(buffer_), "%u\n", type_qps[i][type]);
  883. }
  884. if (ret < 0) {
  885. return Status::IOError("Format the output failed");
  886. }
  887. std::string printout(buffer_);
  888. s = qps_f_->Append(printout);
  889. if (!s.ok()) {
  890. return s;
  891. }
  892. qps_sum[type] += type_qps[i][type];
  893. if (type_qps[i][type] > qps_peak[type]) {
  894. qps_peak[type] = type_qps[i][type];
  895. }
  896. }
  897. }
  898. }
  899. if (cf_qps_f_) {
  900. int cfs_size = static_cast<uint32_t>(cfs_.size());
  901. uint32_t v;
  902. for (uint32_t i = 0; i < duration; i++) {
  903. for (int cf = 0; cf < cfs_size; cf++) {
  904. if (cfs_[cf].cf_qps.find(i) != cfs_[cf].cf_qps.end()) {
  905. v = cfs_[cf].cf_qps[i];
  906. } else {
  907. v = 0;
  908. }
  909. if (cf < cfs_size - 1) {
  910. ret = snprintf(buffer_, sizeof(buffer_), "%u ", v);
  911. } else {
  912. ret = snprintf(buffer_, sizeof(buffer_), "%u\n", v);
  913. }
  914. if (ret < 0) {
  915. return Status::IOError("Format the output failed");
  916. }
  917. std::string printout(buffer_);
  918. s = cf_qps_f_->Append(printout);
  919. if (!s.ok()) {
  920. return s;
  921. }
  922. }
  923. }
  924. }
  925. qps_peak_ = qps_peak;
  926. for (int type = 0; type <= kTaTypeNum; type++) {
  927. if (duration == 0) {
  928. qps_ave_[type] = 0;
  929. } else {
  930. qps_ave_[type] = (static_cast<double>(qps_sum[type])) / duration;
  931. }
  932. }
  933. return Status::OK();
  934. }
  935. // In reprocessing, if we have the whole key space
  936. // we can output the access count of all keys in a cf
  937. // we can make some statistics of the whole key space
  938. // also, we output the top k accessed keys here
  939. Status TraceAnalyzer::ReProcessing() {
  940. int ret;
  941. Status s;
  942. for (auto& cf_it : cfs_) {
  943. uint32_t cf_id = cf_it.first;
  944. // output the time series;
  945. if (FLAGS_output_time_series) {
  946. for (int type = 0; type < kTaTypeNum; type++) {
  947. if (!ta_[type].enabled ||
  948. ta_[type].stats.find(cf_id) == ta_[type].stats.end()) {
  949. continue;
  950. }
  951. TraceStats& stat = ta_[type].stats[cf_id];
  952. if (!stat.time_series_f) {
  953. fprintf(stderr, "Cannot write time_series of '%s' in '%u'\n",
  954. ta_[type].type_name.c_str(), cf_id);
  955. continue;
  956. }
  957. while (!stat.time_series.empty()) {
  958. uint64_t key_id = 0;
  959. auto found = stat.a_key_stats.find(stat.time_series.front().key);
  960. if (found != stat.a_key_stats.end()) {
  961. key_id = found->second.key_id;
  962. }
  963. ret =
  964. snprintf(buffer_, sizeof(buffer_), "%u %" PRIu64 " %" PRIu64 "\n",
  965. stat.time_series.front().type,
  966. stat.time_series.front().ts, key_id);
  967. if (ret < 0) {
  968. return Status::IOError("Format the output failed");
  969. }
  970. std::string printout(buffer_);
  971. s = stat.time_series_f->Append(printout);
  972. if (!s.ok()) {
  973. fprintf(stderr, "Write time series file failed\n");
  974. return s;
  975. }
  976. stat.time_series.pop_front();
  977. }
  978. }
  979. }
  980. // process the whole key space if needed
  981. if (!FLAGS_key_space_dir.empty()) {
  982. std::string whole_key_path =
  983. FLAGS_key_space_dir + "/" + std::to_string(cf_id) + ".txt";
  984. std::string input_key, get_key;
  985. std::vector<std::string> prefix(kTaTypeNum);
  986. std::istringstream iss;
  987. bool has_data = true;
  988. std::unique_ptr<SequentialFile> wkey_input_f;
  989. s = env_->NewSequentialFile(whole_key_path, &wkey_input_f, env_options_);
  990. if (!s.ok()) {
  991. fprintf(stderr, "Cannot open the whole key space file of CF: %u\n",
  992. cf_id);
  993. wkey_input_f.reset();
  994. }
  995. if (wkey_input_f) {
  996. std::unique_ptr<FSSequentialFile> file;
  997. file = NewLegacySequentialFileWrapper(wkey_input_f);
  998. size_t kTraceFileReadaheadSize = 2 * 1024 * 1024;
  999. SequentialFileReader sf_reader(
  1000. std::move(file), whole_key_path,
  1001. kTraceFileReadaheadSize /* filereadahead_size */);
  1002. for (cfs_[cf_id].w_count = 0;
  1003. ReadOneLine(&iss, &sf_reader, &get_key, &has_data, &s);
  1004. ++cfs_[cf_id].w_count) {
  1005. if (!s.ok()) {
  1006. fprintf(stderr, "Read whole key space file failed\n");
  1007. return s;
  1008. }
  1009. input_key = ROCKSDB_NAMESPACE::LDBCommand::HexToString(get_key);
  1010. for (int type = 0; type < kTaTypeNum; type++) {
  1011. if (!ta_[type].enabled) {
  1012. continue;
  1013. }
  1014. TraceStats& stat = ta_[type].stats[cf_id];
  1015. if (stat.w_key_f) {
  1016. if (stat.a_key_stats.find(input_key) != stat.a_key_stats.end()) {
  1017. ret = snprintf(buffer_, sizeof(buffer_),
  1018. "%" PRIu64 " %" PRIu64 "\n", cfs_[cf_id].w_count,
  1019. stat.a_key_stats[input_key].access_count);
  1020. if (ret < 0) {
  1021. return Status::IOError("Format the output failed");
  1022. }
  1023. std::string printout(buffer_);
  1024. s = stat.w_key_f->Append(printout);
  1025. if (!s.ok()) {
  1026. fprintf(stderr, "Write whole key space access file failed\n");
  1027. return s;
  1028. }
  1029. }
  1030. }
  1031. // Output the prefix cut file of the whole key space
  1032. if (FLAGS_output_prefix_cut > 0 && stat.w_prefix_cut_f) {
  1033. if (input_key.compare(0, FLAGS_output_prefix_cut, prefix[type]) !=
  1034. 0) {
  1035. prefix[type] = input_key.substr(0, FLAGS_output_prefix_cut);
  1036. std::string prefix_out =
  1037. ROCKSDB_NAMESPACE::LDBCommand::StringToHex(prefix[type]);
  1038. ret = snprintf(buffer_, sizeof(buffer_), "%" PRIu64 " %s\n",
  1039. cfs_[cf_id].w_count, prefix_out.c_str());
  1040. if (ret < 0) {
  1041. return Status::IOError("Format the output failed");
  1042. }
  1043. std::string printout(buffer_);
  1044. s = stat.w_prefix_cut_f->Append(printout);
  1045. if (!s.ok()) {
  1046. fprintf(stderr,
  1047. "Write whole key space prefix cut file failed\n");
  1048. return s;
  1049. }
  1050. }
  1051. }
  1052. }
  1053. // Make the statistics fo the key size distribution
  1054. if (FLAGS_output_key_distribution) {
  1055. if (cfs_[cf_id].w_key_size_stats.find(input_key.size()) ==
  1056. cfs_[cf_id].w_key_size_stats.end()) {
  1057. cfs_[cf_id].w_key_size_stats[input_key.size()] = 1;
  1058. } else {
  1059. cfs_[cf_id].w_key_size_stats[input_key.size()]++;
  1060. }
  1061. }
  1062. }
  1063. }
  1064. }
  1065. // process the top k accessed keys
  1066. if (FLAGS_print_top_k_access > 0) {
  1067. for (int type = 0; type < kTaTypeNum; type++) {
  1068. if (!ta_[type].enabled ||
  1069. ta_[type].stats.find(cf_id) == ta_[type].stats.end()) {
  1070. continue;
  1071. }
  1072. TraceStats& stat = ta_[type].stats[cf_id];
  1073. for (auto& record : stat.a_key_stats) {
  1074. if (static_cast<int32_t>(stat.top_k_queue.size()) <
  1075. FLAGS_print_top_k_access) {
  1076. stat.top_k_queue.push(
  1077. std::make_pair(record.second.access_count, record.first));
  1078. } else {
  1079. if (record.second.access_count > stat.top_k_queue.top().first) {
  1080. stat.top_k_queue.pop();
  1081. stat.top_k_queue.push(
  1082. std::make_pair(record.second.access_count, record.first));
  1083. }
  1084. }
  1085. }
  1086. }
  1087. }
  1088. }
  1089. return Status::OK();
  1090. }
  1091. // End the processing, print the requested results
  1092. Status TraceAnalyzer::EndProcessing() {
  1093. if (trace_sequence_f_) {
  1094. trace_sequence_f_->Close();
  1095. }
  1096. if (FLAGS_no_print) {
  1097. return Status::OK();
  1098. }
  1099. PrintStatistics();
  1100. CloseOutputFiles();
  1101. return Status::OK();
  1102. }
  1103. // Insert the corresponding key statistics to the correct type
  1104. // and correct CF, output the time-series file if needed
  1105. Status TraceAnalyzer::KeyStatsInsertion(const uint32_t& type,
  1106. const uint32_t& cf_id,
  1107. const std::string& key,
  1108. const size_t value_size,
  1109. const uint64_t ts) {
  1110. Status s;
  1111. StatsUnit unit;
  1112. unit.key_id = 0;
  1113. unit.cf_id = cf_id;
  1114. unit.value_size = value_size;
  1115. unit.access_count = 1;
  1116. unit.latest_ts = ts;
  1117. if (type != TraceOperationType::kGet || value_size > 0) {
  1118. unit.succ_count = 1;
  1119. } else {
  1120. unit.succ_count = 0;
  1121. }
  1122. unit.v_correlation.resize(analyzer_opts_.correlation_list.size());
  1123. for (int i = 0;
  1124. i < (static_cast<int>(analyzer_opts_.correlation_list.size())); i++) {
  1125. unit.v_correlation[i].count = 0;
  1126. unit.v_correlation[i].total_ts = 0;
  1127. }
  1128. std::string prefix;
  1129. if (FLAGS_output_prefix_cut > 0) {
  1130. prefix = key.substr(0, FLAGS_output_prefix_cut);
  1131. }
  1132. if (begin_time_ == 0) {
  1133. begin_time_ = ts;
  1134. }
  1135. uint32_t time_in_sec;
  1136. if (ts < begin_time_) {
  1137. time_in_sec = 0;
  1138. } else {
  1139. time_in_sec = static_cast<uint32_t>((ts - begin_time_) / 1000000);
  1140. }
  1141. uint64_t dist_value_size = value_size / FLAGS_value_interval;
  1142. auto found_stats = ta_[type].stats.find(cf_id);
  1143. if (found_stats == ta_[type].stats.end()) {
  1144. ta_[type].stats[cf_id].cf_id = cf_id;
  1145. ta_[type].stats[cf_id].cf_name = std::to_string(cf_id);
  1146. ta_[type].stats[cf_id].a_count = 1;
  1147. ta_[type].stats[cf_id].a_key_id = 0;
  1148. ta_[type].stats[cf_id].a_key_size_sqsum = MultiplyCheckOverflow(
  1149. static_cast<uint64_t>(key.size()), static_cast<uint64_t>(key.size()));
  1150. ta_[type].stats[cf_id].a_key_size_sum = key.size();
  1151. ta_[type].stats[cf_id].a_value_size_sqsum = MultiplyCheckOverflow(
  1152. static_cast<uint64_t>(value_size), static_cast<uint64_t>(value_size));
  1153. ta_[type].stats[cf_id].a_value_size_sum = value_size;
  1154. s = OpenStatsOutputFiles(ta_[type].type_name, ta_[type].stats[cf_id]);
  1155. if (!FLAGS_print_correlation.empty()) {
  1156. s = StatsUnitCorrelationUpdate(unit, type, ts, key);
  1157. }
  1158. ta_[type].stats[cf_id].a_key_stats[key] = unit;
  1159. ta_[type].stats[cf_id].a_value_size_stats[dist_value_size] = 1;
  1160. ta_[type].stats[cf_id].a_qps_stats[time_in_sec] = 1;
  1161. ta_[type].stats[cf_id].correlation_output.resize(
  1162. analyzer_opts_.correlation_list.size());
  1163. if (FLAGS_output_prefix_cut > 0) {
  1164. std::map<std::string, uint32_t> tmp_qps_map;
  1165. tmp_qps_map[prefix] = 1;
  1166. ta_[type].stats[cf_id].a_qps_prefix_stats[time_in_sec] = tmp_qps_map;
  1167. }
  1168. if (time_in_sec != cur_time_sec_) {
  1169. ta_[type].stats[cf_id].uni_key_num[cur_time_sec_] =
  1170. static_cast<uint64_t>(ta_[type].stats[cf_id].a_key_stats.size());
  1171. cur_time_sec_ = time_in_sec;
  1172. }
  1173. } else {
  1174. found_stats->second.a_count++;
  1175. found_stats->second.a_key_size_sqsum += MultiplyCheckOverflow(
  1176. static_cast<uint64_t>(key.size()), static_cast<uint64_t>(key.size()));
  1177. found_stats->second.a_key_size_sum += key.size();
  1178. found_stats->second.a_value_size_sqsum += MultiplyCheckOverflow(
  1179. static_cast<uint64_t>(value_size), static_cast<uint64_t>(value_size));
  1180. found_stats->second.a_value_size_sum += value_size;
  1181. auto found_key = found_stats->second.a_key_stats.find(key);
  1182. if (found_key == found_stats->second.a_key_stats.end()) {
  1183. found_stats->second.a_key_stats[key] = unit;
  1184. } else {
  1185. found_key->second.access_count++;
  1186. if (type != TraceOperationType::kGet || value_size > 0) {
  1187. found_key->second.succ_count++;
  1188. }
  1189. if (!FLAGS_print_correlation.empty()) {
  1190. s = StatsUnitCorrelationUpdate(found_key->second, type, ts, key);
  1191. }
  1192. }
  1193. if (time_in_sec != cur_time_sec_) {
  1194. found_stats->second.uni_key_num[cur_time_sec_] =
  1195. static_cast<uint64_t>(found_stats->second.a_key_stats.size());
  1196. cur_time_sec_ = time_in_sec;
  1197. }
  1198. auto found_value =
  1199. found_stats->second.a_value_size_stats.find(dist_value_size);
  1200. if (found_value == found_stats->second.a_value_size_stats.end()) {
  1201. found_stats->second.a_value_size_stats[dist_value_size] = 1;
  1202. } else {
  1203. found_value->second++;
  1204. }
  1205. auto found_qps = found_stats->second.a_qps_stats.find(time_in_sec);
  1206. if (found_qps == found_stats->second.a_qps_stats.end()) {
  1207. found_stats->second.a_qps_stats[time_in_sec] = 1;
  1208. } else {
  1209. found_qps->second++;
  1210. }
  1211. if (FLAGS_output_prefix_cut > 0) {
  1212. auto found_qps_prefix =
  1213. found_stats->second.a_qps_prefix_stats.find(time_in_sec);
  1214. if (found_qps_prefix == found_stats->second.a_qps_prefix_stats.end()) {
  1215. std::map<std::string, uint32_t> tmp_qps_map;
  1216. found_stats->second.a_qps_prefix_stats[time_in_sec] = tmp_qps_map;
  1217. }
  1218. if (found_stats->second.a_qps_prefix_stats[time_in_sec].find(prefix) ==
  1219. found_stats->second.a_qps_prefix_stats[time_in_sec].end()) {
  1220. found_stats->second.a_qps_prefix_stats[time_in_sec][prefix] = 1;
  1221. } else {
  1222. found_stats->second.a_qps_prefix_stats[time_in_sec][prefix]++;
  1223. }
  1224. }
  1225. }
  1226. if (cfs_.find(cf_id) == cfs_.end()) {
  1227. CfUnit cf_unit;
  1228. cf_unit.cf_id = cf_id;
  1229. cf_unit.w_count = 0;
  1230. cf_unit.a_count = 0;
  1231. cfs_[cf_id] = cf_unit;
  1232. }
  1233. if (FLAGS_output_qps_stats) {
  1234. cfs_[cf_id].cf_qps[time_in_sec]++;
  1235. }
  1236. if (FLAGS_output_time_series) {
  1237. TraceUnit trace_u;
  1238. trace_u.type = type;
  1239. trace_u.key = key;
  1240. trace_u.value_size = value_size;
  1241. trace_u.ts = (ts - time_series_start_) / 1000000;
  1242. trace_u.cf_id = cf_id;
  1243. ta_[type].stats[cf_id].time_series.push_back(trace_u);
  1244. }
  1245. return Status::OK();
  1246. }
  1247. // Update the correlation unit of each key if enabled
  1248. Status TraceAnalyzer::StatsUnitCorrelationUpdate(StatsUnit& unit,
  1249. const uint32_t& type_second,
  1250. const uint64_t& ts,
  1251. const std::string& key) {
  1252. if (type_second >= kTaTypeNum) {
  1253. fprintf(stderr, "Unknown Type Id: %u\n", type_second);
  1254. return Status::NotFound();
  1255. }
  1256. for (int type_first = 0; type_first < kTaTypeNum; type_first++) {
  1257. if (type_first >= static_cast<int>(ta_.size()) ||
  1258. type_first >= static_cast<int>(analyzer_opts_.correlation_map.size())) {
  1259. break;
  1260. }
  1261. if (analyzer_opts_.correlation_map[type_first][type_second] < 0 ||
  1262. ta_[type_first].stats.find(unit.cf_id) == ta_[type_first].stats.end() ||
  1263. ta_[type_first].stats[unit.cf_id].a_key_stats.find(key) ==
  1264. ta_[type_first].stats[unit.cf_id].a_key_stats.end() ||
  1265. ta_[type_first].stats[unit.cf_id].a_key_stats[key].latest_ts == ts) {
  1266. continue;
  1267. }
  1268. int correlation_id =
  1269. analyzer_opts_.correlation_map[type_first][type_second];
  1270. // after get the x-y operation time or x, update;
  1271. if (correlation_id < 0 ||
  1272. correlation_id >= static_cast<int>(unit.v_correlation.size())) {
  1273. continue;
  1274. }
  1275. unit.v_correlation[correlation_id].count++;
  1276. unit.v_correlation[correlation_id].total_ts +=
  1277. (ts - ta_[type_first].stats[unit.cf_id].a_key_stats[key].latest_ts);
  1278. }
  1279. unit.latest_ts = ts;
  1280. return Status::OK();
  1281. }
  1282. // when a new trace statistic is created, the file handler
  1283. // pointers should be initiated if needed according to
  1284. // the trace analyzer options
  1285. Status TraceAnalyzer::OpenStatsOutputFiles(const std::string& type,
  1286. TraceStats& new_stats) {
  1287. Status s;
  1288. if (FLAGS_output_key_stats) {
  1289. s = CreateOutputFile(type, new_stats.cf_name, "accessed_key_stats.txt",
  1290. &new_stats.a_key_f);
  1291. s = CreateOutputFile(type, new_stats.cf_name,
  1292. "accessed_unique_key_num_change.txt",
  1293. &new_stats.a_key_num_f);
  1294. if (!FLAGS_key_space_dir.empty()) {
  1295. s = CreateOutputFile(type, new_stats.cf_name, "whole_key_stats.txt",
  1296. &new_stats.w_key_f);
  1297. }
  1298. }
  1299. if (FLAGS_output_access_count_stats) {
  1300. s = CreateOutputFile(type, new_stats.cf_name,
  1301. "accessed_key_count_distribution.txt",
  1302. &new_stats.a_count_dist_f);
  1303. }
  1304. if (FLAGS_output_prefix_cut > 0) {
  1305. s = CreateOutputFile(type, new_stats.cf_name, "accessed_key_prefix_cut.txt",
  1306. &new_stats.a_prefix_cut_f);
  1307. if (!FLAGS_key_space_dir.empty()) {
  1308. s = CreateOutputFile(type, new_stats.cf_name, "whole_key_prefix_cut.txt",
  1309. &new_stats.w_prefix_cut_f);
  1310. }
  1311. if (FLAGS_output_qps_stats) {
  1312. s = CreateOutputFile(type, new_stats.cf_name,
  1313. "accessed_top_k_qps_prefix_cut.txt",
  1314. &new_stats.a_top_qps_prefix_f);
  1315. }
  1316. }
  1317. if (FLAGS_output_time_series) {
  1318. s = CreateOutputFile(type, new_stats.cf_name, "time_series.txt",
  1319. &new_stats.time_series_f);
  1320. }
  1321. if (FLAGS_output_value_distribution) {
  1322. s = CreateOutputFile(type, new_stats.cf_name,
  1323. "accessed_value_size_distribution.txt",
  1324. &new_stats.a_value_size_f);
  1325. }
  1326. if (FLAGS_output_key_distribution) {
  1327. s = CreateOutputFile(type, new_stats.cf_name,
  1328. "accessed_key_size_distribution.txt",
  1329. &new_stats.a_key_size_f);
  1330. }
  1331. if (FLAGS_output_qps_stats) {
  1332. s = CreateOutputFile(type, new_stats.cf_name, "qps_stats.txt",
  1333. &new_stats.a_qps_f);
  1334. }
  1335. return Status::OK();
  1336. }
  1337. // create the output path of the files to be opened
  1338. Status TraceAnalyzer::CreateOutputFile(
  1339. const std::string& type, const std::string& cf_name,
  1340. const std::string& ending,
  1341. std::unique_ptr<ROCKSDB_NAMESPACE::WritableFile>* f_ptr) {
  1342. std::string path;
  1343. path = output_path_ + "/" + FLAGS_output_prefix + "-" + type + "-" + cf_name +
  1344. "-" + ending;
  1345. Status s;
  1346. s = env_->NewWritableFile(path, f_ptr, env_options_);
  1347. if (!s.ok()) {
  1348. fprintf(stderr, "Cannot open file: %s\n", path.c_str());
  1349. exit(1);
  1350. }
  1351. return Status::OK();
  1352. }
  1353. // Close the output files in the TraceStats if they are opened
  1354. void TraceAnalyzer::CloseOutputFiles() {
  1355. for (int type = 0; type < kTaTypeNum; type++) {
  1356. if (!ta_[type].enabled) {
  1357. continue;
  1358. }
  1359. for (auto& stat : ta_[type].stats) {
  1360. if (stat.second.time_series_f) {
  1361. stat.second.time_series_f->Close();
  1362. }
  1363. if (stat.second.a_key_f) {
  1364. stat.second.a_key_f->Close();
  1365. }
  1366. if (stat.second.a_key_num_f) {
  1367. stat.second.a_key_num_f->Close();
  1368. }
  1369. if (stat.second.a_count_dist_f) {
  1370. stat.second.a_count_dist_f->Close();
  1371. }
  1372. if (stat.second.a_prefix_cut_f) {
  1373. stat.second.a_prefix_cut_f->Close();
  1374. }
  1375. if (stat.second.a_value_size_f) {
  1376. stat.second.a_value_size_f->Close();
  1377. }
  1378. if (stat.second.a_key_size_f) {
  1379. stat.second.a_key_size_f->Close();
  1380. }
  1381. if (stat.second.a_qps_f) {
  1382. stat.second.a_qps_f->Close();
  1383. }
  1384. if (stat.second.a_top_qps_prefix_f) {
  1385. stat.second.a_top_qps_prefix_f->Close();
  1386. }
  1387. if (stat.second.w_key_f) {
  1388. stat.second.w_key_f->Close();
  1389. }
  1390. if (stat.second.w_prefix_cut_f) {
  1391. stat.second.w_prefix_cut_f->Close();
  1392. }
  1393. }
  1394. }
  1395. return;
  1396. }
  1397. // Handle the Get request in the trace
  1398. Status TraceAnalyzer::HandleGet(uint32_t column_family_id,
  1399. const std::string& key, const uint64_t& ts,
  1400. const uint32_t& get_ret) {
  1401. Status s;
  1402. size_t value_size = 0;
  1403. if (FLAGS_convert_to_human_readable_trace && trace_sequence_f_) {
  1404. s = WriteTraceSequence(TraceOperationType::kGet, column_family_id, key,
  1405. value_size, ts);
  1406. if (!s.ok()) {
  1407. return Status::Corruption("Failed to write the trace sequence to file");
  1408. }
  1409. }
  1410. if (ta_[TraceOperationType::kGet].sample_count >= sample_max_) {
  1411. ta_[TraceOperationType::kGet].sample_count = 0;
  1412. }
  1413. if (ta_[TraceOperationType::kGet].sample_count > 0) {
  1414. ta_[TraceOperationType::kGet].sample_count++;
  1415. return Status::OK();
  1416. }
  1417. ta_[TraceOperationType::kGet].sample_count++;
  1418. if (!ta_[TraceOperationType::kGet].enabled) {
  1419. return Status::OK();
  1420. }
  1421. if (get_ret == 1) {
  1422. value_size = 10;
  1423. }
  1424. s = KeyStatsInsertion(TraceOperationType::kGet, column_family_id, key,
  1425. value_size, ts);
  1426. if (!s.ok()) {
  1427. return Status::Corruption("Failed to insert key statistics");
  1428. }
  1429. return s;
  1430. }
  1431. // Handle the Put request in the write batch of the trace
  1432. Status TraceAnalyzer::HandlePut(uint32_t column_family_id, const Slice& key,
  1433. const Slice& value) {
  1434. Status s;
  1435. size_t value_size = value.ToString().size();
  1436. if (FLAGS_convert_to_human_readable_trace && trace_sequence_f_) {
  1437. s = WriteTraceSequence(TraceOperationType::kPut, column_family_id,
  1438. key.ToString(), value_size, c_time_);
  1439. if (!s.ok()) {
  1440. return Status::Corruption("Failed to write the trace sequence to file");
  1441. }
  1442. }
  1443. if (ta_[TraceOperationType::kPut].sample_count >= sample_max_) {
  1444. ta_[TraceOperationType::kPut].sample_count = 0;
  1445. }
  1446. if (ta_[TraceOperationType::kPut].sample_count > 0) {
  1447. ta_[TraceOperationType::kPut].sample_count++;
  1448. return Status::OK();
  1449. }
  1450. ta_[TraceOperationType::kPut].sample_count++;
  1451. if (!ta_[TraceOperationType::kPut].enabled) {
  1452. return Status::OK();
  1453. }
  1454. s = KeyStatsInsertion(TraceOperationType::kPut, column_family_id,
  1455. key.ToString(), value_size, c_time_);
  1456. if (!s.ok()) {
  1457. return Status::Corruption("Failed to insert key statistics");
  1458. }
  1459. return s;
  1460. }
  1461. // Handle the Delete request in the write batch of the trace
  1462. Status TraceAnalyzer::HandleDelete(uint32_t column_family_id,
  1463. const Slice& key) {
  1464. Status s;
  1465. size_t value_size = 0;
  1466. if (FLAGS_convert_to_human_readable_trace && trace_sequence_f_) {
  1467. s = WriteTraceSequence(TraceOperationType::kDelete, column_family_id,
  1468. key.ToString(), value_size, c_time_);
  1469. if (!s.ok()) {
  1470. return Status::Corruption("Failed to write the trace sequence to file");
  1471. }
  1472. }
  1473. if (ta_[TraceOperationType::kDelete].sample_count >= sample_max_) {
  1474. ta_[TraceOperationType::kDelete].sample_count = 0;
  1475. }
  1476. if (ta_[TraceOperationType::kDelete].sample_count > 0) {
  1477. ta_[TraceOperationType::kDelete].sample_count++;
  1478. return Status::OK();
  1479. }
  1480. ta_[TraceOperationType::kDelete].sample_count++;
  1481. if (!ta_[TraceOperationType::kDelete].enabled) {
  1482. return Status::OK();
  1483. }
  1484. s = KeyStatsInsertion(TraceOperationType::kDelete, column_family_id,
  1485. key.ToString(), value_size, c_time_);
  1486. if (!s.ok()) {
  1487. return Status::Corruption("Failed to insert key statistics");
  1488. }
  1489. return s;
  1490. }
  1491. // Handle the SingleDelete request in the write batch of the trace
  1492. Status TraceAnalyzer::HandleSingleDelete(uint32_t column_family_id,
  1493. const Slice& key) {
  1494. Status s;
  1495. size_t value_size = 0;
  1496. if (FLAGS_convert_to_human_readable_trace && trace_sequence_f_) {
  1497. s = WriteTraceSequence(TraceOperationType::kSingleDelete, column_family_id,
  1498. key.ToString(), value_size, c_time_);
  1499. if (!s.ok()) {
  1500. return Status::Corruption("Failed to write the trace sequence to file");
  1501. }
  1502. }
  1503. if (ta_[TraceOperationType::kSingleDelete].sample_count >= sample_max_) {
  1504. ta_[TraceOperationType::kSingleDelete].sample_count = 0;
  1505. }
  1506. if (ta_[TraceOperationType::kSingleDelete].sample_count > 0) {
  1507. ta_[TraceOperationType::kSingleDelete].sample_count++;
  1508. return Status::OK();
  1509. }
  1510. ta_[TraceOperationType::kSingleDelete].sample_count++;
  1511. if (!ta_[TraceOperationType::kSingleDelete].enabled) {
  1512. return Status::OK();
  1513. }
  1514. s = KeyStatsInsertion(TraceOperationType::kSingleDelete, column_family_id,
  1515. key.ToString(), value_size, c_time_);
  1516. if (!s.ok()) {
  1517. return Status::Corruption("Failed to insert key statistics");
  1518. }
  1519. return s;
  1520. }
  1521. // Handle the DeleteRange request in the write batch of the trace
  1522. Status TraceAnalyzer::HandleDeleteRange(uint32_t column_family_id,
  1523. const Slice& begin_key,
  1524. const Slice& end_key) {
  1525. Status s;
  1526. size_t value_size = 0;
  1527. if (FLAGS_convert_to_human_readable_trace && trace_sequence_f_) {
  1528. s = WriteTraceSequence(TraceOperationType::kRangeDelete, column_family_id,
  1529. begin_key.ToString(), value_size, c_time_);
  1530. if (!s.ok()) {
  1531. return Status::Corruption("Failed to write the trace sequence to file");
  1532. }
  1533. }
  1534. if (ta_[TraceOperationType::kRangeDelete].sample_count >= sample_max_) {
  1535. ta_[TraceOperationType::kRangeDelete].sample_count = 0;
  1536. }
  1537. if (ta_[TraceOperationType::kRangeDelete].sample_count > 0) {
  1538. ta_[TraceOperationType::kRangeDelete].sample_count++;
  1539. return Status::OK();
  1540. }
  1541. ta_[TraceOperationType::kRangeDelete].sample_count++;
  1542. if (!ta_[TraceOperationType::kRangeDelete].enabled) {
  1543. return Status::OK();
  1544. }
  1545. s = KeyStatsInsertion(TraceOperationType::kRangeDelete, column_family_id,
  1546. begin_key.ToString(), value_size, c_time_);
  1547. s = KeyStatsInsertion(TraceOperationType::kRangeDelete, column_family_id,
  1548. end_key.ToString(), value_size, c_time_);
  1549. if (!s.ok()) {
  1550. return Status::Corruption("Failed to insert key statistics");
  1551. }
  1552. return s;
  1553. }
  1554. // Handle the Merge request in the write batch of the trace
  1555. Status TraceAnalyzer::HandleMerge(uint32_t column_family_id, const Slice& key,
  1556. const Slice& value) {
  1557. Status s;
  1558. size_t value_size = value.ToString().size();
  1559. if (FLAGS_convert_to_human_readable_trace && trace_sequence_f_) {
  1560. s = WriteTraceSequence(TraceOperationType::kMerge, column_family_id,
  1561. key.ToString(), value_size, c_time_);
  1562. if (!s.ok()) {
  1563. return Status::Corruption("Failed to write the trace sequence to file");
  1564. }
  1565. }
  1566. if (ta_[TraceOperationType::kMerge].sample_count >= sample_max_) {
  1567. ta_[TraceOperationType::kMerge].sample_count = 0;
  1568. }
  1569. if (ta_[TraceOperationType::kMerge].sample_count > 0) {
  1570. ta_[TraceOperationType::kMerge].sample_count++;
  1571. return Status::OK();
  1572. }
  1573. ta_[TraceOperationType::kMerge].sample_count++;
  1574. if (!ta_[TraceOperationType::kMerge].enabled) {
  1575. return Status::OK();
  1576. }
  1577. s = KeyStatsInsertion(TraceOperationType::kMerge, column_family_id,
  1578. key.ToString(), value_size, c_time_);
  1579. if (!s.ok()) {
  1580. return Status::Corruption("Failed to insert key statistics");
  1581. }
  1582. return s;
  1583. }
  1584. // Handle the Iterator request in the trace
  1585. Status TraceAnalyzer::HandleIter(uint32_t column_family_id,
  1586. const std::string& key, const uint64_t& ts,
  1587. TraceType& trace_type) {
  1588. Status s;
  1589. size_t value_size = 0;
  1590. int type = -1;
  1591. if (trace_type == kTraceIteratorSeek) {
  1592. type = TraceOperationType::kIteratorSeek;
  1593. } else if (trace_type == kTraceIteratorSeekForPrev) {
  1594. type = TraceOperationType::kIteratorSeekForPrev;
  1595. } else {
  1596. return s;
  1597. }
  1598. if (type == -1) {
  1599. return s;
  1600. }
  1601. if (FLAGS_convert_to_human_readable_trace && trace_sequence_f_) {
  1602. s = WriteTraceSequence(type, column_family_id, key, value_size, ts);
  1603. if (!s.ok()) {
  1604. return Status::Corruption("Failed to write the trace sequence to file");
  1605. }
  1606. }
  1607. if (ta_[type].sample_count >= sample_max_) {
  1608. ta_[type].sample_count = 0;
  1609. }
  1610. if (ta_[type].sample_count > 0) {
  1611. ta_[type].sample_count++;
  1612. return Status::OK();
  1613. }
  1614. ta_[type].sample_count++;
  1615. if (!ta_[type].enabled) {
  1616. return Status::OK();
  1617. }
  1618. s = KeyStatsInsertion(type, column_family_id, key, value_size, ts);
  1619. if (!s.ok()) {
  1620. return Status::Corruption("Failed to insert key statistics");
  1621. }
  1622. return s;
  1623. }
  1624. // Before the analyzer is closed, the requested general statistic results are
  1625. // printed out here. In current stage, these information are not output to
  1626. // the files.
  1627. // -----type
  1628. // |__cf_id
  1629. // |_statistics
  1630. void TraceAnalyzer::PrintStatistics() {
  1631. for (int type = 0; type < kTaTypeNum; type++) {
  1632. if (!ta_[type].enabled) {
  1633. continue;
  1634. }
  1635. ta_[type].total_keys = 0;
  1636. ta_[type].total_access = 0;
  1637. ta_[type].total_succ_access = 0;
  1638. printf("\n################# Operation Type: %s #####################\n",
  1639. ta_[type].type_name.c_str());
  1640. if (qps_ave_.size() == kTaTypeNum + 1) {
  1641. printf("Peak QPS is: %u Average QPS is: %f\n", qps_peak_[type],
  1642. qps_ave_[type]);
  1643. }
  1644. for (auto& stat_it : ta_[type].stats) {
  1645. if (stat_it.second.a_count == 0) {
  1646. continue;
  1647. }
  1648. TraceStats& stat = stat_it.second;
  1649. uint64_t total_a_keys = static_cast<uint64_t>(stat.a_key_stats.size());
  1650. double key_size_ave = 0.0;
  1651. double value_size_ave = 0.0;
  1652. double key_size_vari = 0.0;
  1653. double value_size_vari = 0.0;
  1654. if (stat.a_count > 0) {
  1655. key_size_ave =
  1656. (static_cast<double>(stat.a_key_size_sum)) / stat.a_count;
  1657. value_size_ave =
  1658. (static_cast<double>(stat.a_value_size_sum)) / stat.a_count;
  1659. key_size_vari = std::sqrt((static_cast<double>(stat.a_key_size_sqsum)) /
  1660. stat.a_count -
  1661. key_size_ave * key_size_ave);
  1662. value_size_vari = std::sqrt(
  1663. (static_cast<double>(stat.a_value_size_sqsum)) / stat.a_count -
  1664. value_size_ave * value_size_ave);
  1665. }
  1666. if (value_size_ave == 0.0) {
  1667. stat.a_value_mid = 0;
  1668. }
  1669. cfs_[stat.cf_id].a_count += total_a_keys;
  1670. ta_[type].total_keys += total_a_keys;
  1671. ta_[type].total_access += stat.a_count;
  1672. ta_[type].total_succ_access += stat.a_succ_count;
  1673. printf("*********************************************************\n");
  1674. printf("colume family id: %u\n", stat.cf_id);
  1675. printf("Total number of queries to this cf by %s: %" PRIu64 "\n",
  1676. ta_[type].type_name.c_str(), stat.a_count);
  1677. printf("Total unique keys in this cf: %" PRIu64 "\n", total_a_keys);
  1678. printf("Average key size: %f key size medium: %" PRIu64
  1679. " Key size Variation: %f\n",
  1680. key_size_ave, stat.a_key_mid, key_size_vari);
  1681. if (type == kPut || type == kMerge) {
  1682. printf("Average value size: %f Value size medium: %" PRIu64
  1683. " Value size variation: %f\n",
  1684. value_size_ave, stat.a_value_mid, value_size_vari);
  1685. }
  1686. printf("Peak QPS is: %u Average QPS is: %f\n", stat.a_peak_qps,
  1687. stat.a_ave_qps);
  1688. // print the top k accessed key and its access count
  1689. if (FLAGS_print_top_k_access > 0) {
  1690. printf("The Top %d keys that are accessed:\n",
  1691. FLAGS_print_top_k_access);
  1692. while (!stat.top_k_queue.empty()) {
  1693. std::string hex_key = ROCKSDB_NAMESPACE::LDBCommand::StringToHex(
  1694. stat.top_k_queue.top().second);
  1695. printf("Access_count: %" PRIu64 " %s\n", stat.top_k_queue.top().first,
  1696. hex_key.c_str());
  1697. stat.top_k_queue.pop();
  1698. }
  1699. }
  1700. // print the top k access prefix range and
  1701. // top k prefix range with highest average access per key
  1702. if (FLAGS_output_prefix_cut > 0) {
  1703. printf("The Top %d accessed prefix range:\n", FLAGS_print_top_k_access);
  1704. while (!stat.top_k_prefix_access.empty()) {
  1705. printf("Prefix: %s Access count: %" PRIu64 "\n",
  1706. stat.top_k_prefix_access.top().second.c_str(),
  1707. stat.top_k_prefix_access.top().first);
  1708. stat.top_k_prefix_access.pop();
  1709. }
  1710. printf("The Top %d prefix with highest access per key:\n",
  1711. FLAGS_print_top_k_access);
  1712. while (!stat.top_k_prefix_ave.empty()) {
  1713. printf("Prefix: %s access per key: %f\n",
  1714. stat.top_k_prefix_ave.top().second.c_str(),
  1715. stat.top_k_prefix_ave.top().first);
  1716. stat.top_k_prefix_ave.pop();
  1717. }
  1718. }
  1719. // print the operation correlations
  1720. if (!FLAGS_print_correlation.empty()) {
  1721. for (int correlation = 0;
  1722. correlation <
  1723. static_cast<int>(analyzer_opts_.correlation_list.size());
  1724. correlation++) {
  1725. printf(
  1726. "The correlation statistics of '%s' after '%s' is:",
  1727. taIndexToOpt[analyzer_opts_.correlation_list[correlation].second]
  1728. .c_str(),
  1729. taIndexToOpt[analyzer_opts_.correlation_list[correlation].first]
  1730. .c_str());
  1731. double correlation_ave = 0.0;
  1732. if (stat.correlation_output[correlation].first > 0) {
  1733. correlation_ave =
  1734. (static_cast<double>(
  1735. stat.correlation_output[correlation].second)) /
  1736. (stat.correlation_output[correlation].first * 1000);
  1737. }
  1738. printf(" total numbers: %" PRIu64 " average time: %f(ms)\n",
  1739. stat.correlation_output[correlation].first, correlation_ave);
  1740. }
  1741. }
  1742. }
  1743. printf("*********************************************************\n");
  1744. printf("Total keys of '%s' is: %" PRIu64 "\n", ta_[type].type_name.c_str(),
  1745. ta_[type].total_keys);
  1746. printf("Total access is: %" PRIu64 "\n", ta_[type].total_access);
  1747. total_access_keys_ += ta_[type].total_keys;
  1748. }
  1749. // Print the overall statistic information of the trace
  1750. printf("\n*********************************************************\n");
  1751. printf("*********************************************************\n");
  1752. printf("The column family based statistics\n");
  1753. for (auto& cf : cfs_) {
  1754. printf("The column family id: %u\n", cf.first);
  1755. printf("The whole key space key numbers: %" PRIu64 "\n", cf.second.w_count);
  1756. printf("The accessed key space key numbers: %" PRIu64 "\n",
  1757. cf.second.a_count);
  1758. }
  1759. if (FLAGS_print_overall_stats) {
  1760. printf("\n*********************************************************\n");
  1761. printf("*********************************************************\n");
  1762. if (qps_peak_.size() == kTaTypeNum + 1) {
  1763. printf("Average QPS per second: %f Peak QPS: %u\n", qps_ave_[kTaTypeNum],
  1764. qps_peak_[kTaTypeNum]);
  1765. }
  1766. printf("The statistics related to query number need to times: %u\n",
  1767. sample_max_);
  1768. printf("Total_requests: %" PRIu64 " Total_accessed_keys: %" PRIu64
  1769. " Total_gets: %" PRIu64 " Total_write_batch: %" PRIu64 "\n",
  1770. total_requests_, total_access_keys_, total_gets_, total_writes_);
  1771. for (int type = 0; type < kTaTypeNum; type++) {
  1772. if (!ta_[type].enabled) {
  1773. continue;
  1774. }
  1775. printf("Operation: '%s' has: %" PRIu64 "\n", ta_[type].type_name.c_str(),
  1776. ta_[type].total_access);
  1777. }
  1778. }
  1779. }
  1780. // Write the trace sequence to file
  1781. Status TraceAnalyzer::WriteTraceSequence(const uint32_t& type,
  1782. const uint32_t& cf_id,
  1783. const std::string& key,
  1784. const size_t value_size,
  1785. const uint64_t ts) {
  1786. std::string hex_key = ROCKSDB_NAMESPACE::LDBCommand::StringToHex(key);
  1787. int ret;
  1788. ret = snprintf(buffer_, sizeof(buffer_), "%u %u %zu %" PRIu64 "\n", type,
  1789. cf_id, value_size, ts);
  1790. if (ret < 0) {
  1791. return Status::IOError("failed to format the output");
  1792. }
  1793. std::string printout(buffer_);
  1794. if (!FLAGS_no_key) {
  1795. printout = hex_key + " " + printout;
  1796. }
  1797. return trace_sequence_f_->Append(printout);
  1798. }
  1799. // The entrance function of Trace_Analyzer
  1800. int trace_analyzer_tool(int argc, char** argv) {
  1801. std::string trace_path;
  1802. std::string output_path;
  1803. AnalyzerOptions analyzer_opts;
  1804. ParseCommandLineFlags(&argc, &argv, true);
  1805. if (!FLAGS_print_correlation.empty()) {
  1806. analyzer_opts.SparseCorrelationInput(FLAGS_print_correlation);
  1807. }
  1808. std::unique_ptr<TraceAnalyzer> analyzer(
  1809. new TraceAnalyzer(FLAGS_trace_path, FLAGS_output_dir, analyzer_opts));
  1810. if (!analyzer) {
  1811. fprintf(stderr, "Cannot initiate the trace analyzer\n");
  1812. exit(1);
  1813. }
  1814. ROCKSDB_NAMESPACE::Status s = analyzer->PrepareProcessing();
  1815. if (!s.ok()) {
  1816. fprintf(stderr, "%s\n", s.getState());
  1817. fprintf(stderr, "Cannot initiate the trace reader\n");
  1818. exit(1);
  1819. }
  1820. s = analyzer->StartProcessing();
  1821. if (!s.ok() && !FLAGS_try_process_corrupted_trace) {
  1822. fprintf(stderr, "%s\n", s.getState());
  1823. fprintf(stderr, "Cannot processing the trace\n");
  1824. exit(1);
  1825. }
  1826. s = analyzer->MakeStatistics();
  1827. if (!s.ok()) {
  1828. fprintf(stderr, "%s\n", s.getState());
  1829. analyzer->EndProcessing();
  1830. fprintf(stderr, "Cannot make the statistics\n");
  1831. exit(1);
  1832. }
  1833. s = analyzer->ReProcessing();
  1834. if (!s.ok()) {
  1835. fprintf(stderr, "%s\n", s.getState());
  1836. fprintf(stderr, "Cannot re-process the trace for more statistics\n");
  1837. analyzer->EndProcessing();
  1838. exit(1);
  1839. }
  1840. s = analyzer->EndProcessing();
  1841. if (!s.ok()) {
  1842. fprintf(stderr, "%s\n", s.getState());
  1843. fprintf(stderr, "Cannot close the trace analyzer\n");
  1844. exit(1);
  1845. }
  1846. return 0;
  1847. }
  1848. } // namespace ROCKSDB_NAMESPACE
  1849. #endif // Endif of Gflag
  1850. #endif // RocksDB LITE