| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586 |
- // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
- // This source code is licensed under both the GPLv2 (found in the
- // COPYING file in the root directory) and Apache 2.0 License
- // (found in the LICENSE.Apache file in the root directory).
- //
- // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
- // Use of this source code is governed by a BSD-style license that can be
- // found in the LICENSE file. See the AUTHORS file for names of contributors.
- #include "db/db_test_util.h"
- #include "env/composite_env_wrapper.h"
- #include "options/options_helper.h"
- #include "port/port.h"
- #include "port/stack_trace.h"
- #include "test_util/fault_injection_test_env.h"
- #include "test_util/sync_point.h"
- namespace ROCKSDB_NAMESPACE {
- class DBWALTest : public DBTestBase {
- public:
- DBWALTest() : DBTestBase("/db_wal_test") {}
- #if defined(ROCKSDB_PLATFORM_POSIX)
- uint64_t GetAllocatedFileSize(std::string file_name) {
- struct stat sbuf;
- int err = stat(file_name.c_str(), &sbuf);
- assert(err == 0);
- return sbuf.st_blocks * 512;
- }
- #endif
- };
- // A SpecialEnv enriched to give more insight about deleted files
- class EnrichedSpecialEnv : public SpecialEnv {
- public:
- explicit EnrichedSpecialEnv(Env* base) : SpecialEnv(base) {}
- Status NewSequentialFile(const std::string& f,
- std::unique_ptr<SequentialFile>* r,
- const EnvOptions& soptions) override {
- InstrumentedMutexLock l(&env_mutex_);
- if (f == skipped_wal) {
- deleted_wal_reopened = true;
- if (IsWAL(f) && largetest_deleted_wal.size() != 0 &&
- f.compare(largetest_deleted_wal) <= 0) {
- gap_in_wals = true;
- }
- }
- return SpecialEnv::NewSequentialFile(f, r, soptions);
- }
- Status DeleteFile(const std::string& fname) override {
- if (IsWAL(fname)) {
- deleted_wal_cnt++;
- InstrumentedMutexLock l(&env_mutex_);
- // If this is the first WAL, remember its name and skip deleting it. We
- // remember its name partly because the application might attempt to
- // delete the file again.
- if (skipped_wal.size() != 0 && skipped_wal != fname) {
- if (largetest_deleted_wal.size() == 0 ||
- largetest_deleted_wal.compare(fname) < 0) {
- largetest_deleted_wal = fname;
- }
- } else {
- skipped_wal = fname;
- return Status::OK();
- }
- }
- return SpecialEnv::DeleteFile(fname);
- }
- bool IsWAL(const std::string& fname) {
- // printf("iswal %s\n", fname.c_str());
- return fname.compare(fname.size() - 3, 3, "log") == 0;
- }
- InstrumentedMutex env_mutex_;
- // the wal whose actual delete was skipped by the env
- std::string skipped_wal = "";
- // the largest WAL that was requested to be deleted
- std::string largetest_deleted_wal = "";
- // number of WALs that were successfully deleted
- std::atomic<size_t> deleted_wal_cnt = {0};
- // the WAL whose delete from fs was skipped is reopened during recovery
- std::atomic<bool> deleted_wal_reopened = {false};
- // whether a gap in the WALs was detected during recovery
- std::atomic<bool> gap_in_wals = {false};
- };
- class DBWALTestWithEnrichedEnv : public DBTestBase {
- public:
- DBWALTestWithEnrichedEnv() : DBTestBase("/db_wal_test") {
- enriched_env_ = new EnrichedSpecialEnv(env_->target());
- auto options = CurrentOptions();
- options.env = enriched_env_;
- options.allow_2pc = true;
- Reopen(options);
- delete env_;
- // to be deleted by the parent class
- env_ = enriched_env_;
- }
- protected:
- EnrichedSpecialEnv* enriched_env_;
- };
- // Test that the recovery would successfully avoid the gaps between the logs.
- // One known scenario that could cause this is that the application issue the
- // WAL deletion out of order. For the sake of simplicity in the test, here we
- // create the gap by manipulating the env to skip deletion of the first WAL but
- // not the ones after it.
- TEST_F(DBWALTestWithEnrichedEnv, SkipDeletedWALs) {
- auto options = last_options_;
- // To cause frequent WAL deletion
- options.write_buffer_size = 128;
- Reopen(options);
- WriteOptions writeOpt = WriteOptions();
- for (int i = 0; i < 128 * 5; i++) {
- ASSERT_OK(dbfull()->Put(writeOpt, "foo", "v1"));
- }
- FlushOptions fo;
- fo.wait = true;
- ASSERT_OK(db_->Flush(fo));
- // some wals are deleted
- ASSERT_NE(0, enriched_env_->deleted_wal_cnt);
- // but not the first one
- ASSERT_NE(0, enriched_env_->skipped_wal.size());
- // Test that the WAL that was not deleted will be skipped during recovery
- options = last_options_;
- Reopen(options);
- ASSERT_FALSE(enriched_env_->deleted_wal_reopened);
- ASSERT_FALSE(enriched_env_->gap_in_wals);
- }
- TEST_F(DBWALTest, WAL) {
- do {
- CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
- WriteOptions writeOpt = WriteOptions();
- writeOpt.disableWAL = true;
- ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v1"));
- ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v1"));
- ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
- ASSERT_EQ("v1", Get(1, "foo"));
- ASSERT_EQ("v1", Get(1, "bar"));
- writeOpt.disableWAL = false;
- ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v2"));
- writeOpt.disableWAL = true;
- ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v2"));
- ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
- // Both value's should be present.
- ASSERT_EQ("v2", Get(1, "bar"));
- ASSERT_EQ("v2", Get(1, "foo"));
- writeOpt.disableWAL = true;
- ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v3"));
- writeOpt.disableWAL = false;
- ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v3"));
- ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
- // again both values should be present.
- ASSERT_EQ("v3", Get(1, "foo"));
- ASSERT_EQ("v3", Get(1, "bar"));
- } while (ChangeWalOptions());
- }
- TEST_F(DBWALTest, RollLog) {
- do {
- CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
- ASSERT_OK(Put(1, "foo", "v1"));
- ASSERT_OK(Put(1, "baz", "v5"));
- ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
- for (int i = 0; i < 10; i++) {
- ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
- }
- ASSERT_OK(Put(1, "foo", "v4"));
- for (int i = 0; i < 10; i++) {
- ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
- }
- } while (ChangeWalOptions());
- }
- TEST_F(DBWALTest, SyncWALNotBlockWrite) {
- Options options = CurrentOptions();
- options.max_write_buffer_number = 4;
- DestroyAndReopen(options);
- ASSERT_OK(Put("foo1", "bar1"));
- ASSERT_OK(Put("foo5", "bar5"));
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
- {"WritableFileWriter::SyncWithoutFlush:1",
- "DBWALTest::SyncWALNotBlockWrite:1"},
- {"DBWALTest::SyncWALNotBlockWrite:2",
- "WritableFileWriter::SyncWithoutFlush:2"},
- });
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
- ROCKSDB_NAMESPACE::port::Thread thread([&]() { ASSERT_OK(db_->SyncWAL()); });
- TEST_SYNC_POINT("DBWALTest::SyncWALNotBlockWrite:1");
- ASSERT_OK(Put("foo2", "bar2"));
- ASSERT_OK(Put("foo3", "bar3"));
- FlushOptions fo;
- fo.wait = false;
- ASSERT_OK(db_->Flush(fo));
- ASSERT_OK(Put("foo4", "bar4"));
- TEST_SYNC_POINT("DBWALTest::SyncWALNotBlockWrite:2");
- thread.join();
- ASSERT_EQ(Get("foo1"), "bar1");
- ASSERT_EQ(Get("foo2"), "bar2");
- ASSERT_EQ(Get("foo3"), "bar3");
- ASSERT_EQ(Get("foo4"), "bar4");
- ASSERT_EQ(Get("foo5"), "bar5");
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
- }
- TEST_F(DBWALTest, SyncWALNotWaitWrite) {
- ASSERT_OK(Put("foo1", "bar1"));
- ASSERT_OK(Put("foo3", "bar3"));
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
- {"SpecialEnv::WalFile::Append:1", "DBWALTest::SyncWALNotWaitWrite:1"},
- {"DBWALTest::SyncWALNotWaitWrite:2", "SpecialEnv::WalFile::Append:2"},
- });
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
- ROCKSDB_NAMESPACE::port::Thread thread(
- [&]() { ASSERT_OK(Put("foo2", "bar2")); });
- // Moving this to SyncWAL before the actual fsync
- // TEST_SYNC_POINT("DBWALTest::SyncWALNotWaitWrite:1");
- ASSERT_OK(db_->SyncWAL());
- // Moving this to SyncWAL after actual fsync
- // TEST_SYNC_POINT("DBWALTest::SyncWALNotWaitWrite:2");
- thread.join();
- ASSERT_EQ(Get("foo1"), "bar1");
- ASSERT_EQ(Get("foo2"), "bar2");
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
- }
- TEST_F(DBWALTest, Recover) {
- do {
- CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
- ASSERT_OK(Put(1, "foo", "v1"));
- ASSERT_OK(Put(1, "baz", "v5"));
- ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
- ASSERT_EQ("v1", Get(1, "foo"));
- ASSERT_EQ("v1", Get(1, "foo"));
- ASSERT_EQ("v5", Get(1, "baz"));
- ASSERT_OK(Put(1, "bar", "v2"));
- ASSERT_OK(Put(1, "foo", "v3"));
- ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
- ASSERT_EQ("v3", Get(1, "foo"));
- ASSERT_OK(Put(1, "foo", "v4"));
- ASSERT_EQ("v4", Get(1, "foo"));
- ASSERT_EQ("v2", Get(1, "bar"));
- ASSERT_EQ("v5", Get(1, "baz"));
- } while (ChangeWalOptions());
- }
- TEST_F(DBWALTest, RecoverWithTableHandle) {
- do {
- Options options = CurrentOptions();
- options.create_if_missing = true;
- options.disable_auto_compactions = true;
- options.avoid_flush_during_recovery = false;
- DestroyAndReopen(options);
- CreateAndReopenWithCF({"pikachu"}, options);
- ASSERT_OK(Put(1, "foo", "v1"));
- ASSERT_OK(Put(1, "bar", "v2"));
- ASSERT_OK(Flush(1));
- ASSERT_OK(Put(1, "foo", "v3"));
- ASSERT_OK(Put(1, "bar", "v4"));
- ASSERT_OK(Flush(1));
- ASSERT_OK(Put(1, "big", std::string(100, 'a')));
- options = CurrentOptions();
- const int kSmallMaxOpenFiles = 13;
- if (option_config_ == kDBLogDir) {
- // Use this option to check not preloading files
- // Set the max open files to be small enough so no preload will
- // happen.
- options.max_open_files = kSmallMaxOpenFiles;
- // RocksDB sanitize max open files to at least 20. Modify it back.
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
- "SanitizeOptions::AfterChangeMaxOpenFiles", [&](void* arg) {
- int* max_open_files = static_cast<int*>(arg);
- *max_open_files = kSmallMaxOpenFiles;
- });
- } else if (option_config_ == kWalDirAndMmapReads) {
- // Use this option to check always loading all files.
- options.max_open_files = 100;
- } else {
- options.max_open_files = -1;
- }
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
- ReopenWithColumnFamilies({"default", "pikachu"}, options);
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
- std::vector<std::vector<FileMetaData>> files;
- dbfull()->TEST_GetFilesMetaData(handles_[1], &files);
- size_t total_files = 0;
- for (const auto& level : files) {
- total_files += level.size();
- }
- ASSERT_EQ(total_files, 3);
- for (const auto& level : files) {
- for (const auto& file : level) {
- if (options.max_open_files == kSmallMaxOpenFiles) {
- ASSERT_TRUE(file.table_reader_handle == nullptr);
- } else {
- ASSERT_TRUE(file.table_reader_handle != nullptr);
- }
- }
- }
- } while (ChangeWalOptions());
- }
- TEST_F(DBWALTest, IgnoreRecoveredLog) {
- std::string backup_logs = dbname_ + "/backup_logs";
- do {
- // delete old files in backup_logs directory
- env_->CreateDirIfMissing(backup_logs);
- std::vector<std::string> old_files;
- env_->GetChildren(backup_logs, &old_files);
- for (auto& file : old_files) {
- if (file != "." && file != "..") {
- env_->DeleteFile(backup_logs + "/" + file);
- }
- }
- Options options = CurrentOptions();
- options.create_if_missing = true;
- options.merge_operator = MergeOperators::CreateUInt64AddOperator();
- options.wal_dir = dbname_ + "/logs";
- DestroyAndReopen(options);
- // fill up the DB
- std::string one, two;
- PutFixed64(&one, 1);
- PutFixed64(&two, 2);
- ASSERT_OK(db_->Merge(WriteOptions(), Slice("foo"), Slice(one)));
- ASSERT_OK(db_->Merge(WriteOptions(), Slice("foo"), Slice(one)));
- ASSERT_OK(db_->Merge(WriteOptions(), Slice("bar"), Slice(one)));
- // copy the logs to backup
- std::vector<std::string> logs;
- env_->GetChildren(options.wal_dir, &logs);
- for (auto& log : logs) {
- if (log != ".." && log != ".") {
- CopyFile(options.wal_dir + "/" + log, backup_logs + "/" + log);
- }
- }
- // recover the DB
- Reopen(options);
- ASSERT_EQ(two, Get("foo"));
- ASSERT_EQ(one, Get("bar"));
- Close();
- // copy the logs from backup back to wal dir
- for (auto& log : logs) {
- if (log != ".." && log != ".") {
- CopyFile(backup_logs + "/" + log, options.wal_dir + "/" + log);
- }
- }
- // this should ignore the log files, recovery should not happen again
- // if the recovery happens, the same merge operator would be called twice,
- // leading to incorrect results
- Reopen(options);
- ASSERT_EQ(two, Get("foo"));
- ASSERT_EQ(one, Get("bar"));
- Close();
- Destroy(options);
- Reopen(options);
- Close();
- // copy the logs from backup back to wal dir
- env_->CreateDirIfMissing(options.wal_dir);
- for (auto& log : logs) {
- if (log != ".." && log != ".") {
- CopyFile(backup_logs + "/" + log, options.wal_dir + "/" + log);
- }
- }
- // assert that we successfully recovered only from logs, even though we
- // destroyed the DB
- Reopen(options);
- ASSERT_EQ(two, Get("foo"));
- ASSERT_EQ(one, Get("bar"));
- // Recovery will fail if DB directory doesn't exist.
- Destroy(options);
- // copy the logs from backup back to wal dir
- env_->CreateDirIfMissing(options.wal_dir);
- for (auto& log : logs) {
- if (log != ".." && log != ".") {
- CopyFile(backup_logs + "/" + log, options.wal_dir + "/" + log);
- // we won't be needing this file no more
- env_->DeleteFile(backup_logs + "/" + log);
- }
- }
- Status s = TryReopen(options);
- ASSERT_TRUE(!s.ok());
- Destroy(options);
- } while (ChangeWalOptions());
- }
- TEST_F(DBWALTest, RecoveryWithEmptyLog) {
- do {
- CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
- ASSERT_OK(Put(1, "foo", "v1"));
- ASSERT_OK(Put(1, "foo", "v2"));
- ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
- ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
- ASSERT_OK(Put(1, "foo", "v3"));
- ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
- ASSERT_EQ("v3", Get(1, "foo"));
- } while (ChangeWalOptions());
- }
- #if !(defined NDEBUG) || !defined(OS_WIN)
- TEST_F(DBWALTest, PreallocateBlock) {
- Options options = CurrentOptions();
- options.write_buffer_size = 10 * 1000 * 1000;
- options.max_total_wal_size = 0;
- size_t expected_preallocation_size = static_cast<size_t>(
- options.write_buffer_size + options.write_buffer_size / 10);
- DestroyAndReopen(options);
- std::atomic<int> called(0);
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
- "DBTestWalFile.GetPreallocationStatus", [&](void* arg) {
- ASSERT_TRUE(arg != nullptr);
- size_t preallocation_size = *(static_cast<size_t*>(arg));
- ASSERT_EQ(expected_preallocation_size, preallocation_size);
- called.fetch_add(1);
- });
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
- Put("", "");
- Flush();
- Put("", "");
- Close();
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
- ASSERT_EQ(2, called.load());
- options.max_total_wal_size = 1000 * 1000;
- expected_preallocation_size = static_cast<size_t>(options.max_total_wal_size);
- Reopen(options);
- called.store(0);
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
- "DBTestWalFile.GetPreallocationStatus", [&](void* arg) {
- ASSERT_TRUE(arg != nullptr);
- size_t preallocation_size = *(static_cast<size_t*>(arg));
- ASSERT_EQ(expected_preallocation_size, preallocation_size);
- called.fetch_add(1);
- });
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
- Put("", "");
- Flush();
- Put("", "");
- Close();
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
- ASSERT_EQ(2, called.load());
- options.db_write_buffer_size = 800 * 1000;
- expected_preallocation_size =
- static_cast<size_t>(options.db_write_buffer_size);
- Reopen(options);
- called.store(0);
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
- "DBTestWalFile.GetPreallocationStatus", [&](void* arg) {
- ASSERT_TRUE(arg != nullptr);
- size_t preallocation_size = *(static_cast<size_t*>(arg));
- ASSERT_EQ(expected_preallocation_size, preallocation_size);
- called.fetch_add(1);
- });
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
- Put("", "");
- Flush();
- Put("", "");
- Close();
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
- ASSERT_EQ(2, called.load());
- expected_preallocation_size = 700 * 1000;
- std::shared_ptr<WriteBufferManager> write_buffer_manager =
- std::make_shared<WriteBufferManager>(static_cast<uint64_t>(700 * 1000));
- options.write_buffer_manager = write_buffer_manager;
- Reopen(options);
- called.store(0);
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
- "DBTestWalFile.GetPreallocationStatus", [&](void* arg) {
- ASSERT_TRUE(arg != nullptr);
- size_t preallocation_size = *(static_cast<size_t*>(arg));
- ASSERT_EQ(expected_preallocation_size, preallocation_size);
- called.fetch_add(1);
- });
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
- Put("", "");
- Flush();
- Put("", "");
- Close();
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
- ASSERT_EQ(2, called.load());
- }
- #endif // !(defined NDEBUG) || !defined(OS_WIN)
- #ifndef ROCKSDB_LITE
- TEST_F(DBWALTest, FullPurgePreservesRecycledLog) {
- // For github issue #1303
- for (int i = 0; i < 2; ++i) {
- Options options = CurrentOptions();
- options.create_if_missing = true;
- options.recycle_log_file_num = 2;
- if (i != 0) {
- options.wal_dir = alternative_wal_dir_;
- }
- DestroyAndReopen(options);
- ASSERT_OK(Put("foo", "v1"));
- VectorLogPtr log_files;
- ASSERT_OK(dbfull()->GetSortedWalFiles(log_files));
- ASSERT_GT(log_files.size(), 0);
- ASSERT_OK(Flush());
- // Now the original WAL is in log_files[0] and should be marked for
- // recycling.
- // Verify full purge cannot remove this file.
- JobContext job_context(0);
- dbfull()->TEST_LockMutex();
- dbfull()->FindObsoleteFiles(&job_context, true /* force */);
- dbfull()->TEST_UnlockMutex();
- dbfull()->PurgeObsoleteFiles(job_context);
- if (i == 0) {
- ASSERT_OK(
- env_->FileExists(LogFileName(dbname_, log_files[0]->LogNumber())));
- } else {
- ASSERT_OK(env_->FileExists(
- LogFileName(alternative_wal_dir_, log_files[0]->LogNumber())));
- }
- }
- }
- TEST_F(DBWALTest, FullPurgePreservesLogPendingReuse) {
- // Ensures full purge cannot delete a WAL while it's in the process of being
- // recycled. In particular, we force the full purge after a file has been
- // chosen for reuse, but before it has been renamed.
- for (int i = 0; i < 2; ++i) {
- Options options = CurrentOptions();
- options.recycle_log_file_num = 1;
- if (i != 0) {
- options.wal_dir = alternative_wal_dir_;
- }
- DestroyAndReopen(options);
- // The first flush creates a second log so writes can continue before the
- // flush finishes.
- ASSERT_OK(Put("foo", "bar"));
- ASSERT_OK(Flush());
- // The second flush can recycle the first log. Sync points enforce the
- // full purge happens after choosing the log to recycle and before it is
- // renamed.
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
- {"DBImpl::CreateWAL:BeforeReuseWritableFile1",
- "DBWALTest::FullPurgePreservesLogPendingReuse:PreFullPurge"},
- {"DBWALTest::FullPurgePreservesLogPendingReuse:PostFullPurge",
- "DBImpl::CreateWAL:BeforeReuseWritableFile2"},
- });
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
- ROCKSDB_NAMESPACE::port::Thread thread([&]() {
- TEST_SYNC_POINT(
- "DBWALTest::FullPurgePreservesLogPendingReuse:PreFullPurge");
- ASSERT_OK(db_->EnableFileDeletions(true));
- TEST_SYNC_POINT(
- "DBWALTest::FullPurgePreservesLogPendingReuse:PostFullPurge");
- });
- ASSERT_OK(Put("foo", "bar"));
- ASSERT_OK(Flush());
- thread.join();
- }
- }
- TEST_F(DBWALTest, GetSortedWalFiles) {
- do {
- CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
- VectorLogPtr log_files;
- ASSERT_OK(dbfull()->GetSortedWalFiles(log_files));
- ASSERT_EQ(0, log_files.size());
- ASSERT_OK(Put(1, "foo", "v1"));
- ASSERT_OK(dbfull()->GetSortedWalFiles(log_files));
- ASSERT_EQ(1, log_files.size());
- } while (ChangeWalOptions());
- }
- TEST_F(DBWALTest, GetCurrentWalFile) {
- do {
- CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
- std::unique_ptr<LogFile>* bad_log_file = nullptr;
- ASSERT_NOK(dbfull()->GetCurrentWalFile(bad_log_file));
- std::unique_ptr<LogFile> log_file;
- ASSERT_OK(dbfull()->GetCurrentWalFile(&log_file));
- // nothing has been written to the log yet
- ASSERT_EQ(log_file->StartSequence(), 0);
- ASSERT_EQ(log_file->SizeFileBytes(), 0);
- ASSERT_EQ(log_file->Type(), kAliveLogFile);
- ASSERT_GT(log_file->LogNumber(), 0);
- // add some data and verify that the file size actually moves foward
- ASSERT_OK(Put(0, "foo", "v1"));
- ASSERT_OK(Put(0, "foo2", "v2"));
- ASSERT_OK(Put(0, "foo3", "v3"));
- ASSERT_OK(dbfull()->GetCurrentWalFile(&log_file));
- ASSERT_EQ(log_file->StartSequence(), 0);
- ASSERT_GT(log_file->SizeFileBytes(), 0);
- ASSERT_EQ(log_file->Type(), kAliveLogFile);
- ASSERT_GT(log_file->LogNumber(), 0);
- // force log files to cycle and add some more data, then check if
- // log number moves forward
- ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
- for (int i = 0; i < 10; i++) {
- ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
- }
- ASSERT_OK(Put(0, "foo4", "v4"));
- ASSERT_OK(Put(0, "foo5", "v5"));
- ASSERT_OK(Put(0, "foo6", "v6"));
- ASSERT_OK(dbfull()->GetCurrentWalFile(&log_file));
- ASSERT_EQ(log_file->StartSequence(), 0);
- ASSERT_GT(log_file->SizeFileBytes(), 0);
- ASSERT_EQ(log_file->Type(), kAliveLogFile);
- ASSERT_GT(log_file->LogNumber(), 0);
- } while (ChangeWalOptions());
- }
- TEST_F(DBWALTest, RecoveryWithLogDataForSomeCFs) {
- // Test for regression of WAL cleanup missing files that don't contain data
- // for every column family.
- do {
- CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
- ASSERT_OK(Put(1, "foo", "v1"));
- ASSERT_OK(Put(1, "foo", "v2"));
- uint64_t earliest_log_nums[2];
- for (int i = 0; i < 2; ++i) {
- if (i > 0) {
- ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
- }
- VectorLogPtr log_files;
- ASSERT_OK(dbfull()->GetSortedWalFiles(log_files));
- if (log_files.size() > 0) {
- earliest_log_nums[i] = log_files[0]->LogNumber();
- } else {
- earliest_log_nums[i] = port::kMaxUint64;
- }
- }
- // Check at least the first WAL was cleaned up during the recovery.
- ASSERT_LT(earliest_log_nums[0], earliest_log_nums[1]);
- } while (ChangeWalOptions());
- }
- TEST_F(DBWALTest, RecoverWithLargeLog) {
- do {
- {
- Options options = CurrentOptions();
- CreateAndReopenWithCF({"pikachu"}, options);
- ASSERT_OK(Put(1, "big1", std::string(200000, '1')));
- ASSERT_OK(Put(1, "big2", std::string(200000, '2')));
- ASSERT_OK(Put(1, "small3", std::string(10, '3')));
- ASSERT_OK(Put(1, "small4", std::string(10, '4')));
- ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0);
- }
- // Make sure that if we re-open with a small write buffer size that
- // we flush table files in the middle of a large log file.
- Options options;
- options.write_buffer_size = 100000;
- options = CurrentOptions(options);
- ReopenWithColumnFamilies({"default", "pikachu"}, options);
- ASSERT_EQ(NumTableFilesAtLevel(0, 1), 3);
- ASSERT_EQ(std::string(200000, '1'), Get(1, "big1"));
- ASSERT_EQ(std::string(200000, '2'), Get(1, "big2"));
- ASSERT_EQ(std::string(10, '3'), Get(1, "small3"));
- ASSERT_EQ(std::string(10, '4'), Get(1, "small4"));
- ASSERT_GT(NumTableFilesAtLevel(0, 1), 1);
- } while (ChangeWalOptions());
- }
- // In https://reviews.facebook.net/D20661 we change
- // recovery behavior: previously for each log file each column family
- // memtable was flushed, even it was empty. Now it's changed:
- // we try to create the smallest number of table files by merging
- // updates from multiple logs
- TEST_F(DBWALTest, RecoverCheckFileAmountWithSmallWriteBuffer) {
- Options options = CurrentOptions();
- options.write_buffer_size = 5000000;
- CreateAndReopenWithCF({"pikachu", "dobrynia", "nikitich"}, options);
- // Since we will reopen DB with smaller write_buffer_size,
- // each key will go to new SST file
- ASSERT_OK(Put(1, Key(10), DummyString(1000000)));
- ASSERT_OK(Put(1, Key(10), DummyString(1000000)));
- ASSERT_OK(Put(1, Key(10), DummyString(1000000)));
- ASSERT_OK(Put(1, Key(10), DummyString(1000000)));
- ASSERT_OK(Put(3, Key(10), DummyString(1)));
- // Make 'dobrynia' to be flushed and new WAL file to be created
- ASSERT_OK(Put(2, Key(10), DummyString(7500000)));
- ASSERT_OK(Put(2, Key(1), DummyString(1)));
- dbfull()->TEST_WaitForFlushMemTable(handles_[2]);
- {
- auto tables = ListTableFiles(env_, dbname_);
- ASSERT_EQ(tables.size(), static_cast<size_t>(1));
- // Make sure 'dobrynia' was flushed: check sst files amount
- ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"),
- static_cast<uint64_t>(1));
- }
- // New WAL file
- ASSERT_OK(Put(1, Key(1), DummyString(1)));
- ASSERT_OK(Put(1, Key(1), DummyString(1)));
- ASSERT_OK(Put(3, Key(10), DummyString(1)));
- ASSERT_OK(Put(3, Key(10), DummyString(1)));
- ASSERT_OK(Put(3, Key(10), DummyString(1)));
- options.write_buffer_size = 4096;
- options.arena_block_size = 4096;
- ReopenWithColumnFamilies({"default", "pikachu", "dobrynia", "nikitich"},
- options);
- {
- // No inserts => default is empty
- ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
- static_cast<uint64_t>(0));
- // First 4 keys goes to separate SSTs + 1 more SST for 2 smaller keys
- ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"),
- static_cast<uint64_t>(5));
- // 1 SST for big key + 1 SST for small one
- ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"),
- static_cast<uint64_t>(2));
- // 1 SST for all keys
- ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
- static_cast<uint64_t>(1));
- }
- }
- // In https://reviews.facebook.net/D20661 we change
- // recovery behavior: previously for each log file each column family
- // memtable was flushed, even it wasn't empty. Now it's changed:
- // we try to create the smallest number of table files by merging
- // updates from multiple logs
- TEST_F(DBWALTest, RecoverCheckFileAmount) {
- Options options = CurrentOptions();
- options.write_buffer_size = 100000;
- options.arena_block_size = 4 * 1024;
- options.avoid_flush_during_recovery = false;
- CreateAndReopenWithCF({"pikachu", "dobrynia", "nikitich"}, options);
- ASSERT_OK(Put(0, Key(1), DummyString(1)));
- ASSERT_OK(Put(1, Key(1), DummyString(1)));
- ASSERT_OK(Put(2, Key(1), DummyString(1)));
- // Make 'nikitich' memtable to be flushed
- ASSERT_OK(Put(3, Key(10), DummyString(1002400)));
- ASSERT_OK(Put(3, Key(1), DummyString(1)));
- dbfull()->TEST_WaitForFlushMemTable(handles_[3]);
- // 4 memtable are not flushed, 1 sst file
- {
- auto tables = ListTableFiles(env_, dbname_);
- ASSERT_EQ(tables.size(), static_cast<size_t>(1));
- ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
- static_cast<uint64_t>(1));
- }
- // Memtable for 'nikitich' has flushed, new WAL file has opened
- // 4 memtable still not flushed
- // Write to new WAL file
- ASSERT_OK(Put(0, Key(1), DummyString(1)));
- ASSERT_OK(Put(1, Key(1), DummyString(1)));
- ASSERT_OK(Put(2, Key(1), DummyString(1)));
- // Fill up 'nikitich' one more time
- ASSERT_OK(Put(3, Key(10), DummyString(1002400)));
- // make it flush
- ASSERT_OK(Put(3, Key(1), DummyString(1)));
- dbfull()->TEST_WaitForFlushMemTable(handles_[3]);
- // There are still 4 memtable not flushed, and 2 sst tables
- ASSERT_OK(Put(0, Key(1), DummyString(1)));
- ASSERT_OK(Put(1, Key(1), DummyString(1)));
- ASSERT_OK(Put(2, Key(1), DummyString(1)));
- {
- auto tables = ListTableFiles(env_, dbname_);
- ASSERT_EQ(tables.size(), static_cast<size_t>(2));
- ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
- static_cast<uint64_t>(2));
- }
- ReopenWithColumnFamilies({"default", "pikachu", "dobrynia", "nikitich"},
- options);
- {
- std::vector<uint64_t> table_files = ListTableFiles(env_, dbname_);
- // Check, that records for 'default', 'dobrynia' and 'pikachu' from
- // first, second and third WALs went to the same SST.
- // So, there is 6 SSTs: three for 'nikitich', one for 'default', one for
- // 'dobrynia', one for 'pikachu'
- ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
- static_cast<uint64_t>(1));
- ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
- static_cast<uint64_t>(3));
- ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"),
- static_cast<uint64_t>(1));
- ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"),
- static_cast<uint64_t>(1));
- }
- }
- TEST_F(DBWALTest, SyncMultipleLogs) {
- const uint64_t kNumBatches = 2;
- const int kBatchSize = 1000;
- Options options = CurrentOptions();
- options.create_if_missing = true;
- options.write_buffer_size = 4096;
- Reopen(options);
- WriteBatch batch;
- WriteOptions wo;
- wo.sync = true;
- for (uint64_t b = 0; b < kNumBatches; b++) {
- batch.Clear();
- for (int i = 0; i < kBatchSize; i++) {
- batch.Put(Key(i), DummyString(128));
- }
- dbfull()->Write(wo, &batch);
- }
- ASSERT_OK(dbfull()->SyncWAL());
- }
- // Github issue 1339. Prior the fix we read sequence id from the first log to
- // a local variable, then keep increase the variable as we replay logs,
- // ignoring actual sequence id of the records. This is incorrect if some writes
- // come with WAL disabled.
- TEST_F(DBWALTest, PartOfWritesWithWALDisabled) {
- std::unique_ptr<FaultInjectionTestEnv> fault_env(
- new FaultInjectionTestEnv(env_));
- Options options = CurrentOptions();
- options.env = fault_env.get();
- options.disable_auto_compactions = true;
- WriteOptions wal_on, wal_off;
- wal_on.sync = true;
- wal_on.disableWAL = false;
- wal_off.disableWAL = true;
- CreateAndReopenWithCF({"dummy"}, options);
- ASSERT_OK(Put(1, "dummy", "d1", wal_on)); // seq id 1
- ASSERT_OK(Put(1, "dummy", "d2", wal_off));
- ASSERT_OK(Put(1, "dummy", "d3", wal_off));
- ASSERT_OK(Put(0, "key", "v4", wal_on)); // seq id 4
- ASSERT_OK(Flush(0));
- ASSERT_OK(Put(0, "key", "v5", wal_on)); // seq id 5
- ASSERT_EQ("v5", Get(0, "key"));
- dbfull()->FlushWAL(false);
- // Simulate a crash.
- fault_env->SetFilesystemActive(false);
- Close();
- fault_env->ResetState();
- ReopenWithColumnFamilies({"default", "dummy"}, options);
- // Prior to the fix, we may incorrectly recover "v5" with sequence id = 3.
- ASSERT_EQ("v5", Get(0, "key"));
- // Destroy DB before destruct fault_env.
- Destroy(options);
- }
- //
- // Test WAL recovery for the various modes available
- //
- class RecoveryTestHelper {
- public:
- // Number of WAL files to generate
- static const int kWALFilesCount = 10;
- // Starting number for the WAL file name like 00010.log
- static const int kWALFileOffset = 10;
- // Keys to be written per WAL file
- static const int kKeysPerWALFile = 133;
- // Size of the value
- static const int kValueSize = 96;
- // Create WAL files with values filled in
- static void FillData(DBWALTest* test, const Options& options,
- const size_t wal_count, size_t* count) {
- // Calling internal functions requires sanitized options.
- Options sanitized_options = SanitizeOptions(test->dbname_, options);
- const ImmutableDBOptions db_options(sanitized_options);
- *count = 0;
- std::shared_ptr<Cache> table_cache = NewLRUCache(50, 0);
- EnvOptions env_options;
- WriteBufferManager write_buffer_manager(db_options.db_write_buffer_size);
- std::unique_ptr<VersionSet> versions;
- std::unique_ptr<WalManager> wal_manager;
- WriteController write_controller;
- versions.reset(new VersionSet(test->dbname_, &db_options, env_options,
- table_cache.get(), &write_buffer_manager,
- &write_controller,
- /*block_cache_tracer=*/nullptr));
- wal_manager.reset(new WalManager(db_options, env_options));
- std::unique_ptr<log::Writer> current_log_writer;
- for (size_t j = kWALFileOffset; j < wal_count + kWALFileOffset; j++) {
- uint64_t current_log_number = j;
- std::string fname = LogFileName(test->dbname_, current_log_number);
- std::unique_ptr<WritableFile> file;
- ASSERT_OK(db_options.env->NewWritableFile(fname, &file, env_options));
- std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
- NewLegacyWritableFileWrapper(std::move(file)), fname, env_options));
- current_log_writer.reset(
- new log::Writer(std::move(file_writer), current_log_number,
- db_options.recycle_log_file_num > 0));
- WriteBatch batch;
- for (int i = 0; i < kKeysPerWALFile; i++) {
- std::string key = "key" + ToString((*count)++);
- std::string value = test->DummyString(kValueSize);
- assert(current_log_writer.get() != nullptr);
- uint64_t seq = versions->LastSequence() + 1;
- batch.Clear();
- batch.Put(key, value);
- WriteBatchInternal::SetSequence(&batch, seq);
- current_log_writer->AddRecord(WriteBatchInternal::Contents(&batch));
- versions->SetLastAllocatedSequence(seq);
- versions->SetLastPublishedSequence(seq);
- versions->SetLastSequence(seq);
- }
- }
- }
- // Recreate and fill the store with some data
- static size_t FillData(DBWALTest* test, Options* options) {
- options->create_if_missing = true;
- test->DestroyAndReopen(*options);
- test->Close();
- size_t count = 0;
- FillData(test, *options, kWALFilesCount, &count);
- return count;
- }
- // Read back all the keys we wrote and return the number of keys found
- static size_t GetData(DBWALTest* test) {
- size_t count = 0;
- for (size_t i = 0; i < kWALFilesCount * kKeysPerWALFile; i++) {
- if (test->Get("key" + ToString(i)) != "NOT_FOUND") {
- ++count;
- }
- }
- return count;
- }
- // Manuall corrupt the specified WAL
- static void CorruptWAL(DBWALTest* test, const Options& options,
- const double off, const double len,
- const int wal_file_id, const bool trunc = false) {
- Env* env = options.env;
- std::string fname = LogFileName(test->dbname_, wal_file_id);
- uint64_t size;
- ASSERT_OK(env->GetFileSize(fname, &size));
- ASSERT_GT(size, 0);
- #ifdef OS_WIN
- // Windows disk cache behaves differently. When we truncate
- // the original content is still in the cache due to the original
- // handle is still open. Generally, in Windows, one prohibits
- // shared access to files and it is not needed for WAL but we allow
- // it to induce corruption at various tests.
- test->Close();
- #endif
- if (trunc) {
- ASSERT_EQ(0, truncate(fname.c_str(), static_cast<int64_t>(size * off)));
- } else {
- InduceCorruption(fname, static_cast<size_t>(size * off + 8),
- static_cast<size_t>(size * len));
- }
- }
- // Overwrite data with 'a' from offset for length len
- static void InduceCorruption(const std::string& filename, size_t offset,
- size_t len) {
- ASSERT_GT(len, 0U);
- int fd = open(filename.c_str(), O_RDWR);
- // On windows long is 32-bit
- ASSERT_LE(offset, std::numeric_limits<long>::max());
- ASSERT_GT(fd, 0);
- ASSERT_EQ(offset, lseek(fd, static_cast<long>(offset), SEEK_SET));
- void* buf = alloca(len);
- memset(buf, 'b', len);
- ASSERT_EQ(len, write(fd, buf, static_cast<unsigned int>(len)));
- close(fd);
- }
- };
- // Test scope:
- // - We expect to open the data store when there is incomplete trailing writes
- // at the end of any of the logs
- // - We do not expect to open the data store for corruption
- TEST_F(DBWALTest, kTolerateCorruptedTailRecords) {
- const int jstart = RecoveryTestHelper::kWALFileOffset;
- const int jend = jstart + RecoveryTestHelper::kWALFilesCount;
- for (auto trunc : {true, false}) { /* Corruption style */
- for (int i = 0; i < 3; i++) { /* Corruption offset position */
- for (int j = jstart; j < jend; j++) { /* WAL file */
- // Fill data for testing
- Options options = CurrentOptions();
- const size_t row_count = RecoveryTestHelper::FillData(this, &options);
- // test checksum failure or parsing
- RecoveryTestHelper::CorruptWAL(this, options, /*off=*/i * .3,
- /*len%=*/.1, /*wal=*/j, trunc);
- if (trunc) {
- options.wal_recovery_mode =
- WALRecoveryMode::kTolerateCorruptedTailRecords;
- options.create_if_missing = false;
- ASSERT_OK(TryReopen(options));
- const size_t recovered_row_count = RecoveryTestHelper::GetData(this);
- ASSERT_TRUE(i == 0 || recovered_row_count > 0);
- ASSERT_LT(recovered_row_count, row_count);
- } else {
- options.wal_recovery_mode =
- WALRecoveryMode::kTolerateCorruptedTailRecords;
- ASSERT_NOK(TryReopen(options));
- }
- }
- }
- }
- }
- // Test scope:
- // We don't expect the data store to be opened if there is any corruption
- // (leading, middle or trailing -- incomplete writes or corruption)
- TEST_F(DBWALTest, kAbsoluteConsistency) {
- const int jstart = RecoveryTestHelper::kWALFileOffset;
- const int jend = jstart + RecoveryTestHelper::kWALFilesCount;
- // Verify clean slate behavior
- Options options = CurrentOptions();
- const size_t row_count = RecoveryTestHelper::FillData(this, &options);
- options.wal_recovery_mode = WALRecoveryMode::kAbsoluteConsistency;
- options.create_if_missing = false;
- ASSERT_OK(TryReopen(options));
- ASSERT_EQ(RecoveryTestHelper::GetData(this), row_count);
- for (auto trunc : {true, false}) { /* Corruption style */
- for (int i = 0; i < 4; i++) { /* Corruption offset position */
- if (trunc && i == 0) {
- continue;
- }
- for (int j = jstart; j < jend; j++) { /* wal files */
- // fill with new date
- RecoveryTestHelper::FillData(this, &options);
- // corrupt the wal
- RecoveryTestHelper::CorruptWAL(this, options, /*off=*/i * .3,
- /*len%=*/.1, j, trunc);
- // verify
- options.wal_recovery_mode = WALRecoveryMode::kAbsoluteConsistency;
- options.create_if_missing = false;
- ASSERT_NOK(TryReopen(options));
- }
- }
- }
- }
- // Test scope:
- // We don't expect the data store to be opened if there is any inconsistency
- // between WAL and SST files
- TEST_F(DBWALTest, kPointInTimeRecoveryCFConsistency) {
- Options options = CurrentOptions();
- options.avoid_flush_during_recovery = true;
- // Create DB with multiple column families.
- CreateAndReopenWithCF({"one", "two"}, options);
- ASSERT_OK(Put(1, "key1", "val1"));
- ASSERT_OK(Put(2, "key2", "val2"));
- // Record the offset at this point
- Env* env = options.env;
- uint64_t wal_file_id = dbfull()->TEST_LogfileNumber();
- std::string fname = LogFileName(dbname_, wal_file_id);
- uint64_t offset_to_corrupt;
- ASSERT_OK(env->GetFileSize(fname, &offset_to_corrupt));
- ASSERT_GT(offset_to_corrupt, 0);
- ASSERT_OK(Put(1, "key3", "val3"));
- // Corrupt WAL at location of key3
- RecoveryTestHelper::InduceCorruption(
- fname, static_cast<size_t>(offset_to_corrupt), static_cast<size_t>(4));
- ASSERT_OK(Put(2, "key4", "val4"));
- ASSERT_OK(Put(1, "key5", "val5"));
- Flush(2);
- // PIT recovery & verify
- options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
- ASSERT_NOK(TryReopenWithColumnFamilies({"default", "one", "two"}, options));
- }
- // Test scope:
- // - We expect to open data store under all circumstances
- // - We expect only data upto the point where the first error was encountered
- TEST_F(DBWALTest, kPointInTimeRecovery) {
- const int jstart = RecoveryTestHelper::kWALFileOffset;
- const int jend = jstart + RecoveryTestHelper::kWALFilesCount;
- const int maxkeys =
- RecoveryTestHelper::kWALFilesCount * RecoveryTestHelper::kKeysPerWALFile;
- for (auto trunc : {true, false}) { /* Corruption style */
- for (int i = 0; i < 4; i++) { /* Offset of corruption */
- for (int j = jstart; j < jend; j++) { /* WAL file */
- // Fill data for testing
- Options options = CurrentOptions();
- const size_t row_count = RecoveryTestHelper::FillData(this, &options);
- // Corrupt the wal
- RecoveryTestHelper::CorruptWAL(this, options, /*off=*/i * .3,
- /*len%=*/.1, j, trunc);
- // Verify
- options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
- options.create_if_missing = false;
- ASSERT_OK(TryReopen(options));
- // Probe data for invariants
- size_t recovered_row_count = RecoveryTestHelper::GetData(this);
- ASSERT_LT(recovered_row_count, row_count);
- bool expect_data = true;
- for (size_t k = 0; k < maxkeys; ++k) {
- bool found = Get("key" + ToString(i)) != "NOT_FOUND";
- if (expect_data && !found) {
- expect_data = false;
- }
- ASSERT_EQ(found, expect_data);
- }
- const size_t min = RecoveryTestHelper::kKeysPerWALFile *
- (j - RecoveryTestHelper::kWALFileOffset);
- ASSERT_GE(recovered_row_count, min);
- if (!trunc && i != 0) {
- const size_t max = RecoveryTestHelper::kKeysPerWALFile *
- (j - RecoveryTestHelper::kWALFileOffset + 1);
- ASSERT_LE(recovered_row_count, max);
- }
- }
- }
- }
- }
- // Test scope:
- // - We expect to open the data store under all scenarios
- // - We expect to have recovered records past the corruption zone
- TEST_F(DBWALTest, kSkipAnyCorruptedRecords) {
- const int jstart = RecoveryTestHelper::kWALFileOffset;
- const int jend = jstart + RecoveryTestHelper::kWALFilesCount;
- for (auto trunc : {true, false}) { /* Corruption style */
- for (int i = 0; i < 4; i++) { /* Corruption offset */
- for (int j = jstart; j < jend; j++) { /* wal files */
- // Fill data for testing
- Options options = CurrentOptions();
- const size_t row_count = RecoveryTestHelper::FillData(this, &options);
- // Corrupt the WAL
- RecoveryTestHelper::CorruptWAL(this, options, /*off=*/i * .3,
- /*len%=*/.1, j, trunc);
- // Verify behavior
- options.wal_recovery_mode = WALRecoveryMode::kSkipAnyCorruptedRecords;
- options.create_if_missing = false;
- ASSERT_OK(TryReopen(options));
- // Probe data for invariants
- size_t recovered_row_count = RecoveryTestHelper::GetData(this);
- ASSERT_LT(recovered_row_count, row_count);
- if (!trunc) {
- ASSERT_TRUE(i != 0 || recovered_row_count > 0);
- }
- }
- }
- }
- }
- TEST_F(DBWALTest, AvoidFlushDuringRecovery) {
- Options options = CurrentOptions();
- options.disable_auto_compactions = true;
- options.avoid_flush_during_recovery = false;
- // Test with flush after recovery.
- Reopen(options);
- ASSERT_OK(Put("foo", "v1"));
- ASSERT_OK(Put("bar", "v2"));
- ASSERT_OK(Flush());
- ASSERT_OK(Put("foo", "v3"));
- ASSERT_OK(Put("bar", "v4"));
- ASSERT_EQ(1, TotalTableFiles());
- // Reopen DB. Check if WAL logs flushed.
- Reopen(options);
- ASSERT_EQ("v3", Get("foo"));
- ASSERT_EQ("v4", Get("bar"));
- ASSERT_EQ(2, TotalTableFiles());
- // Test without flush after recovery.
- options.avoid_flush_during_recovery = true;
- DestroyAndReopen(options);
- ASSERT_OK(Put("foo", "v5"));
- ASSERT_OK(Put("bar", "v6"));
- ASSERT_OK(Flush());
- ASSERT_OK(Put("foo", "v7"));
- ASSERT_OK(Put("bar", "v8"));
- ASSERT_EQ(1, TotalTableFiles());
- // Reopen DB. WAL logs should not be flushed this time.
- Reopen(options);
- ASSERT_EQ("v7", Get("foo"));
- ASSERT_EQ("v8", Get("bar"));
- ASSERT_EQ(1, TotalTableFiles());
- // Force flush with allow_2pc.
- options.avoid_flush_during_recovery = true;
- options.allow_2pc = true;
- ASSERT_OK(Put("foo", "v9"));
- ASSERT_OK(Put("bar", "v10"));
- ASSERT_OK(Flush());
- ASSERT_OK(Put("foo", "v11"));
- ASSERT_OK(Put("bar", "v12"));
- Reopen(options);
- ASSERT_EQ("v11", Get("foo"));
- ASSERT_EQ("v12", Get("bar"));
- ASSERT_EQ(3, TotalTableFiles());
- }
- TEST_F(DBWALTest, WalCleanupAfterAvoidFlushDuringRecovery) {
- // Verifies WAL files that were present during recovery, but not flushed due
- // to avoid_flush_during_recovery, will be considered for deletion at a later
- // stage. We check at least one such file is deleted during Flush().
- Options options = CurrentOptions();
- options.disable_auto_compactions = true;
- options.avoid_flush_during_recovery = true;
- Reopen(options);
- ASSERT_OK(Put("foo", "v1"));
- Reopen(options);
- for (int i = 0; i < 2; ++i) {
- if (i > 0) {
- // Flush() triggers deletion of obsolete tracked files
- Flush();
- }
- VectorLogPtr log_files;
- ASSERT_OK(dbfull()->GetSortedWalFiles(log_files));
- if (i == 0) {
- ASSERT_GT(log_files.size(), 0);
- } else {
- ASSERT_EQ(0, log_files.size());
- }
- }
- }
- TEST_F(DBWALTest, RecoverWithoutFlush) {
- Options options = CurrentOptions();
- options.avoid_flush_during_recovery = true;
- options.create_if_missing = false;
- options.disable_auto_compactions = true;
- options.write_buffer_size = 64 * 1024 * 1024;
- size_t count = RecoveryTestHelper::FillData(this, &options);
- auto validateData = [this, count]() {
- for (size_t i = 0; i < count; i++) {
- ASSERT_NE(Get("key" + ToString(i)), "NOT_FOUND");
- }
- };
- Reopen(options);
- validateData();
- // Insert some data without flush
- ASSERT_OK(Put("foo", "foo_v1"));
- ASSERT_OK(Put("bar", "bar_v1"));
- Reopen(options);
- validateData();
- ASSERT_EQ(Get("foo"), "foo_v1");
- ASSERT_EQ(Get("bar"), "bar_v1");
- // Insert again and reopen
- ASSERT_OK(Put("foo", "foo_v2"));
- ASSERT_OK(Put("bar", "bar_v2"));
- Reopen(options);
- validateData();
- ASSERT_EQ(Get("foo"), "foo_v2");
- ASSERT_EQ(Get("bar"), "bar_v2");
- // manual flush and insert again
- Flush();
- ASSERT_EQ(Get("foo"), "foo_v2");
- ASSERT_EQ(Get("bar"), "bar_v2");
- ASSERT_OK(Put("foo", "foo_v3"));
- ASSERT_OK(Put("bar", "bar_v3"));
- Reopen(options);
- validateData();
- ASSERT_EQ(Get("foo"), "foo_v3");
- ASSERT_EQ(Get("bar"), "bar_v3");
- }
- TEST_F(DBWALTest, RecoverWithoutFlushMultipleCF) {
- const std::string kSmallValue = "v";
- const std::string kLargeValue = DummyString(1024);
- Options options = CurrentOptions();
- options.avoid_flush_during_recovery = true;
- options.create_if_missing = false;
- options.disable_auto_compactions = true;
- auto countWalFiles = [this]() {
- VectorLogPtr log_files;
- dbfull()->GetSortedWalFiles(log_files);
- return log_files.size();
- };
- // Create DB with multiple column families and multiple log files.
- CreateAndReopenWithCF({"one", "two"}, options);
- ASSERT_OK(Put(0, "key1", kSmallValue));
- ASSERT_OK(Put(1, "key2", kLargeValue));
- Flush(1);
- ASSERT_EQ(1, countWalFiles());
- ASSERT_OK(Put(0, "key3", kSmallValue));
- ASSERT_OK(Put(2, "key4", kLargeValue));
- Flush(2);
- ASSERT_EQ(2, countWalFiles());
- // Reopen, insert and flush.
- options.db_write_buffer_size = 64 * 1024 * 1024;
- ReopenWithColumnFamilies({"default", "one", "two"}, options);
- ASSERT_EQ(Get(0, "key1"), kSmallValue);
- ASSERT_EQ(Get(1, "key2"), kLargeValue);
- ASSERT_EQ(Get(0, "key3"), kSmallValue);
- ASSERT_EQ(Get(2, "key4"), kLargeValue);
- // Insert more data.
- ASSERT_OK(Put(0, "key5", kLargeValue));
- ASSERT_OK(Put(1, "key6", kLargeValue));
- ASSERT_EQ(3, countWalFiles());
- Flush(1);
- ASSERT_OK(Put(2, "key7", kLargeValue));
- dbfull()->FlushWAL(false);
- ASSERT_EQ(4, countWalFiles());
- // Reopen twice and validate.
- for (int i = 0; i < 2; i++) {
- ReopenWithColumnFamilies({"default", "one", "two"}, options);
- ASSERT_EQ(Get(0, "key1"), kSmallValue);
- ASSERT_EQ(Get(1, "key2"), kLargeValue);
- ASSERT_EQ(Get(0, "key3"), kSmallValue);
- ASSERT_EQ(Get(2, "key4"), kLargeValue);
- ASSERT_EQ(Get(0, "key5"), kLargeValue);
- ASSERT_EQ(Get(1, "key6"), kLargeValue);
- ASSERT_EQ(Get(2, "key7"), kLargeValue);
- ASSERT_EQ(4, countWalFiles());
- }
- }
- // In this test we are trying to do the following:
- // 1. Create a DB with corrupted WAL log;
- // 2. Open with avoid_flush_during_recovery = true;
- // 3. Append more data without flushing, which creates new WAL log.
- // 4. Open again. See if it can correctly handle previous corruption.
- TEST_F(DBWALTest, RecoverFromCorruptedWALWithoutFlush) {
- const int jstart = RecoveryTestHelper::kWALFileOffset;
- const int jend = jstart + RecoveryTestHelper::kWALFilesCount;
- const int kAppendKeys = 100;
- Options options = CurrentOptions();
- options.avoid_flush_during_recovery = true;
- options.create_if_missing = false;
- options.disable_auto_compactions = true;
- options.write_buffer_size = 64 * 1024 * 1024;
- auto getAll = [this]() {
- std::vector<std::pair<std::string, std::string>> data;
- ReadOptions ropt;
- Iterator* iter = dbfull()->NewIterator(ropt);
- for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
- data.push_back(
- std::make_pair(iter->key().ToString(), iter->value().ToString()));
- }
- delete iter;
- return data;
- };
- for (auto& mode : wal_recovery_mode_string_map) {
- options.wal_recovery_mode = mode.second;
- for (auto trunc : {true, false}) {
- for (int i = 0; i < 4; i++) {
- for (int j = jstart; j < jend; j++) {
- // Create corrupted WAL
- RecoveryTestHelper::FillData(this, &options);
- RecoveryTestHelper::CorruptWAL(this, options, /*off=*/i * .3,
- /*len%=*/.1, /*wal=*/j, trunc);
- // Skip the test if DB won't open.
- if (!TryReopen(options).ok()) {
- ASSERT_TRUE(options.wal_recovery_mode ==
- WALRecoveryMode::kAbsoluteConsistency ||
- (!trunc &&
- options.wal_recovery_mode ==
- WALRecoveryMode::kTolerateCorruptedTailRecords));
- continue;
- }
- ASSERT_OK(TryReopen(options));
- // Append some more data.
- for (int k = 0; k < kAppendKeys; k++) {
- std::string key = "extra_key" + ToString(k);
- std::string value = DummyString(RecoveryTestHelper::kValueSize);
- ASSERT_OK(Put(key, value));
- }
- // Save data for comparison.
- auto data = getAll();
- // Reopen. Verify data.
- ASSERT_OK(TryReopen(options));
- auto actual_data = getAll();
- ASSERT_EQ(data, actual_data);
- }
- }
- }
- }
- }
- // Tests that total log size is recovered if we set
- // avoid_flush_during_recovery=true.
- // Flush should trigger if max_total_wal_size is reached.
- TEST_F(DBWALTest, RestoreTotalLogSizeAfterRecoverWithoutFlush) {
- class TestFlushListener : public EventListener {
- public:
- std::atomic<int> count{0};
- TestFlushListener() = default;
- void OnFlushBegin(DB* /*db*/, const FlushJobInfo& flush_job_info) override {
- count++;
- assert(FlushReason::kWriteBufferManager == flush_job_info.flush_reason);
- }
- };
- std::shared_ptr<TestFlushListener> test_listener =
- std::make_shared<TestFlushListener>();
- constexpr size_t kKB = 1024;
- constexpr size_t kMB = 1024 * 1024;
- Options options = CurrentOptions();
- options.avoid_flush_during_recovery = true;
- options.max_total_wal_size = 1 * kMB;
- options.listeners.push_back(test_listener);
- // Have to open DB in multi-CF mode to trigger flush when
- // max_total_wal_size is reached.
- CreateAndReopenWithCF({"one"}, options);
- // Write some keys and we will end up with one log file which is slightly
- // smaller than 1MB.
- std::string value_100k(100 * kKB, 'v');
- std::string value_300k(300 * kKB, 'v');
- ASSERT_OK(Put(0, "foo", "v1"));
- for (int i = 0; i < 9; i++) {
- ASSERT_OK(Put(1, "key" + ToString(i), value_100k));
- }
- // Get log files before reopen.
- VectorLogPtr log_files_before;
- ASSERT_OK(dbfull()->GetSortedWalFiles(log_files_before));
- ASSERT_EQ(1, log_files_before.size());
- uint64_t log_size_before = log_files_before[0]->SizeFileBytes();
- ASSERT_GT(log_size_before, 900 * kKB);
- ASSERT_LT(log_size_before, 1 * kMB);
- ReopenWithColumnFamilies({"default", "one"}, options);
- // Write one more value to make log larger than 1MB.
- ASSERT_OK(Put(1, "bar", value_300k));
- // Get log files again. A new log file will be opened.
- VectorLogPtr log_files_after_reopen;
- ASSERT_OK(dbfull()->GetSortedWalFiles(log_files_after_reopen));
- ASSERT_EQ(2, log_files_after_reopen.size());
- ASSERT_EQ(log_files_before[0]->LogNumber(),
- log_files_after_reopen[0]->LogNumber());
- ASSERT_GT(log_files_after_reopen[0]->SizeFileBytes() +
- log_files_after_reopen[1]->SizeFileBytes(),
- 1 * kMB);
- // Write one more key to trigger flush.
- ASSERT_OK(Put(0, "foo", "v2"));
- dbfull()->TEST_WaitForFlushMemTable();
- // Flushed two column families.
- ASSERT_EQ(2, test_listener->count.load());
- }
- #if defined(ROCKSDB_PLATFORM_POSIX)
- #if defined(ROCKSDB_FALLOCATE_PRESENT)
- // Tests that we will truncate the preallocated space of the last log from
- // previous.
- TEST_F(DBWALTest, TruncateLastLogAfterRecoverWithoutFlush) {
- constexpr size_t kKB = 1024;
- Options options = CurrentOptions();
- options.avoid_flush_during_recovery = true;
- DestroyAndReopen(options);
- size_t preallocated_size =
- dbfull()->TEST_GetWalPreallocateBlockSize(options.write_buffer_size);
- ASSERT_OK(Put("foo", "v1"));
- VectorLogPtr log_files_before;
- ASSERT_OK(dbfull()->GetSortedWalFiles(log_files_before));
- ASSERT_EQ(1, log_files_before.size());
- auto& file_before = log_files_before[0];
- ASSERT_LT(file_before->SizeFileBytes(), 1 * kKB);
- // The log file has preallocated space.
- ASSERT_GE(GetAllocatedFileSize(dbname_ + file_before->PathName()),
- preallocated_size);
- Reopen(options);
- VectorLogPtr log_files_after;
- ASSERT_OK(dbfull()->GetSortedWalFiles(log_files_after));
- ASSERT_EQ(1, log_files_after.size());
- ASSERT_LT(log_files_after[0]->SizeFileBytes(), 1 * kKB);
- // The preallocated space should be truncated.
- ASSERT_LT(GetAllocatedFileSize(dbname_ + file_before->PathName()),
- preallocated_size);
- }
- #endif // ROCKSDB_FALLOCATE_PRESENT
- #endif // ROCKSDB_PLATFORM_POSIX
- #endif // ROCKSDB_LITE
- TEST_F(DBWALTest, WalTermTest) {
- Options options = CurrentOptions();
- options.env = env_;
- CreateAndReopenWithCF({"pikachu"}, options);
- ASSERT_OK(Put(1, "foo", "bar"));
- WriteOptions wo;
- wo.sync = true;
- wo.disableWAL = false;
- WriteBatch batch;
- batch.Put("foo", "bar");
- batch.MarkWalTerminationPoint();
- batch.Put("foo2", "bar2");
- ASSERT_OK(dbfull()->Write(wo, &batch));
- // make sure we can re-open it.
- ASSERT_OK(TryReopenWithColumnFamilies({"default", "pikachu"}, options));
- ASSERT_EQ("bar", Get(1, "foo"));
- ASSERT_EQ("NOT_FOUND", Get(1, "foo2"));
- }
- } // namespace ROCKSDB_NAMESPACE
- int main(int argc, char** argv) {
- ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
- ::testing::InitGoogleTest(&argc, argv);
- return RUN_ALL_TESTS();
- }
|