transaction_log_impl.cc 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #ifndef ROCKSDB_LITE
  6. #include "db/transaction_log_impl.h"
  7. #include <cinttypes>
  8. #include "db/write_batch_internal.h"
  9. #include "file/sequence_file_reader.h"
  10. namespace ROCKSDB_NAMESPACE {
  11. TransactionLogIteratorImpl::TransactionLogIteratorImpl(
  12. const std::string& dir, const ImmutableDBOptions* options,
  13. const TransactionLogIterator::ReadOptions& read_options,
  14. const EnvOptions& soptions, const SequenceNumber seq,
  15. std::unique_ptr<VectorLogPtr> files, VersionSet const* const versions,
  16. const bool seq_per_batch)
  17. : dir_(dir),
  18. options_(options),
  19. read_options_(read_options),
  20. soptions_(soptions),
  21. starting_sequence_number_(seq),
  22. files_(std::move(files)),
  23. started_(false),
  24. is_valid_(false),
  25. current_file_index_(0),
  26. current_batch_seq_(0),
  27. current_last_seq_(0),
  28. versions_(versions),
  29. seq_per_batch_(seq_per_batch) {
  30. assert(files_ != nullptr);
  31. assert(versions_ != nullptr);
  32. reporter_.env = options_->env;
  33. reporter_.info_log = options_->info_log.get();
  34. SeekToStartSequence(); // Seek till starting sequence
  35. }
  36. Status TransactionLogIteratorImpl::OpenLogFile(
  37. const LogFile* log_file,
  38. std::unique_ptr<SequentialFileReader>* file_reader) {
  39. FileSystem* fs = options_->fs.get();
  40. std::unique_ptr<FSSequentialFile> file;
  41. std::string fname;
  42. Status s;
  43. EnvOptions optimized_env_options = fs->OptimizeForLogRead(soptions_);
  44. if (log_file->Type() == kArchivedLogFile) {
  45. fname = ArchivedLogFileName(dir_, log_file->LogNumber());
  46. s = fs->NewSequentialFile(fname, optimized_env_options, &file, nullptr);
  47. } else {
  48. fname = LogFileName(dir_, log_file->LogNumber());
  49. s = fs->NewSequentialFile(fname, optimized_env_options, &file, nullptr);
  50. if (!s.ok()) {
  51. // If cannot open file in DB directory.
  52. // Try the archive dir, as it could have moved in the meanwhile.
  53. fname = ArchivedLogFileName(dir_, log_file->LogNumber());
  54. s = fs->NewSequentialFile(fname, optimized_env_options,
  55. &file, nullptr);
  56. }
  57. }
  58. if (s.ok()) {
  59. file_reader->reset(new SequentialFileReader(std::move(file), fname));
  60. }
  61. return s;
  62. }
  63. BatchResult TransactionLogIteratorImpl::GetBatch() {
  64. assert(is_valid_); // cannot call in a non valid state.
  65. BatchResult result;
  66. result.sequence = current_batch_seq_;
  67. result.writeBatchPtr = std::move(current_batch_);
  68. return result;
  69. }
  70. Status TransactionLogIteratorImpl::status() { return current_status_; }
  71. bool TransactionLogIteratorImpl::Valid() { return started_ && is_valid_; }
  72. bool TransactionLogIteratorImpl::RestrictedRead(Slice* record) {
  73. // Don't read if no more complete entries to read from logs
  74. if (current_last_seq_ >= versions_->LastSequence()) {
  75. return false;
  76. }
  77. return current_log_reader_->ReadRecord(record, &scratch_);
  78. }
  79. void TransactionLogIteratorImpl::SeekToStartSequence(uint64_t start_file_index,
  80. bool strict) {
  81. Slice record;
  82. started_ = false;
  83. is_valid_ = false;
  84. if (files_->size() <= start_file_index) {
  85. return;
  86. }
  87. Status s =
  88. OpenLogReader(files_->at(static_cast<size_t>(start_file_index)).get());
  89. if (!s.ok()) {
  90. current_status_ = s;
  91. reporter_.Info(current_status_.ToString().c_str());
  92. return;
  93. }
  94. while (RestrictedRead(&record)) {
  95. if (record.size() < WriteBatchInternal::kHeader) {
  96. reporter_.Corruption(
  97. record.size(), Status::Corruption("very small log record"));
  98. continue;
  99. }
  100. UpdateCurrentWriteBatch(record);
  101. if (current_last_seq_ >= starting_sequence_number_) {
  102. if (strict && current_batch_seq_ != starting_sequence_number_) {
  103. current_status_ = Status::Corruption(
  104. "Gap in sequence number. Could not "
  105. "seek to required sequence number");
  106. reporter_.Info(current_status_.ToString().c_str());
  107. return;
  108. } else if (strict) {
  109. reporter_.Info("Could seek required sequence number. Iterator will "
  110. "continue.");
  111. }
  112. is_valid_ = true;
  113. started_ = true; // set started_ as we could seek till starting sequence
  114. return;
  115. } else {
  116. is_valid_ = false;
  117. }
  118. }
  119. // Could not find start sequence in first file. Normally this must be the
  120. // only file. Otherwise log the error and let the iterator return next entry
  121. // If strict is set, we want to seek exactly till the start sequence and it
  122. // should have been present in the file we scanned above
  123. if (strict) {
  124. current_status_ = Status::Corruption(
  125. "Gap in sequence number. Could not "
  126. "seek to required sequence number");
  127. reporter_.Info(current_status_.ToString().c_str());
  128. } else if (files_->size() != 1) {
  129. current_status_ = Status::Corruption(
  130. "Start sequence was not found, "
  131. "skipping to the next available");
  132. reporter_.Info(current_status_.ToString().c_str());
  133. // Let NextImpl find the next available entry. started_ remains false
  134. // because we don't want to check for gaps while moving to start sequence
  135. NextImpl(true);
  136. }
  137. }
  138. void TransactionLogIteratorImpl::Next() {
  139. return NextImpl(false);
  140. }
  141. void TransactionLogIteratorImpl::NextImpl(bool internal) {
  142. Slice record;
  143. is_valid_ = false;
  144. if (!internal && !started_) {
  145. // Runs every time until we can seek to the start sequence
  146. return SeekToStartSequence();
  147. }
  148. while(true) {
  149. assert(current_log_reader_);
  150. if (current_log_reader_->IsEOF()) {
  151. current_log_reader_->UnmarkEOF();
  152. }
  153. while (RestrictedRead(&record)) {
  154. if (record.size() < WriteBatchInternal::kHeader) {
  155. reporter_.Corruption(
  156. record.size(), Status::Corruption("very small log record"));
  157. continue;
  158. } else {
  159. // started_ should be true if called by application
  160. assert(internal || started_);
  161. // started_ should be false if called internally
  162. assert(!internal || !started_);
  163. UpdateCurrentWriteBatch(record);
  164. if (internal && !started_) {
  165. started_ = true;
  166. }
  167. return;
  168. }
  169. }
  170. // Open the next file
  171. if (current_file_index_ < files_->size() - 1) {
  172. ++current_file_index_;
  173. Status s = OpenLogReader(files_->at(current_file_index_).get());
  174. if (!s.ok()) {
  175. is_valid_ = false;
  176. current_status_ = s;
  177. return;
  178. }
  179. } else {
  180. is_valid_ = false;
  181. if (current_last_seq_ == versions_->LastSequence()) {
  182. current_status_ = Status::OK();
  183. } else {
  184. const char* msg = "Create a new iterator to fetch the new tail.";
  185. current_status_ = Status::TryAgain(msg);
  186. }
  187. return;
  188. }
  189. }
  190. }
  191. bool TransactionLogIteratorImpl::IsBatchExpected(
  192. const WriteBatch* batch, const SequenceNumber expected_seq) {
  193. assert(batch);
  194. SequenceNumber batchSeq = WriteBatchInternal::Sequence(batch);
  195. if (batchSeq != expected_seq) {
  196. char buf[200];
  197. snprintf(buf, sizeof(buf),
  198. "Discontinuity in log records. Got seq=%" PRIu64
  199. ", Expected seq=%" PRIu64 ", Last flushed seq=%" PRIu64
  200. ".Log iterator will reseek the correct batch.",
  201. batchSeq, expected_seq, versions_->LastSequence());
  202. reporter_.Info(buf);
  203. return false;
  204. }
  205. return true;
  206. }
  207. void TransactionLogIteratorImpl::UpdateCurrentWriteBatch(const Slice& record) {
  208. std::unique_ptr<WriteBatch> batch(new WriteBatch());
  209. WriteBatchInternal::SetContents(batch.get(), record);
  210. SequenceNumber expected_seq = current_last_seq_ + 1;
  211. // If the iterator has started, then confirm that we get continuous batches
  212. if (started_ && !IsBatchExpected(batch.get(), expected_seq)) {
  213. // Seek to the batch having expected sequence number
  214. if (expected_seq < files_->at(current_file_index_)->StartSequence()) {
  215. // Expected batch must lie in the previous log file
  216. // Avoid underflow.
  217. if (current_file_index_ != 0) {
  218. current_file_index_--;
  219. }
  220. }
  221. starting_sequence_number_ = expected_seq;
  222. // currentStatus_ will be set to Ok if reseek succeeds
  223. // Note: this is still ok in seq_pre_batch_ && two_write_queuesp_ mode
  224. // that allows gaps in the WAL since it will still skip over the gap.
  225. current_status_ = Status::NotFound("Gap in sequence numbers");
  226. // In seq_per_batch_ mode, gaps in the seq are possible so the strict mode
  227. // should be disabled
  228. return SeekToStartSequence(current_file_index_, !seq_per_batch_);
  229. }
  230. struct BatchCounter : public WriteBatch::Handler {
  231. SequenceNumber sequence_;
  232. BatchCounter(SequenceNumber sequence) : sequence_(sequence) {}
  233. Status MarkNoop(bool empty_batch) override {
  234. if (!empty_batch) {
  235. sequence_++;
  236. }
  237. return Status::OK();
  238. }
  239. Status MarkEndPrepare(const Slice&) override {
  240. sequence_++;
  241. return Status::OK();
  242. }
  243. Status MarkCommit(const Slice&) override {
  244. sequence_++;
  245. return Status::OK();
  246. }
  247. Status PutCF(uint32_t /*cf*/, const Slice& /*key*/,
  248. const Slice& /*val*/) override {
  249. return Status::OK();
  250. }
  251. Status DeleteCF(uint32_t /*cf*/, const Slice& /*key*/) override {
  252. return Status::OK();
  253. }
  254. Status SingleDeleteCF(uint32_t /*cf*/, const Slice& /*key*/) override {
  255. return Status::OK();
  256. }
  257. Status MergeCF(uint32_t /*cf*/, const Slice& /*key*/,
  258. const Slice& /*val*/) override {
  259. return Status::OK();
  260. }
  261. Status MarkBeginPrepare(bool) override { return Status::OK(); }
  262. Status MarkRollback(const Slice&) override { return Status::OK(); }
  263. };
  264. current_batch_seq_ = WriteBatchInternal::Sequence(batch.get());
  265. if (seq_per_batch_) {
  266. BatchCounter counter(current_batch_seq_);
  267. batch->Iterate(&counter);
  268. current_last_seq_ = counter.sequence_;
  269. } else {
  270. current_last_seq_ =
  271. current_batch_seq_ + WriteBatchInternal::Count(batch.get()) - 1;
  272. }
  273. // currentBatchSeq_ can only change here
  274. assert(current_last_seq_ <= versions_->LastSequence());
  275. current_batch_ = std::move(batch);
  276. is_valid_ = true;
  277. current_status_ = Status::OK();
  278. }
  279. Status TransactionLogIteratorImpl::OpenLogReader(const LogFile* log_file) {
  280. std::unique_ptr<SequentialFileReader> file;
  281. Status s = OpenLogFile(log_file, &file);
  282. if (!s.ok()) {
  283. return s;
  284. }
  285. assert(file);
  286. current_log_reader_.reset(
  287. new log::Reader(options_->info_log, std::move(file), &reporter_,
  288. read_options_.verify_checksums_, log_file->LogNumber()));
  289. return Status::OK();
  290. }
  291. } // namespace ROCKSDB_NAMESPACE
  292. #endif // ROCKSDB_LITE