block_cache_tracer.cc 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #include "trace_replay/block_cache_tracer.h"
  6. #include <cinttypes>
  7. #include <cstdio>
  8. #include <cstdlib>
  9. #include "db/db_impl/db_impl.h"
  10. #include "db/dbformat.h"
  11. #include "rocksdb/slice.h"
  12. #include "util/coding.h"
  13. #include "util/hash.h"
  14. #include "util/string_util.h"
  15. namespace ROCKSDB_NAMESPACE {
  16. namespace {
  17. const unsigned int kCharSize = 1;
  18. bool ShouldTrace(const Slice& block_key, const TraceOptions& trace_options) {
  19. if (trace_options.sampling_frequency == 0 ||
  20. trace_options.sampling_frequency == 1) {
  21. return true;
  22. }
  23. // We use spatial downsampling so that we have a complete access history for a
  24. // block.
  25. return 0 == fastrange64(GetSliceNPHash64(block_key),
  26. trace_options.sampling_frequency);
  27. }
  28. } // namespace
  29. const uint64_t kMicrosInSecond = 1000 * 1000;
  30. const uint64_t kSecondInMinute = 60;
  31. const uint64_t kSecondInHour = 3600;
  32. const std::string BlockCacheTraceHelper::kUnknownColumnFamilyName =
  33. "UnknownColumnFamily";
  34. const uint64_t BlockCacheTraceHelper::kReservedGetId = 0;
  35. bool BlockCacheTraceHelper::IsGetOrMultiGetOnDataBlock(
  36. TraceType block_type, TableReaderCaller caller) {
  37. return (block_type == TraceType::kBlockTraceDataBlock) &&
  38. IsGetOrMultiGet(caller);
  39. }
  40. bool BlockCacheTraceHelper::IsGetOrMultiGet(TableReaderCaller caller) {
  41. return caller == TableReaderCaller::kUserGet ||
  42. caller == TableReaderCaller::kUserMultiGet;
  43. }
  44. bool BlockCacheTraceHelper::IsUserAccess(TableReaderCaller caller) {
  45. return caller == TableReaderCaller::kUserGet ||
  46. caller == TableReaderCaller::kUserMultiGet ||
  47. caller == TableReaderCaller::kUserIterator ||
  48. caller == TableReaderCaller::kUserApproximateSize ||
  49. caller == TableReaderCaller::kUserVerifyChecksum;
  50. }
  51. std::string BlockCacheTraceHelper::ComputeRowKey(
  52. const BlockCacheTraceRecord& access) {
  53. if (!IsGetOrMultiGet(access.caller)) {
  54. return "";
  55. }
  56. Slice key = ExtractUserKey(access.referenced_key);
  57. return std::to_string(access.sst_fd_number) + "_" + key.ToString();
  58. }
  59. uint64_t BlockCacheTraceHelper::GetTableId(
  60. const BlockCacheTraceRecord& access) {
  61. if (!IsGetOrMultiGet(access.caller) || access.referenced_key.size() < 4) {
  62. return 0;
  63. }
  64. return static_cast<uint64_t>(DecodeFixed32(access.referenced_key.data())) + 1;
  65. }
  66. uint64_t BlockCacheTraceHelper::GetSequenceNumber(
  67. const BlockCacheTraceRecord& access) {
  68. if (!IsGetOrMultiGet(access.caller)) {
  69. return 0;
  70. }
  71. return access.get_from_user_specified_snapshot == Boolean::kFalse
  72. ? 0
  73. : 1 + GetInternalKeySeqno(access.referenced_key);
  74. }
  75. uint64_t BlockCacheTraceHelper::GetBlockOffsetInFile(
  76. const BlockCacheTraceRecord& access) {
  77. Slice input(access.block_key);
  78. uint64_t offset = 0;
  79. while (true) {
  80. uint64_t tmp = 0;
  81. if (GetVarint64(&input, &tmp)) {
  82. offset = tmp;
  83. } else {
  84. break;
  85. }
  86. }
  87. return offset;
  88. }
  89. BlockCacheTraceWriter::BlockCacheTraceWriter(
  90. Env* env, const TraceOptions& trace_options,
  91. std::unique_ptr<TraceWriter>&& trace_writer)
  92. : env_(env),
  93. trace_options_(trace_options),
  94. trace_writer_(std::move(trace_writer)) {}
  95. Status BlockCacheTraceWriter::WriteBlockAccess(
  96. const BlockCacheTraceRecord& record, const Slice& block_key,
  97. const Slice& cf_name, const Slice& referenced_key) {
  98. uint64_t trace_file_size = trace_writer_->GetFileSize();
  99. if (trace_file_size > trace_options_.max_trace_file_size) {
  100. return Status::OK();
  101. }
  102. Trace trace;
  103. trace.ts = record.access_timestamp;
  104. trace.type = record.block_type;
  105. PutLengthPrefixedSlice(&trace.payload, block_key);
  106. PutFixed64(&trace.payload, record.block_size);
  107. PutFixed64(&trace.payload, record.cf_id);
  108. PutLengthPrefixedSlice(&trace.payload, cf_name);
  109. PutFixed32(&trace.payload, record.level);
  110. PutFixed64(&trace.payload, record.sst_fd_number);
  111. trace.payload.push_back(record.caller);
  112. trace.payload.push_back(record.is_cache_hit);
  113. trace.payload.push_back(record.no_insert);
  114. if (BlockCacheTraceHelper::IsGetOrMultiGet(record.caller)) {
  115. PutFixed64(&trace.payload, record.get_id);
  116. trace.payload.push_back(record.get_from_user_specified_snapshot);
  117. PutLengthPrefixedSlice(&trace.payload, referenced_key);
  118. }
  119. if (BlockCacheTraceHelper::IsGetOrMultiGetOnDataBlock(record.block_type,
  120. record.caller)) {
  121. PutFixed64(&trace.payload, record.referenced_data_size);
  122. PutFixed64(&trace.payload, record.num_keys_in_block);
  123. trace.payload.push_back(record.referenced_key_exist_in_block);
  124. }
  125. std::string encoded_trace;
  126. TracerHelper::EncodeTrace(trace, &encoded_trace);
  127. return trace_writer_->Write(encoded_trace);
  128. }
  129. Status BlockCacheTraceWriter::WriteHeader() {
  130. Trace trace;
  131. trace.ts = env_->NowMicros();
  132. trace.type = TraceType::kTraceBegin;
  133. PutLengthPrefixedSlice(&trace.payload, kTraceMagic);
  134. PutFixed32(&trace.payload, kMajorVersion);
  135. PutFixed32(&trace.payload, kMinorVersion);
  136. std::string encoded_trace;
  137. TracerHelper::EncodeTrace(trace, &encoded_trace);
  138. return trace_writer_->Write(encoded_trace);
  139. }
  140. BlockCacheTraceReader::BlockCacheTraceReader(
  141. std::unique_ptr<TraceReader>&& reader)
  142. : trace_reader_(std::move(reader)) {}
  143. Status BlockCacheTraceReader::ReadHeader(BlockCacheTraceHeader* header) {
  144. assert(header != nullptr);
  145. std::string encoded_trace;
  146. Status s = trace_reader_->Read(&encoded_trace);
  147. if (!s.ok()) {
  148. return s;
  149. }
  150. Trace trace;
  151. s = TracerHelper::DecodeTrace(encoded_trace, &trace);
  152. if (!s.ok()) {
  153. return s;
  154. }
  155. header->start_time = trace.ts;
  156. Slice enc_slice = Slice(trace.payload);
  157. Slice magnic_number;
  158. if (!GetLengthPrefixedSlice(&enc_slice, &magnic_number)) {
  159. return Status::Corruption(
  160. "Corrupted header in the trace file: Failed to read the magic number.");
  161. }
  162. if (magnic_number.ToString() != kTraceMagic) {
  163. return Status::Corruption(
  164. "Corrupted header in the trace file: Magic number does not match.");
  165. }
  166. if (!GetFixed32(&enc_slice, &header->rocksdb_major_version)) {
  167. return Status::Corruption(
  168. "Corrupted header in the trace file: Failed to read rocksdb major "
  169. "version number.");
  170. }
  171. if (!GetFixed32(&enc_slice, &header->rocksdb_minor_version)) {
  172. return Status::Corruption(
  173. "Corrupted header in the trace file: Failed to read rocksdb minor "
  174. "version number.");
  175. }
  176. // We should have retrieved all information in the header.
  177. if (!enc_slice.empty()) {
  178. return Status::Corruption(
  179. "Corrupted header in the trace file: The length of header is too "
  180. "long.");
  181. }
  182. return Status::OK();
  183. }
  184. Status BlockCacheTraceReader::ReadAccess(BlockCacheTraceRecord* record) {
  185. assert(record);
  186. std::string encoded_trace;
  187. Status s = trace_reader_->Read(&encoded_trace);
  188. if (!s.ok()) {
  189. return s;
  190. }
  191. Trace trace;
  192. s = TracerHelper::DecodeTrace(encoded_trace, &trace);
  193. if (!s.ok()) {
  194. return s;
  195. }
  196. record->access_timestamp = trace.ts;
  197. record->block_type = trace.type;
  198. Slice enc_slice = Slice(trace.payload);
  199. Slice block_key;
  200. if (!GetLengthPrefixedSlice(&enc_slice, &block_key)) {
  201. return Status::Incomplete(
  202. "Incomplete access record: Failed to read block key.");
  203. }
  204. record->block_key = block_key.ToString();
  205. if (!GetFixed64(&enc_slice, &record->block_size)) {
  206. return Status::Incomplete(
  207. "Incomplete access record: Failed to read block size.");
  208. }
  209. if (!GetFixed64(&enc_slice, &record->cf_id)) {
  210. return Status::Incomplete(
  211. "Incomplete access record: Failed to read column family ID.");
  212. }
  213. Slice cf_name;
  214. if (!GetLengthPrefixedSlice(&enc_slice, &cf_name)) {
  215. return Status::Incomplete(
  216. "Incomplete access record: Failed to read column family name.");
  217. }
  218. record->cf_name = cf_name.ToString();
  219. if (!GetFixed32(&enc_slice, &record->level)) {
  220. return Status::Incomplete(
  221. "Incomplete access record: Failed to read level.");
  222. }
  223. if (!GetFixed64(&enc_slice, &record->sst_fd_number)) {
  224. return Status::Incomplete(
  225. "Incomplete access record: Failed to read SST file number.");
  226. }
  227. if (enc_slice.empty()) {
  228. return Status::Incomplete(
  229. "Incomplete access record: Failed to read caller.");
  230. }
  231. record->caller = static_cast<TableReaderCaller>(enc_slice[0]);
  232. enc_slice.remove_prefix(kCharSize);
  233. if (enc_slice.empty()) {
  234. return Status::Incomplete(
  235. "Incomplete access record: Failed to read is_cache_hit.");
  236. }
  237. record->is_cache_hit = static_cast<Boolean>(enc_slice[0]);
  238. enc_slice.remove_prefix(kCharSize);
  239. if (enc_slice.empty()) {
  240. return Status::Incomplete(
  241. "Incomplete access record: Failed to read no_insert.");
  242. }
  243. record->no_insert = static_cast<Boolean>(enc_slice[0]);
  244. enc_slice.remove_prefix(kCharSize);
  245. if (BlockCacheTraceHelper::IsGetOrMultiGet(record->caller)) {
  246. if (!GetFixed64(&enc_slice, &record->get_id)) {
  247. return Status::Incomplete(
  248. "Incomplete access record: Failed to read the get id.");
  249. }
  250. if (enc_slice.empty()) {
  251. return Status::Incomplete(
  252. "Incomplete access record: Failed to read "
  253. "get_from_user_specified_snapshot.");
  254. }
  255. record->get_from_user_specified_snapshot =
  256. static_cast<Boolean>(enc_slice[0]);
  257. enc_slice.remove_prefix(kCharSize);
  258. Slice referenced_key;
  259. if (!GetLengthPrefixedSlice(&enc_slice, &referenced_key)) {
  260. return Status::Incomplete(
  261. "Incomplete access record: Failed to read the referenced key.");
  262. }
  263. record->referenced_key = referenced_key.ToString();
  264. }
  265. if (BlockCacheTraceHelper::IsGetOrMultiGetOnDataBlock(record->block_type,
  266. record->caller)) {
  267. if (!GetFixed64(&enc_slice, &record->referenced_data_size)) {
  268. return Status::Incomplete(
  269. "Incomplete access record: Failed to read the referenced data size.");
  270. }
  271. if (!GetFixed64(&enc_slice, &record->num_keys_in_block)) {
  272. return Status::Incomplete(
  273. "Incomplete access record: Failed to read the number of keys in the "
  274. "block.");
  275. }
  276. if (enc_slice.empty()) {
  277. return Status::Incomplete(
  278. "Incomplete access record: Failed to read "
  279. "referenced_key_exist_in_block.");
  280. }
  281. record->referenced_key_exist_in_block = static_cast<Boolean>(enc_slice[0]);
  282. }
  283. return Status::OK();
  284. }
  285. BlockCacheHumanReadableTraceWriter::~BlockCacheHumanReadableTraceWriter() {
  286. if (human_readable_trace_file_writer_) {
  287. human_readable_trace_file_writer_->Flush();
  288. human_readable_trace_file_writer_->Close();
  289. }
  290. }
  291. Status BlockCacheHumanReadableTraceWriter::NewWritableFile(
  292. const std::string& human_readable_trace_file_path,
  293. ROCKSDB_NAMESPACE::Env* env) {
  294. if (human_readable_trace_file_path.empty()) {
  295. return Status::InvalidArgument(
  296. "The provided human_readable_trace_file_path is null.");
  297. }
  298. return env->NewWritableFile(human_readable_trace_file_path,
  299. &human_readable_trace_file_writer_, EnvOptions());
  300. }
  301. Status BlockCacheHumanReadableTraceWriter::WriteHumanReadableTraceRecord(
  302. const BlockCacheTraceRecord& access, uint64_t block_id,
  303. uint64_t get_key_id) {
  304. if (!human_readable_trace_file_writer_) {
  305. return Status::OK();
  306. }
  307. int ret = snprintf(
  308. trace_record_buffer_, sizeof(trace_record_buffer_),
  309. "%" PRIu64 ",%" PRIu64 ",%u,%" PRIu64 ",%" PRIu64 ",%s,%" PRIu32
  310. ",%" PRIu64 ",%u,%u,%" PRIu64 ",%" PRIu64 ",%" PRIu64 ",%u,%u,%" PRIu64
  311. ",%" PRIu64 ",%" PRIu64 ",%" PRIu64 ",%" PRIu64 ",%" PRIu64 "\n",
  312. access.access_timestamp, block_id, access.block_type, access.block_size,
  313. access.cf_id, access.cf_name.c_str(), access.level, access.sst_fd_number,
  314. access.caller, access.no_insert, access.get_id, get_key_id,
  315. access.referenced_data_size, access.is_cache_hit,
  316. access.referenced_key_exist_in_block, access.num_keys_in_block,
  317. BlockCacheTraceHelper::GetTableId(access),
  318. BlockCacheTraceHelper::GetSequenceNumber(access),
  319. static_cast<uint64_t>(access.block_key.size()),
  320. static_cast<uint64_t>(access.referenced_key.size()),
  321. BlockCacheTraceHelper::GetBlockOffsetInFile(access));
  322. if (ret < 0) {
  323. return Status::IOError("failed to format the output");
  324. }
  325. std::string printout(trace_record_buffer_);
  326. return human_readable_trace_file_writer_->Append(printout);
  327. }
  328. BlockCacheHumanReadableTraceReader::BlockCacheHumanReadableTraceReader(
  329. const std::string& trace_file_path)
  330. : BlockCacheTraceReader(/*trace_reader=*/nullptr) {
  331. human_readable_trace_reader_.open(trace_file_path, std::ifstream::in);
  332. }
  333. BlockCacheHumanReadableTraceReader::~BlockCacheHumanReadableTraceReader() {
  334. human_readable_trace_reader_.close();
  335. }
  336. Status BlockCacheHumanReadableTraceReader::ReadHeader(
  337. BlockCacheTraceHeader* /*header*/) {
  338. return Status::OK();
  339. }
  340. Status BlockCacheHumanReadableTraceReader::ReadAccess(
  341. BlockCacheTraceRecord* record) {
  342. std::string line;
  343. if (!std::getline(human_readable_trace_reader_, line)) {
  344. return Status::Incomplete("No more records to read.");
  345. }
  346. std::stringstream ss(line);
  347. std::vector<std::string> record_strs;
  348. while (ss.good()) {
  349. std::string substr;
  350. getline(ss, substr, ',');
  351. record_strs.push_back(substr);
  352. }
  353. if (record_strs.size() != 21) {
  354. return Status::Incomplete("Records format is wrong.");
  355. }
  356. record->access_timestamp = ParseUint64(record_strs[0]);
  357. uint64_t block_key = ParseUint64(record_strs[1]);
  358. record->block_type = static_cast<TraceType>(ParseUint64(record_strs[2]));
  359. record->block_size = ParseUint64(record_strs[3]);
  360. record->cf_id = ParseUint64(record_strs[4]);
  361. record->cf_name = record_strs[5];
  362. record->level = static_cast<uint32_t>(ParseUint64(record_strs[6]));
  363. record->sst_fd_number = ParseUint64(record_strs[7]);
  364. record->caller = static_cast<TableReaderCaller>(ParseUint64(record_strs[8]));
  365. record->no_insert = static_cast<Boolean>(ParseUint64(record_strs[9]));
  366. record->get_id = ParseUint64(record_strs[10]);
  367. uint64_t get_key_id = ParseUint64(record_strs[11]);
  368. record->referenced_data_size = ParseUint64(record_strs[12]);
  369. record->is_cache_hit = static_cast<Boolean>(ParseUint64(record_strs[13]));
  370. record->referenced_key_exist_in_block =
  371. static_cast<Boolean>(ParseUint64(record_strs[14]));
  372. record->num_keys_in_block = ParseUint64(record_strs[15]);
  373. uint64_t table_id = ParseUint64(record_strs[16]);
  374. if (table_id > 0) {
  375. // Decrement since valid table id in the trace file equals traced table id
  376. // + 1.
  377. table_id -= 1;
  378. }
  379. uint64_t get_sequence_number = ParseUint64(record_strs[17]);
  380. if (get_sequence_number > 0) {
  381. record->get_from_user_specified_snapshot = Boolean::kTrue;
  382. // Decrement since valid seq number in the trace file equals traced seq
  383. // number + 1.
  384. get_sequence_number -= 1;
  385. }
  386. uint64_t block_key_size = ParseUint64(record_strs[18]);
  387. uint64_t get_key_size = ParseUint64(record_strs[19]);
  388. uint64_t block_offset = ParseUint64(record_strs[20]);
  389. std::string tmp_block_key;
  390. PutVarint64(&tmp_block_key, block_key);
  391. PutVarint64(&tmp_block_key, block_offset);
  392. // Append 1 until the size is the same as traced block key size.
  393. while (record->block_key.size() < block_key_size - tmp_block_key.size()) {
  394. record->block_key += "1";
  395. }
  396. record->block_key += tmp_block_key;
  397. if (get_key_id != 0) {
  398. std::string tmp_get_key;
  399. PutFixed64(&tmp_get_key, get_key_id);
  400. PutFixed64(&tmp_get_key, get_sequence_number << 8);
  401. PutFixed32(&record->referenced_key, static_cast<uint32_t>(table_id));
  402. // Append 1 until the size is the same as traced key size.
  403. while (record->referenced_key.size() < get_key_size - tmp_get_key.size()) {
  404. record->referenced_key += "1";
  405. }
  406. record->referenced_key += tmp_get_key;
  407. }
  408. return Status::OK();
  409. }
  410. BlockCacheTracer::BlockCacheTracer() { writer_.store(nullptr); }
  411. BlockCacheTracer::~BlockCacheTracer() { EndTrace(); }
  412. Status BlockCacheTracer::StartTrace(
  413. Env* env, const TraceOptions& trace_options,
  414. std::unique_ptr<TraceWriter>&& trace_writer) {
  415. InstrumentedMutexLock lock_guard(&trace_writer_mutex_);
  416. if (writer_.load()) {
  417. return Status::Busy();
  418. }
  419. get_id_counter_.store(1);
  420. trace_options_ = trace_options;
  421. writer_.store(
  422. new BlockCacheTraceWriter(env, trace_options, std::move(trace_writer)));
  423. return writer_.load()->WriteHeader();
  424. }
  425. void BlockCacheTracer::EndTrace() {
  426. InstrumentedMutexLock lock_guard(&trace_writer_mutex_);
  427. if (!writer_.load()) {
  428. return;
  429. }
  430. delete writer_.load();
  431. writer_.store(nullptr);
  432. }
  433. Status BlockCacheTracer::WriteBlockAccess(const BlockCacheTraceRecord& record,
  434. const Slice& block_key,
  435. const Slice& cf_name,
  436. const Slice& referenced_key) {
  437. if (!writer_.load() || !ShouldTrace(block_key, trace_options_)) {
  438. return Status::OK();
  439. }
  440. InstrumentedMutexLock lock_guard(&trace_writer_mutex_);
  441. if (!writer_.load()) {
  442. return Status::OK();
  443. }
  444. return writer_.load()->WriteBlockAccess(record, block_key, cf_name,
  445. referenced_key);
  446. }
  447. uint64_t BlockCacheTracer::NextGetId() {
  448. if (!writer_.load(std::memory_order_relaxed)) {
  449. return BlockCacheTraceHelper::kReservedGetId;
  450. }
  451. uint64_t prev_value = get_id_counter_.fetch_add(1);
  452. if (prev_value == BlockCacheTraceHelper::kReservedGetId) {
  453. // fetch and add again.
  454. return get_id_counter_.fetch_add(1);
  455. }
  456. return prev_value;
  457. }
  458. } // namespace ROCKSDB_NAMESPACE