trace_replay.cc 20 KB


  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #include "trace_replay/trace_replay.h"
  6. #include <chrono>
  7. #include <sstream>
  8. #include <thread>
  9. #include "db/db_impl/db_impl.h"
  10. #include "rocksdb/env.h"
  11. #include "rocksdb/iterator.h"
  12. #include "rocksdb/options.h"
  13. #include "rocksdb/slice.h"
  14. #include "rocksdb/system_clock.h"
  15. #include "rocksdb/trace_reader_writer.h"
  16. #include "rocksdb/write_batch.h"
  17. #include "util/coding.h"
  18. #include "util/string_util.h"
  19. namespace ROCKSDB_NAMESPACE {
  20. const std::string kTraceMagic = "feedcafedeadbeef";
  21. namespace {
  22. void DecodeCFAndKey(std::string& buffer, uint32_t* cf_id, Slice* key) {
  23. Slice buf(buffer);
  24. GetFixed32(&buf, cf_id);
  25. GetLengthPrefixedSlice(&buf, key);
  26. }
  27. } // namespace
  28. Status TracerHelper::ParseVersionStr(std::string& v_string, int* v_num) {
  29. if (v_string.find_first_of('.') == std::string::npos ||
  30. v_string.find_first_of('.') != v_string.find_last_of('.')) {
  31. return Status::Corruption(
  32. "Corrupted trace file. Incorrect version format.");
  33. }
  34. int tmp_num = 0;
  35. for (int i = 0; i < static_cast<int>(v_string.size()); i++) {
  36. if (v_string[i] == '.') {
  37. continue;
  38. } else if (isdigit(v_string[i])) {
  39. tmp_num = tmp_num * 10 + (v_string[i] - '0');
  40. } else {
  41. return Status::Corruption(
  42. "Corrupted trace file. Incorrect version format");
  43. }
  44. }
  45. *v_num = tmp_num;
  46. return Status::OK();
  47. }
  48. Status TracerHelper::ParseTraceHeader(const Trace& header, int* trace_version,
  49. int* db_version) {
  50. std::vector<std::string> s_vec;
  51. int begin = 0, end;
  52. for (int i = 0; i < 3; i++) {
  53. assert(header.payload.find('\t', begin) != std::string::npos);
  54. end = static_cast<int>(header.payload.find('\t', begin));
  55. s_vec.push_back(header.payload.substr(begin, end - begin));
  56. begin = end + 1;
  57. }
  58. std::string t_v_str, db_v_str;
  59. assert(s_vec.size() == 3);
  60. assert(s_vec[1].find("Trace Version: ") != std::string::npos);
  61. t_v_str = s_vec[1].substr(15);
  62. assert(s_vec[2].find("RocksDB Version: ") != std::string::npos);
  63. db_v_str = s_vec[2].substr(17);
  64. Status s;
  65. s = ParseVersionStr(t_v_str, trace_version);
  66. if (s != Status::OK()) {
  67. return s;
  68. }
  69. s = ParseVersionStr(db_v_str, db_version);
  70. return s;
  71. }
  72. void TracerHelper::EncodeTrace(const Trace& trace, std::string* encoded_trace) {
  73. assert(encoded_trace);
  74. PutFixed64(encoded_trace, trace.ts);
  75. encoded_trace->push_back(trace.type);
  76. PutFixed32(encoded_trace, static_cast<uint32_t>(trace.payload.size()));
  77. encoded_trace->append(trace.payload);
  78. }
  79. Status TracerHelper::DecodeTrace(const std::string& encoded_trace,
  80. Trace* trace) {
  81. assert(trace != nullptr);
  82. Slice enc_slice = Slice(encoded_trace);
  83. if (!GetFixed64(&enc_slice, &trace->ts)) {
  84. return Status::Incomplete("Decode trace string failed");
  85. }
  86. if (enc_slice.size() < kTraceTypeSize + kTracePayloadLengthSize) {
  87. return Status::Incomplete("Decode trace string failed");
  88. }
  89. trace->type = static_cast<TraceType>(enc_slice[0]);
  90. enc_slice.remove_prefix(kTraceTypeSize + kTracePayloadLengthSize);
  91. trace->payload = enc_slice.ToString();
  92. return Status::OK();
  93. }
  94. Status TracerHelper::DecodeHeader(const std::string& encoded_trace,
  95. Trace* header) {
  96. Status s = TracerHelper::DecodeTrace(encoded_trace, header);
  97. if (header->type != kTraceBegin) {
  98. return Status::Corruption("Corrupted trace file. Incorrect header.");
  99. }
  100. if (header->payload.substr(0, kTraceMagic.length()) != kTraceMagic) {
  101. return Status::Corruption("Corrupted trace file. Incorrect magic.");
  102. }
  103. return s;
  104. }
  105. bool TracerHelper::SetPayloadMap(uint64_t& payload_map,
  106. const TracePayloadType payload_type) {
  107. uint64_t old_state = payload_map;
  108. uint64_t tmp = 1;
  109. payload_map |= (tmp << payload_type);
  110. return old_state != payload_map;
  111. }
  112. Status TracerHelper::DecodeTraceRecord(Trace* trace, int trace_file_version,
  113. std::unique_ptr<TraceRecord>* record) {
  114. assert(trace != nullptr);
  115. if (record != nullptr) {
  116. record->reset(nullptr);
  117. }
  118. switch (trace->type) {
  119. // Write
  120. case kTraceWrite: {
  121. PinnableSlice rep;
  122. if (trace_file_version < 2) {
  123. rep.PinSelf(trace->payload);
  124. } else {
  125. Slice buf(trace->payload);
  126. GetFixed64(&buf, &trace->payload_map);
  127. int64_t payload_map = static_cast<int64_t>(trace->payload_map);
  128. Slice write_batch_data;
  129. while (payload_map) {
  130. // Find the rightmost set bit.
  131. uint32_t set_pos =
  132. static_cast<uint32_t>(log2(payload_map & -payload_map));
  133. switch (set_pos) {
  134. case TracePayloadType::kWriteBatchData: {
  135. GetLengthPrefixedSlice(&buf, &write_batch_data);
  136. break;
  137. }
  138. default: {
  139. assert(false);
  140. }
  141. }
  142. // unset the rightmost bit.
  143. payload_map &= (payload_map - 1);
  144. }
  145. rep.PinSelf(write_batch_data);
  146. }
  147. if (record != nullptr) {
  148. record->reset(new WriteQueryTraceRecord(std::move(rep), trace->ts));
  149. }
  150. return Status::OK();
  151. }
  152. // Get
  153. case kTraceGet: {
  154. uint32_t cf_id = 0;
  155. Slice get_key;
  156. if (trace_file_version < 2) {
  157. DecodeCFAndKey(trace->payload, &cf_id, &get_key);
  158. } else {
  159. Slice buf(trace->payload);
  160. GetFixed64(&buf, &trace->payload_map);
  161. int64_t payload_map = static_cast<int64_t>(trace->payload_map);
  162. while (payload_map) {
  163. // Find the rightmost set bit.
  164. uint32_t set_pos =
  165. static_cast<uint32_t>(log2(payload_map & -payload_map));
  166. switch (set_pos) {
  167. case TracePayloadType::kGetCFID: {
  168. GetFixed32(&buf, &cf_id);
  169. break;
  170. }
  171. case TracePayloadType::kGetKey: {
  172. GetLengthPrefixedSlice(&buf, &get_key);
  173. break;
  174. }
  175. default: {
  176. assert(false);
  177. }
  178. }
  179. // unset the rightmost bit.
  180. payload_map &= (payload_map - 1);
  181. }
  182. }
  183. if (record != nullptr) {
  184. PinnableSlice ps;
  185. ps.PinSelf(get_key);
  186. record->reset(new GetQueryTraceRecord(cf_id, std::move(ps), trace->ts));
  187. }
  188. return Status::OK();
  189. }
  190. // Iterator Seek and SeekForPrev
  191. case kTraceIteratorSeek:
  192. case kTraceIteratorSeekForPrev: {
  193. uint32_t cf_id = 0;
  194. Slice iter_key;
  195. Slice lower_bound;
  196. Slice upper_bound;
  197. if (trace_file_version < 2) {
  198. DecodeCFAndKey(trace->payload, &cf_id, &iter_key);
  199. } else {
  200. Slice buf(trace->payload);
  201. GetFixed64(&buf, &trace->payload_map);
  202. int64_t payload_map = static_cast<int64_t>(trace->payload_map);
  203. while (payload_map) {
  204. // Find the rightmost set bit.
  205. uint32_t set_pos =
  206. static_cast<uint32_t>(log2(payload_map & -payload_map));
  207. switch (set_pos) {
  208. case TracePayloadType::kIterCFID: {
  209. GetFixed32(&buf, &cf_id);
  210. break;
  211. }
  212. case TracePayloadType::kIterKey: {
  213. GetLengthPrefixedSlice(&buf, &iter_key);
  214. break;
  215. }
  216. case TracePayloadType::kIterLowerBound: {
  217. GetLengthPrefixedSlice(&buf, &lower_bound);
  218. break;
  219. }
  220. case TracePayloadType::kIterUpperBound: {
  221. GetLengthPrefixedSlice(&buf, &upper_bound);
  222. break;
  223. }
  224. default: {
  225. assert(false);
  226. }
  227. }
  228. // unset the rightmost bit.
  229. payload_map &= (payload_map - 1);
  230. }
  231. }
  232. if (record != nullptr) {
  233. PinnableSlice ps_key;
  234. ps_key.PinSelf(iter_key);
  235. PinnableSlice ps_lower;
  236. ps_lower.PinSelf(lower_bound);
  237. PinnableSlice ps_upper;
  238. ps_upper.PinSelf(upper_bound);
  239. record->reset(new IteratorSeekQueryTraceRecord(
  240. static_cast<IteratorSeekQueryTraceRecord::SeekType>(trace->type),
  241. cf_id, std::move(ps_key), std::move(ps_lower), std::move(ps_upper),
  242. trace->ts));
  243. }
  244. return Status::OK();
  245. }
  246. // MultiGet
  247. case kTraceMultiGet: {
  248. if (trace_file_version < 2) {
  249. return Status::Corruption("MultiGet is not supported.");
  250. }
  251. uint32_t multiget_size = 0;
  252. std::vector<uint32_t> cf_ids;
  253. std::vector<PinnableSlice> multiget_keys;
  254. Slice cfids_payload;
  255. Slice keys_payload;
  256. Slice buf(trace->payload);
  257. GetFixed64(&buf, &trace->payload_map);
  258. int64_t payload_map = static_cast<int64_t>(trace->payload_map);
  259. while (payload_map) {
  260. // Find the rightmost set bit.
  261. uint32_t set_pos =
  262. static_cast<uint32_t>(log2(payload_map & -payload_map));
  263. switch (set_pos) {
  264. case TracePayloadType::kMultiGetSize: {
  265. GetFixed32(&buf, &multiget_size);
  266. break;
  267. }
  268. case TracePayloadType::kMultiGetCFIDs: {
  269. GetLengthPrefixedSlice(&buf, &cfids_payload);
  270. break;
  271. }
  272. case TracePayloadType::kMultiGetKeys: {
  273. GetLengthPrefixedSlice(&buf, &keys_payload);
  274. break;
  275. }
  276. default: {
  277. assert(false);
  278. }
  279. }
  280. // unset the rightmost bit.
  281. payload_map &= (payload_map - 1);
  282. }
  283. if (multiget_size == 0) {
  284. return Status::InvalidArgument("Empty MultiGet cf_ids or keys.");
  285. }
  286. // Decode the cfids_payload and keys_payload
  287. cf_ids.reserve(multiget_size);
  288. multiget_keys.reserve(multiget_size);
  289. for (uint32_t i = 0; i < multiget_size; i++) {
  290. uint32_t tmp_cfid = 0;
  291. Slice tmp_key;
  292. GetFixed32(&cfids_payload, &tmp_cfid);
  293. GetLengthPrefixedSlice(&keys_payload, &tmp_key);
  294. cf_ids.push_back(tmp_cfid);
  295. Slice s(tmp_key);
  296. PinnableSlice ps;
  297. ps.PinSelf(s);
  298. multiget_keys.push_back(std::move(ps));
  299. }
  300. if (record != nullptr) {
  301. record->reset(new MultiGetQueryTraceRecord(
  302. std::move(cf_ids), std::move(multiget_keys), trace->ts));
  303. }
  304. return Status::OK();
  305. }
  306. default:
  307. return Status::NotSupported("Unsupported trace type.");
  308. }
  309. }
  310. Tracer::Tracer(SystemClock* clock, const TraceOptions& trace_options,
  311. std::unique_ptr<TraceWriter>&& trace_writer)
  312. : clock_(clock),
  313. trace_options_(trace_options),
  314. trace_writer_(std::move(trace_writer)),
  315. trace_request_count_(0),
  316. trace_write_status_(Status::OK()) {
  317. // TODO: What if this fails?
  318. WriteHeader().PermitUncheckedError();
  319. }
  320. Tracer::~Tracer() { trace_writer_.reset(); }
  321. Status Tracer::Write(WriteBatch* write_batch) {
  322. TraceType trace_type = kTraceWrite;
  323. if (ShouldSkipTrace(trace_type)) {
  324. return Status::OK();
  325. }
  326. Trace trace;
  327. trace.ts = clock_->NowMicros();
  328. trace.type = trace_type;
  329. TracerHelper::SetPayloadMap(trace.payload_map,
  330. TracePayloadType::kWriteBatchData);
  331. PutFixed64(&trace.payload, trace.payload_map);
  332. PutLengthPrefixedSlice(&trace.payload, Slice(write_batch->Data()));
  333. return WriteTrace(trace);
  334. }
  335. Status Tracer::Get(ColumnFamilyHandle* column_family, const Slice& key) {
  336. TraceType trace_type = kTraceGet;
  337. if (ShouldSkipTrace(trace_type)) {
  338. return Status::OK();
  339. }
  340. Trace trace;
  341. trace.ts = clock_->NowMicros();
  342. trace.type = trace_type;
  343. // Set the payloadmap of the struct member that will be encoded in the
  344. // payload.
  345. TracerHelper::SetPayloadMap(trace.payload_map, TracePayloadType::kGetCFID);
  346. TracerHelper::SetPayloadMap(trace.payload_map, TracePayloadType::kGetKey);
  347. // Encode the Get struct members into payload. Make sure add them in order.
  348. PutFixed64(&trace.payload, trace.payload_map);
  349. PutFixed32(&trace.payload, column_family->GetID());
  350. PutLengthPrefixedSlice(&trace.payload, key);
  351. return WriteTrace(trace);
  352. }
  353. Status Tracer::IteratorSeek(const uint32_t& cf_id, const Slice& key,
  354. const Slice& lower_bound, const Slice upper_bound) {
  355. TraceType trace_type = kTraceIteratorSeek;
  356. if (ShouldSkipTrace(trace_type)) {
  357. return Status::OK();
  358. }
  359. Trace trace;
  360. trace.ts = clock_->NowMicros();
  361. trace.type = trace_type;
  362. // Set the payloadmap of the struct member that will be encoded in the
  363. // payload.
  364. TracerHelper::SetPayloadMap(trace.payload_map, TracePayloadType::kIterCFID);
  365. TracerHelper::SetPayloadMap(trace.payload_map, TracePayloadType::kIterKey);
  366. if (lower_bound.size() > 0) {
  367. TracerHelper::SetPayloadMap(trace.payload_map,
  368. TracePayloadType::kIterLowerBound);
  369. }
  370. if (upper_bound.size() > 0) {
  371. TracerHelper::SetPayloadMap(trace.payload_map,
  372. TracePayloadType::kIterUpperBound);
  373. }
  374. // Encode the Iterator struct members into payload. Make sure add them in
  375. // order.
  376. PutFixed64(&trace.payload, trace.payload_map);
  377. PutFixed32(&trace.payload, cf_id);
  378. PutLengthPrefixedSlice(&trace.payload, key);
  379. if (lower_bound.size() > 0) {
  380. PutLengthPrefixedSlice(&trace.payload, lower_bound);
  381. }
  382. if (upper_bound.size() > 0) {
  383. PutLengthPrefixedSlice(&trace.payload, upper_bound);
  384. }
  385. return WriteTrace(trace);
  386. }
  387. Status Tracer::IteratorSeekForPrev(const uint32_t& cf_id, const Slice& key,
  388. const Slice& lower_bound,
  389. const Slice upper_bound) {
  390. TraceType trace_type = kTraceIteratorSeekForPrev;
  391. if (ShouldSkipTrace(trace_type)) {
  392. return Status::OK();
  393. }
  394. Trace trace;
  395. trace.ts = clock_->NowMicros();
  396. trace.type = trace_type;
  397. // Set the payloadmap of the struct member that will be encoded in the
  398. // payload.
  399. TracerHelper::SetPayloadMap(trace.payload_map, TracePayloadType::kIterCFID);
  400. TracerHelper::SetPayloadMap(trace.payload_map, TracePayloadType::kIterKey);
  401. if (lower_bound.size() > 0) {
  402. TracerHelper::SetPayloadMap(trace.payload_map,
  403. TracePayloadType::kIterLowerBound);
  404. }
  405. if (upper_bound.size() > 0) {
  406. TracerHelper::SetPayloadMap(trace.payload_map,
  407. TracePayloadType::kIterUpperBound);
  408. }
  409. // Encode the Iterator struct members into payload. Make sure add them in
  410. // order.
  411. PutFixed64(&trace.payload, trace.payload_map);
  412. PutFixed32(&trace.payload, cf_id);
  413. PutLengthPrefixedSlice(&trace.payload, key);
  414. if (lower_bound.size() > 0) {
  415. PutLengthPrefixedSlice(&trace.payload, lower_bound);
  416. }
  417. if (upper_bound.size() > 0) {
  418. PutLengthPrefixedSlice(&trace.payload, upper_bound);
  419. }
  420. return WriteTrace(trace);
  421. }
  422. Status Tracer::MultiGet(const size_t num_keys,
  423. ColumnFamilyHandle** column_families,
  424. const Slice* keys) {
  425. if (num_keys == 0) {
  426. return Status::OK();
  427. }
  428. std::vector<ColumnFamilyHandle*> v_column_families;
  429. std::vector<Slice> v_keys;
  430. v_column_families.resize(num_keys);
  431. v_keys.resize(num_keys);
  432. for (size_t i = 0; i < num_keys; i++) {
  433. v_column_families[i] = column_families[i];
  434. v_keys[i] = keys[i];
  435. }
  436. return MultiGet(v_column_families, v_keys);
  437. }
  438. Status Tracer::MultiGet(const size_t num_keys,
  439. ColumnFamilyHandle* column_family, const Slice* keys) {
  440. if (num_keys == 0) {
  441. return Status::OK();
  442. }
  443. std::vector<ColumnFamilyHandle*> column_families;
  444. std::vector<Slice> v_keys;
  445. column_families.resize(num_keys);
  446. v_keys.resize(num_keys);
  447. for (size_t i = 0; i < num_keys; i++) {
  448. column_families[i] = column_family;
  449. v_keys[i] = keys[i];
  450. }
  451. return MultiGet(column_families, v_keys);
  452. }
  453. Status Tracer::MultiGet(const std::vector<ColumnFamilyHandle*>& column_families,
  454. const std::vector<Slice>& keys) {
  455. if (column_families.size() != keys.size()) {
  456. return Status::Corruption("the CFs size and keys size does not match!");
  457. }
  458. TraceType trace_type = kTraceMultiGet;
  459. if (ShouldSkipTrace(trace_type)) {
  460. return Status::OK();
  461. }
  462. uint32_t multiget_size = static_cast<uint32_t>(keys.size());
  463. Trace trace;
  464. trace.ts = clock_->NowMicros();
  465. trace.type = trace_type;
  466. // Set the payloadmap of the struct member that will be encoded in the
  467. // payload.
  468. TracerHelper::SetPayloadMap(trace.payload_map,
  469. TracePayloadType::kMultiGetSize);
  470. TracerHelper::SetPayloadMap(trace.payload_map,
  471. TracePayloadType::kMultiGetCFIDs);
  472. TracerHelper::SetPayloadMap(trace.payload_map,
  473. TracePayloadType::kMultiGetKeys);
  474. // Encode the CFIDs inorder
  475. std::string cfids_payload;
  476. std::string keys_payload;
  477. for (uint32_t i = 0; i < multiget_size; i++) {
  478. assert(i < column_families.size());
  479. assert(i < keys.size());
  480. PutFixed32(&cfids_payload, column_families[i]->GetID());
  481. PutLengthPrefixedSlice(&keys_payload, keys[i]);
  482. }
  483. // Encode the Get struct members into payload. Make sure add them in order.
  484. PutFixed64(&trace.payload, trace.payload_map);
  485. PutFixed32(&trace.payload, multiget_size);
  486. PutLengthPrefixedSlice(&trace.payload, cfids_payload);
  487. PutLengthPrefixedSlice(&trace.payload, keys_payload);
  488. return WriteTrace(trace);
  489. }
  490. bool Tracer::ShouldSkipTrace(const TraceType& trace_type) {
  491. if (IsTraceFileOverMax()) {
  492. return true;
  493. }
  494. TraceFilterType filter_mask = kTraceFilterNone;
  495. switch (trace_type) {
  496. case kTraceNone:
  497. case kTraceBegin:
  498. case kTraceEnd:
  499. filter_mask = kTraceFilterNone;
  500. break;
  501. case kTraceWrite:
  502. filter_mask = kTraceFilterWrite;
  503. break;
  504. case kTraceGet:
  505. filter_mask = kTraceFilterGet;
  506. break;
  507. case kTraceIteratorSeek:
  508. filter_mask = kTraceFilterIteratorSeek;
  509. break;
  510. case kTraceIteratorSeekForPrev:
  511. filter_mask = kTraceFilterIteratorSeekForPrev;
  512. break;
  513. case kBlockTraceIndexBlock:
  514. case kBlockTraceFilterBlock:
  515. case kBlockTraceDataBlock:
  516. case kBlockTraceUncompressionDictBlock:
  517. case kBlockTraceRangeDeletionBlock:
  518. case kIOTracer:
  519. filter_mask = kTraceFilterNone;
  520. break;
  521. case kTraceMultiGet:
  522. filter_mask = kTraceFilterMultiGet;
  523. break;
  524. case kTraceMax:
  525. assert(false);
  526. filter_mask = kTraceFilterNone;
  527. break;
  528. }
  529. if (filter_mask != kTraceFilterNone && trace_options_.filter & filter_mask) {
  530. return true;
  531. }
  532. ++trace_request_count_;
  533. if (trace_request_count_ < trace_options_.sampling_frequency) {
  534. return true;
  535. }
  536. trace_request_count_ = 0;
  537. return false;
  538. }
  539. bool Tracer::IsTraceFileOverMax() {
  540. uint64_t trace_file_size = trace_writer_->GetFileSize();
  541. return (trace_file_size > trace_options_.max_trace_file_size);
  542. }
  543. Status Tracer::WriteHeader() {
  544. std::ostringstream s;
  545. s << kTraceMagic << "\t" << "Trace Version: " << kTraceFileMajorVersion << "."
  546. << kTraceFileMinorVersion << "\t" << "RocksDB Version: " << kMajorVersion
  547. << "." << kMinorVersion << "\t" << "Format: Timestamp OpType Payload\n";
  548. std::string header(s.str());
  549. Trace trace;
  550. trace.ts = clock_->NowMicros();
  551. trace.type = kTraceBegin;
  552. trace.payload = header;
  553. return WriteTrace(trace);
  554. }
  555. Status Tracer::WriteFooter() {
  556. Trace trace;
  557. trace.ts = clock_->NowMicros();
  558. trace.type = kTraceEnd;
  559. TracerHelper::SetPayloadMap(trace.payload_map,
  560. TracePayloadType::kEmptyPayload);
  561. trace.payload = "";
  562. return WriteTrace(trace);
  563. }
  564. Status Tracer::WriteTrace(const Trace& trace) {
  565. if (!trace_write_status_.ok()) {
  566. return Status::Incomplete("Tracing has seen error: %s",
  567. trace_write_status_.ToString());
  568. }
  569. assert(trace_write_status_.ok());
  570. std::string encoded_trace;
  571. TracerHelper::EncodeTrace(trace, &encoded_trace);
  572. Status s = trace_writer_->Write(Slice(encoded_trace));
  573. if (!s.ok()) {
  574. trace_write_status_ = s;
  575. }
  576. return s;
  577. }
  578. Status Tracer::Close() { return WriteFooter(); }
  579. } // namespace ROCKSDB_NAMESPACE