transaction_log_impl.cc 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #include "db/transaction_log_impl.h"
  6. #include <cinttypes>
  7. #include "db/write_batch_internal.h"
  8. #include "file/sequence_file_reader.h"
  9. #include "util/defer.h"
  10. namespace ROCKSDB_NAMESPACE {
  11. TransactionLogIteratorImpl::TransactionLogIteratorImpl(
  12. const std::string& dir, const ImmutableDBOptions* options,
  13. const TransactionLogIterator::ReadOptions& read_options,
  14. const EnvOptions& soptions, const SequenceNumber seq,
  15. std::unique_ptr<VectorWalPtr> files, VersionSet const* const versions,
  16. const bool seq_per_batch, const std::shared_ptr<IOTracer>& io_tracer)
  17. : dir_(dir),
  18. options_(options),
  19. read_options_(read_options),
  20. soptions_(soptions),
  21. starting_sequence_number_(seq),
  22. files_(std::move(files)),
  23. versions_(versions),
  24. seq_per_batch_(seq_per_batch),
  25. io_tracer_(io_tracer),
  26. started_(false),
  27. is_valid_(false),
  28. current_file_index_(0),
  29. current_batch_seq_(0),
  30. current_last_seq_(0) {
  31. assert(files_ != nullptr);
  32. assert(versions_ != nullptr);
  33. assert(!seq_per_batch_);
  34. current_status_.PermitUncheckedError(); // Clear on start
  35. reporter_.env = options_->env;
  36. reporter_.info_log = options_->info_log.get();
  37. SeekToStartSequence(); // Seek till starting sequence
  38. }
  39. Status TransactionLogIteratorImpl::OpenLogFile(
  40. const WalFile* log_file,
  41. std::unique_ptr<SequentialFileReader>* file_reader) {
  42. FileSystemPtr fs(options_->fs, io_tracer_);
  43. std::unique_ptr<FSSequentialFile> file;
  44. std::string fname;
  45. Status s;
  46. EnvOptions optimized_env_options = fs->OptimizeForLogRead(soptions_);
  47. if (log_file->Type() == kArchivedLogFile) {
  48. fname = ArchivedLogFileName(dir_, log_file->LogNumber());
  49. s = fs->NewSequentialFile(fname, optimized_env_options, &file, nullptr);
  50. } else {
  51. fname = LogFileName(dir_, log_file->LogNumber());
  52. s = fs->NewSequentialFile(fname, optimized_env_options, &file, nullptr);
  53. if (!s.ok()) {
  54. // If cannot open file in DB directory.
  55. // Try the archive dir, as it could have moved in the meanwhile.
  56. fname = ArchivedLogFileName(dir_, log_file->LogNumber());
  57. s = fs->NewSequentialFile(fname, optimized_env_options, &file, nullptr);
  58. }
  59. }
  60. if (s.ok()) {
  61. file_reader->reset(new SequentialFileReader(std::move(file), fname,
  62. io_tracer_, options_->listeners,
  63. options_->rate_limiter.get()));
  64. }
  65. return s;
  66. }
  67. BatchResult TransactionLogIteratorImpl::GetBatch() {
  68. assert(is_valid_); // cannot call in a non valid state.
  69. BatchResult result;
  70. result.sequence = current_batch_seq_;
  71. result.writeBatchPtr = std::move(current_batch_);
  72. return result;
  73. }
  74. Status TransactionLogIteratorImpl::status() { return current_status_; }
  75. bool TransactionLogIteratorImpl::Valid() { return started_ && is_valid_; }
  76. bool TransactionLogIteratorImpl::RestrictedRead(Slice* record) {
  77. // Don't read if no more complete entries to read from logs
  78. if (current_last_seq_ >= versions_->LastSequence()) {
  79. return false;
  80. }
  81. return current_log_reader_->ReadRecord(record, &scratch_);
  82. }
  83. void TransactionLogIteratorImpl::SeekToStartSequence(uint64_t start_file_index,
  84. bool strict) {
  85. Slice record;
  86. started_ = false;
  87. is_valid_ = false;
  88. // Check invariant of TransactionLogIterator when SeekToStartSequence()
  89. // succeeds.
  90. const Defer defer([this]() {
  91. if (is_valid_) {
  92. assert(current_status_.ok());
  93. if (starting_sequence_number_ > current_batch_seq_) {
  94. assert(current_batch_seq_ < current_last_seq_);
  95. assert(current_last_seq_ >= starting_sequence_number_);
  96. }
  97. }
  98. });
  99. if (files_->size() <= start_file_index) {
  100. return;
  101. } else if (!current_status_.ok()) {
  102. return;
  103. }
  104. Status s =
  105. OpenLogReader(files_->at(static_cast<size_t>(start_file_index)).get());
  106. if (!s.ok()) {
  107. current_status_ = s;
  108. reporter_.Info(current_status_.ToString().c_str());
  109. return;
  110. }
  111. while (RestrictedRead(&record)) {
  112. if (record.size() < WriteBatchInternal::kHeader) {
  113. reporter_.Corruption(record.size(),
  114. Status::Corruption("very small log record"));
  115. continue;
  116. }
  117. UpdateCurrentWriteBatch(record);
  118. if (current_last_seq_ >= starting_sequence_number_) {
  119. if (strict && current_batch_seq_ != starting_sequence_number_) {
  120. current_status_ = Status::Corruption(
  121. "Gap in sequence number. Could not "
  122. "seek to required sequence number");
  123. reporter_.Info(current_status_.ToString().c_str());
  124. return;
  125. } else if (strict) {
  126. reporter_.Info(
  127. "Could seek required sequence number. Iterator will "
  128. "continue.");
  129. }
  130. is_valid_ = true;
  131. started_ = true; // set started_ as we could seek till starting sequence
  132. return;
  133. } else {
  134. is_valid_ = false;
  135. }
  136. }
  137. // Could not find start sequence in first file. Normally this must be the
  138. // only file. Otherwise log the error and let the iterator return next entry
  139. // If strict is set, we want to seek exactly till the start sequence and it
  140. // should have been present in the file we scanned above
  141. if (strict) {
  142. current_status_ = Status::Corruption(
  143. "Gap in sequence number. Could not "
  144. "seek to required sequence number");
  145. reporter_.Info(current_status_.ToString().c_str());
  146. } else if (files_->size() != 1) {
  147. current_status_ = Status::Corruption(
  148. "Start sequence was not found, "
  149. "skipping to the next available");
  150. reporter_.Info(current_status_.ToString().c_str());
  151. // Let NextImpl find the next available entry. started_ remains false
  152. // because we don't want to check for gaps while moving to start sequence
  153. NextImpl(true);
  154. }
  155. }
  156. void TransactionLogIteratorImpl::Next() {
  157. if (!current_status_.ok()) {
  158. return;
  159. }
  160. return NextImpl(false);
  161. }
  162. void TransactionLogIteratorImpl::NextImpl(bool internal) {
  163. Slice record;
  164. is_valid_ = false;
  165. if (!internal && !started_) {
  166. // Runs every time until we can seek to the start sequence
  167. SeekToStartSequence();
  168. }
  169. while (true) {
  170. assert(current_log_reader_);
  171. if (current_log_reader_->IsEOF()) {
  172. current_log_reader_->UnmarkEOF();
  173. }
  174. while (RestrictedRead(&record)) {
  175. if (record.size() < WriteBatchInternal::kHeader) {
  176. reporter_.Corruption(record.size(),
  177. Status::Corruption("very small log record"));
  178. continue;
  179. } else {
  180. // started_ should be true if called by application
  181. assert(internal || started_);
  182. // started_ should be false if called internally
  183. assert(!internal || !started_);
  184. UpdateCurrentWriteBatch(record);
  185. if (internal && !started_) {
  186. started_ = true;
  187. }
  188. return;
  189. }
  190. }
  191. // Open the next file
  192. if (current_file_index_ < files_->size() - 1) {
  193. ++current_file_index_;
  194. Status s = OpenLogReader(files_->at(current_file_index_).get());
  195. if (!s.ok()) {
  196. is_valid_ = false;
  197. current_status_ = s;
  198. return;
  199. }
  200. } else {
  201. is_valid_ = false;
  202. if (current_last_seq_ == versions_->LastSequence()) {
  203. current_status_ = Status::OK();
  204. } else {
  205. const char* msg = "Create a new iterator to fetch the new tail.";
  206. current_status_ = Status::TryAgain(msg);
  207. }
  208. return;
  209. }
  210. }
  211. }
  212. bool TransactionLogIteratorImpl::IsBatchExpected(
  213. const WriteBatch* batch, const SequenceNumber expected_seq) {
  214. assert(batch);
  215. SequenceNumber batchSeq = WriteBatchInternal::Sequence(batch);
  216. if (batchSeq != expected_seq) {
  217. std::ostringstream oss;
  218. oss << "Discontinuity in log records. " << "Got seq=" << batchSeq << ", "
  219. << "Expected seq=" << expected_seq << ", "
  220. << "Last flushed seq=" << versions_->LastSequence() << ". "
  221. << "Log iterator will reseek the correct batch.";
  222. reporter_.Info(oss.str().c_str());
  223. return false;
  224. }
  225. return true;
  226. }
  227. void TransactionLogIteratorImpl::UpdateCurrentWriteBatch(const Slice& record) {
  228. std::unique_ptr<WriteBatch> batch(new WriteBatch());
  229. Status s = WriteBatchInternal::SetContents(batch.get(), record);
  230. s.PermitUncheckedError(); // TODO: What should we do with this error?
  231. SequenceNumber expected_seq = current_last_seq_ + 1;
  232. // If the iterator has started, then confirm that we get continuous batches
  233. if (started_ && !IsBatchExpected(batch.get(), expected_seq)) {
  234. // Seek to the batch having expected sequence number
  235. if (expected_seq < files_->at(current_file_index_)->StartSequence()) {
  236. // Expected batch must lie in the previous log file
  237. // Avoid underflow.
  238. if (current_file_index_ != 0) {
  239. current_file_index_--;
  240. }
  241. }
  242. starting_sequence_number_ = expected_seq;
  243. // currentStatus_ will be set to Ok if reseek succeeds
  244. // Note: this is still ok in seq_pre_batch_ && two_write_queuesp_ mode
  245. // that allows gaps in the WAL since it will still skip over the gap.
  246. current_status_ = Status::NotFound("Gap in sequence numbers");
  247. // In seq_per_batch_ mode, gaps in the seq are possible so the strict mode
  248. // should be disabled
  249. return SeekToStartSequence(current_file_index_, !seq_per_batch_);
  250. }
  251. current_batch_seq_ = WriteBatchInternal::Sequence(batch.get());
  252. assert(!seq_per_batch_);
  253. current_last_seq_ =
  254. current_batch_seq_ + WriteBatchInternal::Count(batch.get()) - 1;
  255. // currentBatchSeq_ can only change here
  256. assert(current_last_seq_ <= versions_->LastSequence());
  257. current_batch_ = std::move(batch);
  258. is_valid_ = true;
  259. current_status_ = Status::OK();
  260. }
  261. Status TransactionLogIteratorImpl::OpenLogReader(const WalFile* log_file) {
  262. std::unique_ptr<SequentialFileReader> file;
  263. Status s = OpenLogFile(log_file, &file);
  264. if (!s.ok()) {
  265. return s;
  266. }
  267. assert(file);
  268. current_log_reader_.reset(
  269. new log::Reader(options_->info_log, std::move(file), &reporter_,
  270. read_options_.verify_checksums_, log_file->LogNumber()));
  271. return Status::OK();
  272. }
  273. } // namespace ROCKSDB_NAMESPACE