| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315 |
- // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
- // This source code is licensed under both the GPLv2 (found in the
- // COPYING file in the root directory) and Apache 2.0 License
- // (found in the LICENSE.Apache file in the root directory).
- #ifndef ROCKSDB_LITE
- #include "db/transaction_log_impl.h"
- #include <cinttypes>
- #include "db/write_batch_internal.h"
- #include "file/sequence_file_reader.h"
- namespace ROCKSDB_NAMESPACE {
- TransactionLogIteratorImpl::TransactionLogIteratorImpl(
- const std::string& dir, const ImmutableDBOptions* options,
- const TransactionLogIterator::ReadOptions& read_options,
- const EnvOptions& soptions, const SequenceNumber seq,
- std::unique_ptr<VectorLogPtr> files, VersionSet const* const versions,
- const bool seq_per_batch)
- : dir_(dir),
- options_(options),
- read_options_(read_options),
- soptions_(soptions),
- starting_sequence_number_(seq),
- files_(std::move(files)),
- started_(false),
- is_valid_(false),
- current_file_index_(0),
- current_batch_seq_(0),
- current_last_seq_(0),
- versions_(versions),
- seq_per_batch_(seq_per_batch) {
- assert(files_ != nullptr);
- assert(versions_ != nullptr);
- reporter_.env = options_->env;
- reporter_.info_log = options_->info_log.get();
- SeekToStartSequence(); // Seek till starting sequence
- }
- Status TransactionLogIteratorImpl::OpenLogFile(
- const LogFile* log_file,
- std::unique_ptr<SequentialFileReader>* file_reader) {
- FileSystem* fs = options_->fs.get();
- std::unique_ptr<FSSequentialFile> file;
- std::string fname;
- Status s;
- EnvOptions optimized_env_options = fs->OptimizeForLogRead(soptions_);
- if (log_file->Type() == kArchivedLogFile) {
- fname = ArchivedLogFileName(dir_, log_file->LogNumber());
- s = fs->NewSequentialFile(fname, optimized_env_options, &file, nullptr);
- } else {
- fname = LogFileName(dir_, log_file->LogNumber());
- s = fs->NewSequentialFile(fname, optimized_env_options, &file, nullptr);
- if (!s.ok()) {
- // If cannot open file in DB directory.
- // Try the archive dir, as it could have moved in the meanwhile.
- fname = ArchivedLogFileName(dir_, log_file->LogNumber());
- s = fs->NewSequentialFile(fname, optimized_env_options,
- &file, nullptr);
- }
- }
- if (s.ok()) {
- file_reader->reset(new SequentialFileReader(std::move(file), fname));
- }
- return s;
- }
- BatchResult TransactionLogIteratorImpl::GetBatch() {
- assert(is_valid_); // cannot call in a non valid state.
- BatchResult result;
- result.sequence = current_batch_seq_;
- result.writeBatchPtr = std::move(current_batch_);
- return result;
- }
- Status TransactionLogIteratorImpl::status() { return current_status_; }
- bool TransactionLogIteratorImpl::Valid() { return started_ && is_valid_; }
- bool TransactionLogIteratorImpl::RestrictedRead(Slice* record) {
- // Don't read if no more complete entries to read from logs
- if (current_last_seq_ >= versions_->LastSequence()) {
- return false;
- }
- return current_log_reader_->ReadRecord(record, &scratch_);
- }
- void TransactionLogIteratorImpl::SeekToStartSequence(uint64_t start_file_index,
- bool strict) {
- Slice record;
- started_ = false;
- is_valid_ = false;
- if (files_->size() <= start_file_index) {
- return;
- }
- Status s =
- OpenLogReader(files_->at(static_cast<size_t>(start_file_index)).get());
- if (!s.ok()) {
- current_status_ = s;
- reporter_.Info(current_status_.ToString().c_str());
- return;
- }
- while (RestrictedRead(&record)) {
- if (record.size() < WriteBatchInternal::kHeader) {
- reporter_.Corruption(
- record.size(), Status::Corruption("very small log record"));
- continue;
- }
- UpdateCurrentWriteBatch(record);
- if (current_last_seq_ >= starting_sequence_number_) {
- if (strict && current_batch_seq_ != starting_sequence_number_) {
- current_status_ = Status::Corruption(
- "Gap in sequence number. Could not "
- "seek to required sequence number");
- reporter_.Info(current_status_.ToString().c_str());
- return;
- } else if (strict) {
- reporter_.Info("Could seek required sequence number. Iterator will "
- "continue.");
- }
- is_valid_ = true;
- started_ = true; // set started_ as we could seek till starting sequence
- return;
- } else {
- is_valid_ = false;
- }
- }
- // Could not find start sequence in first file. Normally this must be the
- // only file. Otherwise log the error and let the iterator return next entry
- // If strict is set, we want to seek exactly till the start sequence and it
- // should have been present in the file we scanned above
- if (strict) {
- current_status_ = Status::Corruption(
- "Gap in sequence number. Could not "
- "seek to required sequence number");
- reporter_.Info(current_status_.ToString().c_str());
- } else if (files_->size() != 1) {
- current_status_ = Status::Corruption(
- "Start sequence was not found, "
- "skipping to the next available");
- reporter_.Info(current_status_.ToString().c_str());
- // Let NextImpl find the next available entry. started_ remains false
- // because we don't want to check for gaps while moving to start sequence
- NextImpl(true);
- }
- }
- void TransactionLogIteratorImpl::Next() {
- return NextImpl(false);
- }
- void TransactionLogIteratorImpl::NextImpl(bool internal) {
- Slice record;
- is_valid_ = false;
- if (!internal && !started_) {
- // Runs every time until we can seek to the start sequence
- return SeekToStartSequence();
- }
- while(true) {
- assert(current_log_reader_);
- if (current_log_reader_->IsEOF()) {
- current_log_reader_->UnmarkEOF();
- }
- while (RestrictedRead(&record)) {
- if (record.size() < WriteBatchInternal::kHeader) {
- reporter_.Corruption(
- record.size(), Status::Corruption("very small log record"));
- continue;
- } else {
- // started_ should be true if called by application
- assert(internal || started_);
- // started_ should be false if called internally
- assert(!internal || !started_);
- UpdateCurrentWriteBatch(record);
- if (internal && !started_) {
- started_ = true;
- }
- return;
- }
- }
- // Open the next file
- if (current_file_index_ < files_->size() - 1) {
- ++current_file_index_;
- Status s = OpenLogReader(files_->at(current_file_index_).get());
- if (!s.ok()) {
- is_valid_ = false;
- current_status_ = s;
- return;
- }
- } else {
- is_valid_ = false;
- if (current_last_seq_ == versions_->LastSequence()) {
- current_status_ = Status::OK();
- } else {
- const char* msg = "Create a new iterator to fetch the new tail.";
- current_status_ = Status::TryAgain(msg);
- }
- return;
- }
- }
- }
- bool TransactionLogIteratorImpl::IsBatchExpected(
- const WriteBatch* batch, const SequenceNumber expected_seq) {
- assert(batch);
- SequenceNumber batchSeq = WriteBatchInternal::Sequence(batch);
- if (batchSeq != expected_seq) {
- char buf[200];
- snprintf(buf, sizeof(buf),
- "Discontinuity in log records. Got seq=%" PRIu64
- ", Expected seq=%" PRIu64 ", Last flushed seq=%" PRIu64
- ".Log iterator will reseek the correct batch.",
- batchSeq, expected_seq, versions_->LastSequence());
- reporter_.Info(buf);
- return false;
- }
- return true;
- }
- void TransactionLogIteratorImpl::UpdateCurrentWriteBatch(const Slice& record) {
- std::unique_ptr<WriteBatch> batch(new WriteBatch());
- WriteBatchInternal::SetContents(batch.get(), record);
- SequenceNumber expected_seq = current_last_seq_ + 1;
- // If the iterator has started, then confirm that we get continuous batches
- if (started_ && !IsBatchExpected(batch.get(), expected_seq)) {
- // Seek to the batch having expected sequence number
- if (expected_seq < files_->at(current_file_index_)->StartSequence()) {
- // Expected batch must lie in the previous log file
- // Avoid underflow.
- if (current_file_index_ != 0) {
- current_file_index_--;
- }
- }
- starting_sequence_number_ = expected_seq;
- // currentStatus_ will be set to Ok if reseek succeeds
- // Note: this is still ok in seq_pre_batch_ && two_write_queuesp_ mode
- // that allows gaps in the WAL since it will still skip over the gap.
- current_status_ = Status::NotFound("Gap in sequence numbers");
- // In seq_per_batch_ mode, gaps in the seq are possible so the strict mode
- // should be disabled
- return SeekToStartSequence(current_file_index_, !seq_per_batch_);
- }
- struct BatchCounter : public WriteBatch::Handler {
- SequenceNumber sequence_;
- BatchCounter(SequenceNumber sequence) : sequence_(sequence) {}
- Status MarkNoop(bool empty_batch) override {
- if (!empty_batch) {
- sequence_++;
- }
- return Status::OK();
- }
- Status MarkEndPrepare(const Slice&) override {
- sequence_++;
- return Status::OK();
- }
- Status MarkCommit(const Slice&) override {
- sequence_++;
- return Status::OK();
- }
- Status PutCF(uint32_t /*cf*/, const Slice& /*key*/,
- const Slice& /*val*/) override {
- return Status::OK();
- }
- Status DeleteCF(uint32_t /*cf*/, const Slice& /*key*/) override {
- return Status::OK();
- }
- Status SingleDeleteCF(uint32_t /*cf*/, const Slice& /*key*/) override {
- return Status::OK();
- }
- Status MergeCF(uint32_t /*cf*/, const Slice& /*key*/,
- const Slice& /*val*/) override {
- return Status::OK();
- }
- Status MarkBeginPrepare(bool) override { return Status::OK(); }
- Status MarkRollback(const Slice&) override { return Status::OK(); }
- };
- current_batch_seq_ = WriteBatchInternal::Sequence(batch.get());
- if (seq_per_batch_) {
- BatchCounter counter(current_batch_seq_);
- batch->Iterate(&counter);
- current_last_seq_ = counter.sequence_;
- } else {
- current_last_seq_ =
- current_batch_seq_ + WriteBatchInternal::Count(batch.get()) - 1;
- }
- // currentBatchSeq_ can only change here
- assert(current_last_seq_ <= versions_->LastSequence());
- current_batch_ = std::move(batch);
- is_valid_ = true;
- current_status_ = Status::OK();
- }
- Status TransactionLogIteratorImpl::OpenLogReader(const LogFile* log_file) {
- std::unique_ptr<SequentialFileReader> file;
- Status s = OpenLogFile(log_file, &file);
- if (!s.ok()) {
- return s;
- }
- assert(file);
- current_log_reader_.reset(
- new log::Reader(options_->info_log, std::move(file), &reporter_,
- read_options_.verify_checksums_, log_file->LogNumber()));
- return Status::OK();
- }
- } // namespace ROCKSDB_NAMESPACE
- #endif // ROCKSDB_LITE
|