file_trace_reader_writer.cc 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #include "utilities/trace/file_trace_reader_writer.h"
  6. #include "env/composite_env_wrapper.h"
  7. #include "file/random_access_file_reader.h"
  8. #include "file/writable_file_writer.h"
  9. #include "trace_replay/trace_replay.h"
  10. #include "util/coding.h"
  11. namespace ROCKSDB_NAMESPACE {
  12. const unsigned int FileTraceReader::kBufferSize = 1024; // 1KB
  13. FileTraceReader::FileTraceReader(
  14. std::unique_ptr<RandomAccessFileReader>&& reader)
  15. : file_reader_(std::move(reader)),
  16. offset_(0),
  17. buffer_(new char[kBufferSize]) {}
  18. FileTraceReader::~FileTraceReader() {
  19. Close().PermitUncheckedError();
  20. delete[] buffer_;
  21. }
  22. Status FileTraceReader::Close() {
  23. file_reader_.reset();
  24. return Status::OK();
  25. }
  26. Status FileTraceReader::Reset() {
  27. if (file_reader_ == nullptr) {
  28. return Status::IOError("TraceReader is closed.");
  29. }
  30. offset_ = 0;
  31. return Status::OK();
  32. }
  33. Status FileTraceReader::Read(std::string* data) {
  34. assert(file_reader_ != nullptr);
  35. Status s = file_reader_->Read(IOOptions(), offset_, kTraceMetadataSize,
  36. &result_, buffer_, nullptr);
  37. if (!s.ok()) {
  38. return s;
  39. }
  40. if (result_.size() == 0) {
  41. // No more data to read
  42. // Todo: Come up with a better way to indicate end of data. May be this
  43. // could be avoided once footer is introduced.
  44. return Status::Incomplete();
  45. }
  46. if (result_.size() < kTraceMetadataSize) {
  47. return Status::Corruption("Corrupted trace file.");
  48. }
  49. *data = result_.ToString();
  50. offset_ += kTraceMetadataSize;
  51. uint32_t payload_len =
  52. DecodeFixed32(&buffer_[kTraceTimestampSize + kTraceTypeSize]);
  53. // Read Payload
  54. unsigned int bytes_to_read = payload_len;
  55. unsigned int to_read =
  56. bytes_to_read > kBufferSize ? kBufferSize : bytes_to_read;
  57. while (to_read > 0) {
  58. s = file_reader_->Read(IOOptions(), offset_, to_read, &result_, buffer_,
  59. nullptr);
  60. if (!s.ok()) {
  61. return s;
  62. }
  63. if (result_.size() < to_read) {
  64. return Status::Corruption("Corrupted trace file.");
  65. }
  66. data->append(result_.data(), result_.size());
  67. offset_ += to_read;
  68. bytes_to_read -= to_read;
  69. to_read = bytes_to_read > kBufferSize ? kBufferSize : bytes_to_read;
  70. }
  71. return s;
  72. }
  73. FileTraceWriter::FileTraceWriter(
  74. std::unique_ptr<WritableFileWriter>&& file_writer)
  75. : file_writer_(std::move(file_writer)) {}
  76. FileTraceWriter::~FileTraceWriter() { Close().PermitUncheckedError(); }
  77. Status FileTraceWriter::Close() {
  78. file_writer_.reset();
  79. return Status::OK();
  80. }
  81. Status FileTraceWriter::Write(const Slice& data) {
  82. return file_writer_->Append(IOOptions(), data);
  83. }
  84. uint64_t FileTraceWriter::GetFileSize() { return file_writer_->GetFileSize(); }
  85. Status NewFileTraceReader(Env* env, const EnvOptions& env_options,
  86. const std::string& trace_filename,
  87. std::unique_ptr<TraceReader>* trace_reader) {
  88. std::unique_ptr<RandomAccessFileReader> file_reader;
  89. Status s = RandomAccessFileReader::Create(
  90. env->GetFileSystem(), trace_filename, FileOptions(env_options),
  91. &file_reader, nullptr);
  92. if (!s.ok()) {
  93. return s;
  94. }
  95. trace_reader->reset(new FileTraceReader(std::move(file_reader)));
  96. return s;
  97. }
  98. Status NewFileTraceWriter(Env* env, const EnvOptions& env_options,
  99. const std::string& trace_filename,
  100. std::unique_ptr<TraceWriter>* trace_writer) {
  101. std::unique_ptr<WritableFileWriter> file_writer;
  102. Status s = WritableFileWriter::Create(env->GetFileSystem(), trace_filename,
  103. FileOptions(env_options), &file_writer,
  104. nullptr);
  105. if (!s.ok()) {
  106. return s;
  107. }
  108. trace_writer->reset(new FileTraceWriter(std::move(file_writer)));
  109. return s;
  110. }
  111. } // namespace ROCKSDB_NAMESPACE