replayer_impl.h 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #pragma once
  6. #include <atomic>
  7. #include <functional>
  8. #include <memory>
  9. #include <mutex>
  10. #include <unordered_map>
  11. #include "rocksdb/db.h"
  12. #include "rocksdb/env.h"
  13. #include "rocksdb/status.h"
  14. #include "rocksdb/trace_reader_writer.h"
  15. #include "rocksdb/trace_record.h"
  16. #include "rocksdb/trace_record_result.h"
  17. #include "rocksdb/utilities/replayer.h"
  18. #include "trace_replay/trace_replay.h"
  19. namespace ROCKSDB_NAMESPACE {
  20. class ReplayerImpl : public Replayer {
  21. public:
  22. ReplayerImpl(DB* db, const std::vector<ColumnFamilyHandle*>& handles,
  23. std::unique_ptr<TraceReader>&& reader);
  24. ~ReplayerImpl() override;
  25. using Replayer::Prepare;
  26. Status Prepare() override;
  27. using Replayer::Next;
  28. Status Next(std::unique_ptr<TraceRecord>* record) override;
  29. using Replayer::Execute;
  30. Status Execute(const std::unique_ptr<TraceRecord>& record,
  31. std::unique_ptr<TraceRecordResult>* result) override;
  32. using Replayer::Replay;
  33. Status Replay(
  34. const ReplayOptions& options,
  35. const std::function<void(Status, std::unique_ptr<TraceRecordResult>&&)>&
  36. result_callback) override;
  37. using Replayer::GetHeaderTimestamp;
  38. uint64_t GetHeaderTimestamp() const override;
  39. private:
  40. Status ReadHeader(Trace* header);
  41. Status ReadTrace(Trace* trace);
  42. // Generic function to execute a Trace in a thread pool.
  43. static void BackgroundWork(void* arg);
  44. std::unique_ptr<TraceReader> trace_reader_;
  45. std::mutex mutex_;
  46. std::atomic<bool> prepared_;
  47. std::atomic<bool> trace_end_;
  48. uint64_t header_ts_;
  49. std::unique_ptr<TraceRecord::Handler> exec_handler_;
  50. Env* env_;
  51. // When reading the trace header, the trace file version can be parsed.
  52. // Replayer will use different decode method to get the trace content based
  53. // on different trace file version.
  54. int trace_file_version_;
  55. };
  56. // Arguments passed to BackgroundWork() for replaying in a thread pool.
  57. struct ReplayerWorkerArg {
  58. Trace trace_entry;
  59. int trace_file_version;
  60. // Handler to execute TraceRecord.
  61. TraceRecord::Handler* handler;
  62. // Callback function to report the error status and the timestamp of the
  63. // TraceRecord (not the start/end timestamp of executing the TraceRecord).
  64. std::function<void(Status, uint64_t)> error_cb;
  65. // Callback function to report the trace execution status and operation
  66. // execution status/result(s).
  67. std::function<void(Status, std::unique_ptr<TraceRecordResult>&&)> result_cb;
  68. };
  69. } // namespace ROCKSDB_NAMESPACE