| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645264626472648264926502651265226532654265526562657265826592660266126622663266426652666266726682669267026712672267326742675267626772678267926802681268226832684268526862687268826892690269126922693269426952696269726982699270027012702270327042705270627072708270927102711271227132714271527162717271827192720272127222723272427252726272727282729273027312732273327342735273627372738273927402741274227432744274527462747274827492750275127522753275427552756275727582759276027612762276327642765276627672768276927702771277227732774277527762777277827792780278127822783278427852786278727882789279027912792279327942795279627972798279928002801280228032804280528062807280828092810281128122813281428152816281728182819282028212822282328242825282628272828282928302831283228332834283528362837283828392840284128422843284428452846284728482849285028512852285328542855285628572858285928602861286228632864286528662867286828692870287128722873287428752876287728782879288028812882288328842885288628872888288928902891289228932894289528962897289828992900290129022903290429052906290729082909291029112912291329142915291629172918291929202921292229232924292529262927292829292930293129322933293429352936293729382939294029412942294329442945294629472948294929502951295229532954295529562957295829592960296129622963296429652966296729682969297029712972297329742975297629772978297929802981298229832984298529862987298829892990299129922993299429952996299729982999300030013002300330043005300630073008300930103011301230133014301530163017301830193020302130223023302430253026302730283029303030313032303330343035303630373038303930403041304230433044304530463047304830493050305130523053305430553056305730583059306030613062306330643065306630673068306930703071307230733074307530763077307830793080308130823083308430853086308730883089309030913092309330943095309630973098309931003101310231033104310531063107310831093110311131123113311431153116311731183119312031213122312331243125312631273128312931303131313231333134313531363137313831393140314131423143314431453146314731483149315031513152 |
- // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
- // This source code is licensed under both the GPLv2 (found in the
- // COPYING file in the root directory) and Apache 2.0 License
- // (found in the LICENSE.Apache file in the root directory).
- //
- // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
- // Use of this source code is governed by a BSD-style license that can be
- // found in the LICENSE file. See the AUTHORS file for names of contributors.
- #include "db/db_test_util.h"
- #include "db/db_with_timestamp_test_util.h"
- #include "options/options_helper.h"
- #include "port/port.h"
- #include "port/stack_trace.h"
- #include "rocksdb/file_system.h"
- #include "test_util/sync_point.h"
- #include "util/defer.h"
- #include "util/udt_util.h"
- #include "utilities/fault_injection_env.h"
- #include "utilities/fault_injection_fs.h"
- namespace ROCKSDB_NAMESPACE {
- class DBWALTestBase : public DBTestBase {
- protected:
- explicit DBWALTestBase(const std::string& dir_name)
- : DBTestBase(dir_name, /*env_do_fsync=*/true) {}
- #if defined(ROCKSDB_PLATFORM_POSIX)
- public:
- #if defined(ROCKSDB_FALLOCATE_PRESENT)
- bool IsFallocateSupported() {
- // Test fallocate support of running file system.
- // Skip this test if fallocate is not supported.
- std::string fname_test_fallocate = dbname_ + "/preallocate_testfile";
- int fd = -1;
- do {
- fd = open(fname_test_fallocate.c_str(), O_CREAT | O_RDWR | O_TRUNC, 0644);
- } while (fd < 0 && errno == EINTR);
- assert(fd > 0);
- int alloc_status = fallocate(fd, 0, 0, 1);
- int err_number = errno;
- close(fd);
- assert(env_->DeleteFile(fname_test_fallocate) == Status::OK());
- if (err_number == ENOSYS || err_number == EOPNOTSUPP) {
- fprintf(stderr, "Skipped preallocated space check: %s\n",
- errnoStr(err_number).c_str());
- return false;
- }
- assert(alloc_status == 0);
- return true;
- }
- #endif // ROCKSDB_FALLOCATE_PRESENT
- uint64_t GetAllocatedFileSize(std::string file_name) {
- struct stat sbuf;
- int err = stat(file_name.c_str(), &sbuf);
- assert(err == 0);
- return sbuf.st_blocks * 512;
- }
- #endif // ROCKSDB_PLATFORM_POSIX
- };
- class DBWALTest : public DBWALTestBase {
- public:
- DBWALTest() : DBWALTestBase("/db_wal_test") {}
- };
- // A SpecialEnv enriched to give more insight about deleted files
- class EnrichedSpecialEnv : public SpecialEnv {
- public:
- explicit EnrichedSpecialEnv(Env* base) : SpecialEnv(base) {}
- Status NewSequentialFile(const std::string& f,
- std::unique_ptr<SequentialFile>* r,
- const EnvOptions& soptions) override {
- InstrumentedMutexLock l(&env_mutex_);
- if (f == skipped_wal) {
- deleted_wal_reopened = true;
- if (IsWAL(f) && largest_deleted_wal.size() != 0 &&
- f.compare(largest_deleted_wal) <= 0) {
- gap_in_wals = true;
- }
- }
- return SpecialEnv::NewSequentialFile(f, r, soptions);
- }
- Status DeleteFile(const std::string& fname) override {
- if (IsWAL(fname)) {
- deleted_wal_cnt++;
- InstrumentedMutexLock l(&env_mutex_);
- // If this is the first WAL, remember its name and skip deleting it. We
- // remember its name partly because the application might attempt to
- // delete the file again.
- if (skipped_wal.size() != 0 && skipped_wal != fname) {
- if (largest_deleted_wal.size() == 0 ||
- largest_deleted_wal.compare(fname) < 0) {
- largest_deleted_wal = fname;
- }
- } else {
- skipped_wal = fname;
- return Status::OK();
- }
- }
- return SpecialEnv::DeleteFile(fname);
- }
- bool IsWAL(const std::string& fname) {
- // printf("iswal %s\n", fname.c_str());
- return fname.compare(fname.size() - 3, 3, "log") == 0;
- }
- InstrumentedMutex env_mutex_;
- // the wal whose actual delete was skipped by the env
- std::string skipped_wal;
- // the largest WAL that was requested to be deleted
- std::string largest_deleted_wal;
- // number of WALs that were successfully deleted
- std::atomic<size_t> deleted_wal_cnt = {0};
- // the WAL whose delete from fs was skipped is reopened during recovery
- std::atomic<bool> deleted_wal_reopened = {false};
- // whether a gap in the WALs was detected during recovery
- std::atomic<bool> gap_in_wals = {false};
- };
- class DBWALTestWithEnrichedEnv : public DBTestBase {
- public:
- DBWALTestWithEnrichedEnv()
- : DBTestBase("db_wal_test", /*env_do_fsync=*/true) {
- enriched_env_ = new EnrichedSpecialEnv(env_->target());
- auto options = CurrentOptions();
- options.env = enriched_env_;
- options.allow_2pc = true;
- Reopen(options);
- delete env_;
- // to be deleted by the parent class
- env_ = enriched_env_;
- }
- protected:
- EnrichedSpecialEnv* enriched_env_;
- };
- // Test that the recovery would successfully avoid the gaps between the logs.
- // One known scenario that could cause this is that the application issue the
- // WAL deletion out of order. For the sake of simplicity in the test, here we
- // create the gap by manipulating the env to skip deletion of the first WAL but
- // not the ones after it.
- TEST_F(DBWALTestWithEnrichedEnv, SkipDeletedWALs) {
- auto options = last_options_;
- // To cause frequent WAL deletion
- options.write_buffer_size = 128;
- Reopen(options);
- SyncPoint::GetInstance()->LoadDependency(
- {{"DBImpl::PurgeObsoleteFiles:End",
- "DBWALTestWithEnrichedEnv.SkipDeletedWALs:AfterFlush"}});
- SyncPoint::GetInstance()->EnableProcessing();
- WriteOptions writeOpt = WriteOptions();
- for (int i = 0; i < 128 * 5; i++) {
- ASSERT_OK(dbfull()->Put(writeOpt, "foo", "v1"));
- }
- FlushOptions fo;
- fo.wait = true;
- ASSERT_OK(db_->Flush(fo));
- TEST_SYNC_POINT("DBWALTestWithEnrichedEnv.SkipDeletedWALs:AfterFlush");
- // some wals are deleted
- ASSERT_NE(0, enriched_env_->deleted_wal_cnt);
- // but not the first one
- ASSERT_NE(0, enriched_env_->skipped_wal.size());
- // Test that the WAL that was not deleted will be skipped during recovery
- options = last_options_;
- Reopen(options);
- ASSERT_FALSE(enriched_env_->deleted_wal_reopened);
- ASSERT_FALSE(enriched_env_->gap_in_wals);
- SyncPoint::GetInstance()->DisableProcessing();
- }
- TEST_F(DBWALTest, WAL) {
- do {
- CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
- WriteOptions writeOpt = WriteOptions();
- writeOpt.disableWAL = true;
- ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v1"));
- ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v1"));
- ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
- ASSERT_EQ("v1", Get(1, "foo"));
- ASSERT_EQ("v1", Get(1, "bar"));
- writeOpt.disableWAL = false;
- ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v2"));
- writeOpt.disableWAL = true;
- ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v2"));
- ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
- // Both value's should be present.
- ASSERT_EQ("v2", Get(1, "bar"));
- ASSERT_EQ("v2", Get(1, "foo"));
- writeOpt.disableWAL = true;
- ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v3"));
- writeOpt.disableWAL = false;
- ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v3"));
- ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
- // again both values should be present.
- ASSERT_EQ("v3", Get(1, "foo"));
- ASSERT_EQ("v3", Get(1, "bar"));
- } while (ChangeWalOptions());
- }
- TEST_F(DBWALTest, RollLog) {
- do {
- CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
- ASSERT_OK(Put(1, "foo", "v1"));
- ASSERT_OK(Put(1, "baz", "v5"));
- ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
- for (int i = 0; i < 10; i++) {
- ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
- }
- ASSERT_OK(Put(1, "foo", "v4"));
- for (int i = 0; i < 10; i++) {
- ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
- }
- } while (ChangeWalOptions());
- }
- TEST_F(DBWALTest, SyncWALNotBlockWrite) {
- Options options = CurrentOptions();
- options.max_write_buffer_number = 4;
- DestroyAndReopen(options);
- ASSERT_OK(Put("foo1", "bar1"));
- ASSERT_OK(Put("foo5", "bar5"));
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
- {"WritableFileWriter::SyncWithoutFlush:1",
- "DBWALTest::SyncWALNotBlockWrite:1"},
- {"DBWALTest::SyncWALNotBlockWrite:2",
- "WritableFileWriter::SyncWithoutFlush:2"},
- });
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
- ROCKSDB_NAMESPACE::port::Thread thread([&]() { ASSERT_OK(db_->SyncWAL()); });
- TEST_SYNC_POINT("DBWALTest::SyncWALNotBlockWrite:1");
- ASSERT_OK(Put("foo2", "bar2"));
- ASSERT_OK(Put("foo3", "bar3"));
- FlushOptions fo;
- fo.wait = false;
- ASSERT_OK(db_->Flush(fo));
- ASSERT_OK(Put("foo4", "bar4"));
- TEST_SYNC_POINT("DBWALTest::SyncWALNotBlockWrite:2");
- thread.join();
- ASSERT_EQ(Get("foo1"), "bar1");
- ASSERT_EQ(Get("foo2"), "bar2");
- ASSERT_EQ(Get("foo3"), "bar3");
- ASSERT_EQ(Get("foo4"), "bar4");
- ASSERT_EQ(Get("foo5"), "bar5");
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
- }
- TEST_F(DBWALTest, SyncWALNotWaitWrite) {
- ASSERT_OK(Put("foo1", "bar1"));
- ASSERT_OK(Put("foo3", "bar3"));
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
- {"SpecialEnv::SpecialWalFile::Append:1",
- "DBWALTest::SyncWALNotWaitWrite:1"},
- {"DBWALTest::SyncWALNotWaitWrite:2",
- "SpecialEnv::SpecialWalFile::Append:2"},
- });
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
- ROCKSDB_NAMESPACE::port::Thread thread(
- [&]() { ASSERT_OK(Put("foo2", "bar2")); });
- // Moving this to SyncWAL before the actual fsync
- // TEST_SYNC_POINT("DBWALTest::SyncWALNotWaitWrite:1");
- ASSERT_OK(db_->SyncWAL());
- // Moving this to SyncWAL after actual fsync
- // TEST_SYNC_POINT("DBWALTest::SyncWALNotWaitWrite:2");
- thread.join();
- ASSERT_EQ(Get("foo1"), "bar1");
- ASSERT_EQ(Get("foo2"), "bar2");
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
- }
- TEST_F(DBWALTest, Recover) {
- do {
- CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
- ASSERT_OK(Put(1, "foo", "v1"));
- ASSERT_OK(Put(1, "baz", "v5"));
- ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
- ASSERT_EQ("v1", Get(1, "foo"));
- ASSERT_EQ("v1", Get(1, "foo"));
- ASSERT_EQ("v5", Get(1, "baz"));
- ASSERT_OK(Put(1, "bar", "v2"));
- ASSERT_OK(Put(1, "foo", "v3"));
- ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
- ASSERT_EQ("v3", Get(1, "foo"));
- ASSERT_OK(Put(1, "foo", "v4"));
- ASSERT_EQ("v4", Get(1, "foo"));
- ASSERT_EQ("v2", Get(1, "bar"));
- ASSERT_EQ("v5", Get(1, "baz"));
- } while (ChangeWalOptions());
- }
- class DBWALTestWithTimestamp
- : public DBBasicTestWithTimestampBase,
- public testing::WithParamInterface<test::UserDefinedTimestampTestMode> {
- public:
- DBWALTestWithTimestamp()
- : DBBasicTestWithTimestampBase("db_wal_test_with_timestamp") {}
- Status CreateAndReopenWithTs(const std::vector<std::string>& cfs,
- const Options& ts_options, bool persist_udt,
- bool avoid_flush_during_recovery = false) {
- Options default_options = CurrentOptions();
- default_options.allow_concurrent_memtable_write =
- persist_udt ? true : false;
- DestroyAndReopen(default_options);
- CreateColumnFamilies(cfs, ts_options);
- return ReopenColumnFamiliesWithTs(cfs, ts_options, persist_udt,
- avoid_flush_during_recovery);
- }
- Status ReopenColumnFamiliesWithTs(const std::vector<std::string>& cfs,
- Options ts_options, bool persist_udt,
- bool avoid_flush_during_recovery = false) {
- Options default_options = CurrentOptions();
- default_options.create_if_missing = false;
- default_options.allow_concurrent_memtable_write =
- persist_udt ? true : false;
- default_options.avoid_flush_during_recovery = avoid_flush_during_recovery;
- ts_options.create_if_missing = false;
- std::vector<Options> cf_options(cfs.size(), ts_options);
- std::vector<std::string> cfs_plus_default = cfs;
- cfs_plus_default.insert(cfs_plus_default.begin(), kDefaultColumnFamilyName);
- cf_options.insert(cf_options.begin(), default_options);
- Close();
- return TryReopenWithColumnFamilies(cfs_plus_default, cf_options);
- }
- Status Put(uint32_t cf, const Slice& key, const Slice& ts,
- const Slice& value) {
- WriteOptions write_opts;
- return db_->Put(write_opts, handles_[cf], key, ts, value);
- }
- void CheckGet(const ReadOptions& read_opts, uint32_t cf, const Slice& key,
- const std::string& expected_value,
- const std::string& expected_ts) {
- std::string actual_value;
- std::string actual_ts;
- ASSERT_OK(
- db_->Get(read_opts, handles_[cf], key, &actual_value, &actual_ts));
- ASSERT_EQ(expected_value, actual_value);
- ASSERT_EQ(expected_ts, actual_ts);
- }
- };
- TEST_P(DBWALTestWithTimestamp, RecoverAndNoFlush) {
- // Set up the option that enables user defined timestmp size.
- std::string ts1;
- PutFixed64(&ts1, 1);
- Options ts_options;
- ts_options.create_if_missing = true;
- ts_options.comparator = test::BytewiseComparatorWithU64TsWrapper();
- // Test that user-defined timestamps are recovered from WAL regardless of
- // the value of this flag because UDTs are saved in WAL nonetheless.
- // We however need to explicitly disable flush during recovery by setting
- // `avoid_flush_during_recovery=true` so that we can avoid timestamps getting
- // stripped when the `persist_user_defined_timestamps` flag is false, so that
- // all written timestamps are available for testing user-defined time travel
- // read.
- bool persist_udt = test::ShouldPersistUDT(GetParam());
- ts_options.persist_user_defined_timestamps = persist_udt;
- bool avoid_flush_during_recovery = true;
- std::string full_history_ts_low;
- ReadOptions read_opts;
- do {
- Slice ts_slice = ts1;
- read_opts.timestamp = &ts_slice;
- ASSERT_OK(CreateAndReopenWithTs({"pikachu"}, ts_options, persist_udt,
- avoid_flush_during_recovery));
- ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), 0U);
- ASSERT_OK(Put(1, "foo", ts1, "v1"));
- ASSERT_OK(Put(1, "baz", ts1, "v5"));
- ASSERT_OK(ReopenColumnFamiliesWithTs({"pikachu"}, ts_options, persist_udt,
- avoid_flush_during_recovery));
- ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), 0U);
- // Do a timestamped read with ts1 after second reopen.
- CheckGet(read_opts, 1, "foo", "v1", ts1);
- CheckGet(read_opts, 1, "baz", "v5", ts1);
- // Write more value versions for key "foo" and "bar" before and after second
- // reopen.
- std::string ts2;
- PutFixed64(&ts2, 2);
- ASSERT_OK(Put(1, "bar", ts2, "v2"));
- ASSERT_OK(Put(1, "foo", ts2, "v3"));
- ASSERT_OK(ReopenColumnFamiliesWithTs({"pikachu"}, ts_options, persist_udt,
- avoid_flush_during_recovery));
- ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), 0U);
- std::string ts3;
- PutFixed64(&ts3, 3);
- ASSERT_OK(Put(1, "foo", ts3, "v4"));
- // All the key value pairs available for read:
- // "foo" -> [(ts1, "v1"), (ts2, "v3"), (ts3, "v4")]
- // "bar" -> [(ts2, "v2")]
- // "baz" -> [(ts1, "v5")]
- // Do a timestamped read with ts1 after third reopen.
- // read_opts.timestamp is set to ts1 for below reads
- CheckGet(read_opts, 1, "foo", "v1", ts1);
- std::string value;
- ASSERT_TRUE(db_->Get(read_opts, handles_[1], "bar", &value).IsNotFound());
- CheckGet(read_opts, 1, "baz", "v5", ts1);
- // Do a timestamped read with ts2 after third reopen.
- ts_slice = ts2;
- // read_opts.timestamp is set to ts2 for below reads.
- CheckGet(read_opts, 1, "foo", "v3", ts2);
- CheckGet(read_opts, 1, "bar", "v2", ts2);
- CheckGet(read_opts, 1, "baz", "v5", ts1);
- // Do a timestamped read with ts3 after third reopen.
- ts_slice = ts3;
- // read_opts.timestamp is set to ts3 for below reads.
- CheckGet(read_opts, 1, "foo", "v4", ts3);
- CheckGet(read_opts, 1, "bar", "v2", ts2);
- CheckGet(read_opts, 1, "baz", "v5", ts1);
- ASSERT_OK(db_->GetFullHistoryTsLow(handles_[1], &full_history_ts_low));
- ASSERT_TRUE(full_history_ts_low.empty());
- } while (ChangeWalOptions());
- }
- TEST_P(DBWALTestWithTimestamp, RecoverAndFlush) {
- // Set up the option that enables user defined timestamp size.
- std::string min_ts;
- std::string write_ts;
- PutFixed64(&min_ts, 0);
- PutFixed64(&write_ts, 1);
- Options ts_options;
- ts_options.create_if_missing = true;
- ts_options.comparator = test::BytewiseComparatorWithU64TsWrapper();
- bool persist_udt = test::ShouldPersistUDT(GetParam());
- ts_options.persist_user_defined_timestamps = persist_udt;
- std::string smallest_ukey_without_ts = "baz";
- std::string largest_ukey_without_ts = "foo";
- ASSERT_OK(CreateAndReopenWithTs({"pikachu"}, ts_options, persist_udt));
- // No flush, no sst files, because of no data.
- ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), 0U);
- ASSERT_OK(Put(1, largest_ukey_without_ts, write_ts, "v1"));
- ASSERT_OK(Put(1, smallest_ukey_without_ts, write_ts, "v5"));
- ASSERT_OK(ReopenColumnFamiliesWithTs({"pikachu"}, ts_options, persist_udt));
- // Memtable recovered from WAL flushed because `avoid_flush_during_recovery`
- // defaults to false, created one L0 file.
- ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), 1U);
- std::vector<std::vector<FileMetaData>> level_to_files;
- dbfull()->TEST_GetFilesMetaData(handles_[1], &level_to_files);
- std::string full_history_ts_low;
- ASSERT_OK(db_->GetFullHistoryTsLow(handles_[1], &full_history_ts_low));
- ASSERT_GT(level_to_files.size(), 1);
- // L0 only has one SST file.
- ASSERT_EQ(level_to_files[0].size(), 1);
- auto meta = level_to_files[0][0];
- if (persist_udt) {
- ASSERT_EQ(smallest_ukey_without_ts + write_ts, meta.smallest.user_key());
- ASSERT_EQ(largest_ukey_without_ts + write_ts, meta.largest.user_key());
- ASSERT_TRUE(full_history_ts_low.empty());
- } else {
- ASSERT_EQ(smallest_ukey_without_ts + min_ts, meta.smallest.user_key());
- ASSERT_EQ(largest_ukey_without_ts + min_ts, meta.largest.user_key());
- std::string effective_cutoff;
- Slice write_ts_slice = write_ts;
- GetFullHistoryTsLowFromU64CutoffTs(&write_ts_slice, &effective_cutoff);
- ASSERT_EQ(effective_cutoff, full_history_ts_low);
- }
- }
- // Param 0: test mode for the user-defined timestamp feature
- INSTANTIATE_TEST_CASE_P(
- P, DBWALTestWithTimestamp,
- ::testing::Values(
- test::UserDefinedTimestampTestMode::kStripUserDefinedTimestamp,
- test::UserDefinedTimestampTestMode::kNormal));
- TEST_F(DBWALTestWithTimestamp, EnableDisableUDT) {
- Options options;
- options.create_if_missing = true;
- options.comparator = BytewiseComparator();
- bool avoid_flush_during_recovery = true;
- ASSERT_OK(CreateAndReopenWithTs({"pikachu"}, options, true /* persist_udt */,
- avoid_flush_during_recovery));
- ASSERT_OK(db_->Put(WriteOptions(), handles_[1], "foo", "v1"));
- ASSERT_OK(db_->Put(WriteOptions(), handles_[1], "baz", "v5"));
- options.comparator = test::BytewiseComparatorWithU64TsWrapper();
- options.persist_user_defined_timestamps = false;
- // Test handle timestamp size inconsistency in WAL when enabling user-defined
- // timestamps.
- ASSERT_OK(ReopenColumnFamiliesWithTs({"pikachu"}, options,
- false /* persist_udt */,
- avoid_flush_during_recovery));
- std::string ts;
- PutFixed64(&ts, 0);
- Slice ts_slice = ts;
- ReadOptions read_opts;
- read_opts.timestamp = &ts_slice;
- // Pre-existing entries are treated as if they have the min timestamp.
- CheckGet(read_opts, 1, "foo", "v1", ts);
- CheckGet(read_opts, 1, "baz", "v5", ts);
- ts.clear();
- PutFixed64(&ts, 1);
- ASSERT_OK(db_->Put(WriteOptions(), handles_[1], "foo", ts, "v2"));
- ASSERT_OK(db_->Put(WriteOptions(), handles_[1], "baz", ts, "v6"));
- CheckGet(read_opts, 1, "foo", "v2", ts);
- CheckGet(read_opts, 1, "baz", "v6", ts);
- options.comparator = BytewiseComparator();
- // Open the column family again with the UDT feature disabled. Test handle
- // timestamp size inconsistency in WAL when disabling user-defined timestamps
- ASSERT_OK(ReopenColumnFamiliesWithTs({"pikachu"}, options,
- true /* persist_udt */,
- avoid_flush_during_recovery));
- ASSERT_EQ("v2", Get(1, "foo"));
- ASSERT_EQ("v6", Get(1, "baz"));
- }
- TEST_F(DBWALTest, RecoverWithTableHandle) {
- do {
- Options options = CurrentOptions();
- options.create_if_missing = true;
- options.disable_auto_compactions = true;
- options.avoid_flush_during_recovery = false;
- DestroyAndReopen(options);
- CreateAndReopenWithCF({"pikachu"}, options);
- ASSERT_OK(Put(1, "foo", "v1"));
- ASSERT_OK(Put(1, "bar", "v2"));
- ASSERT_OK(Flush(1));
- ASSERT_OK(Put(1, "foo", "v3"));
- ASSERT_OK(Put(1, "bar", "v4"));
- ASSERT_OK(Flush(1));
- ASSERT_OK(Put(1, "big", std::string(100, 'a')));
- options = CurrentOptions();
- const int kSmallMaxOpenFiles = 13;
- if (option_config_ == kDBLogDir) {
- // Use this option to check not preloading files
- // Set the max open files to be small enough so no preload will
- // happen.
- options.max_open_files = kSmallMaxOpenFiles;
- // RocksDB sanitize max open files to at least 20. Modify it back.
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
- "SanitizeOptions::AfterChangeMaxOpenFiles", [&](void* arg) {
- int* max_open_files = static_cast<int*>(arg);
- *max_open_files = kSmallMaxOpenFiles;
- });
- } else if (option_config_ == kWalDirAndMmapReads) {
- // Use this option to check always loading all files.
- options.max_open_files = 100;
- } else {
- options.max_open_files = -1;
- }
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
- ReopenWithColumnFamilies({"default", "pikachu"}, options);
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
- std::vector<std::vector<FileMetaData>> files;
- dbfull()->TEST_GetFilesMetaData(handles_[1], &files);
- size_t total_files = 0;
- for (const auto& level : files) {
- total_files += level.size();
- }
- ASSERT_EQ(total_files, 3);
- for (const auto& level : files) {
- for (const auto& file : level) {
- if (options.max_open_files == kSmallMaxOpenFiles) {
- ASSERT_TRUE(file.table_reader_handle == nullptr);
- } else {
- ASSERT_TRUE(file.table_reader_handle != nullptr);
- }
- }
- }
- } while (ChangeWalOptions());
- }
- TEST_F(DBWALTest, RecoverWithBlob) {
- // Write a value that's below the prospective size limit for blobs and another
- // one that's above. Note that blob files are not actually enabled at this
- // point.
- constexpr uint64_t min_blob_size = 10;
- constexpr char short_value[] = "short";
- static_assert(sizeof(short_value) - 1 < min_blob_size,
- "short_value too long");
- constexpr char long_value[] = "long_value";
- static_assert(sizeof(long_value) - 1 >= min_blob_size,
- "long_value too short");
- ASSERT_OK(Put("key1", short_value));
- ASSERT_OK(Put("key2", long_value));
- // There should be no files just yet since we haven't flushed.
- {
- VersionSet* const versions = dbfull()->GetVersionSet();
- ASSERT_NE(versions, nullptr);
- ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
- ASSERT_NE(cfd, nullptr);
- Version* const current = cfd->current();
- ASSERT_NE(current, nullptr);
- const VersionStorageInfo* const storage_info = current->storage_info();
- ASSERT_NE(storage_info, nullptr);
- ASSERT_EQ(storage_info->num_non_empty_levels(), 0);
- ASSERT_TRUE(storage_info->GetBlobFiles().empty());
- }
- // Reopen the database with blob files enabled. A new table file/blob file
- // pair should be written during recovery.
- Options options;
- options.enable_blob_files = true;
- options.min_blob_size = min_blob_size;
- options.avoid_flush_during_recovery = false;
- options.disable_auto_compactions = true;
- options.env = env_;
- Reopen(options);
- ASSERT_EQ(Get("key1"), short_value);
- ASSERT_EQ(Get("key2"), long_value);
- VersionSet* const versions = dbfull()->GetVersionSet();
- ASSERT_NE(versions, nullptr);
- ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
- ASSERT_NE(cfd, nullptr);
- Version* const current = cfd->current();
- ASSERT_NE(current, nullptr);
- const VersionStorageInfo* const storage_info = current->storage_info();
- ASSERT_NE(storage_info, nullptr);
- const auto& l0_files = storage_info->LevelFiles(0);
- ASSERT_EQ(l0_files.size(), 1);
- const FileMetaData* const table_file = l0_files[0];
- ASSERT_NE(table_file, nullptr);
- const auto& blob_files = storage_info->GetBlobFiles();
- ASSERT_EQ(blob_files.size(), 1);
- const auto& blob_file = blob_files.front();
- ASSERT_NE(blob_file, nullptr);
- ASSERT_EQ(table_file->smallest.user_key(), "key1");
- ASSERT_EQ(table_file->largest.user_key(), "key2");
- ASSERT_EQ(table_file->fd.smallest_seqno, 1);
- ASSERT_EQ(table_file->fd.largest_seqno, 2);
- ASSERT_EQ(table_file->oldest_blob_file_number,
- blob_file->GetBlobFileNumber());
- ASSERT_EQ(blob_file->GetTotalBlobCount(), 1);
- const InternalStats* const internal_stats = cfd->internal_stats();
- ASSERT_NE(internal_stats, nullptr);
- const auto& compaction_stats = internal_stats->TEST_GetCompactionStats();
- ASSERT_FALSE(compaction_stats.empty());
- ASSERT_EQ(compaction_stats[0].bytes_written, table_file->fd.GetFileSize());
- ASSERT_EQ(compaction_stats[0].bytes_written_blob,
- blob_file->GetTotalBlobBytes());
- ASSERT_EQ(compaction_stats[0].num_output_files, 1);
- ASSERT_EQ(compaction_stats[0].num_output_files_blob, 1);
- const uint64_t* const cf_stats_value = internal_stats->TEST_GetCFStatsValue();
- ASSERT_EQ(cf_stats_value[InternalStats::BYTES_FLUSHED],
- compaction_stats[0].bytes_written +
- compaction_stats[0].bytes_written_blob);
- }
- TEST_F(DBWALTest, RecoverWithBlobMultiSST) {
- // Write several large (4 KB) values without flushing. Note that blob files
- // are not actually enabled at this point.
- std::string large_value(1 << 12, 'a');
- constexpr int num_keys = 64;
- for (int i = 0; i < num_keys; ++i) {
- ASSERT_OK(Put(Key(i), large_value));
- }
- // There should be no files just yet since we haven't flushed.
- {
- VersionSet* const versions = dbfull()->GetVersionSet();
- ASSERT_NE(versions, nullptr);
- ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
- ASSERT_NE(cfd, nullptr);
- Version* const current = cfd->current();
- ASSERT_NE(current, nullptr);
- const VersionStorageInfo* const storage_info = current->storage_info();
- ASSERT_NE(storage_info, nullptr);
- ASSERT_EQ(storage_info->num_non_empty_levels(), 0);
- ASSERT_TRUE(storage_info->GetBlobFiles().empty());
- }
- // Reopen the database with blob files enabled and write buffer size set to a
- // smaller value. Multiple table files+blob files should be written and added
- // to the Version during recovery.
- Options options;
- options.write_buffer_size = 1 << 16; // 64 KB
- options.enable_blob_files = true;
- options.avoid_flush_during_recovery = false;
- options.disable_auto_compactions = true;
- options.env = env_;
- Reopen(options);
- for (int i = 0; i < num_keys; ++i) {
- ASSERT_EQ(Get(Key(i)), large_value);
- }
- VersionSet* const versions = dbfull()->GetVersionSet();
- ASSERT_NE(versions, nullptr);
- ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
- ASSERT_NE(cfd, nullptr);
- Version* const current = cfd->current();
- ASSERT_NE(current, nullptr);
- const VersionStorageInfo* const storage_info = current->storage_info();
- ASSERT_NE(storage_info, nullptr);
- const auto& l0_files = storage_info->LevelFiles(0);
- ASSERT_GT(l0_files.size(), 1);
- const auto& blob_files = storage_info->GetBlobFiles();
- ASSERT_GT(blob_files.size(), 1);
- ASSERT_EQ(l0_files.size(), blob_files.size());
- }
- TEST_F(DBWALTest, WALWithChecksumHandoff) {
- #ifndef ROCKSDB_ASSERT_STATUS_CHECKED
- if (mem_env_ || encrypted_env_) {
- ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment");
- return;
- }
- std::shared_ptr<FaultInjectionTestFS> fault_fs(
- new FaultInjectionTestFS(FileSystem::Default()));
- std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
- do {
- Options options = CurrentOptions();
- options.checksum_handoff_file_types.Add(FileType::kWalFile);
- options.env = fault_fs_env.get();
- fault_fs->SetChecksumHandoffFuncType(ChecksumType::kCRC32c);
- CreateAndReopenWithCF({"pikachu"}, options);
- WriteOptions writeOpt = WriteOptions();
- writeOpt.disableWAL = true;
- ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v1"));
- ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v1"));
- ReopenWithColumnFamilies({"default", "pikachu"}, options);
- ASSERT_EQ("v1", Get(1, "foo"));
- ASSERT_EQ("v1", Get(1, "bar"));
- writeOpt.disableWAL = false;
- ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v2"));
- writeOpt.disableWAL = true;
- ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v2"));
- ReopenWithColumnFamilies({"default", "pikachu"}, options);
- // Both value's should be present.
- ASSERT_EQ("v2", Get(1, "bar"));
- ASSERT_EQ("v2", Get(1, "foo"));
- writeOpt.disableWAL = true;
- // This put, data is persisted by Flush
- ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v3"));
- ReopenWithColumnFamilies({"default", "pikachu"}, options);
- writeOpt.disableWAL = false;
- // Data is persisted in the WAL
- ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "zoo", "v3"));
- ASSERT_OK(dbfull()->SyncWAL());
- // The hash does not match, write fails
- fault_fs->SetChecksumHandoffFuncType(ChecksumType::kxxHash);
- writeOpt.disableWAL = false;
- ASSERT_NOK(dbfull()->Put(writeOpt, handles_[1], "foo", "v3"));
- ReopenWithColumnFamilies({"default", "pikachu"}, options);
- // Due to the write failure, Get should not find
- ASSERT_NE("v3", Get(1, "foo"));
- ASSERT_EQ("v3", Get(1, "zoo"));
- ASSERT_EQ("v3", Get(1, "bar"));
- fault_fs->SetChecksumHandoffFuncType(ChecksumType::kCRC32c);
- // Each write will be similated as corrupted.
- fault_fs->IngestDataCorruptionBeforeWrite();
- writeOpt.disableWAL = true;
- ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v4"));
- writeOpt.disableWAL = false;
- ASSERT_NOK(dbfull()->Put(writeOpt, handles_[1], "foo", "v4"));
- ReopenWithColumnFamilies({"default", "pikachu"}, options);
- ASSERT_NE("v4", Get(1, "foo"));
- ASSERT_NE("v4", Get(1, "bar"));
- fault_fs->NoDataCorruptionBeforeWrite();
- fault_fs->SetChecksumHandoffFuncType(ChecksumType::kNoChecksum);
- // The file system does not provide checksum method and verification.
- writeOpt.disableWAL = true;
- ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v5"));
- writeOpt.disableWAL = false;
- ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v5"));
- ReopenWithColumnFamilies({"default", "pikachu"}, options);
- ASSERT_EQ("v5", Get(1, "foo"));
- ASSERT_EQ("v5", Get(1, "bar"));
- Destroy(options);
- } while (ChangeWalOptions());
- #endif // ROCKSDB_ASSERT_STATUS_CHECKED
- }
- TEST_F(DBWALTest, LockWal) {
- do {
- Options options = CurrentOptions();
- options.create_if_missing = true;
- DestroyAndReopen(options);
- ASSERT_OK(Put("foo", "v"));
- ASSERT_OK(Put("bar", "v"));
- ASSERT_OK(db_->LockWAL());
- // Verify writes are stopped
- WriteOptions wopts;
- wopts.no_slowdown = true;
- Status s = db_->Put(wopts, "foo", "dontcare");
- ASSERT_TRUE(s.IsIncomplete());
- {
- VectorLogPtr wals;
- ASSERT_OK(db_->GetSortedWalFiles(wals));
- ASSERT_FALSE(wals.empty());
- }
- port::Thread worker([&]() {
- Status tmp_s = db_->Flush(FlushOptions());
- ASSERT_OK(tmp_s);
- });
- FlushOptions flush_opts;
- flush_opts.wait = false;
- s = db_->Flush(flush_opts);
- ASSERT_TRUE(s.IsTryAgain());
- ASSERT_OK(db_->UnlockWAL());
- ASSERT_OK(db_->Put(WriteOptions(), "foo", "dontcare"));
- worker.join();
- } while (ChangeWalOptions());
- }
- class DBRecoveryTestBlobError
- : public DBWALTest,
- public testing::WithParamInterface<std::string> {
- public:
- DBRecoveryTestBlobError() : sync_point_(GetParam()) {}
- std::string sync_point_;
- };
- INSTANTIATE_TEST_CASE_P(DBRecoveryTestBlobError, DBRecoveryTestBlobError,
- ::testing::ValuesIn(std::vector<std::string>{
- "BlobFileBuilder::WriteBlobToFile:AddRecord",
- "BlobFileBuilder::WriteBlobToFile:AppendFooter"}));
- TEST_P(DBRecoveryTestBlobError, RecoverWithBlobError) {
- // Write a value. Note that blob files are not actually enabled at this point.
- ASSERT_OK(Put("key", "blob"));
- // Reopen with blob files enabled but make blob file writing fail during
- // recovery.
- SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* arg) {
- Status* const s = static_cast<Status*>(arg);
- assert(s);
- (*s) = Status::IOError(sync_point_);
- });
- SyncPoint::GetInstance()->EnableProcessing();
- Options options;
- options.enable_blob_files = true;
- options.avoid_flush_during_recovery = false;
- options.disable_auto_compactions = true;
- options.env = env_;
- ASSERT_NOK(TryReopen(options));
- SyncPoint::GetInstance()->DisableProcessing();
- SyncPoint::GetInstance()->ClearAllCallBacks();
- // Make sure the files generated by the failed recovery have been deleted.
- std::vector<std::string> files;
- ASSERT_OK(env_->GetChildren(dbname_, &files));
- for (const auto& file : files) {
- uint64_t number = 0;
- FileType type = kTableFile;
- if (!ParseFileName(file, &number, &type)) {
- continue;
- }
- ASSERT_NE(type, kTableFile);
- ASSERT_NE(type, kBlobFile);
- }
- }
- TEST_F(DBWALTest, IgnoreRecoveredLog) {
- std::string backup_logs = dbname_ + "/backup_logs";
- do {
- // delete old files in backup_logs directory
- ASSERT_OK(env_->CreateDirIfMissing(backup_logs));
- std::vector<std::string> old_files;
- ASSERT_OK(env_->GetChildren(backup_logs, &old_files));
- for (auto& file : old_files) {
- ASSERT_OK(env_->DeleteFile(backup_logs + "/" + file));
- }
- Options options = CurrentOptions();
- options.create_if_missing = true;
- options.merge_operator = MergeOperators::CreateUInt64AddOperator();
- options.wal_dir = dbname_ + "/logs";
- DestroyAndReopen(options);
- // fill up the DB
- std::string one, two;
- PutFixed64(&one, 1);
- PutFixed64(&two, 2);
- ASSERT_OK(db_->Merge(WriteOptions(), Slice("foo"), Slice(one)));
- ASSERT_OK(db_->Merge(WriteOptions(), Slice("foo"), Slice(one)));
- ASSERT_OK(db_->Merge(WriteOptions(), Slice("bar"), Slice(one)));
- // copy the logs to backup
- std::vector<std::string> logs;
- ASSERT_OK(env_->GetChildren(options.wal_dir, &logs));
- for (auto& log : logs) {
- CopyFile(options.wal_dir + "/" + log, backup_logs + "/" + log);
- }
- // recover the DB
- Reopen(options);
- ASSERT_EQ(two, Get("foo"));
- ASSERT_EQ(one, Get("bar"));
- Close();
- // copy the logs from backup back to wal dir
- for (auto& log : logs) {
- CopyFile(backup_logs + "/" + log, options.wal_dir + "/" + log);
- }
- // this should ignore the log files, recovery should not happen again
- // if the recovery happens, the same merge operator would be called twice,
- // leading to incorrect results
- Reopen(options);
- ASSERT_EQ(two, Get("foo"));
- ASSERT_EQ(one, Get("bar"));
- Close();
- Destroy(options);
- Reopen(options);
- Close();
- // copy the logs from backup back to wal dir
- ASSERT_OK(env_->CreateDirIfMissing(options.wal_dir));
- for (auto& log : logs) {
- CopyFile(backup_logs + "/" + log, options.wal_dir + "/" + log);
- }
- // assert that we successfully recovered only from logs, even though we
- // destroyed the DB
- Reopen(options);
- ASSERT_EQ(two, Get("foo"));
- ASSERT_EQ(one, Get("bar"));
- // Recovery will fail if DB directory doesn't exist.
- Destroy(options);
- // copy the logs from backup back to wal dir
- ASSERT_OK(env_->CreateDirIfMissing(options.wal_dir));
- for (auto& log : logs) {
- CopyFile(backup_logs + "/" + log, options.wal_dir + "/" + log);
- // we won't be needing this file no more
- ASSERT_OK(env_->DeleteFile(backup_logs + "/" + log));
- }
- Status s = TryReopen(options);
- ASSERT_NOK(s);
- Destroy(options);
- } while (ChangeWalOptions());
- }
- TEST_F(DBWALTest, RecoveryWithEmptyLog) {
- do {
- CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
- ASSERT_OK(Put(1, "foo", "v1"));
- ASSERT_OK(Put(1, "foo", "v2"));
- ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
- ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
- ASSERT_OK(Put(1, "foo", "v3"));
- ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
- ASSERT_EQ("v3", Get(1, "foo"));
- } while (ChangeWalOptions());
- }
- #if !(defined NDEBUG) || !defined(OS_WIN)
- TEST_F(DBWALTest, PreallocateBlock) {
- Options options = CurrentOptions();
- options.write_buffer_size = 10 * 1000 * 1000;
- options.max_total_wal_size = 0;
- size_t expected_preallocation_size = static_cast<size_t>(
- options.write_buffer_size + options.write_buffer_size / 10);
- DestroyAndReopen(options);
- std::atomic<int> called(0);
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
- "DBTestWalFile.GetPreallocationStatus", [&](void* arg) {
- ASSERT_TRUE(arg != nullptr);
- size_t preallocation_size = *(static_cast<size_t*>(arg));
- ASSERT_EQ(expected_preallocation_size, preallocation_size);
- called.fetch_add(1);
- });
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
- ASSERT_OK(Put("", ""));
- ASSERT_OK(Flush());
- ASSERT_OK(Put("", ""));
- Close();
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
- ASSERT_EQ(2, called.load());
- options.max_total_wal_size = 1000 * 1000;
- expected_preallocation_size = static_cast<size_t>(options.max_total_wal_size);
- Reopen(options);
- called.store(0);
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
- "DBTestWalFile.GetPreallocationStatus", [&](void* arg) {
- ASSERT_TRUE(arg != nullptr);
- size_t preallocation_size = *(static_cast<size_t*>(arg));
- ASSERT_EQ(expected_preallocation_size, preallocation_size);
- called.fetch_add(1);
- });
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
- ASSERT_OK(Put("", ""));
- ASSERT_OK(Flush());
- ASSERT_OK(Put("", ""));
- Close();
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
- ASSERT_EQ(2, called.load());
- options.db_write_buffer_size = 800 * 1000;
- expected_preallocation_size =
- static_cast<size_t>(options.db_write_buffer_size);
- Reopen(options);
- called.store(0);
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
- "DBTestWalFile.GetPreallocationStatus", [&](void* arg) {
- ASSERT_TRUE(arg != nullptr);
- size_t preallocation_size = *(static_cast<size_t*>(arg));
- ASSERT_EQ(expected_preallocation_size, preallocation_size);
- called.fetch_add(1);
- });
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
- ASSERT_OK(Put("", ""));
- ASSERT_OK(Flush());
- ASSERT_OK(Put("", ""));
- Close();
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
- ASSERT_EQ(2, called.load());
- expected_preallocation_size = 700 * 1000;
- std::shared_ptr<WriteBufferManager> write_buffer_manager =
- std::make_shared<WriteBufferManager>(static_cast<uint64_t>(700 * 1000));
- options.write_buffer_manager = write_buffer_manager;
- Reopen(options);
- called.store(0);
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
- "DBTestWalFile.GetPreallocationStatus", [&](void* arg) {
- ASSERT_TRUE(arg != nullptr);
- size_t preallocation_size = *(static_cast<size_t*>(arg));
- ASSERT_EQ(expected_preallocation_size, preallocation_size);
- called.fetch_add(1);
- });
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
- ASSERT_OK(Put("", ""));
- ASSERT_OK(Flush());
- ASSERT_OK(Put("", ""));
- Close();
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
- ASSERT_EQ(2, called.load());
- }
- #endif // !(defined NDEBUG) || !defined(OS_WIN)
- TEST_F(DBWALTest, FullPurgePreservesRecycledLog) {
- // For github issue #1303
- for (int i = 0; i < 2; ++i) {
- Options options = CurrentOptions();
- options.create_if_missing = true;
- options.recycle_log_file_num = 2;
- options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
- if (i != 0) {
- options.wal_dir = alternative_wal_dir_;
- }
- DestroyAndReopen(options);
- ASSERT_OK(Put("foo", "v1"));
- VectorLogPtr log_files;
- ASSERT_OK(dbfull()->GetSortedWalFiles(log_files));
- ASSERT_GT(log_files.size(), 0);
- ASSERT_OK(Flush());
- // Now the original WAL is in log_files[0] and should be marked for
- // recycling.
- // Verify full purge cannot remove this file.
- JobContext job_context(0);
- dbfull()->TEST_LockMutex();
- dbfull()->FindObsoleteFiles(&job_context, true /* force */);
- dbfull()->TEST_UnlockMutex();
- dbfull()->PurgeObsoleteFiles(job_context);
- job_context.Clean();
- if (i == 0) {
- ASSERT_OK(
- env_->FileExists(LogFileName(dbname_, log_files[0]->LogNumber())));
- } else {
- ASSERT_OK(env_->FileExists(
- LogFileName(alternative_wal_dir_, log_files[0]->LogNumber())));
- }
- }
- }
- TEST_F(DBWALTest, FullPurgePreservesLogPendingReuse) {
- // Ensures full purge cannot delete a WAL while it's in the process of being
- // recycled. In particular, we force the full purge after a file has been
- // chosen for reuse, but before it has been renamed.
- for (int i = 0; i < 2; ++i) {
- Options options = CurrentOptions();
- options.recycle_log_file_num = 1;
- options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
- if (i != 0) {
- options.wal_dir = alternative_wal_dir_;
- }
- DestroyAndReopen(options);
- // The first flush creates a second log so writes can continue before the
- // flush finishes.
- ASSERT_OK(Put("foo", "bar"));
- ASSERT_OK(Flush());
- // The second flush can recycle the first log. Sync points enforce the
- // full purge happens after choosing the log to recycle and before it is
- // renamed.
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
- {"DBImpl::CreateWAL:BeforeReuseWritableFile1",
- "DBWALTest::FullPurgePreservesLogPendingReuse:PreFullPurge"},
- {"DBWALTest::FullPurgePreservesLogPendingReuse:PostFullPurge",
- "DBImpl::CreateWAL:BeforeReuseWritableFile2"},
- });
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
- ROCKSDB_NAMESPACE::port::Thread thread([&]() {
- TEST_SYNC_POINT(
- "DBWALTest::FullPurgePreservesLogPendingReuse:PreFullPurge");
- ASSERT_OK(db_->EnableFileDeletions());
- TEST_SYNC_POINT(
- "DBWALTest::FullPurgePreservesLogPendingReuse:PostFullPurge");
- });
- ASSERT_OK(Put("foo", "bar"));
- ASSERT_OK(Flush());
- thread.join();
- }
- }
- TEST_F(DBWALTest, GetSortedWalFiles) {
- do {
- CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
- VectorLogPtr log_files;
- ASSERT_OK(dbfull()->GetSortedWalFiles(log_files));
- ASSERT_EQ(0, log_files.size());
- ASSERT_OK(Put(1, "foo", "v1"));
- ASSERT_OK(dbfull()->GetSortedWalFiles(log_files));
- ASSERT_EQ(1, log_files.size());
- } while (ChangeWalOptions());
- }
- TEST_F(DBWALTest, GetCurrentWalFile) {
- do {
- CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
- std::unique_ptr<LogFile>* bad_log_file = nullptr;
- ASSERT_NOK(dbfull()->GetCurrentWalFile(bad_log_file));
- std::unique_ptr<LogFile> log_file;
- ASSERT_OK(dbfull()->GetCurrentWalFile(&log_file));
- // nothing has been written to the log yet
- ASSERT_EQ(log_file->StartSequence(), 0);
- ASSERT_EQ(log_file->SizeFileBytes(), 0);
- ASSERT_EQ(log_file->Type(), kAliveLogFile);
- ASSERT_GT(log_file->LogNumber(), 0);
- // add some data and verify that the file size actually moves foward
- ASSERT_OK(Put(0, "foo", "v1"));
- ASSERT_OK(Put(0, "foo2", "v2"));
- ASSERT_OK(Put(0, "foo3", "v3"));
- ASSERT_OK(dbfull()->GetCurrentWalFile(&log_file));
- ASSERT_EQ(log_file->StartSequence(), 0);
- ASSERT_GT(log_file->SizeFileBytes(), 0);
- ASSERT_EQ(log_file->Type(), kAliveLogFile);
- ASSERT_GT(log_file->LogNumber(), 0);
- // force log files to cycle and add some more data, then check if
- // log number moves forward
- ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
- for (int i = 0; i < 10; i++) {
- ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
- }
- ASSERT_OK(Put(0, "foo4", "v4"));
- ASSERT_OK(Put(0, "foo5", "v5"));
- ASSERT_OK(Put(0, "foo6", "v6"));
- ASSERT_OK(dbfull()->GetCurrentWalFile(&log_file));
- ASSERT_EQ(log_file->StartSequence(), 0);
- ASSERT_GT(log_file->SizeFileBytes(), 0);
- ASSERT_EQ(log_file->Type(), kAliveLogFile);
- ASSERT_GT(log_file->LogNumber(), 0);
- } while (ChangeWalOptions());
- }
- TEST_F(DBWALTest, RecoveryWithLogDataForSomeCFs) {
- // Test for regression of WAL cleanup missing files that don't contain data
- // for every column family.
- do {
- CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
- ASSERT_OK(Put(1, "foo", "v1"));
- ASSERT_OK(Put(1, "foo", "v2"));
- uint64_t earliest_log_nums[2];
- for (int i = 0; i < 2; ++i) {
- if (i > 0) {
- ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
- }
- VectorLogPtr log_files;
- ASSERT_OK(dbfull()->GetSortedWalFiles(log_files));
- if (log_files.size() > 0) {
- earliest_log_nums[i] = log_files[0]->LogNumber();
- } else {
- earliest_log_nums[i] = std::numeric_limits<uint64_t>::max();
- }
- }
- // Check at least the first WAL was cleaned up during the recovery.
- ASSERT_LT(earliest_log_nums[0], earliest_log_nums[1]);
- } while (ChangeWalOptions());
- }
- TEST_F(DBWALTest, RecoverWithLargeLog) {
- do {
- {
- Options options = CurrentOptions();
- CreateAndReopenWithCF({"pikachu"}, options);
- ASSERT_OK(Put(1, "big1", std::string(200000, '1')));
- ASSERT_OK(Put(1, "big2", std::string(200000, '2')));
- ASSERT_OK(Put(1, "small3", std::string(10, '3')));
- ASSERT_OK(Put(1, "small4", std::string(10, '4')));
- ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0);
- }
- // Make sure that if we re-open with a small write buffer size that
- // we flush table files in the middle of a large log file.
- Options options;
- options.write_buffer_size = 100000;
- options = CurrentOptions(options);
- ReopenWithColumnFamilies({"default", "pikachu"}, options);
- ASSERT_EQ(NumTableFilesAtLevel(0, 1), 3);
- ASSERT_EQ(std::string(200000, '1'), Get(1, "big1"));
- ASSERT_EQ(std::string(200000, '2'), Get(1, "big2"));
- ASSERT_EQ(std::string(10, '3'), Get(1, "small3"));
- ASSERT_EQ(std::string(10, '4'), Get(1, "small4"));
- ASSERT_GT(NumTableFilesAtLevel(0, 1), 1);
- } while (ChangeWalOptions());
- }
- // In https://reviews.facebook.net/D20661 we change
- // recovery behavior: previously for each log file each column family
- // memtable was flushed, even it was empty. Now it's changed:
- // we try to create the smallest number of table files by merging
- // updates from multiple logs
- TEST_F(DBWALTest, RecoverCheckFileAmountWithSmallWriteBuffer) {
- Options options = CurrentOptions();
- options.write_buffer_size = 5000000;
- CreateAndReopenWithCF({"pikachu", "dobrynia", "nikitich"}, options);
- // Since we will reopen DB with smaller write_buffer_size,
- // each key will go to new SST file
- ASSERT_OK(Put(1, Key(10), DummyString(1000000)));
- ASSERT_OK(Put(1, Key(10), DummyString(1000000)));
- ASSERT_OK(Put(1, Key(10), DummyString(1000000)));
- ASSERT_OK(Put(1, Key(10), DummyString(1000000)));
- ASSERT_OK(Put(3, Key(10), DummyString(1)));
- // Make 'dobrynia' to be flushed and new WAL file to be created
- ASSERT_OK(Put(2, Key(10), DummyString(7500000)));
- ASSERT_OK(Put(2, Key(1), DummyString(1)));
- ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[2]));
- {
- auto tables = ListTableFiles(env_, dbname_);
- ASSERT_EQ(tables.size(), static_cast<size_t>(1));
- // Make sure 'dobrynia' was flushed: check sst files amount
- ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"),
- static_cast<uint64_t>(1));
- }
- // New WAL file
- ASSERT_OK(Put(1, Key(1), DummyString(1)));
- ASSERT_OK(Put(1, Key(1), DummyString(1)));
- ASSERT_OK(Put(3, Key(10), DummyString(1)));
- ASSERT_OK(Put(3, Key(10), DummyString(1)));
- ASSERT_OK(Put(3, Key(10), DummyString(1)));
- options.write_buffer_size = 4096;
- options.arena_block_size = 4096;
- ReopenWithColumnFamilies({"default", "pikachu", "dobrynia", "nikitich"},
- options);
- {
- // No inserts => default is empty
- ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
- static_cast<uint64_t>(0));
- // First 4 keys goes to separate SSTs + 1 more SST for 2 smaller keys
- ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"),
- static_cast<uint64_t>(5));
- // 1 SST for big key + 1 SST for small one
- ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"),
- static_cast<uint64_t>(2));
- // 1 SST for all keys
- ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
- static_cast<uint64_t>(1));
- }
- }
- // In https://reviews.facebook.net/D20661 we change
- // recovery behavior: previously for each log file each column family
- // memtable was flushed, even it wasn't empty. Now it's changed:
- // we try to create the smallest number of table files by merging
- // updates from multiple logs
- TEST_F(DBWALTest, RecoverCheckFileAmount) {
- Options options = CurrentOptions();
- options.write_buffer_size = 100000;
- options.arena_block_size = 4 * 1024;
- options.avoid_flush_during_recovery = false;
- CreateAndReopenWithCF({"pikachu", "dobrynia", "nikitich"}, options);
- ASSERT_OK(Put(0, Key(1), DummyString(1)));
- ASSERT_OK(Put(1, Key(1), DummyString(1)));
- ASSERT_OK(Put(2, Key(1), DummyString(1)));
- // Make 'nikitich' memtable to be flushed
- ASSERT_OK(Put(3, Key(10), DummyString(1002400)));
- ASSERT_OK(Put(3, Key(1), DummyString(1)));
- ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[3]));
- // 4 memtable are not flushed, 1 sst file
- {
- auto tables = ListTableFiles(env_, dbname_);
- ASSERT_EQ(tables.size(), static_cast<size_t>(1));
- ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
- static_cast<uint64_t>(1));
- }
- // Memtable for 'nikitich' has flushed, new WAL file has opened
- // 4 memtable still not flushed
- // Write to new WAL file
- ASSERT_OK(Put(0, Key(1), DummyString(1)));
- ASSERT_OK(Put(1, Key(1), DummyString(1)));
- ASSERT_OK(Put(2, Key(1), DummyString(1)));
- // Fill up 'nikitich' one more time
- ASSERT_OK(Put(3, Key(10), DummyString(1002400)));
- // make it flush
- ASSERT_OK(Put(3, Key(1), DummyString(1)));
- ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[3]));
- // There are still 4 memtable not flushed, and 2 sst tables
- ASSERT_OK(Put(0, Key(1), DummyString(1)));
- ASSERT_OK(Put(1, Key(1), DummyString(1)));
- ASSERT_OK(Put(2, Key(1), DummyString(1)));
- {
- auto tables = ListTableFiles(env_, dbname_);
- ASSERT_EQ(tables.size(), static_cast<size_t>(2));
- ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
- static_cast<uint64_t>(2));
- }
- ReopenWithColumnFamilies({"default", "pikachu", "dobrynia", "nikitich"},
- options);
- {
- std::vector<uint64_t> table_files = ListTableFiles(env_, dbname_);
- // Check, that records for 'default', 'dobrynia' and 'pikachu' from
- // first, second and third WALs went to the same SST.
- // So, there is 6 SSTs: three for 'nikitich', one for 'default', one for
- // 'dobrynia', one for 'pikachu'
- ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
- static_cast<uint64_t>(1));
- ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
- static_cast<uint64_t>(3));
- ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"),
- static_cast<uint64_t>(1));
- ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"),
- static_cast<uint64_t>(1));
- }
- }
- TEST_F(DBWALTest, SyncMultipleLogs) {
- const uint64_t kNumBatches = 2;
- const int kBatchSize = 1000;
- Options options = CurrentOptions();
- options.create_if_missing = true;
- options.write_buffer_size = 4096;
- Reopen(options);
- WriteBatch batch;
- WriteOptions wo;
- wo.sync = true;
- for (uint64_t b = 0; b < kNumBatches; b++) {
- batch.Clear();
- for (int i = 0; i < kBatchSize; i++) {
- ASSERT_OK(batch.Put(Key(i), DummyString(128)));
- }
- ASSERT_OK(dbfull()->Write(wo, &batch));
- }
- ASSERT_OK(dbfull()->SyncWAL());
- }
- TEST_F(DBWALTest, DISABLED_RecycleMultipleWalsCrash) {
- Options options = CurrentOptions();
- options.max_write_buffer_number = 5;
- options.track_and_verify_wals_in_manifest = true;
- options.max_bgerror_resume_count = 0; // manual resume
- options.recycle_log_file_num = 3;
- options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
- // Disable truncating recycled WALs to new size in posix env
- // (approximating a crash)
- SyncPoint::GetInstance()->SetCallBack(
- "PosixWritableFile::Close",
- [](void* arg) { *(static_cast<size_t*>(arg)) = 0; });
- SyncPoint::GetInstance()->EnableProcessing();
- // Re-open with desired options
- DestroyAndReopen(options);
- Defer closer([this]() { Close(); });
- // Ensure WAL recycling wasn't sanitized away
- ASSERT_EQ(db_->GetOptions().recycle_log_file_num,
- options.recycle_log_file_num);
- // Prepare external files for later ingestion
- std::string sst_files_dir = dbname_ + "/sst_files/";
- ASSERT_OK(DestroyDir(env_, sst_files_dir));
- ASSERT_OK(env_->CreateDir(sst_files_dir));
- std::string external_file1 = sst_files_dir + "file1.sst";
- {
- SstFileWriter sst_file_writer(EnvOptions(), options);
- ASSERT_OK(sst_file_writer.Open(external_file1));
- ASSERT_OK(sst_file_writer.Put("external1", "ex1"));
- ExternalSstFileInfo file_info;
- ASSERT_OK(sst_file_writer.Finish(&file_info));
- }
- std::string external_file2 = sst_files_dir + "file2.sst";
- {
- SstFileWriter sst_file_writer(EnvOptions(), options);
- ASSERT_OK(sst_file_writer.Open(external_file2));
- ASSERT_OK(sst_file_writer.Put("external2", "ex2"));
- ExternalSstFileInfo file_info;
- ASSERT_OK(sst_file_writer.Finish(&file_info));
- }
- // Populate some WALs to be recycled such that there will be extra data
- // from an old incarnation of the WAL on recovery
- ASSERT_OK(db_->PauseBackgroundWork());
- ASSERT_OK(Put("ignore1", Random::GetTLSInstance()->RandomString(500)));
- ASSERT_OK(static_cast_with_check<DBImpl>(db_)->TEST_SwitchMemtable());
- ASSERT_OK(Put("ignore2", Random::GetTLSInstance()->RandomString(500)));
- ASSERT_OK(static_cast_with_check<DBImpl>(db_)->TEST_SwitchMemtable());
- ASSERT_OK(db_->ContinueBackgroundWork());
- ASSERT_OK(Flush());
- ASSERT_OK(Put("ignore3", Random::GetTLSInstance()->RandomString(500)));
- ASSERT_OK(Flush());
- // Verify expected log files (still there for recycling)
- std::vector<FileAttributes> files;
- int log_count = 0;
- ASSERT_OK(options.env->GetChildrenFileAttributes(dbname_, &files));
- for (const auto& f : files) {
- if (EndsWith(f.name, ".log")) {
- EXPECT_GT(f.size_bytes, 500);
- ++log_count;
- }
- }
- EXPECT_EQ(log_count, 3);
- // (Re-used recipe) Generate two inactive WALs and one active WAL, with a
- // gap in sequence numbers to interfere with recovery
- ASSERT_OK(db_->PauseBackgroundWork());
- ASSERT_OK(Put("key1", "val1"));
- ASSERT_OK(static_cast_with_check<DBImpl>(db_)->TEST_SwitchMemtable());
- ASSERT_OK(Put("key2", "val2"));
- ASSERT_OK(static_cast_with_check<DBImpl>(db_)->TEST_SwitchMemtable());
- // Need a gap in sequence numbers, so e.g. ingest external file
- // with an open snapshot
- {
- ManagedSnapshot snapshot(db_);
- ASSERT_OK(
- db_->IngestExternalFile({external_file1}, IngestExternalFileOptions()));
- }
- ASSERT_OK(Put("key3", "val3"));
- ASSERT_OK(db_->SyncWAL());
- // Need an SST file that is logically after that WAL, so that dropping WAL
- // data is not a valid point in time.
- {
- ManagedSnapshot snapshot(db_);
- ASSERT_OK(
- db_->IngestExternalFile({external_file2}, IngestExternalFileOptions()));
- }
- // Approximate a crash, with respect to recycled WAL data extending past
- // the end of the current WAL data (see SyncPoint callback above)
- Close();
- // Verify recycled log files haven't been truncated
- files.clear();
- log_count = 0;
- ASSERT_OK(options.env->GetChildrenFileAttributes(dbname_, &files));
- for (const auto& f : files) {
- if (EndsWith(f.name, ".log")) {
- EXPECT_GT(f.size_bytes, 500);
- ++log_count;
- }
- }
- EXPECT_EQ(log_count, 3);
- // Verify no data loss after reopen.
- Reopen(options);
- EXPECT_EQ("val1", Get("key1"));
- EXPECT_EQ("val2", Get("key2")); // Passes because of adjacent seqnos
- EXPECT_EQ("ex1", Get("external1"));
- EXPECT_EQ("val3", Get("key3")); // <- ONLY FAILURE! (Not a point in time)
- EXPECT_EQ("ex2", Get("external2"));
- SyncPoint::GetInstance()->DisableProcessing();
- SyncPoint::GetInstance()->ClearAllCallBacks();
- }
- TEST_F(DBWALTest, SyncWalPartialFailure) {
- class MyTestFileSystem : public FileSystemWrapper {
- public:
- explicit MyTestFileSystem(std::shared_ptr<FileSystem> base)
- : FileSystemWrapper(std::move(base)) {}
- static const char* kClassName() { return "MyTestFileSystem"; }
- const char* Name() const override { return kClassName(); }
- IOStatus NewWritableFile(const std::string& fname,
- const FileOptions& file_opts,
- std::unique_ptr<FSWritableFile>* result,
- IODebugContext* dbg) override {
- IOStatus s = target()->NewWritableFile(fname, file_opts, result, dbg);
- if (s.ok()) {
- *result =
- std::make_unique<MyTestWritableFile>(std::move(*result), *this);
- }
- return s;
- }
- AcqRelAtomic<uint32_t> syncs_before_failure_{UINT32_MAX};
- protected:
- class MyTestWritableFile : public FSWritableFileOwnerWrapper {
- public:
- MyTestWritableFile(std::unique_ptr<FSWritableFile>&& file,
- MyTestFileSystem& fs)
- : FSWritableFileOwnerWrapper(std::move(file)), fs_(fs) {}
- IOStatus Sync(const IOOptions& options, IODebugContext* dbg) override {
- int prev_val = fs_.syncs_before_failure_.FetchSub(1);
- if (prev_val == 0) {
- return IOStatus::IOError("fault");
- } else {
- return target()->Sync(options, dbg);
- }
- }
- protected:
- MyTestFileSystem& fs_;
- };
- };
- Options options = CurrentOptions();
- options.max_write_buffer_number = 4;
- options.track_and_verify_wals_in_manifest = true;
- options.max_bgerror_resume_count = 0; // manual resume
- auto custom_fs =
- std::make_shared<MyTestFileSystem>(options.env->GetFileSystem());
- std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(custom_fs));
- options.env = fault_fs_env.get();
- Reopen(options);
- Defer closer([this]() { Close(); });
- // This is the simplest way to get
- // * one inactive WAL, synced
- // * one inactive WAL, not synced, and
- // * one active WAL, not synced
- // with a single thread, to exercise as much logic as we reasonably can.
- ASSERT_OK(db_->PauseBackgroundWork());
- ASSERT_OK(Put("key1", "val1"));
- ASSERT_OK(static_cast_with_check<DBImpl>(db_)->TEST_SwitchMemtable());
- ASSERT_OK(db_->SyncWAL());
- ASSERT_OK(Put("key2", "val2"));
- ASSERT_OK(static_cast_with_check<DBImpl>(db_)->TEST_SwitchMemtable());
- ASSERT_OK(Put("key3", "val3"));
- // Allow 1 of the WALs to sync, but another won't
- custom_fs->syncs_before_failure_.Store(1);
- ASSERT_NOK(db_->SyncWAL());
- // Stuck in this state. (This could previously cause a segfault.)
- ASSERT_NOK(db_->SyncWAL());
- // Can't Resume because WAL write failure is considered non-recoverable,
- // regardless of the IOStatus itself. (Can/should be fixed?)
- ASSERT_NOK(db_->Resume());
- // Verify no data loss after reopen.
- // Also Close() could previously crash in this state.
- Reopen(options);
- ASSERT_EQ("val1", Get("key1"));
- ASSERT_EQ("val2", Get("key2"));
- ASSERT_EQ("val3", Get("key3"));
- }
- // Github issue 1339. Prior the fix we read sequence id from the first log to
- // a local variable, then keep increase the variable as we replay logs,
- // ignoring actual sequence id of the records. This is incorrect if some writes
- // come with WAL disabled.
- TEST_F(DBWALTest, PartOfWritesWithWALDisabled) {
- std::unique_ptr<FaultInjectionTestEnv> fault_env(
- new FaultInjectionTestEnv(env_));
- Options options = CurrentOptions();
- options.env = fault_env.get();
- options.disable_auto_compactions = true;
- WriteOptions wal_on, wal_off;
- wal_on.sync = true;
- wal_on.disableWAL = false;
- wal_off.disableWAL = true;
- CreateAndReopenWithCF({"dummy"}, options);
- ASSERT_OK(Put(1, "dummy", "d1", wal_on)); // seq id 1
- ASSERT_OK(Put(1, "dummy", "d2", wal_off));
- ASSERT_OK(Put(1, "dummy", "d3", wal_off));
- ASSERT_OK(Put(0, "key", "v4", wal_on)); // seq id 4
- ASSERT_OK(Flush(0));
- ASSERT_OK(Put(0, "key", "v5", wal_on)); // seq id 5
- ASSERT_EQ("v5", Get(0, "key"));
- ASSERT_OK(dbfull()->FlushWAL(false));
- // Simulate a crash.
- fault_env->SetFilesystemActive(false);
- Close();
- fault_env->ResetState();
- ReopenWithColumnFamilies({"default", "dummy"}, options);
- // Prior to the fix, we may incorrectly recover "v5" with sequence id = 3.
- ASSERT_EQ("v5", Get(0, "key"));
- // Destroy DB before destruct fault_env.
- Destroy(options);
- }
- //
- // Test WAL recovery for the various modes available
- //
- class RecoveryTestHelper {
- public:
- // Number of WAL files to generate
- static constexpr int kWALFilesCount = 10;
- // Starting number for the WAL file name like 00010.log
- static constexpr int kWALFileOffset = 10;
- // Keys to be written per WAL file
- static constexpr int kKeysPerWALFile = 133;
- // Size of the value
- static constexpr int kValueSize = 96;
- // Create WAL files with values filled in
- static void FillData(DBWALTestBase* test, const Options& options,
- const size_t wal_count, size_t* count) {
- // Calling internal functions requires sanitized options.
- Options sanitized_options = SanitizeOptions(test->dbname_, options);
- const ImmutableDBOptions db_options(sanitized_options);
- *count = 0;
- std::shared_ptr<Cache> table_cache = NewLRUCache(50, 0);
- FileOptions file_options;
- WriteBufferManager write_buffer_manager(db_options.db_write_buffer_size);
- std::unique_ptr<VersionSet> versions;
- std::unique_ptr<WalManager> wal_manager;
- WriteController write_controller;
- versions.reset(new VersionSet(
- test->dbname_, &db_options, file_options, table_cache.get(),
- &write_buffer_manager, &write_controller,
- /*block_cache_tracer=*/nullptr,
- /*io_tracer=*/nullptr, /*db_id=*/"", /*db_session_id=*/"",
- options.daily_offpeak_time_utc,
- /*error_handler=*/nullptr, /*read_only=*/false));
- wal_manager.reset(
- new WalManager(db_options, file_options, /*io_tracer=*/nullptr));
- std::unique_ptr<log::Writer> current_log_writer;
- for (size_t j = kWALFileOffset; j < wal_count + kWALFileOffset; j++) {
- uint64_t current_log_number = j;
- std::string fname = LogFileName(test->dbname_, current_log_number);
- std::unique_ptr<WritableFileWriter> file_writer;
- ASSERT_OK(WritableFileWriter::Create(db_options.env->GetFileSystem(),
- fname, file_options, &file_writer,
- nullptr));
- log::Writer* log_writer =
- new log::Writer(std::move(file_writer), current_log_number,
- db_options.recycle_log_file_num > 0, false,
- db_options.wal_compression);
- ASSERT_OK(log_writer->AddCompressionTypeRecord(WriteOptions()));
- current_log_writer.reset(log_writer);
- WriteBatch batch;
- for (int i = 0; i < kKeysPerWALFile; i++) {
- std::string key = "key" + std::to_string((*count)++);
- std::string value = test->DummyString(kValueSize);
- ASSERT_NE(current_log_writer.get(), nullptr);
- uint64_t seq = versions->LastSequence() + 1;
- batch.Clear();
- ASSERT_OK(batch.Put(key, value));
- WriteBatchInternal::SetSequence(&batch, seq);
- ASSERT_OK(current_log_writer->AddRecord(
- WriteOptions(), WriteBatchInternal::Contents(&batch)));
- versions->SetLastAllocatedSequence(seq);
- versions->SetLastPublishedSequence(seq);
- versions->SetLastSequence(seq);
- }
- }
- }
- // Recreate and fill the store with some data
- static size_t FillData(DBWALTestBase* test, Options* options) {
- options->create_if_missing = true;
- test->DestroyAndReopen(*options);
- test->Close();
- size_t count = 0;
- FillData(test, *options, kWALFilesCount, &count);
- return count;
- }
- // Read back all the keys we wrote and return the number of keys found
- static size_t GetData(DBWALTestBase* test) {
- size_t count = 0;
- for (size_t i = 0; i < kWALFilesCount * kKeysPerWALFile; i++) {
- if (test->Get("key" + std::to_string(i)) != "NOT_FOUND") {
- ++count;
- }
- }
- return count;
- }
- // Manuall corrupt the specified WAL
- static void CorruptWAL(DBWALTestBase* test, const Options& options,
- const double off, const double len,
- const int wal_file_id, const bool trunc = false) {
- Env* env = options.env;
- std::string fname = LogFileName(test->dbname_, wal_file_id);
- uint64_t size;
- ASSERT_OK(env->GetFileSize(fname, &size));
- ASSERT_GT(size, 0);
- #ifdef OS_WIN
- // Windows disk cache behaves differently. When we truncate
- // the original content is still in the cache due to the original
- // handle is still open. Generally, in Windows, one prohibits
- // shared access to files and it is not needed for WAL but we allow
- // it to induce corruption at various tests.
- test->Close();
- #endif
- if (trunc) {
- ASSERT_OK(
- test::TruncateFile(env, fname, static_cast<uint64_t>(size * off)));
- } else {
- ASSERT_OK(test::CorruptFile(env, fname, static_cast<int>(size * off + 8),
- static_cast<int>(size * len), false));
- }
- }
- };
- TEST_F(DBWALTest, TrackAndVerifyWALsRecycleWAL) {
- Options options = CurrentOptions();
- options.avoid_flush_during_shutdown = true;
- options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
- options.recycle_log_file_num = 1;
- options.track_and_verify_wals = true;
- DestroyAndReopen(options);
- ASSERT_OK(Put("key_ignore", "wal_to_recycle"));
- ASSERT_OK(Put("key_ignore1", "wal_to_recycle"));
- ASSERT_OK(Put("key_ignore2", "wal_to_recycle"));
- ASSERT_OK(Flush());
- ASSERT_OK(Put("key_ignore", "wal_to_recycle"));
- ASSERT_OK(Put("key_ignore1", "wal_to_recycle"));
- ASSERT_OK(Put("key_ignore2", "wal_to_recycle"));
- ASSERT_OK(Flush());
- // Stop background flush to avoid deleting any WAL
- options.env->SetBackgroundThreads(1, Env::HIGH);
- test::SleepingBackgroundTask sleeping_task;
- options.env->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
- &sleeping_task, Env::Priority::HIGH);
- // Recycle the first WAL
- ASSERT_OK(Put("key1", "old_value"));
- // Recycle the second WAL
- ASSERT_OK(dbfull()->TEST_SwitchWAL());
- ASSERT_OK(Put("key1", "new_value"));
- // Create a WAL hole on sequence number by truncating the first WAL to 0 byte
- VectorWalPtr log_files;
- ASSERT_OK(db_->GetSortedWalFiles(log_files));
- ASSERT_EQ(log_files.size(), 2);
- std::string log_name = LogFileName(dbname_, log_files.front()->LogNumber());
- Close();
- // Drop `Put("key1", "old_value")` in the first WAL
- ASSERT_OK(test::TruncateFile(options.env, log_name, 0 /* new_length */));
- Status s = DB::Open(options, dbname_, &db_);
- ASSERT_OK(s);
- ASSERT_EQ("wal_to_recycle", Get("key_ignore2"));
- ASSERT_EQ("NOT_FOUND", Get("key1"));
- Close();
- }
- class DBWALTrackAndVerifyWALsWithParamsTest
- : public DBWALTestBase,
- public ::testing::WithParamInterface<WALRecoveryMode> {
- public:
- DBWALTrackAndVerifyWALsWithParamsTest()
- : DBWALTestBase("/db_wal_track_and_verify_wals_with_params_test") {}
- };
- INSTANTIATE_TEST_CASE_P(
- DBWALTrackAndVerifyWALsWithParamsTest,
- DBWALTrackAndVerifyWALsWithParamsTest,
- ::testing::Values(WALRecoveryMode::kTolerateCorruptedTailRecords,
- WALRecoveryMode::kAbsoluteConsistency,
- WALRecoveryMode::kPointInTimeRecovery,
- WALRecoveryMode::kSkipAnyCorruptedRecords));
- TEST_P(DBWALTrackAndVerifyWALsWithParamsTest, Basic) {
- Options options = CurrentOptions();
- options.avoid_flush_during_shutdown = true;
- options.track_and_verify_wals = true;
- options.wal_recovery_mode = GetParam();
- // Stop background flush to avoid deleting any WAL
- options.env->SetBackgroundThreads(1, Env::HIGH);
- test::SleepingBackgroundTask sleeping_task;
- options.env->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
- &sleeping_task, Env::Priority::HIGH);
- for (int i = 0; i < 5; i++) {
- DestroyAndReopen(options);
- ASSERT_OK(Put("key1", "old_value"));
- SequenceNumber last_seqno_recorded_in_fist_wal =
- dbfull()->GetLatestSequenceNumber();
- ASSERT_OK(dbfull()->TEST_SwitchWAL());
- ASSERT_OK(Put("key1", "new_value"));
- VectorWalPtr log_files;
- ASSERT_OK(db_->GetSortedWalFiles(log_files));
- ASSERT_EQ(log_files.size(), 2);
- uint64_t first_log_number = log_files.front()->LogNumber();
- std::string first_log_name = LogFileName(dbname_, first_log_number);
- std::string second_log_name =
- LogFileName(dbname_, log_files.back()->LogNumber());
- if (i == 0) {
- // Delete the obsolete WAL and verify it will not be seen as a WAL hole
- sleeping_task.WakeUp();
- sleeping_task.WaitUntilDone();
- ASSERT_OK(dbfull()->TEST_WaitForBackgroundWork());
- // Stop background flush to avoid deleting any WAL
- sleeping_task.Reset();
- options.env->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
- &sleeping_task, Env::Priority::HIGH);
- Close();
- } else if (i == 1) {
- // Create a WAL hole on WAL number by deleting the first WAL and verify
- // the hole will be detected
- Close();
- ASSERT_OK(options.env->DeleteFile(first_log_name));
- } else if (i == 2) {
- // Create a WAL hole on sequence number by truncating the first WAL and
- // verify the hole will be detected
- Close();
- ASSERT_OK(
- test::TruncateFile(options.env, first_log_name, 0 /* new_length */));
- } else if (i == 3) {
- // Create a WAL hole on size difference by truncating the first WAL and
- // mocking a correct sequence number to force triggering corruption based
- // on size instead of sequence number and verify the hole will be detected
- Close();
- ASSERT_OK(
- test::TruncateFile(options.env, first_log_name, 0 /* new_length */));
- SyncPoint::GetInstance()->SetCallBack(
- "DBImpl::UpdatePredecessorWALInfo", [&](void* arg) {
- std::pair<uint64_t, SequenceNumber*>* pair =
- static_cast<std::pair<uint64_t, SequenceNumber*>*>(arg);
- if (pair->first == first_log_number) {
- *(pair->second) = last_seqno_recorded_in_fist_wal;
- }
- });
- SyncPoint::GetInstance()->EnableProcessing();
- } else if (i == 4) {
- // Delete all wals and verify opening a DB with no WAL will be detected
- Close();
- ASSERT_OK(options.env->DeleteFile(first_log_name));
- ASSERT_OK(options.env->DeleteFile(second_log_name));
- }
- Status s = DB::Open(options, dbname_, &db_);
- if (i == 0) {
- ASSERT_OK(s);
- ASSERT_EQ("new_value", Get("key1"));
- continue;
- } else if (i == 3) {
- SyncPoint::GetInstance()->ClearAllCallBacks();
- SyncPoint::GetInstance()->DisableProcessing();
- } else if (i == 4) {
- ASSERT_TRUE(s.IsCorruption());
- ASSERT_TRUE(
- s.ToString().find("Opening an existing DB with no WAL files") !=
- std::string::npos);
- Close();
- continue;
- }
- if (options.wal_recovery_mode == WALRecoveryMode::kPointInTimeRecovery) {
- ASSERT_OK(s);
- ASSERT_EQ("NOT_FOUND", Get("key1"));
- } else if (options.wal_recovery_mode ==
- WALRecoveryMode::kAbsoluteConsistency ||
- options.wal_recovery_mode ==
- WALRecoveryMode::kTolerateCorruptedTailRecords) {
- ASSERT_TRUE(s.IsCorruption());
- std::string msg;
- if (i == 1) {
- msg = "Missing WAL";
- } else if (i == 2) {
- msg = "Mismatched last sequence number recorded in the WAL";
- } else if (i == 3) {
- msg = "Mismatched size of the WAL";
- }
- ASSERT_TRUE(s.ToString().find(msg) != std::string::npos);
- } else {
- ASSERT_OK(s);
- ASSERT_EQ("new_value", Get("key1"));
- }
- Close();
- }
- }
- class DBWALTestWithParams
- : public DBWALTestBase,
- public ::testing::WithParamInterface<
- std::tuple<bool, int, int, CompressionType, bool>> {
- public:
- DBWALTestWithParams() : DBWALTestBase("/db_wal_test_with_params") {}
- };
- INSTANTIATE_TEST_CASE_P(
- Wal, DBWALTestWithParams,
- ::testing::Combine(::testing::Bool(), ::testing::Range(0, 4, 1),
- ::testing::Range(RecoveryTestHelper::kWALFileOffset,
- RecoveryTestHelper::kWALFileOffset +
- RecoveryTestHelper::kWALFilesCount,
- 1),
- ::testing::Values(CompressionType::kNoCompression,
- CompressionType::kZSTD),
- ::testing::Bool()));
- class DBWALTestWithParamsVaryingRecoveryMode
- : public DBWALTestBase,
- public ::testing::WithParamInterface<
- std::tuple<bool, int, int, WALRecoveryMode, CompressionType>> {
- public:
- DBWALTestWithParamsVaryingRecoveryMode()
- : DBWALTestBase("/db_wal_test_with_params_mode") {}
- };
- INSTANTIATE_TEST_CASE_P(
- Wal, DBWALTestWithParamsVaryingRecoveryMode,
- ::testing::Combine(
- ::testing::Bool(), ::testing::Range(0, 4, 1),
- ::testing::Range(RecoveryTestHelper::kWALFileOffset,
- RecoveryTestHelper::kWALFileOffset +
- RecoveryTestHelper::kWALFilesCount,
- 1),
- ::testing::Values(WALRecoveryMode::kTolerateCorruptedTailRecords,
- WALRecoveryMode::kAbsoluteConsistency,
- WALRecoveryMode::kPointInTimeRecovery,
- WALRecoveryMode::kSkipAnyCorruptedRecords),
- ::testing::Values(CompressionType::kNoCompression,
- CompressionType::kZSTD)));
- // Test scope:
- // - We expect to open the data store when there is incomplete trailing writes
- // at the end of any of the logs
- // - We do not expect to open the data store for corruption
- TEST_P(DBWALTestWithParams, kTolerateCorruptedTailRecords) {
- bool trunc = std::get<0>(GetParam()); // Corruption style
- // Corruption offset position
- int corrupt_offset = std::get<1>(GetParam());
- int wal_file_id = std::get<2>(GetParam()); // WAL file
- // Fill data for testing
- Options options = CurrentOptions();
- options.track_and_verify_wals = std::get<4>(GetParam());
- const size_t row_count = RecoveryTestHelper::FillData(this, &options);
- // test checksum failure or parsing
- RecoveryTestHelper::CorruptWAL(this, options, corrupt_offset * .3,
- /*len%=*/.1, wal_file_id, trunc);
- options.wal_recovery_mode = WALRecoveryMode::kTolerateCorruptedTailRecords;
- if (trunc) {
- options.create_if_missing = false;
- ASSERT_OK(TryReopen(options));
- const size_t recovered_row_count = RecoveryTestHelper::GetData(this);
- ASSERT_TRUE(corrupt_offset == 0 || recovered_row_count > 0);
- ASSERT_LT(recovered_row_count, row_count);
- } else {
- ASSERT_NOK(TryReopen(options));
- }
- }
- // Test scope:
- // We don't expect the data store to be opened if there is any corruption
- // (leading, middle or trailing -- incomplete writes or corruption)
- TEST_P(DBWALTestWithParams, kAbsoluteConsistency) {
- // Verify clean slate behavior
- Options options = CurrentOptions();
- options.track_and_verify_wals = std::get<4>(GetParam());
- const size_t row_count = RecoveryTestHelper::FillData(this, &options);
- options.create_if_missing = false;
- ASSERT_OK(TryReopen(options));
- ASSERT_EQ(RecoveryTestHelper::GetData(this), row_count);
- bool trunc = std::get<0>(GetParam()); // Corruption style
- // Corruption offset position
- int corrupt_offset = std::get<1>(GetParam());
- int wal_file_id = std::get<2>(GetParam()); // WAL file
- // WAL compression type
- CompressionType compression_type = std::get<3>(GetParam());
- options.wal_compression = compression_type;
- if (trunc && corrupt_offset == 0) {
- return;
- }
- // fill with new date
- RecoveryTestHelper::FillData(this, &options);
- // corrupt the wal
- RecoveryTestHelper::CorruptWAL(this, options, corrupt_offset * .33,
- /*len%=*/.1, wal_file_id, trunc);
- // verify
- options.wal_recovery_mode = WALRecoveryMode::kAbsoluteConsistency;
- options.create_if_missing = false;
- ASSERT_NOK(TryReopen(options));
- }
- // Test scope:
- // We don't expect the data store to be opened if there is any inconsistency
- // between WAL and SST files
- TEST_F(DBWALTest, kPointInTimeRecoveryCFConsistency) {
- Options options = CurrentOptions();
- options.avoid_flush_during_recovery = true;
- // Create DB with multiple column families.
- CreateAndReopenWithCF({"one", "two"}, options);
- ASSERT_OK(Put(1, "key1", "val1"));
- ASSERT_OK(Put(2, "key2", "val2"));
- // Record the offset at this point
- Env* env = options.env;
- uint64_t wal_file_id = dbfull()->TEST_LogfileNumber();
- std::string fname = LogFileName(dbname_, wal_file_id);
- uint64_t offset_to_corrupt;
- ASSERT_OK(env->GetFileSize(fname, &offset_to_corrupt));
- ASSERT_GT(offset_to_corrupt, 0);
- ASSERT_OK(Put(1, "key3", "val3"));
- // Corrupt WAL at location of key3
- ASSERT_OK(test::CorruptFile(env, fname, static_cast<int>(offset_to_corrupt),
- 4, false));
- ASSERT_OK(Put(2, "key4", "val4"));
- ASSERT_OK(Put(1, "key5", "val5"));
- ASSERT_OK(Flush(2));
- // PIT recovery & verify
- options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
- ASSERT_NOK(TryReopenWithColumnFamilies({"default", "one", "two"}, options));
- }
- TEST_F(DBWALTest, RaceInstallFlushResultsWithWalObsoletion) {
- Options options = CurrentOptions();
- options.env = env_;
- options.track_and_verify_wals_in_manifest = true;
- // The following make sure there are two bg flush threads.
- options.max_background_jobs = 8;
- DestroyAndReopen(options);
- const std::string cf1_name("cf1");
- CreateAndReopenWithCF({cf1_name}, options);
- assert(handles_.size() == 2);
- {
- dbfull()->TEST_LockMutex();
- ASSERT_LE(2, dbfull()->GetBGJobLimits().max_flushes);
- dbfull()->TEST_UnlockMutex();
- }
- ASSERT_OK(dbfull()->PauseBackgroundWork());
- ASSERT_OK(db_->Put(WriteOptions(), handles_[1], "foo", "value"));
- ASSERT_OK(db_->Put(WriteOptions(), "foo", "value"));
- ASSERT_OK(dbfull()->TEST_FlushMemTable(
- /*wait=*/false, /*allow_write_stall=*/true, handles_[1]));
- ASSERT_OK(db_->Put(WriteOptions(), "foo", "value"));
- ASSERT_OK(dbfull()->TEST_FlushMemTable(
- /*wait=*/false, /*allow_write_stall=*/true, handles_[0]));
- bool called = false;
- std::atomic<int> bg_flush_threads{0};
- std::atomic<bool> wal_synced{false};
- SyncPoint::GetInstance()->DisableProcessing();
- SyncPoint::GetInstance()->ClearAllCallBacks();
- SyncPoint::GetInstance()->SetCallBack(
- "DBImpl::BackgroundCallFlush:start", [&](void* /*arg*/) {
- int cur = bg_flush_threads.load();
- int desired = cur + 1;
- if (cur > 0 ||
- !bg_flush_threads.compare_exchange_strong(cur, desired)) {
- while (!wal_synced.load()) {
- // Wait until the other bg flush thread finishes committing WAL sync
- // operation to the MANIFEST.
- }
- }
- });
- SyncPoint::GetInstance()->SetCallBack(
- "DBImpl::FlushMemTableToOutputFile:CommitWal:1",
- [&](void* /*arg*/) { wal_synced.store(true); });
- // This callback will be called when the first bg flush thread reaches the
- // point before entering the MANIFEST write queue after flushing the SST
- // file.
- // The purpose of the sync points here is to ensure both bg flush threads
- // finish computing `min_wal_number_to_keep` before any of them updates the
- // `log_number` for the column family that's being flushed.
- SyncPoint::GetInstance()->SetCallBack(
- "MemTableList::TryInstallMemtableFlushResults:AfterComputeMinWalToKeep",
- [&](void* /*arg*/) {
- dbfull()->mutex()->AssertHeld();
- if (!called) {
- // We are the first bg flush thread in the MANIFEST write queue.
- // We set up the dependency between sync points for two threads that
- // will be executing the same code.
- // For the interleaving of events, see
- // https://github.com/facebook/rocksdb/pull/9715.
- // bg flush thread1 will release the db mutex while in the MANIFEST
- // write queue. In the meantime, bg flush thread2 locks db mutex and
- // computes the min_wal_number_to_keep (before thread1 writes to
- // MANIFEST thus before cf1->log_number is updated). Bg thread2 joins
- // the MANIFEST write queue afterwards and bg flush thread1 proceeds
- // with writing to MANIFEST.
- called = true;
- SyncPoint::GetInstance()->LoadDependency({
- {"VersionSet::LogAndApply:WriteManifestStart",
- "DBWALTest::RaceInstallFlushResultsWithWalObsoletion:BgFlush2"},
- {"DBWALTest::RaceInstallFlushResultsWithWalObsoletion:BgFlush2",
- "VersionSet::LogAndApply:WriteManifest"},
- });
- } else {
- // The other bg flush thread has already been in the MANIFEST write
- // queue, and we are after.
- TEST_SYNC_POINT(
- "DBWALTest::RaceInstallFlushResultsWithWalObsoletion:BgFlush2");
- }
- });
- SyncPoint::GetInstance()->EnableProcessing();
- ASSERT_OK(dbfull()->ContinueBackgroundWork());
- ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[0]));
- ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[1]));
- ASSERT_TRUE(called);
- Close();
- SyncPoint::GetInstance()->DisableProcessing();
- SyncPoint::GetInstance()->ClearAllCallBacks();
- DB* db1 = nullptr;
- Status s = DB::OpenForReadOnly(options, dbname_, &db1);
- ASSERT_OK(s);
- assert(db1);
- delete db1;
- }
- TEST_F(DBWALTest, FixSyncWalOnObseletedWalWithNewManifestCausingMissingWAL) {
- Options options = CurrentOptions();
- // Small size to force manifest creation
- options.max_manifest_file_size = 1;
- options.track_and_verify_wals_in_manifest = true;
- DestroyAndReopen(options);
- // Accumulate memtable m1 and create the 1st wal (i.e, 4.log)
- ASSERT_OK(Put(Key(1), ""));
- ASSERT_OK(Put(Key(2), ""));
- ASSERT_OK(Put(Key(3), ""));
- const std::string wal_file_path = db_->GetName() + "/000004.log";
- // Coerce the following sequence of events:
- // (1) Flush() marks 4.log to be obsoleted, 8.log to be the latest (i.e,
- // active) log and release the lock
- // (2) SyncWAL() proceeds with the lock. It
- // creates a new manifest and syncs all the inactive wals before the latest
- // (i.e, active log), which is 4.log. Note that SyncWAL() is not aware of the
- // fact that 4.log has marked as to be obseleted. Such wal
- // sync will then add a WAL addition record of 4.log to the new manifest
- // without any special treatment. Prior to the fix, there is no WAL deletion
- // record to offset it. (3) BackgroundFlush() will eventually purge 4.log.
- bool wal_synced = false;
- SyncPoint::GetInstance()->SetCallBack(
- "FindObsoleteFiles::PostMutexUnlock", [&](void*) {
- ASSERT_OK(env_->FileExists(wal_file_path));
- uint64_t pre_sync_wal_manifest_no =
- dbfull()->TEST_Current_Manifest_FileNo();
- ASSERT_OK(db_->SyncWAL());
- uint64_t post_sync_wal_manifest_no =
- dbfull()->TEST_Current_Manifest_FileNo();
- bool new_manifest_created =
- post_sync_wal_manifest_no == pre_sync_wal_manifest_no + 1;
- ASSERT_TRUE(new_manifest_created);
- wal_synced = true;
- });
- SyncPoint::GetInstance()->EnableProcessing();
- ASSERT_OK(Flush());
- ASSERT_OK(dbfull()->TEST_WaitForBackgroundWork());
- ASSERT_TRUE(wal_synced);
- // BackgroundFlush() purged 4.log
- // because the memtable associated with the WAL was flushed and new WAL was
- // created (i.e, 8.log)
- ASSERT_TRUE(env_->FileExists(wal_file_path).IsNotFound());
- SyncPoint::GetInstance()->ClearAllCallBacks();
- SyncPoint::GetInstance()->DisableProcessing();
- // To verify the corruption of "Missing WAL with log number: 4" under
- // `options.track_and_verify_wals_in_manifest = true` is fixed.
- //
- // Before the fix, `db_->SyncWAL()` will sync and record WAL addtion of the
- // obseleted WAL 4.log in a new manifest without any special treament.
- // This will result in missing-wal corruption in DB::Reopen().
- Status s = TryReopen(options);
- EXPECT_OK(s);
- }
- // Test scope:
- // - We expect to open data store under all circumstances
- // - We expect only data upto the point where the first error was encountered
- TEST_P(DBWALTestWithParams, kPointInTimeRecovery) {
- const int maxkeys =
- RecoveryTestHelper::kWALFilesCount * RecoveryTestHelper::kKeysPerWALFile;
- bool trunc = std::get<0>(GetParam()); // Corruption style
- // Corruption offset position
- int corrupt_offset = std::get<1>(GetParam());
- int wal_file_id = std::get<2>(GetParam()); // WAL file
- // WAL compression type
- CompressionType compression_type = std::get<3>(GetParam());
- // Fill data for testing
- Options options = CurrentOptions();
- options.track_and_verify_wals = std::get<4>(GetParam());
- options.wal_compression = compression_type;
- const size_t row_count = RecoveryTestHelper::FillData(this, &options);
- // Corrupt the wal
- // The offset here was 0.3 which cuts off right at the end of a
- // valid fragment after wal zstd compression checksum is enabled,
- // so changed the value to 0.33.
- RecoveryTestHelper::CorruptWAL(this, options, corrupt_offset * .33,
- /*len%=*/.1, wal_file_id, trunc);
- // Verify
- options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
- options.create_if_missing = false;
- ASSERT_OK(TryReopen(options));
- // Probe data for invariants
- size_t recovered_row_count = RecoveryTestHelper::GetData(this);
- ASSERT_LT(recovered_row_count, row_count);
- // Verify a prefix of keys were recovered. But not in the case of full WAL
- // truncation, because we have no way to know there was a corruption when
- // truncation happened on record boundaries (preventing recovery holes in
- // that case requires using `track_and_verify_wals_in_manifest`).
- if (!trunc || corrupt_offset != 0) {
- bool expect_data = true;
- for (size_t k = 0; k < maxkeys; ++k) {
- bool found = Get("key" + std::to_string(k)) != "NOT_FOUND";
- if (expect_data && !found) {
- expect_data = false;
- }
- ASSERT_EQ(found, expect_data);
- }
- }
- const size_t min = RecoveryTestHelper::kKeysPerWALFile *
- (wal_file_id - RecoveryTestHelper::kWALFileOffset);
- ASSERT_GE(recovered_row_count, min);
- if (!trunc && corrupt_offset != 0) {
- const size_t max = RecoveryTestHelper::kKeysPerWALFile *
- (wal_file_id - RecoveryTestHelper::kWALFileOffset + 1);
- ASSERT_LE(recovered_row_count, max);
- }
- }
- // Test scope:
- // - We expect to open the data store under all scenarios
- // - We expect to have recovered records past the corruption zone
- TEST_P(DBWALTestWithParams, kSkipAnyCorruptedRecords) {
- bool trunc = std::get<0>(GetParam()); // Corruption style
- // Corruption offset position
- int corrupt_offset = std::get<1>(GetParam());
- int wal_file_id = std::get<2>(GetParam()); // WAL file
- // WAL compression type
- CompressionType compression_type = std::get<3>(GetParam());
- // Fill data for testing
- Options options = CurrentOptions();
- options.track_and_verify_wals = std::get<4>(GetParam());
- options.wal_compression = compression_type;
- const size_t row_count = RecoveryTestHelper::FillData(this, &options);
- // Corrupt the WAL
- RecoveryTestHelper::CorruptWAL(this, options, corrupt_offset * .3,
- /*len%=*/.1, wal_file_id, trunc);
- // Verify behavior
- options.wal_recovery_mode = WALRecoveryMode::kSkipAnyCorruptedRecords;
- options.create_if_missing = false;
- ASSERT_OK(TryReopen(options));
- // Probe data for invariants
- size_t recovered_row_count = RecoveryTestHelper::GetData(this);
- ASSERT_LT(recovered_row_count, row_count);
- if (!trunc) {
- ASSERT_TRUE(corrupt_offset != 0 || recovered_row_count > 0);
- }
- }
- TEST_F(DBWALTest, AvoidFlushDuringRecovery) {
- Options options = CurrentOptions();
- options.disable_auto_compactions = true;
- options.avoid_flush_during_recovery = false;
- // Test with flush after recovery.
- Reopen(options);
- ASSERT_OK(Put("foo", "v1"));
- ASSERT_OK(Put("bar", "v2"));
- ASSERT_OK(Flush());
- ASSERT_OK(Put("foo", "v3"));
- ASSERT_OK(Put("bar", "v4"));
- ASSERT_EQ(1, TotalTableFiles());
- // Reopen DB. Check if WAL logs flushed.
- Reopen(options);
- ASSERT_EQ("v3", Get("foo"));
- ASSERT_EQ("v4", Get("bar"));
- ASSERT_EQ(2, TotalTableFiles());
- // Test without flush after recovery.
- options.avoid_flush_during_recovery = true;
- DestroyAndReopen(options);
- ASSERT_OK(Put("foo", "v5"));
- ASSERT_OK(Put("bar", "v6"));
- ASSERT_OK(Flush());
- ASSERT_OK(Put("foo", "v7"));
- ASSERT_OK(Put("bar", "v8"));
- ASSERT_EQ(1, TotalTableFiles());
- // Reopen DB. WAL logs should not be flushed this time.
- Reopen(options);
- ASSERT_EQ("v7", Get("foo"));
- ASSERT_EQ("v8", Get("bar"));
- ASSERT_EQ(1, TotalTableFiles());
- // Force flush with allow_2pc.
- options.avoid_flush_during_recovery = true;
- options.allow_2pc = true;
- ASSERT_OK(Put("foo", "v9"));
- ASSERT_OK(Put("bar", "v10"));
- ASSERT_OK(Flush());
- ASSERT_OK(Put("foo", "v11"));
- ASSERT_OK(Put("bar", "v12"));
- Reopen(options);
- ASSERT_EQ("v11", Get("foo"));
- ASSERT_EQ("v12", Get("bar"));
- ASSERT_EQ(3, TotalTableFiles());
- }
- TEST_F(DBWALTest, WalCleanupAfterAvoidFlushDuringRecovery) {
- // Verifies WAL files that were present during recovery, but not flushed due
- // to avoid_flush_during_recovery, will be considered for deletion at a later
- // stage. We check at least one such file is deleted during Flush().
- Options options = CurrentOptions();
- options.disable_auto_compactions = true;
- options.avoid_flush_during_recovery = true;
- Reopen(options);
- ASSERT_OK(Put("foo", "v1"));
- Reopen(options);
- for (int i = 0; i < 2; ++i) {
- if (i > 0) {
- // Flush() triggers deletion of obsolete tracked files
- ASSERT_OK(Flush());
- }
- VectorLogPtr log_files;
- ASSERT_OK(dbfull()->GetSortedWalFiles(log_files));
- if (i == 0) {
- ASSERT_GT(log_files.size(), 0);
- } else {
- ASSERT_EQ(0, log_files.size());
- }
- }
- }
- TEST_F(DBWALTest, RecoverWithoutFlush) {
- Options options = CurrentOptions();
- options.avoid_flush_during_recovery = true;
- options.create_if_missing = false;
- options.disable_auto_compactions = true;
- options.write_buffer_size = 64 * 1024 * 1024;
- size_t count = RecoveryTestHelper::FillData(this, &options);
- auto validateData = [this, count]() {
- for (size_t i = 0; i < count; i++) {
- ASSERT_NE(Get("key" + std::to_string(i)), "NOT_FOUND");
- }
- };
- Reopen(options);
- validateData();
- // Insert some data without flush
- ASSERT_OK(Put("foo", "foo_v1"));
- ASSERT_OK(Put("bar", "bar_v1"));
- Reopen(options);
- validateData();
- ASSERT_EQ(Get("foo"), "foo_v1");
- ASSERT_EQ(Get("bar"), "bar_v1");
- // Insert again and reopen
- ASSERT_OK(Put("foo", "foo_v2"));
- ASSERT_OK(Put("bar", "bar_v2"));
- Reopen(options);
- validateData();
- ASSERT_EQ(Get("foo"), "foo_v2");
- ASSERT_EQ(Get("bar"), "bar_v2");
- // manual flush and insert again
- ASSERT_OK(Flush());
- ASSERT_EQ(Get("foo"), "foo_v2");
- ASSERT_EQ(Get("bar"), "bar_v2");
- ASSERT_OK(Put("foo", "foo_v3"));
- ASSERT_OK(Put("bar", "bar_v3"));
- Reopen(options);
- validateData();
- ASSERT_EQ(Get("foo"), "foo_v3");
- ASSERT_EQ(Get("bar"), "bar_v3");
- }
- TEST_F(DBWALTest, RecoverWithoutFlushMultipleCF) {
- const std::string kSmallValue = "v";
- const std::string kLargeValue = DummyString(1024);
- Options options = CurrentOptions();
- options.avoid_flush_during_recovery = true;
- options.create_if_missing = false;
- options.disable_auto_compactions = true;
- auto countWalFiles = [this]() {
- VectorLogPtr log_files;
- if (!dbfull()->GetSortedWalFiles(log_files).ok()) {
- return size_t{0};
- }
- return log_files.size();
- };
- // Create DB with multiple column families and multiple log files.
- CreateAndReopenWithCF({"one", "two"}, options);
- ASSERT_OK(Put(0, "key1", kSmallValue));
- ASSERT_OK(Put(1, "key2", kLargeValue));
- ASSERT_OK(Flush(1));
- ASSERT_EQ(1, countWalFiles());
- ASSERT_OK(Put(0, "key3", kSmallValue));
- ASSERT_OK(Put(2, "key4", kLargeValue));
- ASSERT_OK(Flush(2));
- ASSERT_EQ(2, countWalFiles());
- // Reopen, insert and flush.
- options.db_write_buffer_size = 64 * 1024 * 1024;
- ReopenWithColumnFamilies({"default", "one", "two"}, options);
- ASSERT_EQ(Get(0, "key1"), kSmallValue);
- ASSERT_EQ(Get(1, "key2"), kLargeValue);
- ASSERT_EQ(Get(0, "key3"), kSmallValue);
- ASSERT_EQ(Get(2, "key4"), kLargeValue);
- // Insert more data.
- ASSERT_OK(Put(0, "key5", kLargeValue));
- ASSERT_OK(Put(1, "key6", kLargeValue));
- ASSERT_EQ(3, countWalFiles());
- ASSERT_OK(Flush(1));
- ASSERT_OK(Put(2, "key7", kLargeValue));
- ASSERT_OK(dbfull()->FlushWAL(false));
- ASSERT_EQ(4, countWalFiles());
- // Reopen twice and validate.
- for (int i = 0; i < 2; i++) {
- ReopenWithColumnFamilies({"default", "one", "two"}, options);
- ASSERT_EQ(Get(0, "key1"), kSmallValue);
- ASSERT_EQ(Get(1, "key2"), kLargeValue);
- ASSERT_EQ(Get(0, "key3"), kSmallValue);
- ASSERT_EQ(Get(2, "key4"), kLargeValue);
- ASSERT_EQ(Get(0, "key5"), kLargeValue);
- ASSERT_EQ(Get(1, "key6"), kLargeValue);
- ASSERT_EQ(Get(2, "key7"), kLargeValue);
- ASSERT_EQ(4, countWalFiles());
- }
- }
- // In this test we are trying to do the following:
- // 1. Create a DB with corrupted WAL log;
- // 2. Open with avoid_flush_during_recovery = true;
- // 3. Append more data without flushing, which creates new WAL log.
- // 4. Open again. See if it can correctly handle previous corruption.
- TEST_P(DBWALTestWithParamsVaryingRecoveryMode,
- RecoverFromCorruptedWALWithoutFlush) {
- const int kAppendKeys = 100;
- Options options = CurrentOptions();
- options.avoid_flush_during_recovery = true;
- options.create_if_missing = false;
- options.disable_auto_compactions = true;
- options.write_buffer_size = 64 * 1024 * 1024;
- auto getAll = [this]() {
- std::vector<std::pair<std::string, std::string>> data;
- ReadOptions ropt;
- Iterator* iter = dbfull()->NewIterator(ropt);
- for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
- data.emplace_back(iter->key().ToString(), iter->value().ToString());
- }
- EXPECT_OK(iter->status());
- delete iter;
- return data;
- };
- bool trunc = std::get<0>(GetParam()); // Corruption style
- // Corruption offset position
- int corrupt_offset = std::get<1>(GetParam());
- int wal_file_id = std::get<2>(GetParam()); // WAL file
- WALRecoveryMode recovery_mode = std::get<3>(GetParam());
- // WAL compression type
- CompressionType compression_type = std::get<4>(GetParam());
- options.wal_recovery_mode = recovery_mode;
- options.wal_compression = compression_type;
- // Create corrupted WAL
- RecoveryTestHelper::FillData(this, &options);
- RecoveryTestHelper::CorruptWAL(this, options, corrupt_offset * .3,
- /*len%=*/.1, wal_file_id, trunc);
- // Skip the test if DB won't open.
- if (!TryReopen(options).ok()) {
- ASSERT_TRUE(options.wal_recovery_mode ==
- WALRecoveryMode::kAbsoluteConsistency ||
- (!trunc && options.wal_recovery_mode ==
- WALRecoveryMode::kTolerateCorruptedTailRecords));
- return;
- }
- ASSERT_OK(TryReopen(options));
- // Append some more data.
- for (int k = 0; k < kAppendKeys; k++) {
- std::string key = "extra_key" + std::to_string(k);
- std::string value = DummyString(RecoveryTestHelper::kValueSize);
- ASSERT_OK(Put(key, value));
- }
- // Save data for comparison.
- auto data = getAll();
- // Reopen. Verify data.
- ASSERT_OK(TryReopen(options));
- auto actual_data = getAll();
- ASSERT_EQ(data, actual_data);
- }
- // Tests that total log size is recovered if we set
- // avoid_flush_during_recovery=true.
- // Flush should trigger if max_total_wal_size is reached.
- TEST_F(DBWALTest, RestoreTotalLogSizeAfterRecoverWithoutFlush) {
- auto test_listener = std::make_shared<FlushCounterListener>();
- test_listener->expected_flush_reason = FlushReason::kWalFull;
- constexpr size_t kKB = 1024;
- constexpr size_t kMB = 1024 * 1024;
- Options options = CurrentOptions();
- options.avoid_flush_during_recovery = true;
- options.max_total_wal_size = 1 * kMB;
- options.listeners.push_back(test_listener);
- // Have to open DB in multi-CF mode to trigger flush when
- // max_total_wal_size is reached.
- CreateAndReopenWithCF({"one"}, options);
- // Write some keys and we will end up with one log file which is slightly
- // smaller than 1MB.
- std::string value_100k(100 * kKB, 'v');
- std::string value_300k(300 * kKB, 'v');
- ASSERT_OK(Put(0, "foo", "v1"));
- for (int i = 0; i < 9; i++) {
- ASSERT_OK(Put(1, "key" + std::to_string(i), value_100k));
- }
- // Get log files before reopen.
- VectorLogPtr log_files_before;
- ASSERT_OK(dbfull()->GetSortedWalFiles(log_files_before));
- ASSERT_EQ(1, log_files_before.size());
- uint64_t log_size_before = log_files_before[0]->SizeFileBytes();
- ASSERT_GT(log_size_before, 900 * kKB);
- ASSERT_LT(log_size_before, 1 * kMB);
- ReopenWithColumnFamilies({"default", "one"}, options);
- // Write one more value to make log larger than 1MB.
- ASSERT_OK(Put(1, "bar", value_300k));
- // Get log files again. A new log file will be opened.
- VectorLogPtr log_files_after_reopen;
- ASSERT_OK(dbfull()->GetSortedWalFiles(log_files_after_reopen));
- ASSERT_EQ(2, log_files_after_reopen.size());
- ASSERT_EQ(log_files_before[0]->LogNumber(),
- log_files_after_reopen[0]->LogNumber());
- ASSERT_GT(log_files_after_reopen[0]->SizeFileBytes() +
- log_files_after_reopen[1]->SizeFileBytes(),
- 1 * kMB);
- // Write one more key to trigger flush.
- ASSERT_OK(Put(0, "foo", "v2"));
- for (auto* h : handles_) {
- ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(h));
- }
- // Flushed two column families.
- ASSERT_EQ(2, test_listener->count.load());
- }
- #if defined(ROCKSDB_PLATFORM_POSIX)
- #if defined(ROCKSDB_FALLOCATE_PRESENT)
- // Tests that we will truncate the preallocated space of the last log from
- // previous.
- TEST_F(DBWALTest, TruncateLastLogAfterRecoverWithoutFlush) {
- constexpr size_t kKB = 1024;
- Options options = CurrentOptions();
- options.env = env_;
- options.avoid_flush_during_recovery = true;
- if (mem_env_) {
- ROCKSDB_GTEST_SKIP("Test requires non-mem environment");
- return;
- }
- if (!IsFallocateSupported()) {
- return;
- }
- DestroyAndReopen(options);
- size_t preallocated_size =
- dbfull()->TEST_GetWalPreallocateBlockSize(options.write_buffer_size);
- ASSERT_OK(Put("foo", "v1"));
- VectorLogPtr log_files_before;
- ASSERT_OK(dbfull()->GetSortedWalFiles(log_files_before));
- ASSERT_EQ(1, log_files_before.size());
- auto& file_before = log_files_before[0];
- ASSERT_LT(file_before->SizeFileBytes(), 1 * kKB);
- // The log file has preallocated space.
- ASSERT_GE(GetAllocatedFileSize(dbname_ + file_before->PathName()),
- preallocated_size);
- Reopen(options);
- VectorLogPtr log_files_after;
- ASSERT_OK(dbfull()->GetSortedWalFiles(log_files_after));
- ASSERT_EQ(1, log_files_after.size());
- ASSERT_LT(log_files_after[0]->SizeFileBytes(), 1 * kKB);
- // The preallocated space should be truncated.
- ASSERT_LT(GetAllocatedFileSize(dbname_ + file_before->PathName()),
- preallocated_size);
- }
- // Tests that we will truncate the preallocated space of the last log from
- // previous.
- TEST_F(DBWALTest, TruncateLastLogAfterRecoverWithFlush) {
- constexpr size_t kKB = 1024;
- Options options = CurrentOptions();
- options.env = env_;
- options.avoid_flush_during_recovery = false;
- options.avoid_flush_during_shutdown = true;
- if (mem_env_) {
- ROCKSDB_GTEST_SKIP("Test requires non-mem environment");
- return;
- }
- if (!IsFallocateSupported()) {
- return;
- }
- DestroyAndReopen(options);
- size_t preallocated_size =
- dbfull()->TEST_GetWalPreallocateBlockSize(options.write_buffer_size);
- ASSERT_OK(Put("foo", "v1"));
- VectorLogPtr log_files_before;
- ASSERT_OK(dbfull()->GetSortedWalFiles(log_files_before));
- ASSERT_EQ(1, log_files_before.size());
- auto& file_before = log_files_before[0];
- ASSERT_LT(file_before->SizeFileBytes(), 1 * kKB);
- ASSERT_GE(GetAllocatedFileSize(dbname_ + file_before->PathName()),
- preallocated_size);
- // The log file has preallocated space.
- Close();
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
- {{"DBImpl::PurgeObsoleteFiles:Begin",
- "DBWALTest::TruncateLastLogAfterRecoverWithFlush:AfterRecover"},
- {"DBWALTest::TruncateLastLogAfterRecoverWithFlush:AfterTruncate",
- "DBImpl::DeleteObsoleteFileImpl::BeforeDeletion"}});
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
- port::Thread reopen_thread([&]() { Reopen(options); });
- TEST_SYNC_POINT(
- "DBWALTest::TruncateLastLogAfterRecoverWithFlush:AfterRecover");
- // After the flush during Open, the log file should get deleted. However,
- // if the process is in a crash loop, the log file may not get
- // deleted and thte preallocated space will keep accumulating. So we need
- // to ensure it gets trtuncated.
- EXPECT_LT(GetAllocatedFileSize(dbname_ + file_before->PathName()),
- preallocated_size);
- TEST_SYNC_POINT(
- "DBWALTest::TruncateLastLogAfterRecoverWithFlush:AfterTruncate");
- reopen_thread.join();
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
- }
- TEST_F(DBWALTest, TruncateLastLogAfterRecoverWALEmpty) {
- Options options = CurrentOptions();
- options.env = env_;
- options.avoid_flush_during_recovery = false;
- if (mem_env_ || encrypted_env_) {
- ROCKSDB_GTEST_SKIP("Test requires non-mem/non-encrypted environment");
- return;
- }
- if (!IsFallocateSupported()) {
- return;
- }
- DestroyAndReopen(options);
- size_t preallocated_size =
- dbfull()->TEST_GetWalPreallocateBlockSize(options.write_buffer_size);
- Close();
- std::vector<std::string> filenames;
- std::string last_log;
- uint64_t last_log_num = 0;
- ASSERT_OK(env_->GetChildren(dbname_, &filenames));
- for (const auto& fname : filenames) {
- uint64_t number;
- FileType type;
- if (ParseFileName(fname, &number, &type, nullptr)) {
- if (type == kWalFile && number > last_log_num) {
- last_log = fname;
- }
- }
- }
- ASSERT_NE(last_log, "");
- last_log = dbname_ + '/' + last_log;
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
- {{"DBImpl::PurgeObsoleteFiles:Begin",
- "DBWALTest::TruncateLastLogAfterRecoverWithFlush:AfterRecover"},
- {"DBWALTest::TruncateLastLogAfterRecoverWithFlush:AfterTruncate",
- "DBImpl::DeleteObsoleteFileImpl::BeforeDeletion"}});
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
- "PosixWritableFile::Close",
- [](void* arg) { *(static_cast<size_t*>(arg)) = 0; });
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
- // Preallocate space for the empty log file. This could happen if WAL data
- // was buffered in memory and the process crashed.
- std::unique_ptr<WritableFile> log_file;
- ASSERT_OK(env_->ReopenWritableFile(last_log, &log_file, EnvOptions()));
- log_file->SetPreallocationBlockSize(preallocated_size);
- log_file->PrepareWrite(0, 4096);
- log_file.reset();
- ASSERT_GE(GetAllocatedFileSize(last_log), preallocated_size);
- port::Thread reopen_thread([&]() { Reopen(options); });
- TEST_SYNC_POINT(
- "DBWALTest::TruncateLastLogAfterRecoverWithFlush:AfterRecover");
- // The preallocated space should be truncated.
- EXPECT_LT(GetAllocatedFileSize(last_log), preallocated_size);
- TEST_SYNC_POINT(
- "DBWALTest::TruncateLastLogAfterRecoverWithFlush:AfterTruncate");
- reopen_thread.join();
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
- }
- TEST_F(DBWALTest, ReadOnlyRecoveryNoTruncate) {
- constexpr size_t kKB = 1024;
- Options options = CurrentOptions();
- options.env = env_;
- options.avoid_flush_during_recovery = true;
- if (mem_env_) {
- ROCKSDB_GTEST_SKIP("Test requires non-mem environment");
- return;
- }
- if (!IsFallocateSupported()) {
- return;
- }
- // create DB and close with file truncate disabled
- std::atomic_bool enable_truncate{false};
- SyncPoint::GetInstance()->SetCallBack("PosixWritableFile::Close",
- [&](void* arg) {
- if (!enable_truncate) {
- *(static_cast<size_t*>(arg)) = 0;
- }
- });
- SyncPoint::GetInstance()->EnableProcessing();
- DestroyAndReopen(options);
- size_t preallocated_size =
- dbfull()->TEST_GetWalPreallocateBlockSize(options.write_buffer_size);
- ASSERT_OK(Put("foo", "v1"));
- VectorLogPtr log_files_before;
- ASSERT_OK(dbfull()->GetSortedWalFiles(log_files_before));
- ASSERT_EQ(1, log_files_before.size());
- auto& file_before = log_files_before[0];
- ASSERT_LT(file_before->SizeFileBytes(), 1 * kKB);
- // The log file has preallocated space.
- auto db_size = GetAllocatedFileSize(dbname_ + file_before->PathName());
- ASSERT_GE(db_size, preallocated_size);
- Close();
- // enable truncate and open DB as readonly, the file should not be truncated
- // and DB size is not changed.
- enable_truncate = true;
- ASSERT_OK(ReadOnlyReopen(options));
- VectorLogPtr log_files_after;
- ASSERT_OK(dbfull()->GetSortedWalFiles(log_files_after));
- ASSERT_EQ(1, log_files_after.size());
- ASSERT_LT(log_files_after[0]->SizeFileBytes(), 1 * kKB);
- ASSERT_EQ(log_files_after[0]->PathName(), file_before->PathName());
- // The preallocated space should NOT be truncated.
- // the DB size is almost the same.
- ASSERT_NEAR(GetAllocatedFileSize(dbname_ + file_before->PathName()), db_size,
- db_size / 100);
- SyncPoint::GetInstance()->DisableProcessing();
- SyncPoint::GetInstance()->ClearAllCallBacks();
- }
- #endif // ROCKSDB_FALLOCATE_PRESENT
- #endif // ROCKSDB_PLATFORM_POSIX
- TEST_F(DBWALTest, WalInManifestButNotInSortedWals) {
- Options options = CurrentOptions();
- options.track_and_verify_wals_in_manifest = true;
- options.wal_recovery_mode = WALRecoveryMode::kAbsoluteConsistency;
- // Build a way to make wal files selectively go missing
- bool wals_go_missing = false;
- struct MissingWalFs : public FileSystemWrapper {
- MissingWalFs(const std::shared_ptr<FileSystem>& t,
- bool* _wals_go_missing_flag)
- : FileSystemWrapper(t), wals_go_missing_flag(_wals_go_missing_flag) {}
- bool* wals_go_missing_flag;
- IOStatus GetChildren(const std::string& dir, const IOOptions& io_opts,
- std::vector<std::string>* r,
- IODebugContext* dbg) override {
- IOStatus s = target_->GetChildren(dir, io_opts, r, dbg);
- if (s.ok() && *wals_go_missing_flag) {
- for (size_t i = 0; i < r->size();) {
- if (EndsWith(r->at(i), ".log")) {
- r->erase(r->begin() + i);
- } else {
- ++i;
- }
- }
- }
- return s;
- }
- const char* Name() const override { return "MissingWalFs"; }
- };
- auto my_fs =
- std::make_shared<MissingWalFs>(env_->GetFileSystem(), &wals_go_missing);
- std::unique_ptr<Env> my_env(NewCompositeEnv(my_fs));
- options.env = my_env.get();
- CreateAndReopenWithCF({"blah"}, options);
- // Currently necessary to get a WAL tracked in manifest; see
- // https://github.com/facebook/rocksdb/issues/10080
- ASSERT_OK(Put(0, "x", "y"));
- ASSERT_OK(db_->SyncWAL());
- ASSERT_OK(Put(1, "x", "y"));
- ASSERT_OK(db_->SyncWAL());
- ASSERT_OK(Flush(1));
- ASSERT_FALSE(dbfull()->GetVersionSet()->GetWalSet().GetWals().empty());
- std::vector<std::unique_ptr<LogFile>> wals;
- ASSERT_OK(db_->GetSortedWalFiles(wals));
- wals_go_missing = true;
- ASSERT_NOK(db_->GetSortedWalFiles(wals));
- wals_go_missing = false;
- Close();
- }
- TEST_F(DBWALTest, WalTermTest) {
- Options options = CurrentOptions();
- options.env = env_;
- CreateAndReopenWithCF({"pikachu"}, options);
- ASSERT_OK(Put(1, "foo", "bar"));
- WriteOptions wo;
- wo.sync = true;
- wo.disableWAL = false;
- WriteBatch batch;
- ASSERT_OK(batch.Put("foo", "bar"));
- batch.MarkWalTerminationPoint();
- ASSERT_OK(batch.Put("foo2", "bar2"));
- ASSERT_OK(dbfull()->Write(wo, &batch));
- // make sure we can re-open it.
- ASSERT_OK(TryReopenWithColumnFamilies({"default", "pikachu"}, options));
- ASSERT_EQ("bar", Get(1, "foo"));
- ASSERT_EQ("NOT_FOUND", Get(1, "foo2"));
- }
- TEST_F(DBWALTest, GetCompressedWalsAfterSync) {
- if (db_->GetOptions().wal_compression == kNoCompression) {
- ROCKSDB_GTEST_BYPASS("stream compression not present");
- return;
- }
- Options options = GetDefaultOptions();
- options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
- options.create_if_missing = true;
- options.env = env_;
- options.avoid_flush_during_recovery = true;
- options.track_and_verify_wals_in_manifest = true;
- // Enable WAL compression so that the newly-created WAL will be non-empty
- // after DB open, even if point-in-time WAL recovery encounters no
- // corruption.
- options.wal_compression = kZSTD;
- DestroyAndReopen(options);
- // Write something to memtable and WAL so that wal_empty_ will be false after
- // next DB::Open().
- ASSERT_OK(Put("a", "v"));
- Reopen(options);
- // New WAL is created, thanks to !wal_empty_.
- ASSERT_OK(dbfull()->TEST_SwitchWAL());
- ASSERT_OK(Put("b", "v"));
- ASSERT_OK(db_->SyncWAL());
- VectorLogPtr wals;
- Status s = dbfull()->GetSortedWalFiles(wals);
- ASSERT_OK(s);
- }
- TEST_F(DBWALTest, EmptyWalReopenTest) {
- Options options = CurrentOptions();
- options.env = env_;
- CreateAndReopenWithCF({"pikachu"}, options);
- // make sure we can re-open it.
- ASSERT_OK(TryReopenWithColumnFamilies({"default", "pikachu"}, options));
- {
- std::vector<std::string> files;
- int num_wal_files = 0;
- ASSERT_OK(env_->GetChildren(dbname_, &files));
- for (const auto& file : files) {
- uint64_t number = 0;
- FileType type = kWalFile;
- if (ParseFileName(file, &number, &type) && type == kWalFile) {
- num_wal_files++;
- }
- }
- ASSERT_EQ(num_wal_files, 1);
- }
- }
- TEST_F(DBWALTest, RecoveryFlushSwitchWALOnEmptyMemtable) {
- Options options = CurrentOptions();
- auto fault_fs = std::make_shared<FaultInjectionTestFS>(FileSystem::Default());
- std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
- options.env = fault_fs_env.get();
- options.avoid_flush_during_shutdown = true;
- DestroyAndReopen(options);
- // Make sure the memtable switch in recovery flush happened after test checks
- // the memtable is empty.
- SyncPoint::GetInstance()->LoadDependency(
- {{"DBWALTest.RecoveryFlushSwitchWALOnEmptyMemtable:"
- "AfterCheckMemtableEmpty",
- "RecoverFromRetryableBGIOError:BeforeStart"}});
- SyncPoint::GetInstance()->EnableProcessing();
- fault_fs->SetThreadLocalErrorContext(
- FaultInjectionIOType::kMetadataWrite, 7 /* seed*/, 1 /* one_in */,
- true /* retryable */, false /* has_data_loss*/);
- fault_fs->EnableThreadLocalErrorInjection(
- FaultInjectionIOType::kMetadataWrite);
- WriteOptions wo;
- wo.sync = true;
- Status s = Put("k", "old_v", wo);
- ASSERT_TRUE(s.IsIOError());
- // To verify the key is not in memtable nor SST
- ASSERT_TRUE(static_cast<ColumnFamilyHandleImpl*>(db_->DefaultColumnFamily())
- ->cfd()
- ->mem()
- ->IsEmpty());
- ASSERT_EQ("NOT_FOUND", Get("k"));
- TEST_SYNC_POINT(
- "DBWALTest.RecoveryFlushSwitchWALOnEmptyMemtable:"
- "AfterCheckMemtableEmpty");
- SyncPoint::GetInstance()->DisableProcessing();
- fault_fs->DisableThreadLocalErrorInjection(
- FaultInjectionIOType::kMetadataWrite);
- // Keep trying write until recovery of the previous IO error finishes
- while (!s.ok()) {
- options.env->SleepForMicroseconds(1000);
- s = Put("k", "new_v");
- }
- // If recovery flush didn't switch WAL, we will end up having two duplicate
- // WAL entries with same seqno and same key that violate assertion during WAL
- // recovery and fail DB reopen
- options.avoid_flush_during_recovery = false;
- Reopen(options);
- ASSERT_EQ("new_v", Get("k"));
- Destroy(options);
- }
- TEST_F(DBWALTest, WALWriteErrorNoRecovery) {
- Options options = CurrentOptions();
- auto fault_fs = std::make_shared<FaultInjectionTestFS>(FileSystem::Default());
- std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
- options.env = fault_fs_env.get();
- options.manual_wal_flush = true;
- DestroyAndReopen(options);
- fault_fs->SetThreadLocalErrorContext(
- FaultInjectionIOType::kWrite, 7 /* seed*/, 1 /* one_in */,
- true /* retryable */, false /* has_data_loss*/);
- fault_fs->EnableThreadLocalErrorInjection(FaultInjectionIOType::kWrite);
- ASSERT_OK(Put("k", "v"));
- Status s;
- s = db_->FlushWAL(false);
- ASSERT_TRUE(s.IsIOError());
- s = dbfull()->TEST_GetBGError();
- ASSERT_EQ(s.severity(), Status::Severity::kFatalError);
- ASSERT_FALSE(dbfull()->TEST_IsRecoveryInProgress());
- fault_fs->DisableThreadLocalErrorInjection(FaultInjectionIOType::kWrite);
- Destroy(options);
- }
- } // namespace ROCKSDB_NAMESPACE
- int main(int argc, char** argv) {
- ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
- ::testing::InitGoogleTest(&argc, argv);
- return RUN_ALL_TESTS();
- }
|