trace_replay.cc 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #include "trace_replay/trace_replay.h"
  6. #include <chrono>
  7. #include <sstream>
  8. #include <thread>
  9. #include "db/db_impl/db_impl.h"
  10. #include "rocksdb/slice.h"
  11. #include "rocksdb/write_batch.h"
  12. #include "util/coding.h"
  13. #include "util/string_util.h"
  14. #include "util/threadpool_imp.h"
  15. namespace ROCKSDB_NAMESPACE {
  16. const std::string kTraceMagic = "feedcafedeadbeef";
  17. namespace {
  18. void EncodeCFAndKey(std::string* dst, uint32_t cf_id, const Slice& key) {
  19. PutFixed32(dst, cf_id);
  20. PutLengthPrefixedSlice(dst, key);
  21. }
  22. void DecodeCFAndKey(std::string& buffer, uint32_t* cf_id, Slice* key) {
  23. Slice buf(buffer);
  24. GetFixed32(&buf, cf_id);
  25. GetLengthPrefixedSlice(&buf, key);
  26. }
  27. } // namespace
  28. void TracerHelper::EncodeTrace(const Trace& trace, std::string* encoded_trace) {
  29. assert(encoded_trace);
  30. PutFixed64(encoded_trace, trace.ts);
  31. encoded_trace->push_back(trace.type);
  32. PutFixed32(encoded_trace, static_cast<uint32_t>(trace.payload.size()));
  33. encoded_trace->append(trace.payload);
  34. }
  35. Status TracerHelper::DecodeTrace(const std::string& encoded_trace,
  36. Trace* trace) {
  37. assert(trace != nullptr);
  38. Slice enc_slice = Slice(encoded_trace);
  39. if (!GetFixed64(&enc_slice, &trace->ts)) {
  40. return Status::Incomplete("Decode trace string failed");
  41. }
  42. if (enc_slice.size() < kTraceTypeSize + kTracePayloadLengthSize) {
  43. return Status::Incomplete("Decode trace string failed");
  44. }
  45. trace->type = static_cast<TraceType>(enc_slice[0]);
  46. enc_slice.remove_prefix(kTraceTypeSize + kTracePayloadLengthSize);
  47. trace->payload = enc_slice.ToString();
  48. return Status::OK();
  49. }
  50. Tracer::Tracer(Env* env, const TraceOptions& trace_options,
  51. std::unique_ptr<TraceWriter>&& trace_writer)
  52. : env_(env),
  53. trace_options_(trace_options),
  54. trace_writer_(std::move(trace_writer)),
  55. trace_request_count_ (0) {
  56. WriteHeader();
  57. }
  58. Tracer::~Tracer() { trace_writer_.reset(); }
  59. Status Tracer::Write(WriteBatch* write_batch) {
  60. TraceType trace_type = kTraceWrite;
  61. if (ShouldSkipTrace(trace_type)) {
  62. return Status::OK();
  63. }
  64. Trace trace;
  65. trace.ts = env_->NowMicros();
  66. trace.type = trace_type;
  67. trace.payload = write_batch->Data();
  68. return WriteTrace(trace);
  69. }
  70. Status Tracer::Get(ColumnFamilyHandle* column_family, const Slice& key) {
  71. TraceType trace_type = kTraceGet;
  72. if (ShouldSkipTrace(trace_type)) {
  73. return Status::OK();
  74. }
  75. Trace trace;
  76. trace.ts = env_->NowMicros();
  77. trace.type = trace_type;
  78. EncodeCFAndKey(&trace.payload, column_family->GetID(), key);
  79. return WriteTrace(trace);
  80. }
  81. Status Tracer::IteratorSeek(const uint32_t& cf_id, const Slice& key) {
  82. TraceType trace_type = kTraceIteratorSeek;
  83. if (ShouldSkipTrace(trace_type)) {
  84. return Status::OK();
  85. }
  86. Trace trace;
  87. trace.ts = env_->NowMicros();
  88. trace.type = trace_type;
  89. EncodeCFAndKey(&trace.payload, cf_id, key);
  90. return WriteTrace(trace);
  91. }
  92. Status Tracer::IteratorSeekForPrev(const uint32_t& cf_id, const Slice& key) {
  93. TraceType trace_type = kTraceIteratorSeekForPrev;
  94. if (ShouldSkipTrace(trace_type)) {
  95. return Status::OK();
  96. }
  97. Trace trace;
  98. trace.ts = env_->NowMicros();
  99. trace.type = trace_type;
  100. EncodeCFAndKey(&trace.payload, cf_id, key);
  101. return WriteTrace(trace);
  102. }
  103. bool Tracer::ShouldSkipTrace(const TraceType& trace_type) {
  104. if (IsTraceFileOverMax()) {
  105. return true;
  106. }
  107. if ((trace_options_.filter & kTraceFilterGet
  108. && trace_type == kTraceGet)
  109. || (trace_options_.filter & kTraceFilterWrite
  110. && trace_type == kTraceWrite)) {
  111. return true;
  112. }
  113. ++trace_request_count_;
  114. if (trace_request_count_ < trace_options_.sampling_frequency) {
  115. return true;
  116. }
  117. trace_request_count_ = 0;
  118. return false;
  119. }
  120. bool Tracer::IsTraceFileOverMax() {
  121. uint64_t trace_file_size = trace_writer_->GetFileSize();
  122. return (trace_file_size > trace_options_.max_trace_file_size);
  123. }
  124. Status Tracer::WriteHeader() {
  125. std::ostringstream s;
  126. s << kTraceMagic << "\t"
  127. << "Trace Version: 0.1\t"
  128. << "RocksDB Version: " << kMajorVersion << "." << kMinorVersion << "\t"
  129. << "Format: Timestamp OpType Payload\n";
  130. std::string header(s.str());
  131. Trace trace;
  132. trace.ts = env_->NowMicros();
  133. trace.type = kTraceBegin;
  134. trace.payload = header;
  135. return WriteTrace(trace);
  136. }
  137. Status Tracer::WriteFooter() {
  138. Trace trace;
  139. trace.ts = env_->NowMicros();
  140. trace.type = kTraceEnd;
  141. trace.payload = "";
  142. return WriteTrace(trace);
  143. }
  144. Status Tracer::WriteTrace(const Trace& trace) {
  145. std::string encoded_trace;
  146. TracerHelper::EncodeTrace(trace, &encoded_trace);
  147. return trace_writer_->Write(Slice(encoded_trace));
  148. }
  149. Status Tracer::Close() { return WriteFooter(); }
  150. Replayer::Replayer(DB* db, const std::vector<ColumnFamilyHandle*>& handles,
  151. std::unique_ptr<TraceReader>&& reader)
  152. : trace_reader_(std::move(reader)) {
  153. assert(db != nullptr);
  154. db_ = static_cast<DBImpl*>(db->GetRootDB());
  155. env_ = Env::Default();
  156. for (ColumnFamilyHandle* cfh : handles) {
  157. cf_map_[cfh->GetID()] = cfh;
  158. }
  159. fast_forward_ = 1;
  160. }
  161. Replayer::~Replayer() { trace_reader_.reset(); }
  162. Status Replayer::SetFastForward(uint32_t fast_forward) {
  163. Status s;
  164. if (fast_forward < 1) {
  165. s = Status::InvalidArgument("Wrong fast forward speed!");
  166. } else {
  167. fast_forward_ = fast_forward;
  168. s = Status::OK();
  169. }
  170. return s;
  171. }
  172. Status Replayer::Replay() {
  173. Status s;
  174. Trace header;
  175. s = ReadHeader(&header);
  176. if (!s.ok()) {
  177. return s;
  178. }
  179. std::chrono::system_clock::time_point replay_epoch =
  180. std::chrono::system_clock::now();
  181. WriteOptions woptions;
  182. ReadOptions roptions;
  183. Trace trace;
  184. uint64_t ops = 0;
  185. Iterator* single_iter = nullptr;
  186. while (s.ok()) {
  187. trace.reset();
  188. s = ReadTrace(&trace);
  189. if (!s.ok()) {
  190. break;
  191. }
  192. std::this_thread::sleep_until(
  193. replay_epoch +
  194. std::chrono::microseconds((trace.ts - header.ts) / fast_forward_));
  195. if (trace.type == kTraceWrite) {
  196. WriteBatch batch(trace.payload);
  197. db_->Write(woptions, &batch);
  198. ops++;
  199. } else if (trace.type == kTraceGet) {
  200. uint32_t cf_id = 0;
  201. Slice key;
  202. DecodeCFAndKey(trace.payload, &cf_id, &key);
  203. if (cf_id > 0 && cf_map_.find(cf_id) == cf_map_.end()) {
  204. return Status::Corruption("Invalid Column Family ID.");
  205. }
  206. std::string value;
  207. if (cf_id == 0) {
  208. db_->Get(roptions, key, &value);
  209. } else {
  210. db_->Get(roptions, cf_map_[cf_id], key, &value);
  211. }
  212. ops++;
  213. } else if (trace.type == kTraceIteratorSeek) {
  214. uint32_t cf_id = 0;
  215. Slice key;
  216. DecodeCFAndKey(trace.payload, &cf_id, &key);
  217. if (cf_id > 0 && cf_map_.find(cf_id) == cf_map_.end()) {
  218. return Status::Corruption("Invalid Column Family ID.");
  219. }
  220. if (cf_id == 0) {
  221. single_iter = db_->NewIterator(roptions);
  222. } else {
  223. single_iter = db_->NewIterator(roptions, cf_map_[cf_id]);
  224. }
  225. single_iter->Seek(key);
  226. ops++;
  227. delete single_iter;
  228. } else if (trace.type == kTraceIteratorSeekForPrev) {
  229. // Currently, only support to call the Seek()
  230. uint32_t cf_id = 0;
  231. Slice key;
  232. DecodeCFAndKey(trace.payload, &cf_id, &key);
  233. if (cf_id > 0 && cf_map_.find(cf_id) == cf_map_.end()) {
  234. return Status::Corruption("Invalid Column Family ID.");
  235. }
  236. if (cf_id == 0) {
  237. single_iter = db_->NewIterator(roptions);
  238. } else {
  239. single_iter = db_->NewIterator(roptions, cf_map_[cf_id]);
  240. }
  241. single_iter->SeekForPrev(key);
  242. ops++;
  243. delete single_iter;
  244. } else if (trace.type == kTraceEnd) {
  245. // Do nothing for now.
  246. // TODO: Add some validations later.
  247. break;
  248. }
  249. }
  250. if (s.IsIncomplete()) {
  251. // Reaching eof returns Incomplete status at the moment.
  252. // Could happen when killing a process without calling EndTrace() API.
  253. // TODO: Add better error handling.
  254. return Status::OK();
  255. }
  256. return s;
  257. }
  258. // The trace can be replayed with multithread by configurnge the number of
  259. // threads in the thread pool. Trace records are read from the trace file
  260. // sequentially and the corresponding queries are scheduled in the task
  261. // queue based on the timestamp. Currently, we support Write_batch (Put,
  262. // Delete, SingleDelete, DeleteRange), Get, Iterator (Seek and SeekForPrev).
  263. Status Replayer::MultiThreadReplay(uint32_t threads_num) {
  264. Status s;
  265. Trace header;
  266. s = ReadHeader(&header);
  267. if (!s.ok()) {
  268. return s;
  269. }
  270. ThreadPoolImpl thread_pool;
  271. thread_pool.SetHostEnv(env_);
  272. if (threads_num > 1) {
  273. thread_pool.SetBackgroundThreads(static_cast<int>(threads_num));
  274. } else {
  275. thread_pool.SetBackgroundThreads(1);
  276. }
  277. std::chrono::system_clock::time_point replay_epoch =
  278. std::chrono::system_clock::now();
  279. WriteOptions woptions;
  280. ReadOptions roptions;
  281. uint64_t ops = 0;
  282. while (s.ok()) {
  283. std::unique_ptr<ReplayerWorkerArg> ra(new ReplayerWorkerArg);
  284. ra->db = db_;
  285. s = ReadTrace(&(ra->trace_entry));
  286. if (!s.ok()) {
  287. break;
  288. }
  289. ra->woptions = woptions;
  290. ra->roptions = roptions;
  291. std::this_thread::sleep_until(
  292. replay_epoch + std::chrono::microseconds(
  293. (ra->trace_entry.ts - header.ts) / fast_forward_));
  294. if (ra->trace_entry.type == kTraceWrite) {
  295. thread_pool.Schedule(&Replayer::BGWorkWriteBatch, ra.release(), nullptr,
  296. nullptr);
  297. ops++;
  298. } else if (ra->trace_entry.type == kTraceGet) {
  299. thread_pool.Schedule(&Replayer::BGWorkGet, ra.release(), nullptr,
  300. nullptr);
  301. ops++;
  302. } else if (ra->trace_entry.type == kTraceIteratorSeek) {
  303. thread_pool.Schedule(&Replayer::BGWorkIterSeek, ra.release(), nullptr,
  304. nullptr);
  305. ops++;
  306. } else if (ra->trace_entry.type == kTraceIteratorSeekForPrev) {
  307. thread_pool.Schedule(&Replayer::BGWorkIterSeekForPrev, ra.release(),
  308. nullptr, nullptr);
  309. ops++;
  310. } else if (ra->trace_entry.type == kTraceEnd) {
  311. // Do nothing for now.
  312. // TODO: Add some validations later.
  313. break;
  314. } else {
  315. // Other trace entry types that are not implemented for replay.
  316. // To finish the replay, we continue the process.
  317. continue;
  318. }
  319. }
  320. if (s.IsIncomplete()) {
  321. // Reaching eof returns Incomplete status at the moment.
  322. // Could happen when killing a process without calling EndTrace() API.
  323. // TODO: Add better error handling.
  324. s = Status::OK();
  325. }
  326. thread_pool.JoinAllThreads();
  327. return s;
  328. }
  329. Status Replayer::ReadHeader(Trace* header) {
  330. assert(header != nullptr);
  331. Status s = ReadTrace(header);
  332. if (!s.ok()) {
  333. return s;
  334. }
  335. if (header->type != kTraceBegin) {
  336. return Status::Corruption("Corrupted trace file. Incorrect header.");
  337. }
  338. if (header->payload.substr(0, kTraceMagic.length()) != kTraceMagic) {
  339. return Status::Corruption("Corrupted trace file. Incorrect magic.");
  340. }
  341. return s;
  342. }
  343. Status Replayer::ReadFooter(Trace* footer) {
  344. assert(footer != nullptr);
  345. Status s = ReadTrace(footer);
  346. if (!s.ok()) {
  347. return s;
  348. }
  349. if (footer->type != kTraceEnd) {
  350. return Status::Corruption("Corrupted trace file. Incorrect footer.");
  351. }
  352. // TODO: Add more validations later
  353. return s;
  354. }
  355. Status Replayer::ReadTrace(Trace* trace) {
  356. assert(trace != nullptr);
  357. std::string encoded_trace;
  358. Status s = trace_reader_->Read(&encoded_trace);
  359. if (!s.ok()) {
  360. return s;
  361. }
  362. return TracerHelper::DecodeTrace(encoded_trace, trace);
  363. }
  364. void Replayer::BGWorkGet(void* arg) {
  365. std::unique_ptr<ReplayerWorkerArg> ra(
  366. reinterpret_cast<ReplayerWorkerArg*>(arg));
  367. auto cf_map = static_cast<std::unordered_map<uint32_t, ColumnFamilyHandle*>*>(
  368. ra->cf_map);
  369. uint32_t cf_id = 0;
  370. Slice key;
  371. DecodeCFAndKey(ra->trace_entry.payload, &cf_id, &key);
  372. if (cf_id > 0 && cf_map->find(cf_id) == cf_map->end()) {
  373. return;
  374. }
  375. std::string value;
  376. if (cf_id == 0) {
  377. ra->db->Get(ra->roptions, key, &value);
  378. } else {
  379. ra->db->Get(ra->roptions, (*cf_map)[cf_id], key, &value);
  380. }
  381. return;
  382. }
  383. void Replayer::BGWorkWriteBatch(void* arg) {
  384. std::unique_ptr<ReplayerWorkerArg> ra(
  385. reinterpret_cast<ReplayerWorkerArg*>(arg));
  386. WriteBatch batch(ra->trace_entry.payload);
  387. ra->db->Write(ra->woptions, &batch);
  388. return;
  389. }
  390. void Replayer::BGWorkIterSeek(void* arg) {
  391. std::unique_ptr<ReplayerWorkerArg> ra(
  392. reinterpret_cast<ReplayerWorkerArg*>(arg));
  393. auto cf_map = static_cast<std::unordered_map<uint32_t, ColumnFamilyHandle*>*>(
  394. ra->cf_map);
  395. uint32_t cf_id = 0;
  396. Slice key;
  397. DecodeCFAndKey(ra->trace_entry.payload, &cf_id, &key);
  398. if (cf_id > 0 && cf_map->find(cf_id) == cf_map->end()) {
  399. return;
  400. }
  401. std::string value;
  402. Iterator* single_iter = nullptr;
  403. if (cf_id == 0) {
  404. single_iter = ra->db->NewIterator(ra->roptions);
  405. } else {
  406. single_iter = ra->db->NewIterator(ra->roptions, (*cf_map)[cf_id]);
  407. }
  408. single_iter->Seek(key);
  409. delete single_iter;
  410. return;
  411. }
  412. void Replayer::BGWorkIterSeekForPrev(void* arg) {
  413. std::unique_ptr<ReplayerWorkerArg> ra(
  414. reinterpret_cast<ReplayerWorkerArg*>(arg));
  415. auto cf_map = static_cast<std::unordered_map<uint32_t, ColumnFamilyHandle*>*>(
  416. ra->cf_map);
  417. uint32_t cf_id = 0;
  418. Slice key;
  419. DecodeCFAndKey(ra->trace_entry.payload, &cf_id, &key);
  420. if (cf_id > 0 && cf_map->find(cf_id) == cf_map->end()) {
  421. return;
  422. }
  423. std::string value;
  424. Iterator* single_iter = nullptr;
  425. if (cf_id == 0) {
  426. single_iter = ra->db->NewIterator(ra->roptions);
  427. } else {
  428. single_iter = ra->db->NewIterator(ra->roptions, (*cf_map)[cf_id]);
  429. }
  430. single_iter->SeekForPrev(key);
  431. delete single_iter;
  432. return;
  433. }
  434. } // namespace ROCKSDB_NAMESPACE