replayer_impl.cc 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #include "utilities/trace/replayer_impl.h"
  6. #include <cmath>
  7. #include <thread>
  8. #include "rocksdb/options.h"
  9. #include "rocksdb/slice.h"
  10. #include "rocksdb/system_clock.h"
  11. #include "util/threadpool_imp.h"
  12. namespace ROCKSDB_NAMESPACE {
  13. ReplayerImpl::ReplayerImpl(DB* db,
  14. const std::vector<ColumnFamilyHandle*>& handles,
  15. std::unique_ptr<TraceReader>&& reader)
  16. : Replayer(),
  17. trace_reader_(std::move(reader)),
  18. prepared_(false),
  19. trace_end_(false),
  20. header_ts_(0),
  21. exec_handler_(TraceRecord::NewExecutionHandler(db, handles)),
  22. env_(db->GetEnv()),
  23. trace_file_version_(-1) {}
  24. ReplayerImpl::~ReplayerImpl() {
  25. exec_handler_.reset();
  26. trace_reader_.reset();
  27. }
  28. Status ReplayerImpl::Prepare() {
  29. Trace header;
  30. int db_version;
  31. Status s = ReadHeader(&header);
  32. if (!s.ok()) {
  33. return s;
  34. }
  35. s = TracerHelper::ParseTraceHeader(header, &trace_file_version_, &db_version);
  36. if (!s.ok()) {
  37. return s;
  38. }
  39. header_ts_ = header.ts;
  40. prepared_ = true;
  41. trace_end_ = false;
  42. return Status::OK();
  43. }
  44. Status ReplayerImpl::Next(std::unique_ptr<TraceRecord>* record) {
  45. if (!prepared_) {
  46. return Status::Incomplete("Not prepared!");
  47. }
  48. if (trace_end_) {
  49. return Status::Incomplete("Trace end.");
  50. }
  51. Trace trace;
  52. Status s = ReadTrace(&trace); // ReadTrace is atomic
  53. // Reached the trace end.
  54. if (s.ok() && trace.type == kTraceEnd) {
  55. trace_end_ = true;
  56. return Status::Incomplete("Trace end.");
  57. }
  58. if (!s.ok() || record == nullptr) {
  59. return s;
  60. }
  61. return TracerHelper::DecodeTraceRecord(&trace, trace_file_version_, record);
  62. }
  63. Status ReplayerImpl::Execute(const std::unique_ptr<TraceRecord>& record,
  64. std::unique_ptr<TraceRecordResult>* result) {
  65. return record->Accept(exec_handler_.get(), result);
  66. }
  67. Status ReplayerImpl::Replay(
  68. const ReplayOptions& options,
  69. const std::function<void(Status, std::unique_ptr<TraceRecordResult>&&)>&
  70. result_callback) {
  71. if (options.fast_forward <= 0.0) {
  72. return Status::InvalidArgument("Wrong fast forward speed!");
  73. }
  74. if (!prepared_) {
  75. return Status::Incomplete("Not prepared!");
  76. }
  77. if (trace_end_) {
  78. return Status::Incomplete("Trace end.");
  79. }
  80. Status s = Status::OK();
  81. if (options.num_threads <= 1) {
  82. // num_threads == 0 or num_threads == 1 uses single thread.
  83. std::chrono::system_clock::time_point replay_epoch =
  84. std::chrono::system_clock::now();
  85. while (s.ok()) {
  86. Trace trace;
  87. s = ReadTrace(&trace);
  88. // If already at trace end, ReadTrace should return Status::Incomplete().
  89. if (!s.ok()) {
  90. break;
  91. }
  92. // No need to sleep before breaking the loop if at the trace end.
  93. if (trace.type == kTraceEnd) {
  94. trace_end_ = true;
  95. s = Status::Incomplete("Trace end.");
  96. break;
  97. }
  98. // In single-threaded replay, decode first then sleep.
  99. std::unique_ptr<TraceRecord> record;
  100. s = TracerHelper::DecodeTraceRecord(&trace, trace_file_version_, &record);
  101. if (!s.ok() && !s.IsNotSupported()) {
  102. break;
  103. }
  104. std::chrono::system_clock::time_point sleep_to =
  105. replay_epoch +
  106. std::chrono::microseconds(static_cast<uint64_t>(std::llround(
  107. 1.0 * (trace.ts - header_ts_) / options.fast_forward)));
  108. if (sleep_to > std::chrono::system_clock::now()) {
  109. std::this_thread::sleep_until(sleep_to);
  110. }
  111. // Skip unsupported traces, stop for other errors.
  112. if (s.IsNotSupported()) {
  113. if (result_callback != nullptr) {
  114. result_callback(s, nullptr);
  115. }
  116. s = Status::OK();
  117. continue;
  118. }
  119. if (result_callback == nullptr) {
  120. s = Execute(record, nullptr);
  121. } else {
  122. std::unique_ptr<TraceRecordResult> res;
  123. s = Execute(record, &res);
  124. result_callback(s, std::move(res));
  125. }
  126. }
  127. } else {
  128. // Multi-threaded replay.
  129. ThreadPoolImpl thread_pool;
  130. thread_pool.SetHostEnv(env_);
  131. thread_pool.SetBackgroundThreads(static_cast<int>(options.num_threads));
  132. std::mutex mtx;
  133. // Background decoding and execution status.
  134. Status bg_s = Status::OK();
  135. uint64_t last_err_ts = static_cast<uint64_t>(-1);
  136. // Callback function used in background work to update bg_s for the ealiest
  137. // TraceRecord which has execution error. This is different from the
  138. // timestamp of the first execution error (either start or end timestamp).
  139. //
  140. // Suppose TraceRecord R1, R2, with timestamps T1 < T2. Their execution
  141. // timestamps are T1_start, T1_end, T2_start, T2_end.
  142. // Single-thread: there must be T1_start < T1_end < T2_start < T2_end.
  143. // Multi-thread: T1_start < T2_start may not be enforced. Orders of them are
  144. // totally unknown.
  145. // In order to report the same `first` error in both single-thread and
  146. // multi-thread replay, we can only rely on the TraceRecords' timestamps,
  147. // rather than their executin timestamps. Although in single-thread replay,
  148. // the first error is also the last error, while in multi-thread replay, the
  149. // first error may not be the first error in execution, and it may not be
  150. // the last error in exeution as well.
  151. auto error_cb = [&mtx, &bg_s, &last_err_ts](Status err, uint64_t err_ts) {
  152. std::lock_guard<std::mutex> gd(mtx);
  153. // Only record the first error.
  154. if (!err.ok() && !err.IsNotSupported() && err_ts < last_err_ts) {
  155. bg_s = err;
  156. last_err_ts = err_ts;
  157. }
  158. };
  159. std::chrono::system_clock::time_point replay_epoch =
  160. std::chrono::system_clock::now();
  161. while (bg_s.ok() && s.ok()) {
  162. Trace trace;
  163. s = ReadTrace(&trace);
  164. // If already at trace end, ReadTrace should return Status::Incomplete().
  165. if (!s.ok()) {
  166. break;
  167. }
  168. TraceType trace_type = trace.type;
  169. // No need to sleep before breaking the loop if at the trace end.
  170. if (trace_type == kTraceEnd) {
  171. trace_end_ = true;
  172. s = Status::Incomplete("Trace end.");
  173. break;
  174. }
  175. // In multi-threaded replay, sleep first then start decoding and
  176. // execution in a thread.
  177. std::chrono::system_clock::time_point sleep_to =
  178. replay_epoch +
  179. std::chrono::microseconds(static_cast<uint64_t>(std::llround(
  180. 1.0 * (trace.ts - header_ts_) / options.fast_forward)));
  181. if (sleep_to > std::chrono::system_clock::now()) {
  182. std::this_thread::sleep_until(sleep_to);
  183. }
  184. if (trace_type == kTraceWrite || trace_type == kTraceGet ||
  185. trace_type == kTraceIteratorSeek ||
  186. trace_type == kTraceIteratorSeekForPrev ||
  187. trace_type == kTraceMultiGet) {
  188. std::unique_ptr<ReplayerWorkerArg> ra(new ReplayerWorkerArg);
  189. ra->trace_entry = std::move(trace);
  190. ra->handler = exec_handler_.get();
  191. ra->trace_file_version = trace_file_version_;
  192. ra->error_cb = error_cb;
  193. ra->result_cb = result_callback;
  194. thread_pool.Schedule(&ReplayerImpl::BackgroundWork, ra.release(),
  195. nullptr, nullptr);
  196. } else {
  197. // Skip unsupported traces.
  198. if (result_callback != nullptr) {
  199. result_callback(Status::NotSupported("Unsupported trace type."),
  200. nullptr);
  201. }
  202. }
  203. }
  204. thread_pool.WaitForJobsAndJoinAllThreads();
  205. if (!bg_s.ok()) {
  206. s = bg_s;
  207. }
  208. }
  209. if (s.IsIncomplete()) {
  210. // Reaching eof returns Incomplete status at the moment.
  211. // Could happen when killing a process without calling EndTrace() API.
  212. // TODO: Add better error handling.
  213. trace_end_ = true;
  214. return Status::OK();
  215. }
  216. return s;
  217. }
  218. uint64_t ReplayerImpl::GetHeaderTimestamp() const { return header_ts_; }
  219. Status ReplayerImpl::ReadHeader(Trace* header) {
  220. assert(header != nullptr);
  221. Status s = trace_reader_->Reset();
  222. if (!s.ok()) {
  223. return s;
  224. }
  225. std::string encoded_trace;
  226. // Read the trace head
  227. s = trace_reader_->Read(&encoded_trace);
  228. if (!s.ok()) {
  229. return s;
  230. }
  231. return TracerHelper::DecodeHeader(encoded_trace, header);
  232. }
  233. Status ReplayerImpl::ReadTrace(Trace* trace) {
  234. assert(trace != nullptr);
  235. std::string encoded_trace;
  236. // We don't know if TraceReader is implemented thread-safe, so we protect the
  237. // reading trace part with a mutex. The decoding part does not need to be
  238. // protected since it's local.
  239. {
  240. std::lock_guard<std::mutex> guard(mutex_);
  241. Status s = trace_reader_->Read(&encoded_trace);
  242. if (!s.ok()) {
  243. return s;
  244. }
  245. }
  246. return TracerHelper::DecodeTrace(encoded_trace, trace);
  247. }
  248. void ReplayerImpl::BackgroundWork(void* arg) {
  249. std::unique_ptr<ReplayerWorkerArg> ra(static_cast<ReplayerWorkerArg*>(arg));
  250. assert(ra != nullptr);
  251. std::unique_ptr<TraceRecord> record;
  252. Status s = TracerHelper::DecodeTraceRecord(&(ra->trace_entry),
  253. ra->trace_file_version, &record);
  254. if (!s.ok()) {
  255. // Stop the replay
  256. if (ra->error_cb != nullptr) {
  257. ra->error_cb(s, ra->trace_entry.ts);
  258. }
  259. // Report the result
  260. if (ra->result_cb != nullptr) {
  261. ra->result_cb(s, nullptr);
  262. }
  263. return;
  264. }
  265. if (ra->result_cb == nullptr) {
  266. s = record->Accept(ra->handler, nullptr);
  267. } else {
  268. std::unique_ptr<TraceRecordResult> res;
  269. s = record->Accept(ra->handler, &res);
  270. ra->result_cb(s, std::move(res));
  271. }
  272. record.reset();
  273. }
  274. } // namespace ROCKSDB_NAMESPACE