transaction_log_impl.h 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #pragma once
  6. #include <vector>
  7. #include "db/log_reader.h"
  8. #include "db/version_set.h"
  9. #include "file/filename.h"
  10. #include "logging/logging.h"
  11. #include "options/db_options.h"
  12. #include "port/port.h"
  13. #include "rocksdb/env.h"
  14. #include "rocksdb/options.h"
  15. #include "rocksdb/transaction_log.h"
  16. #include "rocksdb/types.h"
  17. namespace ROCKSDB_NAMESPACE {
  18. class WalFileImpl : public WalFile {
  19. public:
  20. WalFileImpl(uint64_t logNum, WalFileType logType, SequenceNumber startSeq,
  21. uint64_t sizeBytes)
  22. : logNumber_(logNum),
  23. type_(logType),
  24. startSequence_(startSeq),
  25. sizeFileBytes_(sizeBytes) {}
  26. std::string PathName() const override {
  27. if (type_ == kArchivedLogFile) {
  28. return ArchivedLogFileName("", logNumber_);
  29. }
  30. return LogFileName("", logNumber_);
  31. }
  32. uint64_t LogNumber() const override { return logNumber_; }
  33. WalFileType Type() const override { return type_; }
  34. SequenceNumber StartSequence() const override { return startSequence_; }
  35. uint64_t SizeFileBytes() const override { return sizeFileBytes_; }
  36. bool operator<(const WalFile& that) const {
  37. return LogNumber() < that.LogNumber();
  38. }
  39. private:
  40. uint64_t logNumber_;
  41. WalFileType type_;
  42. SequenceNumber startSequence_;
  43. uint64_t sizeFileBytes_;
  44. };
  45. class TransactionLogIteratorImpl : public TransactionLogIterator {
  46. public:
  47. TransactionLogIteratorImpl(
  48. const std::string& dir, const ImmutableDBOptions* options,
  49. const TransactionLogIterator::ReadOptions& read_options,
  50. const EnvOptions& soptions, const SequenceNumber seqNum,
  51. std::unique_ptr<VectorWalPtr> files, VersionSet const* const versions,
  52. const bool seq_per_batch, const std::shared_ptr<IOTracer>& io_tracer);
  53. bool Valid() override;
  54. void Next() override;
  55. Status status() override;
  56. BatchResult GetBatch() override;
  57. private:
  58. const std::string& dir_;
  59. const ImmutableDBOptions* options_;
  60. const TransactionLogIterator::ReadOptions read_options_;
  61. const EnvOptions& soptions_;
  62. SequenceNumber starting_sequence_number_;
  63. std::unique_ptr<VectorWalPtr> files_;
  64. // Used only to get latest seq. num
  65. // TODO(icanadi) can this be just a callback?
  66. VersionSet const* const versions_;
  67. const bool seq_per_batch_;
  68. std::shared_ptr<IOTracer> io_tracer_;
  69. // State variables
  70. bool started_;
  71. bool is_valid_; // not valid when it starts of.
  72. Status current_status_;
  73. size_t current_file_index_;
  74. std::unique_ptr<WriteBatch> current_batch_;
  75. std::unique_ptr<log::Reader> current_log_reader_;
  76. std::string scratch_;
  77. Status OpenLogFile(const WalFile* log_file,
  78. std::unique_ptr<SequentialFileReader>* file);
  79. struct LogReporter : public log::Reader::Reporter {
  80. Env* env;
  81. Logger* info_log;
  82. void Corruption(size_t bytes, const Status& s,
  83. uint64_t /*log_number*/ = kMaxSequenceNumber) override {
  84. ROCKS_LOG_ERROR(info_log, "dropping %" ROCKSDB_PRIszt " bytes; %s", bytes,
  85. s.ToString().c_str());
  86. }
  87. virtual void Info(const char* s) { ROCKS_LOG_INFO(info_log, "%s", s); }
  88. } reporter_;
  89. SequenceNumber
  90. current_batch_seq_; // sequence number at start of current batch
  91. SequenceNumber current_last_seq_; // last sequence in the current batch
  92. // Reads from transaction log only if the writebatch record has been written
  93. bool RestrictedRead(Slice* record);
  94. // Seeks to starting_sequence_number_ reading from start_file_index in files_.
  95. // If strict is set, then must get a batch starting with
  96. // starting_sequence_number_.
  97. void SeekToStartSequence(uint64_t start_file_index = 0, bool strict = false);
  98. // Implementation of Next. SeekToStartSequence calls it internally with
  99. // internal=true to let it find next entry even if it has to jump gaps because
  100. // the iterator may start off from the first available entry but promises to
  101. // be continuous after that
  102. void NextImpl(bool internal = false);
  103. // Check if batch is expected, else return false
  104. bool IsBatchExpected(const WriteBatch* batch, SequenceNumber expected_seq);
  105. // Update current batch if a continuous batch is found.
  106. void UpdateCurrentWriteBatch(const Slice& record);
  107. Status OpenLogReader(const WalFile* file);
  108. };
  109. } // namespace ROCKSDB_NAMESPACE