| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871 | //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.//  This source code is licensed under both the GPLv2 (found in the//  COPYING file in the root directory) and Apache 2.0 License//  (found in the LICENSE.Apache file in the root directory).//// Copyright (c) 2011 The LevelDB Authors. All rights reserved.// Use of this source code is governed by a BSD-style license that can be// found in the LICENSE file. See the AUTHORS file for names of contributors.#ifndef ROCKSDB_LITE#include "db/db_test_util.h"#include "port/stack_trace.h"#include "rocksdb/perf_context.h"#include "rocksdb/sst_file_manager.h"#include "test_util/fault_injection_test_env.h"#if !defined(ROCKSDB_LITE)#include "test_util/sync_point.h"#endifnamespace ROCKSDB_NAMESPACE {class DBErrorHandlingTest : public DBTestBase { public:  DBErrorHandlingTest() : DBTestBase("/db_error_handling_test") {}  std::string GetManifestNameFromLiveFiles() {    std::vector<std::string> live_files;    uint64_t manifest_size;    dbfull()->GetLiveFiles(live_files, &manifest_size, false);    for (auto& file : live_files) {      uint64_t num = 0;      FileType type;      if (ParseFileName(file, &num, &type) && type == kDescriptorFile) {        return file;      }    }    return "";  }};class DBErrorHandlingEnv : public EnvWrapper {  public:    DBErrorHandlingEnv() : EnvWrapper(Env::Default()),      trig_no_space(false), trig_io_error(false) {}    void SetTrigNoSpace() {trig_no_space = true;}    void SetTrigIoError() {trig_io_error = true;}  private:    bool trig_no_space;    bool trig_io_error;};class ErrorHandlerListener : public EventListener { public:  ErrorHandlerListener()      : mutex_(),        cv_(&mutex_),        no_auto_recovery_(false),        recovery_complete_(false),        file_creation_started_(false),        override_bg_error_(false),        file_count_(0),        fault_env_(nullptr) {}  void OnTableFileCreationStarted(      const TableFileCreationBriefInfo& /*ti*/) override {    InstrumentedMutexLock l(&mutex_);    file_creation_started_ = true;    if (file_count_ > 0) {      if (--file_count_ == 0) {        fault_env_->SetFilesystemActive(false, file_creation_error_);        file_creation_error_ = Status::OK();      }    }    cv_.SignalAll();  }  void OnErrorRecoveryBegin(BackgroundErrorReason /*reason*/,                            Status /*bg_error*/,                            bool* auto_recovery) override {    if (*auto_recovery && no_auto_recovery_) {      *auto_recovery = false;    }  }  void OnErrorRecoveryCompleted(Status /*old_bg_error*/) override {    InstrumentedMutexLock l(&mutex_);    recovery_complete_ = true;    cv_.SignalAll();  }  bool WaitForRecovery(uint64_t /*abs_time_us*/) {    InstrumentedMutexLock l(&mutex_);    while (!recovery_complete_) {      cv_.Wait(/*abs_time_us*/);    }    if (recovery_complete_) {      recovery_complete_ = false;      return true;    }    return false;  }  void WaitForTableFileCreationStarted(uint64_t /*abs_time_us*/) {    InstrumentedMutexLock l(&mutex_);    while (!file_creation_started_) {      cv_.Wait(/*abs_time_us*/);    }    file_creation_started_ = false;  }  void OnBackgroundError(BackgroundErrorReason /*reason*/,                         Status* bg_error) override {    if (override_bg_error_) {      *bg_error = bg_error_;      override_bg_error_ = false;    }  }  void EnableAutoRecovery(bool enable = true) { no_auto_recovery_ = !enable; }  void OverrideBGError(Status bg_err) {    bg_error_ = bg_err;    override_bg_error_ = true;  }  void InjectFileCreationError(FaultInjectionTestEnv* env, int file_count,                               Status s) {    fault_env_ = env;    file_count_ = file_count;    file_creation_error_ = s;  } private:  InstrumentedMutex mutex_;  InstrumentedCondVar cv_;  bool no_auto_recovery_;  bool recovery_complete_;  bool file_creation_started_;  bool override_bg_error_;  int file_count_;  Status file_creation_error_;  Status bg_error_;  FaultInjectionTestEnv* fault_env_;};TEST_F(DBErrorHandlingTest, FLushWriteError) {  std::unique_ptr<FaultInjectionTestEnv> fault_env(      new FaultInjectionTestEnv(Env::Default()));  std::shared_ptr<ErrorHandlerListener> listener(new ErrorHandlerListener());  Options options = GetDefaultOptions();  options.create_if_missing = true;  options.env = fault_env.get();  options.listeners.emplace_back(listener);  Status s;  listener->EnableAutoRecovery(false);  DestroyAndReopen(options);  Put(Key(0), "val");  SyncPoint::GetInstance()->SetCallBack(      "FlushJob::Start", [&](void *) {    fault_env->SetFilesystemActive(false, Status::NoSpace("Out of space"));  });  SyncPoint::GetInstance()->EnableProcessing();  s = Flush();  ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);  SyncPoint::GetInstance()->DisableProcessing();  fault_env->SetFilesystemActive(true);  s = dbfull()->Resume();  ASSERT_EQ(s, Status::OK());  Reopen(options);  ASSERT_EQ("val", Get(Key(0)));  Destroy(options);}TEST_F(DBErrorHandlingTest, ManifestWriteError) {  std::unique_ptr<FaultInjectionTestEnv> fault_env(      new FaultInjectionTestEnv(Env::Default()));  std::shared_ptr<ErrorHandlerListener> listener(new ErrorHandlerListener());  Options options = GetDefaultOptions();  options.create_if_missing = true;  options.env = fault_env.get();  options.listeners.emplace_back(listener);  Status s;  std::string old_manifest;  std::string new_manifest;  listener->EnableAutoRecovery(false);  DestroyAndReopen(options);  old_manifest = GetManifestNameFromLiveFiles();  Put(Key(0), "val");  Flush();  Put(Key(1), "val");  SyncPoint::GetInstance()->SetCallBack(      "VersionSet::LogAndApply:WriteManifest", [&](void *) {    fault_env->SetFilesystemActive(false, Status::NoSpace("Out of space"));  });  SyncPoint::GetInstance()->EnableProcessing();  s = Flush();  ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);  SyncPoint::GetInstance()->ClearAllCallBacks();  SyncPoint::GetInstance()->DisableProcessing();  fault_env->SetFilesystemActive(true);  s = dbfull()->Resume();  ASSERT_EQ(s, Status::OK());  new_manifest = GetManifestNameFromLiveFiles();  ASSERT_NE(new_manifest, old_manifest);  Reopen(options);  ASSERT_EQ("val", Get(Key(0)));  ASSERT_EQ("val", Get(Key(1)));  Close();}TEST_F(DBErrorHandlingTest, DoubleManifestWriteError) {  std::unique_ptr<FaultInjectionTestEnv> fault_env(      new FaultInjectionTestEnv(Env::Default()));  std::shared_ptr<ErrorHandlerListener> listener(new ErrorHandlerListener());  Options options = GetDefaultOptions();  options.create_if_missing = true;  options.env = fault_env.get();  options.listeners.emplace_back(listener);  Status s;  std::string old_manifest;  std::string new_manifest;  listener->EnableAutoRecovery(false);  DestroyAndReopen(options);  old_manifest = GetManifestNameFromLiveFiles();  Put(Key(0), "val");  Flush();  Put(Key(1), "val");  SyncPoint::GetInstance()->SetCallBack(      "VersionSet::LogAndApply:WriteManifest", [&](void *) {    fault_env->SetFilesystemActive(false, Status::NoSpace("Out of space"));  });  SyncPoint::GetInstance()->EnableProcessing();  s = Flush();  ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);  fault_env->SetFilesystemActive(true);  // This Resume() will attempt to create a new manifest file and fail again  s = dbfull()->Resume();  ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);  fault_env->SetFilesystemActive(true);  SyncPoint::GetInstance()->ClearAllCallBacks();  SyncPoint::GetInstance()->DisableProcessing();  // A successful Resume() will create a new manifest file  s = dbfull()->Resume();  ASSERT_EQ(s, Status::OK());  new_manifest = GetManifestNameFromLiveFiles();  ASSERT_NE(new_manifest, old_manifest);  Reopen(options);  ASSERT_EQ("val", Get(Key(0)));  ASSERT_EQ("val", Get(Key(1)));  Close();}TEST_F(DBErrorHandlingTest, CompactionManifestWriteError) {  std::unique_ptr<FaultInjectionTestEnv> fault_env(      new FaultInjectionTestEnv(Env::Default()));  std::shared_ptr<ErrorHandlerListener> listener(new ErrorHandlerListener());  Options options = GetDefaultOptions();  options.create_if_missing = true;  options.level0_file_num_compaction_trigger = 2;  options.listeners.emplace_back(listener);  options.env = fault_env.get();  Status s;  std::string old_manifest;  std::string new_manifest;  std::atomic<bool> fail_manifest(false);  DestroyAndReopen(options);  old_manifest = GetManifestNameFromLiveFiles();  Put(Key(0), "val");  Put(Key(2), "val");  s = Flush();  ASSERT_EQ(s, Status::OK());  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(      // Wait for flush of 2nd L0 file before starting compaction      {{"DBImpl::FlushMemTable:FlushMemTableFinished",        "BackgroundCallCompaction:0"},       // Wait for compaction to detect manifest write error       {"BackgroundCallCompaction:1", "CompactionManifestWriteError:0"},       // Make compaction thread wait for error to be cleared       {"CompactionManifestWriteError:1",        "DBImpl::BackgroundCallCompaction:FoundObsoleteFiles"},       // Wait for DB instance to clear bg_error before calling       // TEST_WaitForCompact       {"SstFileManagerImpl::ErrorCleared", "CompactionManifestWriteError:2"}});  // trigger manifest write failure in compaction thread  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(      "BackgroundCallCompaction:0", [&](void*) { fail_manifest.store(true); });  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(      "VersionSet::LogAndApply:WriteManifest", [&](void*) {        if (fail_manifest.load()) {          fault_env->SetFilesystemActive(false,                                         Status::NoSpace("Out of space"));        }      });  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();  Put(Key(1), "val");  // This Flush will trigger a compaction, which will fail when appending to  // the manifest  s = Flush();  ASSERT_EQ(s, Status::OK());  TEST_SYNC_POINT("CompactionManifestWriteError:0");  // Clear all errors so when the compaction is retried, it will succeed  fault_env->SetFilesystemActive(true);  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();  TEST_SYNC_POINT("CompactionManifestWriteError:1");  TEST_SYNC_POINT("CompactionManifestWriteError:2");  s = dbfull()->TEST_WaitForCompact();  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();  ASSERT_EQ(s, Status::OK());  new_manifest = GetManifestNameFromLiveFiles();  ASSERT_NE(new_manifest, old_manifest);  Reopen(options);  ASSERT_EQ("val", Get(Key(0)));  ASSERT_EQ("val", Get(Key(1)));  ASSERT_EQ("val", Get(Key(2)));  Close();}TEST_F(DBErrorHandlingTest, CompactionWriteError) {  std::unique_ptr<FaultInjectionTestEnv> fault_env(      new FaultInjectionTestEnv(Env::Default()));  std::shared_ptr<ErrorHandlerListener> listener(new ErrorHandlerListener());  Options options = GetDefaultOptions();  options.create_if_missing = true;  options.level0_file_num_compaction_trigger = 2;  options.listeners.emplace_back(listener);  options.env = fault_env.get();  Status s;  DestroyAndReopen(options);  Put(Key(0), "va;");  Put(Key(2), "va;");  s = Flush();  ASSERT_EQ(s, Status::OK());  listener->OverrideBGError(      Status(Status::NoSpace(), Status::Severity::kHardError)      );  listener->EnableAutoRecovery(false);  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(      {{"DBImpl::FlushMemTable:FlushMemTableFinished",        "BackgroundCallCompaction:0"}});  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(      "BackgroundCallCompaction:0", [&](void*) {        fault_env->SetFilesystemActive(false, Status::NoSpace("Out of space"));      });  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();  Put(Key(1), "val");  s = Flush();  ASSERT_EQ(s, Status::OK());  s = dbfull()->TEST_WaitForCompact();  ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);  fault_env->SetFilesystemActive(true);  s = dbfull()->Resume();  ASSERT_EQ(s, Status::OK());  Destroy(options);}TEST_F(DBErrorHandlingTest, CorruptionError) {  std::unique_ptr<FaultInjectionTestEnv> fault_env(      new FaultInjectionTestEnv(Env::Default()));  Options options = GetDefaultOptions();  options.create_if_missing = true;  options.level0_file_num_compaction_trigger = 2;  options.env = fault_env.get();  Status s;  DestroyAndReopen(options);  Put(Key(0), "va;");  Put(Key(2), "va;");  s = Flush();  ASSERT_EQ(s, Status::OK());  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(      {{"DBImpl::FlushMemTable:FlushMemTableFinished",        "BackgroundCallCompaction:0"}});  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(      "BackgroundCallCompaction:0", [&](void*) {        fault_env->SetFilesystemActive(false, Status::Corruption("Corruption"));      });  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();  Put(Key(1), "val");  s = Flush();  ASSERT_EQ(s, Status::OK());  s = dbfull()->TEST_WaitForCompact();  ASSERT_EQ(s.severity(),            ROCKSDB_NAMESPACE::Status::Severity::kUnrecoverableError);  fault_env->SetFilesystemActive(true);  s = dbfull()->Resume();  ASSERT_NE(s, Status::OK());  Destroy(options);}TEST_F(DBErrorHandlingTest, AutoRecoverFlushError) {  std::unique_ptr<FaultInjectionTestEnv> fault_env(      new FaultInjectionTestEnv(Env::Default()));  std::shared_ptr<ErrorHandlerListener> listener(new ErrorHandlerListener());  Options options = GetDefaultOptions();  options.create_if_missing = true;  options.env = fault_env.get();  options.listeners.emplace_back(listener);  Status s;  listener->EnableAutoRecovery();  DestroyAndReopen(options);  Put(Key(0), "val");  SyncPoint::GetInstance()->SetCallBack("FlushJob::Start", [&](void*) {    fault_env->SetFilesystemActive(false, Status::NoSpace("Out of space"));  });  SyncPoint::GetInstance()->EnableProcessing();  s = Flush();  ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);  SyncPoint::GetInstance()->DisableProcessing();  fault_env->SetFilesystemActive(true);  ASSERT_EQ(listener->WaitForRecovery(5000000), true);  s = Put(Key(1), "val");  ASSERT_EQ(s, Status::OK());  Reopen(options);  ASSERT_EQ("val", Get(Key(0)));  ASSERT_EQ("val", Get(Key(1)));  Destroy(options);}TEST_F(DBErrorHandlingTest, FailRecoverFlushError) {  std::unique_ptr<FaultInjectionTestEnv> fault_env(      new FaultInjectionTestEnv(Env::Default()));  std::shared_ptr<ErrorHandlerListener> listener(new ErrorHandlerListener());  Options options = GetDefaultOptions();  options.create_if_missing = true;  options.env = fault_env.get();  options.listeners.emplace_back(listener);  Status s;  listener->EnableAutoRecovery();  DestroyAndReopen(options);  Put(Key(0), "val");  SyncPoint::GetInstance()->SetCallBack("FlushJob::Start", [&](void*) {    fault_env->SetFilesystemActive(false, Status::NoSpace("Out of space"));  });  SyncPoint::GetInstance()->EnableProcessing();  s = Flush();  ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);  // We should be able to shutdown the database while auto recovery is going  // on in the background  Close();  DestroyDB(dbname_, options);}TEST_F(DBErrorHandlingTest, WALWriteError) {  std::unique_ptr<FaultInjectionTestEnv> fault_env(      new FaultInjectionTestEnv(Env::Default()));  std::shared_ptr<ErrorHandlerListener> listener(new ErrorHandlerListener());  Options options = GetDefaultOptions();  options.create_if_missing = true;  options.writable_file_max_buffer_size = 32768;  options.env = fault_env.get();  options.listeners.emplace_back(listener);  Status s;  Random rnd(301);  listener->EnableAutoRecovery();  DestroyAndReopen(options);  {    WriteBatch batch;    for (auto i = 0; i<100; ++i) {      batch.Put(Key(i), RandomString(&rnd, 1024));    }    WriteOptions wopts;    wopts.sync = true;    ASSERT_EQ(dbfull()->Write(wopts, &batch), Status::OK());  };  {    WriteBatch batch;    int write_error = 0;    for (auto i = 100; i<199; ++i) {      batch.Put(Key(i), RandomString(&rnd, 1024));    }    SyncPoint::GetInstance()->SetCallBack("WritableFileWriter::Append:BeforePrepareWrite", [&](void*) {      write_error++;      if (write_error > 2) {        fault_env->SetFilesystemActive(false, Status::NoSpace("Out of space"));      }    });    SyncPoint::GetInstance()->EnableProcessing();    WriteOptions wopts;    wopts.sync = true;    s = dbfull()->Write(wopts, &batch);    ASSERT_EQ(s, s.NoSpace());  }  SyncPoint::GetInstance()->DisableProcessing();  fault_env->SetFilesystemActive(true);  ASSERT_EQ(listener->WaitForRecovery(5000000), true);  for (auto i=0; i<199; ++i) {    if (i < 100) {      ASSERT_NE(Get(Key(i)), "NOT_FOUND");    } else {      ASSERT_EQ(Get(Key(i)), "NOT_FOUND");    }  }  Reopen(options);  for (auto i=0; i<199; ++i) {    if (i < 100) {      ASSERT_NE(Get(Key(i)), "NOT_FOUND");    } else {      ASSERT_EQ(Get(Key(i)), "NOT_FOUND");    }  }  Close();}TEST_F(DBErrorHandlingTest, MultiCFWALWriteError) {  std::unique_ptr<FaultInjectionTestEnv> fault_env(      new FaultInjectionTestEnv(Env::Default()));  std::shared_ptr<ErrorHandlerListener> listener(new ErrorHandlerListener());  Options options = GetDefaultOptions();  options.create_if_missing = true;  options.writable_file_max_buffer_size = 32768;  options.env = fault_env.get();  options.listeners.emplace_back(listener);  Status s;  Random rnd(301);  listener->EnableAutoRecovery();  CreateAndReopenWithCF({"one", "two", "three"}, options);  {    WriteBatch batch;    for (auto i = 1; i < 4; ++i) {      for (auto j = 0; j < 100; ++j) {        batch.Put(handles_[i], Key(j), RandomString(&rnd, 1024));      }    }    WriteOptions wopts;    wopts.sync = true;    ASSERT_EQ(dbfull()->Write(wopts, &batch), Status::OK());  };  {    WriteBatch batch;    int write_error = 0;    // Write to one CF    for (auto i = 100; i < 199; ++i) {      batch.Put(handles_[2], Key(i), RandomString(&rnd, 1024));    }    SyncPoint::GetInstance()->SetCallBack(        "WritableFileWriter::Append:BeforePrepareWrite", [&](void*) {          write_error++;          if (write_error > 2) {            fault_env->SetFilesystemActive(false,                                           Status::NoSpace("Out of space"));          }        });    SyncPoint::GetInstance()->EnableProcessing();    WriteOptions wopts;    wopts.sync = true;    s = dbfull()->Write(wopts, &batch);    ASSERT_EQ(s, s.NoSpace());  }  SyncPoint::GetInstance()->DisableProcessing();  fault_env->SetFilesystemActive(true);  ASSERT_EQ(listener->WaitForRecovery(5000000), true);  for (auto i = 1; i < 4; ++i) {    // Every CF should have been flushed    ASSERT_EQ(NumTableFilesAtLevel(0, i), 1);  }  for (auto i = 1; i < 4; ++i) {    for (auto j = 0; j < 199; ++j) {      if (j < 100) {        ASSERT_NE(Get(i, Key(j)), "NOT_FOUND");      } else {        ASSERT_EQ(Get(i, Key(j)), "NOT_FOUND");      }    }  }  ReopenWithColumnFamilies({"default", "one", "two", "three"}, options);  for (auto i = 1; i < 4; ++i) {    for (auto j = 0; j < 199; ++j) {      if (j < 100) {        ASSERT_NE(Get(i, Key(j)), "NOT_FOUND");      } else {        ASSERT_EQ(Get(i, Key(j)), "NOT_FOUND");      }    }  }  Close();}TEST_F(DBErrorHandlingTest, MultiDBCompactionError) {  FaultInjectionTestEnv* def_env = new FaultInjectionTestEnv(Env::Default());  std::vector<std::unique_ptr<FaultInjectionTestEnv>> fault_env;  std::vector<Options> options;  std::vector<std::shared_ptr<ErrorHandlerListener>> listener;  std::vector<DB*> db;  std::shared_ptr<SstFileManager> sfm(NewSstFileManager(def_env));  int kNumDbInstances = 3;  Random rnd(301);  for (auto i = 0; i < kNumDbInstances; ++i) {    listener.emplace_back(new ErrorHandlerListener());    options.emplace_back(GetDefaultOptions());    fault_env.emplace_back(new FaultInjectionTestEnv(Env::Default()));    options[i].create_if_missing = true;    options[i].level0_file_num_compaction_trigger = 2;    options[i].writable_file_max_buffer_size = 32768;    options[i].env = fault_env[i].get();    options[i].listeners.emplace_back(listener[i]);    options[i].sst_file_manager = sfm;    DB* dbptr;    char buf[16];    listener[i]->EnableAutoRecovery();    // Setup for returning error for the 3rd SST, which would be level 1    listener[i]->InjectFileCreationError(fault_env[i].get(), 3,                                         Status::NoSpace("Out of space"));    snprintf(buf, sizeof(buf), "_%d", i);    DestroyDB(dbname_ + std::string(buf), options[i]);    ASSERT_EQ(DB::Open(options[i], dbname_ + std::string(buf), &dbptr),              Status::OK());    db.emplace_back(dbptr);  }  for (auto i = 0; i < kNumDbInstances; ++i) {    WriteBatch batch;    for (auto j = 0; j <= 100; ++j) {      batch.Put(Key(j), RandomString(&rnd, 1024));    }    WriteOptions wopts;    wopts.sync = true;    ASSERT_EQ(db[i]->Write(wopts, &batch), Status::OK());    ASSERT_EQ(db[i]->Flush(FlushOptions()), Status::OK());  }  def_env->SetFilesystemActive(false, Status::NoSpace("Out of space"));  for (auto i = 0; i < kNumDbInstances; ++i) {    WriteBatch batch;    // Write to one CF    for (auto j = 100; j < 199; ++j) {      batch.Put(Key(j), RandomString(&rnd, 1024));    }    WriteOptions wopts;    wopts.sync = true;    ASSERT_EQ(db[i]->Write(wopts, &batch), Status::OK());    ASSERT_EQ(db[i]->Flush(FlushOptions()), Status::OK());  }  for (auto i = 0; i < kNumDbInstances; ++i) {    Status s = static_cast<DBImpl*>(db[i])->TEST_WaitForCompact(true);    ASSERT_EQ(s.severity(), Status::Severity::kSoftError);    fault_env[i]->SetFilesystemActive(true);  }  def_env->SetFilesystemActive(true);  for (auto i = 0; i < kNumDbInstances; ++i) {    std::string prop;    ASSERT_EQ(listener[i]->WaitForRecovery(5000000), true);    ASSERT_EQ(static_cast<DBImpl*>(db[i])->TEST_WaitForCompact(true),              Status::OK());    EXPECT_TRUE(db[i]->GetProperty(        "rocksdb.num-files-at-level" + NumberToString(0), &prop));    EXPECT_EQ(atoi(prop.c_str()), 0);    EXPECT_TRUE(db[i]->GetProperty(        "rocksdb.num-files-at-level" + NumberToString(1), &prop));    EXPECT_EQ(atoi(prop.c_str()), 1);  }  for (auto i = 0; i < kNumDbInstances; ++i) {    char buf[16];    snprintf(buf, sizeof(buf), "_%d", i);    delete db[i];    fault_env[i]->SetFilesystemActive(true);    if (getenv("KEEP_DB")) {      printf("DB is still at %s%s\n", dbname_.c_str(), buf);    } else {      Status s = DestroyDB(dbname_ + std::string(buf), options[i]);    }  }  options.clear();  sfm.reset();  delete def_env;}TEST_F(DBErrorHandlingTest, MultiDBVariousErrors) {  FaultInjectionTestEnv* def_env = new FaultInjectionTestEnv(Env::Default());  std::vector<std::unique_ptr<FaultInjectionTestEnv>> fault_env;  std::vector<Options> options;  std::vector<std::shared_ptr<ErrorHandlerListener>> listener;  std::vector<DB*> db;  std::shared_ptr<SstFileManager> sfm(NewSstFileManager(def_env));  int kNumDbInstances = 3;  Random rnd(301);  for (auto i = 0; i < kNumDbInstances; ++i) {    listener.emplace_back(new ErrorHandlerListener());    options.emplace_back(GetDefaultOptions());    fault_env.emplace_back(new FaultInjectionTestEnv(Env::Default()));    options[i].create_if_missing = true;    options[i].level0_file_num_compaction_trigger = 2;    options[i].writable_file_max_buffer_size = 32768;    options[i].env = fault_env[i].get();    options[i].listeners.emplace_back(listener[i]);    options[i].sst_file_manager = sfm;    DB* dbptr;    char buf[16];    listener[i]->EnableAutoRecovery();    switch (i) {      case 0:        // Setup for returning error for the 3rd SST, which would be level 1        listener[i]->InjectFileCreationError(fault_env[i].get(), 3,                                             Status::NoSpace("Out of space"));        break;      case 1:        // Setup for returning error after the 1st SST, which would result        // in a hard error        listener[i]->InjectFileCreationError(fault_env[i].get(), 2,                                             Status::NoSpace("Out of space"));        break;      default:        break;    }    snprintf(buf, sizeof(buf), "_%d", i);    DestroyDB(dbname_ + std::string(buf), options[i]);    ASSERT_EQ(DB::Open(options[i], dbname_ + std::string(buf), &dbptr),              Status::OK());    db.emplace_back(dbptr);  }  for (auto i = 0; i < kNumDbInstances; ++i) {    WriteBatch batch;    for (auto j = 0; j <= 100; ++j) {      batch.Put(Key(j), RandomString(&rnd, 1024));    }    WriteOptions wopts;    wopts.sync = true;    ASSERT_EQ(db[i]->Write(wopts, &batch), Status::OK());    ASSERT_EQ(db[i]->Flush(FlushOptions()), Status::OK());  }  def_env->SetFilesystemActive(false, Status::NoSpace("Out of space"));  for (auto i = 0; i < kNumDbInstances; ++i) {    WriteBatch batch;    // Write to one CF    for (auto j = 100; j < 199; ++j) {      batch.Put(Key(j), RandomString(&rnd, 1024));    }    WriteOptions wopts;    wopts.sync = true;    ASSERT_EQ(db[i]->Write(wopts, &batch), Status::OK());    if (i != 1) {      ASSERT_EQ(db[i]->Flush(FlushOptions()), Status::OK());    } else {      ASSERT_EQ(db[i]->Flush(FlushOptions()), Status::NoSpace());    }  }  for (auto i = 0; i < kNumDbInstances; ++i) {    Status s = static_cast<DBImpl*>(db[i])->TEST_WaitForCompact(true);    switch (i) {      case 0:        ASSERT_EQ(s.severity(), Status::Severity::kSoftError);        break;      case 1:        ASSERT_EQ(s.severity(), Status::Severity::kHardError);        break;      case 2:        ASSERT_EQ(s, Status::OK());        break;    }    fault_env[i]->SetFilesystemActive(true);  }  def_env->SetFilesystemActive(true);  for (auto i = 0; i < kNumDbInstances; ++i) {    std::string prop;    if (i < 2) {      ASSERT_EQ(listener[i]->WaitForRecovery(5000000), true);    }    if (i == 1) {      ASSERT_EQ(static_cast<DBImpl*>(db[i])->TEST_WaitForCompact(true),                Status::OK());    }    EXPECT_TRUE(db[i]->GetProperty(        "rocksdb.num-files-at-level" + NumberToString(0), &prop));    EXPECT_EQ(atoi(prop.c_str()), 0);    EXPECT_TRUE(db[i]->GetProperty(        "rocksdb.num-files-at-level" + NumberToString(1), &prop));    EXPECT_EQ(atoi(prop.c_str()), 1);  }  for (auto i = 0; i < kNumDbInstances; ++i) {    char buf[16];    snprintf(buf, sizeof(buf), "_%d", i);    fault_env[i]->SetFilesystemActive(true);    delete db[i];    if (getenv("KEEP_DB")) {      printf("DB is still at %s%s\n", dbname_.c_str(), buf);    } else {      DestroyDB(dbname_ + std::string(buf), options[i]);    }  }  options.clear();  delete def_env;}}  // namespace ROCKSDB_NAMESPACEint main(int argc, char** argv) {  ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();  ::testing::InitGoogleTest(&argc, argv);  return RUN_ALL_TESTS();}#else#include <stdio.h>int main(int /*argc*/, char** /*argv*/) {  fprintf(stderr, "SKIPPED as Cuckoo table is not supported in ROCKSDB_LITE\n");  return 0;}#endif  // ROCKSDB_LITE
 |