block_cache_tracer.cc 18 KB


  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #include "trace_replay/block_cache_tracer.h"
  6. #include <cinttypes>
  7. #include <cstdio>
  8. #include <cstdlib>
  9. #include <string>
  10. #include "db/db_impl/db_impl.h"
  11. #include "db/dbformat.h"
  12. #include "rocksdb/slice.h"
  13. #include "rocksdb/trace_record.h"
  14. #include "util/coding.h"
  15. #include "util/hash.h"
  16. #include "util/string_util.h"
  17. namespace ROCKSDB_NAMESPACE {
  18. namespace {
  19. bool ShouldTrace(const Slice& block_key,
  20. const BlockCacheTraceOptions& trace_options) {
  21. if (trace_options.sampling_frequency == 0 ||
  22. trace_options.sampling_frequency == 1) {
  23. return true;
  24. }
  25. // We use spatial downsampling so that we have a complete access history for a
  26. // block.
  27. return 0 == GetSliceRangedNPHash(block_key, trace_options.sampling_frequency);
  28. }
  29. } // namespace
  30. const uint64_t kMicrosInSecond = 1000 * 1000;
  31. const uint64_t kSecondInMinute = 60;
  32. const uint64_t kSecondInHour = 3600;
  33. const std::string BlockCacheTraceHelper::kUnknownColumnFamilyName =
  34. "UnknownColumnFamily";
  35. const uint64_t BlockCacheTraceRecord::kReservedGetId = 0;
  36. const uint64_t BlockCacheTraceHelper::kReservedGetId = 0;
  37. bool BlockCacheTraceHelper::IsGetOrMultiGetOnDataBlock(
  38. TraceType block_type, TableReaderCaller caller) {
  39. return (block_type == TraceType::kBlockTraceDataBlock) &&
  40. IsGetOrMultiGet(caller);
  41. }
  42. bool BlockCacheTraceHelper::IsGetOrMultiGet(TableReaderCaller caller) {
  43. return caller == TableReaderCaller::kUserGet ||
  44. caller == TableReaderCaller::kUserMultiGet;
  45. }
  46. bool BlockCacheTraceHelper::IsUserAccess(TableReaderCaller caller) {
  47. return caller == TableReaderCaller::kUserGet ||
  48. caller == TableReaderCaller::kUserMultiGet ||
  49. caller == TableReaderCaller::kUserIterator ||
  50. caller == TableReaderCaller::kUserApproximateSize ||
  51. caller == TableReaderCaller::kUserVerifyChecksum;
  52. }
  53. std::string BlockCacheTraceHelper::ComputeRowKey(
  54. const BlockCacheTraceRecord& access) {
  55. if (!IsGetOrMultiGet(access.caller)) {
  56. return "";
  57. }
  58. Slice key = ExtractUserKey(access.referenced_key);
  59. return std::to_string(access.sst_fd_number) + "_" + key.ToString();
  60. }
  61. uint64_t BlockCacheTraceHelper::GetTableId(
  62. const BlockCacheTraceRecord& access) {
  63. if (!IsGetOrMultiGet(access.caller) || access.referenced_key.size() < 4) {
  64. return 0;
  65. }
  66. return static_cast<uint64_t>(DecodeFixed32(access.referenced_key.data())) + 1;
  67. }
  68. uint64_t BlockCacheTraceHelper::GetSequenceNumber(
  69. const BlockCacheTraceRecord& access) {
  70. if (!IsGetOrMultiGet(access.caller)) {
  71. return 0;
  72. }
  73. if (access.caller == TableReaderCaller::kUserMultiGet &&
  74. access.referenced_key.size() < 4) {
  75. return 0;
  76. }
  77. return access.get_from_user_specified_snapshot
  78. ? 1 + GetInternalKeySeqno(access.referenced_key)
  79. : 0;
  80. }
  81. uint64_t BlockCacheTraceHelper::GetBlockOffsetInFile(
  82. const BlockCacheTraceRecord& access) {
  83. Slice input(access.block_key);
  84. uint64_t offset = 0;
  85. while (true) {
  86. uint64_t tmp = 0;
  87. if (GetVarint64(&input, &tmp)) {
  88. offset = tmp;
  89. } else {
  90. break;
  91. }
  92. }
  93. return offset;
  94. }
  95. BlockCacheTraceWriterImpl::BlockCacheTraceWriterImpl(
  96. SystemClock* clock, const BlockCacheTraceWriterOptions& trace_options,
  97. std::unique_ptr<TraceWriter>&& trace_writer)
  98. : clock_(clock),
  99. trace_options_(trace_options),
  100. trace_writer_(std::move(trace_writer)) {}
  101. Status BlockCacheTraceWriterImpl::WriteBlockAccess(
  102. const BlockCacheTraceRecord& record, const Slice& block_key,
  103. const Slice& cf_name, const Slice& referenced_key) {
  104. uint64_t trace_file_size = trace_writer_->GetFileSize();
  105. if (trace_file_size > trace_options_.max_trace_file_size) {
  106. return Status::OK();
  107. }
  108. Trace trace;
  109. trace.ts = record.access_timestamp;
  110. trace.type = record.block_type;
  111. PutLengthPrefixedSlice(&trace.payload, block_key);
  112. PutFixed64(&trace.payload, record.block_size);
  113. PutFixed64(&trace.payload, record.cf_id);
  114. PutLengthPrefixedSlice(&trace.payload, cf_name);
  115. PutFixed32(&trace.payload, record.level);
  116. PutFixed64(&trace.payload, record.sst_fd_number);
  117. trace.payload.push_back(record.caller);
  118. trace.payload.push_back(record.is_cache_hit);
  119. trace.payload.push_back(record.no_insert);
  120. if (BlockCacheTraceHelper::IsGetOrMultiGet(record.caller)) {
  121. PutFixed64(&trace.payload, record.get_id);
  122. trace.payload.push_back(record.get_from_user_specified_snapshot);
  123. PutLengthPrefixedSlice(&trace.payload, referenced_key);
  124. }
  125. if (BlockCacheTraceHelper::IsGetOrMultiGetOnDataBlock(record.block_type,
  126. record.caller)) {
  127. PutFixed64(&trace.payload, record.referenced_data_size);
  128. PutFixed64(&trace.payload, record.num_keys_in_block);
  129. trace.payload.push_back(record.referenced_key_exist_in_block);
  130. }
  131. std::string encoded_trace;
  132. TracerHelper::EncodeTrace(trace, &encoded_trace);
  133. return trace_writer_->Write(encoded_trace);
  134. }
  135. Status BlockCacheTraceWriterImpl::WriteHeader() {
  136. Trace trace;
  137. trace.ts = clock_->NowMicros();
  138. trace.type = TraceType::kTraceBegin;
  139. PutLengthPrefixedSlice(&trace.payload, kTraceMagic);
  140. PutFixed32(&trace.payload, kMajorVersion);
  141. PutFixed32(&trace.payload, kMinorVersion);
  142. std::string encoded_trace;
  143. TracerHelper::EncodeTrace(trace, &encoded_trace);
  144. return trace_writer_->Write(encoded_trace);
  145. }
  146. BlockCacheTraceReader::BlockCacheTraceReader(
  147. std::unique_ptr<TraceReader>&& reader)
  148. : trace_reader_(std::move(reader)) {}
  149. Status BlockCacheTraceReader::ReadHeader(BlockCacheTraceHeader* header) {
  150. assert(header != nullptr);
  151. std::string encoded_trace;
  152. Status s = trace_reader_->Read(&encoded_trace);
  153. if (!s.ok()) {
  154. return s;
  155. }
  156. Trace trace;
  157. s = TracerHelper::DecodeTrace(encoded_trace, &trace);
  158. if (!s.ok()) {
  159. return s;
  160. }
  161. header->start_time = trace.ts;
  162. Slice enc_slice = Slice(trace.payload);
  163. Slice magnic_number;
  164. if (!GetLengthPrefixedSlice(&enc_slice, &magnic_number)) {
  165. return Status::Corruption(
  166. "Corrupted header in the trace file: Failed to read the magic number.");
  167. }
  168. if (magnic_number.ToString() != kTraceMagic) {
  169. return Status::Corruption(
  170. "Corrupted header in the trace file: Magic number does not match.");
  171. }
  172. if (!GetFixed32(&enc_slice, &header->rocksdb_major_version)) {
  173. return Status::Corruption(
  174. "Corrupted header in the trace file: Failed to read rocksdb major "
  175. "version number.");
  176. }
  177. if (!GetFixed32(&enc_slice, &header->rocksdb_minor_version)) {
  178. return Status::Corruption(
  179. "Corrupted header in the trace file: Failed to read rocksdb minor "
  180. "version number.");
  181. }
  182. // We should have retrieved all information in the header.
  183. if (!enc_slice.empty()) {
  184. return Status::Corruption(
  185. "Corrupted header in the trace file: The length of header is too "
  186. "long.");
  187. }
  188. return Status::OK();
  189. }
  190. Status BlockCacheTraceReader::ReadAccess(BlockCacheTraceRecord* record) {
  191. assert(record);
  192. std::string encoded_trace;
  193. Status s = trace_reader_->Read(&encoded_trace);
  194. if (!s.ok()) {
  195. return s;
  196. }
  197. Trace trace;
  198. s = TracerHelper::DecodeTrace(encoded_trace, &trace);
  199. if (!s.ok()) {
  200. return s;
  201. }
  202. record->access_timestamp = trace.ts;
  203. record->block_type = trace.type;
  204. Slice enc_slice = Slice(trace.payload);
  205. const unsigned int kCharSize = 1;
  206. Slice block_key;
  207. if (!GetLengthPrefixedSlice(&enc_slice, &block_key)) {
  208. return Status::Incomplete(
  209. "Incomplete access record: Failed to read block key.");
  210. }
  211. record->block_key = block_key.ToString();
  212. if (!GetFixed64(&enc_slice, &record->block_size)) {
  213. return Status::Incomplete(
  214. "Incomplete access record: Failed to read block size.");
  215. }
  216. if (!GetFixed64(&enc_slice, &record->cf_id)) {
  217. return Status::Incomplete(
  218. "Incomplete access record: Failed to read column family ID.");
  219. }
  220. Slice cf_name;
  221. if (!GetLengthPrefixedSlice(&enc_slice, &cf_name)) {
  222. return Status::Incomplete(
  223. "Incomplete access record: Failed to read column family name.");
  224. }
  225. record->cf_name = cf_name.ToString();
  226. if (!GetFixed32(&enc_slice, &record->level)) {
  227. return Status::Incomplete(
  228. "Incomplete access record: Failed to read level.");
  229. }
  230. if (!GetFixed64(&enc_slice, &record->sst_fd_number)) {
  231. return Status::Incomplete(
  232. "Incomplete access record: Failed to read SST file number.");
  233. }
  234. if (enc_slice.empty()) {
  235. return Status::Incomplete(
  236. "Incomplete access record: Failed to read caller.");
  237. }
  238. record->caller = static_cast<TableReaderCaller>(enc_slice[0]);
  239. enc_slice.remove_prefix(kCharSize);
  240. if (enc_slice.empty()) {
  241. return Status::Incomplete(
  242. "Incomplete access record: Failed to read is_cache_hit.");
  243. }
  244. record->is_cache_hit = static_cast<char>(enc_slice[0]);
  245. enc_slice.remove_prefix(kCharSize);
  246. if (enc_slice.empty()) {
  247. return Status::Incomplete(
  248. "Incomplete access record: Failed to read no_insert.");
  249. }
  250. record->no_insert = static_cast<char>(enc_slice[0]);
  251. enc_slice.remove_prefix(kCharSize);
  252. if (BlockCacheTraceHelper::IsGetOrMultiGet(record->caller)) {
  253. if (!GetFixed64(&enc_slice, &record->get_id)) {
  254. return Status::Incomplete(
  255. "Incomplete access record: Failed to read the get id.");
  256. }
  257. if (enc_slice.empty()) {
  258. return Status::Incomplete(
  259. "Incomplete access record: Failed to read "
  260. "get_from_user_specified_snapshot.");
  261. }
  262. record->get_from_user_specified_snapshot = static_cast<char>(enc_slice[0]);
  263. enc_slice.remove_prefix(kCharSize);
  264. Slice referenced_key;
  265. if (!GetLengthPrefixedSlice(&enc_slice, &referenced_key)) {
  266. return Status::Incomplete(
  267. "Incomplete access record: Failed to read the referenced key.");
  268. }
  269. record->referenced_key = referenced_key.ToString();
  270. }
  271. if (BlockCacheTraceHelper::IsGetOrMultiGetOnDataBlock(record->block_type,
  272. record->caller)) {
  273. if (!GetFixed64(&enc_slice, &record->referenced_data_size)) {
  274. return Status::Incomplete(
  275. "Incomplete access record: Failed to read the referenced data size.");
  276. }
  277. if (!GetFixed64(&enc_slice, &record->num_keys_in_block)) {
  278. return Status::Incomplete(
  279. "Incomplete access record: Failed to read the number of keys in the "
  280. "block.");
  281. }
  282. if (enc_slice.empty()) {
  283. return Status::Incomplete(
  284. "Incomplete access record: Failed to read "
  285. "referenced_key_exist_in_block.");
  286. }
  287. record->referenced_key_exist_in_block = static_cast<char>(enc_slice[0]);
  288. }
  289. return Status::OK();
  290. }
  291. BlockCacheHumanReadableTraceWriter::~BlockCacheHumanReadableTraceWriter() {
  292. if (human_readable_trace_file_writer_) {
  293. human_readable_trace_file_writer_->Flush().PermitUncheckedError();
  294. human_readable_trace_file_writer_->Close().PermitUncheckedError();
  295. }
  296. }
  297. Status BlockCacheHumanReadableTraceWriter::NewWritableFile(
  298. const std::string& human_readable_trace_file_path,
  299. ROCKSDB_NAMESPACE::Env* env) {
  300. if (human_readable_trace_file_path.empty()) {
  301. return Status::InvalidArgument(
  302. "The provided human_readable_trace_file_path is null.");
  303. }
  304. return env->NewWritableFile(human_readable_trace_file_path,
  305. &human_readable_trace_file_writer_, EnvOptions());
  306. }
  307. Status BlockCacheHumanReadableTraceWriter::WriteHumanReadableTraceRecord(
  308. const BlockCacheTraceRecord& access, uint64_t block_id,
  309. uint64_t get_key_id) {
  310. if (!human_readable_trace_file_writer_) {
  311. return Status::OK();
  312. }
  313. int ret = snprintf(
  314. trace_record_buffer_, sizeof(trace_record_buffer_),
  315. "%" PRIu64 ",%" PRIu64 ",%u,%" PRIu64 ",%" PRIu64 ",%s,%" PRIu32
  316. ",%" PRIu64 ",%u,%u,%" PRIu64 ",%" PRIu64 ",%" PRIu64 ",%u,%u,%" PRIu64
  317. ",%" PRIu64 ",%" PRIu64 ",%" PRIu64 ",%" PRIu64 ",%" PRIu64 "\n",
  318. access.access_timestamp, block_id, access.block_type, access.block_size,
  319. access.cf_id, access.cf_name.c_str(), access.level, access.sst_fd_number,
  320. access.caller, access.no_insert, access.get_id, get_key_id,
  321. access.referenced_data_size, access.is_cache_hit,
  322. access.referenced_key_exist_in_block, access.num_keys_in_block,
  323. BlockCacheTraceHelper::GetTableId(access),
  324. BlockCacheTraceHelper::GetSequenceNumber(access),
  325. static_cast<uint64_t>(access.block_key.size()),
  326. static_cast<uint64_t>(access.referenced_key.size()),
  327. BlockCacheTraceHelper::GetBlockOffsetInFile(access));
  328. if (ret < 0) {
  329. return Status::IOError("failed to format the output");
  330. }
  331. std::string printout(trace_record_buffer_);
  332. return human_readable_trace_file_writer_->Append(printout);
  333. }
  334. BlockCacheHumanReadableTraceReader::BlockCacheHumanReadableTraceReader(
  335. const std::string& trace_file_path)
  336. : BlockCacheTraceReader(/*trace_reader=*/nullptr) {
  337. human_readable_trace_reader_.open(trace_file_path, std::ifstream::in);
  338. }
  339. BlockCacheHumanReadableTraceReader::~BlockCacheHumanReadableTraceReader() {
  340. human_readable_trace_reader_.close();
  341. }
  342. Status BlockCacheHumanReadableTraceReader::ReadHeader(
  343. BlockCacheTraceHeader* /*header*/) {
  344. return Status::OK();
  345. }
  346. Status BlockCacheHumanReadableTraceReader::ReadAccess(
  347. BlockCacheTraceRecord* record) {
  348. std::string line;
  349. if (!std::getline(human_readable_trace_reader_, line)) {
  350. return Status::Incomplete("No more records to read.");
  351. }
  352. std::stringstream ss(line);
  353. std::vector<std::string> record_strs;
  354. while (ss.good()) {
  355. std::string substr;
  356. getline(ss, substr, ',');
  357. record_strs.push_back(substr);
  358. }
  359. if (record_strs.size() != 21) {
  360. return Status::Incomplete("Records format is wrong.");
  361. }
  362. record->access_timestamp = ParseUint64(record_strs[0]);
  363. uint64_t block_key = ParseUint64(record_strs[1]);
  364. record->block_type = static_cast<TraceType>(ParseUint64(record_strs[2]));
  365. record->block_size = ParseUint64(record_strs[3]);
  366. record->cf_id = ParseUint64(record_strs[4]);
  367. record->cf_name = record_strs[5];
  368. record->level = static_cast<uint32_t>(ParseUint64(record_strs[6]));
  369. record->sst_fd_number = ParseUint64(record_strs[7]);
  370. record->caller = static_cast<TableReaderCaller>(ParseUint64(record_strs[8]));
  371. record->no_insert = static_cast<char>(ParseUint64(record_strs[9]));
  372. record->get_id = ParseUint64(record_strs[10]);
  373. uint64_t get_key_id = ParseUint64(record_strs[11]);
  374. record->referenced_data_size = ParseUint64(record_strs[12]);
  375. record->is_cache_hit = static_cast<char>(ParseUint64(record_strs[13]));
  376. record->referenced_key_exist_in_block =
  377. static_cast<char>(ParseUint64(record_strs[14]));
  378. record->num_keys_in_block = ParseUint64(record_strs[15]);
  379. uint64_t table_id = ParseUint64(record_strs[16]);
  380. if (table_id > 0) {
  381. // Decrement since valid table id in the trace file equals traced table id
  382. // + 1.
  383. table_id -= 1;
  384. }
  385. uint64_t get_sequence_number = ParseUint64(record_strs[17]);
  386. if (get_sequence_number > 0) {
  387. record->get_from_user_specified_snapshot = true;
  388. // Decrement since valid seq number in the trace file equals traced seq
  389. // number + 1.
  390. get_sequence_number -= 1;
  391. }
  392. uint64_t block_key_size = ParseUint64(record_strs[18]);
  393. uint64_t get_key_size = ParseUint64(record_strs[19]);
  394. uint64_t block_offset = ParseUint64(record_strs[20]);
  395. std::string tmp_block_key;
  396. PutVarint64(&tmp_block_key, block_key);
  397. PutVarint64(&tmp_block_key, block_offset);
  398. // Append 1 until the size is the same as traced block key size.
  399. while (record->block_key.size() < block_key_size - tmp_block_key.size()) {
  400. record->block_key += "1";
  401. }
  402. record->block_key += tmp_block_key;
  403. if (get_key_id != 0) {
  404. std::string tmp_get_key;
  405. PutFixed64(&tmp_get_key, get_key_id);
  406. PutFixed64(&tmp_get_key, get_sequence_number << 8);
  407. PutFixed32(&record->referenced_key, static_cast<uint32_t>(table_id));
  408. // Append 1 until the size is the same as traced key size.
  409. while (record->referenced_key.size() < get_key_size - tmp_get_key.size()) {
  410. record->referenced_key += "1";
  411. }
  412. record->referenced_key += tmp_get_key;
  413. }
  414. return Status::OK();
  415. }
  416. BlockCacheTracer::BlockCacheTracer() { writer_.store(nullptr); }
  417. BlockCacheTracer::~BlockCacheTracer() { EndTrace(); }
  418. Status BlockCacheTracer::StartTrace(
  419. const BlockCacheTraceOptions& trace_options,
  420. std::unique_ptr<BlockCacheTraceWriter>&& trace_writer) {
  421. InstrumentedMutexLock lock_guard(&trace_writer_mutex_);
  422. if (writer_.load()) {
  423. return Status::Busy();
  424. }
  425. get_id_counter_.store(1);
  426. trace_options_ = trace_options;
  427. writer_.store(trace_writer.release());
  428. return writer_.load()->WriteHeader();
  429. }
  430. void BlockCacheTracer::EndTrace() {
  431. InstrumentedMutexLock lock_guard(&trace_writer_mutex_);
  432. if (!writer_.load()) {
  433. return;
  434. }
  435. delete writer_.load();
  436. writer_.store(nullptr);
  437. }
  438. Status BlockCacheTracer::WriteBlockAccess(const BlockCacheTraceRecord& record,
  439. const Slice& block_key,
  440. const Slice& cf_name,
  441. const Slice& referenced_key) {
  442. if (!writer_.load() || !ShouldTrace(block_key, trace_options_)) {
  443. return Status::OK();
  444. }
  445. InstrumentedMutexLock lock_guard(&trace_writer_mutex_);
  446. if (!writer_.load()) {
  447. return Status::OK();
  448. }
  449. return writer_.load()->WriteBlockAccess(record, block_key, cf_name,
  450. referenced_key);
  451. }
  452. uint64_t BlockCacheTracer::NextGetId() {
  453. if (!writer_.load(std::memory_order_relaxed)) {
  454. return BlockCacheTraceHelper::kReservedGetId;
  455. }
  456. uint64_t prev_value = get_id_counter_.fetch_add(1);
  457. if (prev_value == BlockCacheTraceHelper::kReservedGetId) {
  458. // fetch and add again.
  459. return get_id_counter_.fetch_add(1);
  460. }
  461. return prev_value;
  462. }
  463. std::unique_ptr<BlockCacheTraceWriter> NewBlockCacheTraceWriter(
  464. SystemClock* clock, const BlockCacheTraceWriterOptions& trace_options,
  465. std::unique_ptr<TraceWriter>&& trace_writer) {
  466. return std::unique_ptr<BlockCacheTraceWriter>(new BlockCacheTraceWriterImpl(
  467. clock, trace_options, std::move(trace_writer)));
  468. }
  469. } // namespace ROCKSDB_NAMESPACE