file_trace_reader_writer.cc 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #include "utilities/trace/file_trace_reader_writer.h"
  6. #include "env/composite_env_wrapper.h"
  7. #include "file/random_access_file_reader.h"
  8. #include "file/writable_file_writer.h"
  9. #include "trace_replay/trace_replay.h"
  10. #include "util/coding.h"
  11. namespace ROCKSDB_NAMESPACE {
  12. const unsigned int FileTraceReader::kBufferSize = 1024; // 1KB
  13. FileTraceReader::FileTraceReader(
  14. std::unique_ptr<RandomAccessFileReader>&& reader)
  15. : file_reader_(std::move(reader)),
  16. offset_(0),
  17. buffer_(new char[kBufferSize]) {}
  18. FileTraceReader::~FileTraceReader() {
  19. Close();
  20. delete[] buffer_;
  21. }
  22. Status FileTraceReader::Close() {
  23. file_reader_.reset();
  24. return Status::OK();
  25. }
  26. Status FileTraceReader::Read(std::string* data) {
  27. assert(file_reader_ != nullptr);
  28. Status s = file_reader_->Read(offset_, kTraceMetadataSize, &result_, buffer_);
  29. if (!s.ok()) {
  30. return s;
  31. }
  32. if (result_.size() == 0) {
  33. // No more data to read
  34. // Todo: Come up with a better way to indicate end of data. May be this
  35. // could be avoided once footer is introduced.
  36. return Status::Incomplete();
  37. }
  38. if (result_.size() < kTraceMetadataSize) {
  39. return Status::Corruption("Corrupted trace file.");
  40. }
  41. *data = result_.ToString();
  42. offset_ += kTraceMetadataSize;
  43. uint32_t payload_len =
  44. DecodeFixed32(&buffer_[kTraceTimestampSize + kTraceTypeSize]);
  45. // Read Payload
  46. unsigned int bytes_to_read = payload_len;
  47. unsigned int to_read =
  48. bytes_to_read > kBufferSize ? kBufferSize : bytes_to_read;
  49. while (to_read > 0) {
  50. s = file_reader_->Read(offset_, to_read, &result_, buffer_);
  51. if (!s.ok()) {
  52. return s;
  53. }
  54. if (result_.size() < to_read) {
  55. return Status::Corruption("Corrupted trace file.");
  56. }
  57. data->append(result_.data(), result_.size());
  58. offset_ += to_read;
  59. bytes_to_read -= to_read;
  60. to_read = bytes_to_read > kBufferSize ? kBufferSize : bytes_to_read;
  61. }
  62. return s;
  63. }
  64. FileTraceWriter::~FileTraceWriter() { Close(); }
  65. Status FileTraceWriter::Close() {
  66. file_writer_.reset();
  67. return Status::OK();
  68. }
  69. Status FileTraceWriter::Write(const Slice& data) {
  70. return file_writer_->Append(data);
  71. }
  72. uint64_t FileTraceWriter::GetFileSize() { return file_writer_->GetFileSize(); }
  73. Status NewFileTraceReader(Env* env, const EnvOptions& env_options,
  74. const std::string& trace_filename,
  75. std::unique_ptr<TraceReader>* trace_reader) {
  76. std::unique_ptr<RandomAccessFile> trace_file;
  77. Status s = env->NewRandomAccessFile(trace_filename, &trace_file, env_options);
  78. if (!s.ok()) {
  79. return s;
  80. }
  81. std::unique_ptr<RandomAccessFileReader> file_reader;
  82. file_reader.reset(new RandomAccessFileReader(
  83. NewLegacyRandomAccessFileWrapper(trace_file), trace_filename));
  84. trace_reader->reset(new FileTraceReader(std::move(file_reader)));
  85. return s;
  86. }
  87. Status NewFileTraceWriter(Env* env, const EnvOptions& env_options,
  88. const std::string& trace_filename,
  89. std::unique_ptr<TraceWriter>* trace_writer) {
  90. std::unique_ptr<WritableFile> trace_file;
  91. Status s = env->NewWritableFile(trace_filename, &trace_file, env_options);
  92. if (!s.ok()) {
  93. return s;
  94. }
  95. std::unique_ptr<WritableFileWriter> file_writer;
  96. file_writer.reset(new WritableFileWriter(
  97. NewLegacyWritableFileWrapper(std::move(trace_file)), trace_filename,
  98. env_options));
  99. trace_writer->reset(new FileTraceWriter(std::move(file_writer)));
  100. return s;
  101. }
  102. } // namespace ROCKSDB_NAMESPACE