db_flush_test.cc 136 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645264626472648264926502651265226532654265526562657265826592660266126622663266426652666266726682669267026712672267326742675267626772678267926802681268226832684268526862687268826892690269126922693269426952696269726982699270027012702270327042705270627072708270927102711271227132714271527162717271827192720272127222723272427252726272727282729273027312732273327342735273627372738273927402741274227432744274527462747274827492750275127522753275427552756275727582759276027612762276327642765276627672768276927702771277227732774277527762777277827792780278127822783278427852786278727882789279027912792279327942795279627972798279928002801280228032804280528062807280828092810281128122813281428152816281728182819282028212822282328242825282628272828282928302831283228332834283528362837283828392840284128422843284428452846284728482849285028512852285328542855285628572858285928602861286228632864286528662867286828692870287128722873287428752876287728782879288028812882288328842885288628872888288928902891289228932894289528962897289828992900290129022903290429052906290729082909291029112912291329142915291629172918291929202921292229232924292529262927292829292930293129322933293429352936293729382939294029412942294329442945294629472948294929502951295229532954295529562957295829592960296129622963296429652966296729682969297029712972297329742975297629772978297929802981298229832984298529862987298829892990299129922993299429952996299729982999300030013002300330043005300630073008300930103011301230133014301530163017301830193020302130223023302430253026302730283029303030313032303330343035303630373038303930403041304230433044304530463047304830493050305130523053305430553056305730583059306030613062306330643065306630673068306930703071307230733074307530763077307830793080308130823083308430853086308730883089309030913092309330943095309630973098309931003101310231033104310531063107310831093110311131123113311431153116311731183119312031213122312331243125312631273128312931303131313231333134313531363137313831393140314131423143314431453146314731483149315031513152315331543155315631573158315931603161316231633164316531663167316831693170317131723173317431753176317731783179318031813182318331843185318631873188318931903191319231933194319531963197319831993200320132023203320432053206320732083209321032113212321332143215321632173218321932203221322232233224322532263227322832293230323132323233323432353236323732383239324032413242324332443245324632473248324932503251325232533254325532563257325832593260326132623263326432653266326732683269327032713272327332743275327632773278327932803281328232833284328532863287328832893290329132923293329432953296329732983299330033013302330333043305330633073308330933103311331233133314331533163317331833193320332133223323332433253326332733283329333033313332333333343335333633373338333933403341334233433344334533463347334833493350335133523353335433553356335733583359336033613362336333643365336633673368336933703371337233733374337533763377337833793380338133823383338433853386338733883389339033913392339333943395339633973398339934003401340234033404340534063407340834093410341134123413341434153416341734183419342034213422342334243425342634273428342934303431343234333434343534363437343834393440344134423443344434453446344734483449345034513452345334543455345634573458345934603461346234633464346534663467346834693470347134723473347434753476347734783479348034813482348334843485348634873488348934903491349234933494349534963497349834993500350135023503350435053506350735083509351035113512351335143515351635173518351935203521352235233524352535263527352835293530353135323533353435353536353735383539354035413542354335443545354635473548354935503551355235533554355535563557355835593560356135623563356435653566356735683569357035713572357335743575357635773578357935803581358235833584358535863587358835893590359135923593359435953596359735983599360036013602360336043605360636073608360936103611361236133614361536163617361836193620362136223623362436253626362736283629363036313632363336343635363636373638363936403641364236433644364536463647364836493650365136523653365436553656365736583659366036613662366336643665366636673668366936703671367236733674367536763677367836793680368136823683368436853686368736883689369036913692369336943695369636973698369937003701370237033704370537063707370837093710371137123713371437153716
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #include <atomic>
  10. #include <limits>
  11. #include "db/db_impl/db_impl.h"
  12. #include "db/db_test_util.h"
  13. #include "env/mock_env.h"
  14. #include "file/filename.h"
  15. #include "port/port.h"
  16. #include "port/stack_trace.h"
  17. #include "rocksdb/utilities/transaction_db.h"
  18. #include "test_util/sync_point.h"
  19. #include "test_util/testutil.h"
  20. #include "util/cast_util.h"
  21. #include "util/mutexlock.h"
  22. #include "utilities/fault_injection_env.h"
  23. #include "utilities/fault_injection_fs.h"
  24. namespace ROCKSDB_NAMESPACE {
  25. // This is a static filter used for filtering
  26. // kvs during the compaction process.
  27. static std::string NEW_VALUE = "NewValue";
  28. class DBFlushTest : public DBTestBase {
  29. public:
  30. DBFlushTest() : DBTestBase("db_flush_test", /*env_do_fsync=*/true) {}
  31. };
  32. class DBFlushDirectIOTest : public DBFlushTest,
  33. public ::testing::WithParamInterface<bool> {
  34. public:
  35. DBFlushDirectIOTest() : DBFlushTest() {}
  36. };
  37. class DBAtomicFlushTest : public DBFlushTest,
  38. public ::testing::WithParamInterface<bool> {
  39. public:
  40. DBAtomicFlushTest() : DBFlushTest() {}
  41. };
  42. // We had issue when two background threads trying to flush at the same time,
  43. // only one of them get committed. The test verifies the issue is fixed.
  44. TEST_F(DBFlushTest, FlushWhileWritingManifest) {
  45. Options options;
  46. options.disable_auto_compactions = true;
  47. options.max_background_flushes = 2;
  48. options.env = env_;
  49. Reopen(options);
  50. FlushOptions no_wait;
  51. no_wait.wait = false;
  52. no_wait.allow_write_stall = true;
  53. SyncPoint::GetInstance()->LoadDependency(
  54. {{"VersionSet::LogAndApply:WriteManifest",
  55. "DBFlushTest::FlushWhileWritingManifest:1"},
  56. {"MemTableList::TryInstallMemtableFlushResults:InProgress",
  57. "VersionSet::LogAndApply:WriteManifestDone"}});
  58. SyncPoint::GetInstance()->EnableProcessing();
  59. ASSERT_OK(Put("foo", "v"));
  60. ASSERT_OK(dbfull()->Flush(no_wait));
  61. TEST_SYNC_POINT("DBFlushTest::FlushWhileWritingManifest:1");
  62. ASSERT_OK(Put("bar", "v"));
  63. ASSERT_OK(dbfull()->Flush(no_wait));
  64. // If the issue is hit we will wait here forever.
  65. ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
  66. ASSERT_EQ(2, TotalTableFiles());
  67. }
  68. // Disable this test temporarily on Travis as it fails intermittently.
  69. // Github issue: #4151
  70. TEST_F(DBFlushTest, SyncFail) {
  71. std::unique_ptr<FaultInjectionTestEnv> fault_injection_env(
  72. new FaultInjectionTestEnv(env_));
  73. Options options;
  74. options.disable_auto_compactions = true;
  75. options.env = fault_injection_env.get();
  76. SyncPoint::GetInstance()->LoadDependency(
  77. {{"DBFlushTest::SyncFail:1", "DBImpl::SyncClosedWals:Start"},
  78. {"DBImpl::SyncClosedWals:Failed", "DBFlushTest::SyncFail:2"}});
  79. SyncPoint::GetInstance()->EnableProcessing();
  80. CreateAndReopenWithCF({"pikachu"}, options);
  81. ASSERT_OK(Put("key", "value"));
  82. FlushOptions flush_options;
  83. flush_options.wait = false;
  84. ASSERT_OK(dbfull()->Flush(flush_options));
  85. // Flush installs a new super-version. Get the ref count after that.
  86. fault_injection_env->SetFilesystemActive(false);
  87. TEST_SYNC_POINT("DBFlushTest::SyncFail:1");
  88. TEST_SYNC_POINT("DBFlushTest::SyncFail:2");
  89. fault_injection_env->SetFilesystemActive(true);
  90. // Now the background job will do the flush; wait for it.
  91. // Returns the IO error happend during flush.
  92. ASSERT_NOK(dbfull()->TEST_WaitForFlushMemTable());
  93. ASSERT_EQ("", FilesPerLevel()); // flush failed.
  94. Destroy(options);
  95. }
  96. TEST_F(DBFlushTest, SyncSkip) {
  97. Options options = CurrentOptions();
  98. SyncPoint::GetInstance()->LoadDependency(
  99. {{"DBFlushTest::SyncSkip:1", "DBImpl::SyncClosedWals:Skip"},
  100. {"DBImpl::SyncClosedWals:Skip", "DBFlushTest::SyncSkip:2"}});
  101. SyncPoint::GetInstance()->EnableProcessing();
  102. Reopen(options);
  103. ASSERT_OK(Put("key", "value"));
  104. FlushOptions flush_options;
  105. flush_options.wait = false;
  106. ASSERT_OK(dbfull()->Flush(flush_options));
  107. TEST_SYNC_POINT("DBFlushTest::SyncSkip:1");
  108. TEST_SYNC_POINT("DBFlushTest::SyncSkip:2");
  109. // Now the background job will do the flush; wait for it.
  110. ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
  111. Destroy(options);
  112. }
  113. TEST_F(DBFlushTest, FlushInLowPriThreadPool) {
  114. // Verify setting an empty high-pri (flush) thread pool causes flushes to be
  115. // scheduled in the low-pri (compaction) thread pool.
  116. Options options = CurrentOptions();
  117. options.level0_file_num_compaction_trigger = 4;
  118. options.memtable_factory.reset(test::NewSpecialSkipListFactory(1));
  119. Reopen(options);
  120. env_->SetBackgroundThreads(0, Env::HIGH);
  121. std::thread::id tid;
  122. int num_flushes = 0, num_compactions = 0;
  123. SyncPoint::GetInstance()->SetCallBack(
  124. "DBImpl::BGWorkFlush", [&](void* /*arg*/) {
  125. if (tid == std::thread::id()) {
  126. tid = std::this_thread::get_id();
  127. } else {
  128. ASSERT_EQ(tid, std::this_thread::get_id());
  129. }
  130. ++num_flushes;
  131. });
  132. SyncPoint::GetInstance()->SetCallBack(
  133. "DBImpl::BGWorkCompaction", [&](void* /*arg*/) {
  134. ASSERT_EQ(tid, std::this_thread::get_id());
  135. ++num_compactions;
  136. });
  137. SyncPoint::GetInstance()->EnableProcessing();
  138. ASSERT_OK(Put("key", "val"));
  139. for (int i = 0; i < 4; ++i) {
  140. ASSERT_OK(Put("key", "val"));
  141. ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
  142. }
  143. ASSERT_OK(dbfull()->TEST_WaitForCompact());
  144. ASSERT_EQ(4, num_flushes);
  145. ASSERT_EQ(1, num_compactions);
  146. }
  147. // Test when flush job is submitted to low priority thread pool and when DB is
  148. // closed in the meanwhile, CloseHelper doesn't hang.
  149. TEST_F(DBFlushTest, CloseDBWhenFlushInLowPri) {
  150. Options options = CurrentOptions();
  151. options.max_background_flushes = 1;
  152. options.max_total_wal_size = 8192;
  153. DestroyAndReopen(options);
  154. CreateColumnFamilies({"cf1", "cf2"}, options);
  155. env_->SetBackgroundThreads(0, Env::HIGH);
  156. env_->SetBackgroundThreads(1, Env::LOW);
  157. test::SleepingBackgroundTask sleeping_task_low;
  158. int num_flushes = 0;
  159. SyncPoint::GetInstance()->SetCallBack("DBImpl::BGWorkFlush",
  160. [&](void* /*arg*/) { ++num_flushes; });
  161. int num_low_flush_unscheduled = 0;
  162. SyncPoint::GetInstance()->SetCallBack(
  163. "DBImpl::UnscheduleLowFlushCallback", [&](void* /*arg*/) {
  164. num_low_flush_unscheduled++;
  165. // There should be one flush job in low pool that needs to be
  166. // unscheduled
  167. ASSERT_EQ(num_low_flush_unscheduled, 1);
  168. });
  169. int num_high_flush_unscheduled = 0;
  170. SyncPoint::GetInstance()->SetCallBack(
  171. "DBImpl::UnscheduleHighFlushCallback", [&](void* /*arg*/) {
  172. num_high_flush_unscheduled++;
  173. // There should be no flush job in high pool
  174. ASSERT_EQ(num_high_flush_unscheduled, 0);
  175. });
  176. SyncPoint::GetInstance()->EnableProcessing();
  177. ASSERT_OK(Put(0, "key1", DummyString(8192)));
  178. // Block thread so that flush cannot be run and can be removed from the queue
  179. // when called Unschedule.
  180. env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
  181. Env::Priority::LOW);
  182. sleeping_task_low.WaitUntilSleeping();
  183. // Trigger flush and flush job will be scheduled to LOW priority thread.
  184. ASSERT_OK(Put(0, "key2", DummyString(8192)));
  185. // Close DB and flush job in low priority queue will be removed without
  186. // running.
  187. Close();
  188. sleeping_task_low.WakeUp();
  189. sleeping_task_low.WaitUntilDone();
  190. ASSERT_EQ(0, num_flushes);
  191. ASSERT_OK(TryReopenWithColumnFamilies({"default", "cf1", "cf2"}, options));
  192. ASSERT_OK(Put(0, "key3", DummyString(8192)));
  193. ASSERT_OK(Flush(0));
  194. ASSERT_EQ(1, num_flushes);
  195. }
  196. TEST_F(DBFlushTest, ManualFlushWithMinWriteBufferNumberToMerge) {
  197. Options options = CurrentOptions();
  198. options.write_buffer_size = 100;
  199. options.max_write_buffer_number = 4;
  200. options.min_write_buffer_number_to_merge = 3;
  201. Reopen(options);
  202. SyncPoint::GetInstance()->LoadDependency(
  203. {{"DBImpl::BGWorkFlush",
  204. "DBFlushTest::ManualFlushWithMinWriteBufferNumberToMerge:1"},
  205. {"DBFlushTest::ManualFlushWithMinWriteBufferNumberToMerge:2",
  206. "FlushJob::WriteLevel0Table"}});
  207. SyncPoint::GetInstance()->EnableProcessing();
  208. ASSERT_OK(Put("key1", "value1"));
  209. port::Thread t([&]() {
  210. // The call wait for flush to finish, i.e. with flush_options.wait = true.
  211. ASSERT_OK(Flush());
  212. });
  213. // Wait for flush start.
  214. TEST_SYNC_POINT("DBFlushTest::ManualFlushWithMinWriteBufferNumberToMerge:1");
  215. // Insert a second memtable before the manual flush finish.
  216. // At the end of the manual flush job, it will check if further flush
  217. // is needed, but it will not trigger flush of the second memtable because
  218. // min_write_buffer_number_to_merge is not reached.
  219. ASSERT_OK(Put("key2", "value2"));
  220. ASSERT_OK(dbfull()->TEST_SwitchMemtable());
  221. TEST_SYNC_POINT("DBFlushTest::ManualFlushWithMinWriteBufferNumberToMerge:2");
  222. // Manual flush should return, without waiting for flush indefinitely.
  223. t.join();
  224. }
  225. TEST_F(DBFlushTest, ScheduleOnlyOneBgThread) {
  226. Options options = CurrentOptions();
  227. Reopen(options);
  228. SyncPoint::GetInstance()->DisableProcessing();
  229. SyncPoint::GetInstance()->ClearAllCallBacks();
  230. int called = 0;
  231. SyncPoint::GetInstance()->SetCallBack(
  232. "DBImpl::MaybeScheduleFlushOrCompaction:AfterSchedule:0", [&](void* arg) {
  233. ASSERT_NE(nullptr, arg);
  234. auto unscheduled_flushes = *static_cast<int*>(arg);
  235. ASSERT_EQ(0, unscheduled_flushes);
  236. ++called;
  237. });
  238. SyncPoint::GetInstance()->EnableProcessing();
  239. ASSERT_OK(Put("a", "foo"));
  240. FlushOptions flush_opts;
  241. ASSERT_OK(dbfull()->Flush(flush_opts));
  242. ASSERT_EQ(1, called);
  243. SyncPoint::GetInstance()->DisableProcessing();
  244. SyncPoint::GetInstance()->ClearAllCallBacks();
  245. }
  246. // The following 3 tests are designed for testing garbage statistics at flush
  247. // time.
  248. //
  249. // ======= General Information ======= (from GitHub Wiki).
  250. // There are three scenarios where memtable flush can be triggered:
  251. //
  252. // 1 - Memtable size exceeds ColumnFamilyOptions::write_buffer_size
  253. // after a write.
  254. // 2 - Total memtable size across all column families exceeds
  255. // DBOptions::db_write_buffer_size,
  256. // or DBOptions::write_buffer_manager signals a flush. In this scenario
  257. // the largest memtable will be flushed.
  258. // 3 - Total WAL file size exceeds DBOptions::max_total_wal_size.
  259. // In this scenario the memtable with the oldest data will be flushed,
  260. // in order to allow the WAL file with data from this memtable to be
  261. // purged.
  262. //
  263. // As a result, a memtable can be flushed before it is full. This is one
  264. // reason the generated SST file can be smaller than the corresponding
  265. // memtable. Compression is another factor to make SST file smaller than
  266. // corresponding memtable, since data in memtable is uncompressed.
  267. TEST_F(DBFlushTest, StatisticsGarbageBasic) {
  268. Options options = CurrentOptions();
  269. // The following options are used to enforce several values that
  270. // may already exist as default values to make this test resilient
  271. // to default value updates in the future.
  272. options.statistics = CreateDBStatistics();
  273. // Record all statistics.
  274. options.statistics->set_stats_level(StatsLevel::kAll);
  275. // create the DB if it's not already present
  276. options.create_if_missing = true;
  277. // Useful for now as we are trying to compare uncompressed data savings on
  278. // flush().
  279. options.compression = kNoCompression;
  280. // Prevent memtable in place updates. Should already be disabled
  281. // (from Wiki:
  282. // In place updates can be enabled by toggling on the bool
  283. // inplace_update_support flag. However, this flag is by default set to
  284. // false
  285. // because this thread-safe in-place update support is not compatible
  286. // with concurrent memtable writes. Note that the bool
  287. // allow_concurrent_memtable_write is set to true by default )
  288. options.inplace_update_support = false;
  289. options.allow_concurrent_memtable_write = true;
  290. // Enforce size of a single MemTable to 64MB (64MB = 67108864 bytes).
  291. options.write_buffer_size = 64 << 20;
  292. ASSERT_OK(TryReopen(options));
  293. // Put multiple times the same key-values.
  294. // The encoded length of a db entry in the memtable is
  295. // defined in db/memtable.cc (MemTable::Add) as the variable:
  296. // encoded_len= VarintLength(internal_key_size) --> =
  297. // log_256(internal_key).
  298. // Min # of bytes
  299. // necessary to
  300. // store
  301. // internal_key_size.
  302. // + internal_key_size --> = actual key string,
  303. // (size key_size: w/o term null char)
  304. // + 8 bytes for
  305. // fixed uint64 "seq
  306. // number
  307. // +
  308. // insertion type"
  309. // + VarintLength(val_size) --> = min # of bytes to
  310. // store val_size
  311. // + val_size --> = actual value
  312. // string
  313. // For example, in our situation, "key1" : size 4, "value1" : size 6
  314. // (the terminating null characters are not copied over to the memtable).
  315. // And therefore encoded_len = 1 + (4+8) + 1 + 6 = 20 bytes per entry.
  316. // However in terms of raw data contained in the memtable, and written
  317. // over to the SSTable, we only count internal_key_size and val_size,
  318. // because this is the only raw chunk of bytes that contains everything
  319. // necessary to reconstruct a user entry: sequence number, insertion type,
  320. // key, and value.
  321. // To test the relevance of our Memtable garbage statistics,
  322. // namely MEMTABLE_PAYLOAD_BYTES_AT_FLUSH and MEMTABLE_GARBAGE_BYTES_AT_FLUSH,
  323. // we insert K-V pairs with 3 distinct keys (of length 4),
  324. // and random values of arbitrary length RAND_VALUES_LENGTH,
  325. // and we repeat this step NUM_REPEAT times total.
  326. // At the end, we insert 3 final K-V pairs with the same 3 keys
  327. // and known values (these will be the final values, of length 6).
  328. // I chose NUM_REPEAT=2,000 such that no automatic flush is
  329. // triggered (the number of bytes in the memtable is therefore
  330. // well below any meaningful heuristic for a memtable of size 64MB).
  331. // As a result, since each K-V pair is inserted as a payload
  332. // of N meaningful bytes (sequence number, insertion type,
  333. // key, and value = 8 + 4 + RAND_VALUE_LENGTH),
  334. // MEMTABLE_GARBAGE_BYTES_AT_FLUSH should be equal to 2,000 * N bytes
  335. // and MEMTABLE_PAYLAOD_BYTES_AT_FLUSH = MEMTABLE_GARBAGE_BYTES_AT_FLUSH +
  336. // (3*(8 + 4 + 6)) bytes. For RAND_VALUE_LENGTH = 172 (arbitrary value), we
  337. // expect:
  338. // N = 8 + 4 + 172 = 184 bytes
  339. // MEMTABLE_GARBAGE_BYTES_AT_FLUSH = 2,000 * 184 = 368,000 bytes.
  340. // MEMTABLE_PAYLOAD_BYTES_AT_FLUSH = 368,000 + 3*18 = 368,054 bytes.
  341. const size_t NUM_REPEAT = 2000;
  342. const size_t RAND_VALUES_LENGTH = 172;
  343. const std::string KEY1 = "key1";
  344. const std::string KEY2 = "key2";
  345. const std::string KEY3 = "key3";
  346. const std::string VALUE1 = "value1";
  347. const std::string VALUE2 = "value2";
  348. const std::string VALUE3 = "value3";
  349. uint64_t EXPECTED_MEMTABLE_PAYLOAD_BYTES_AT_FLUSH = 0;
  350. uint64_t EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH = 0;
  351. Random rnd(301);
  352. // Insertion of of K-V pairs, multiple times.
  353. for (size_t i = 0; i < NUM_REPEAT; i++) {
  354. // Create value strings of arbitrary length RAND_VALUES_LENGTH bytes.
  355. std::string p_v1 = rnd.RandomString(RAND_VALUES_LENGTH);
  356. std::string p_v2 = rnd.RandomString(RAND_VALUES_LENGTH);
  357. std::string p_v3 = rnd.RandomString(RAND_VALUES_LENGTH);
  358. ASSERT_OK(Put(KEY1, p_v1));
  359. ASSERT_OK(Put(KEY2, p_v2));
  360. ASSERT_OK(Put(KEY3, p_v3));
  361. EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH +=
  362. KEY1.size() + p_v1.size() + sizeof(uint64_t);
  363. EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH +=
  364. KEY2.size() + p_v2.size() + sizeof(uint64_t);
  365. EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH +=
  366. KEY3.size() + p_v3.size() + sizeof(uint64_t);
  367. }
  368. // The memtable data bytes includes the "garbage"
  369. // bytes along with the useful payload.
  370. EXPECTED_MEMTABLE_PAYLOAD_BYTES_AT_FLUSH =
  371. EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH;
  372. ASSERT_OK(Put(KEY1, VALUE1));
  373. ASSERT_OK(Put(KEY2, VALUE2));
  374. ASSERT_OK(Put(KEY3, VALUE3));
  375. // Add useful payload to the memtable data bytes:
  376. EXPECTED_MEMTABLE_PAYLOAD_BYTES_AT_FLUSH +=
  377. KEY1.size() + VALUE1.size() + KEY2.size() + VALUE2.size() + KEY3.size() +
  378. VALUE3.size() + 3 * sizeof(uint64_t);
  379. // We assert that the last K-V pairs have been successfully inserted,
  380. // and that the valid values are VALUE1, VALUE2, VALUE3.
  381. PinnableSlice value;
  382. ASSERT_OK(Get(KEY1, &value));
  383. ASSERT_EQ(value.ToString(), VALUE1);
  384. ASSERT_OK(Get(KEY2, &value));
  385. ASSERT_EQ(value.ToString(), VALUE2);
  386. ASSERT_OK(Get(KEY3, &value));
  387. ASSERT_EQ(value.ToString(), VALUE3);
  388. // Force flush to SST. Increments the statistics counter.
  389. ASSERT_OK(Flush());
  390. // Collect statistics.
  391. uint64_t mem_data_bytes =
  392. TestGetTickerCount(options, MEMTABLE_PAYLOAD_BYTES_AT_FLUSH);
  393. uint64_t mem_garbage_bytes =
  394. TestGetTickerCount(options, MEMTABLE_GARBAGE_BYTES_AT_FLUSH);
  395. EXPECT_EQ(mem_data_bytes, EXPECTED_MEMTABLE_PAYLOAD_BYTES_AT_FLUSH);
  396. EXPECT_EQ(mem_garbage_bytes, EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH);
  397. Close();
  398. }
  399. TEST_F(DBFlushTest, StatisticsGarbageInsertAndDeletes) {
  400. Options options = CurrentOptions();
  401. options.statistics = CreateDBStatistics();
  402. options.statistics->set_stats_level(StatsLevel::kAll);
  403. options.create_if_missing = true;
  404. options.compression = kNoCompression;
  405. options.inplace_update_support = false;
  406. options.allow_concurrent_memtable_write = true;
  407. options.write_buffer_size = 67108864;
  408. ASSERT_OK(TryReopen(options));
  409. const size_t NUM_REPEAT = 2000;
  410. const size_t RAND_VALUES_LENGTH = 37;
  411. const std::string KEY1 = "key1";
  412. const std::string KEY2 = "key2";
  413. const std::string KEY3 = "key3";
  414. const std::string KEY4 = "key4";
  415. const std::string KEY5 = "key5";
  416. const std::string KEY6 = "key6";
  417. uint64_t EXPECTED_MEMTABLE_PAYLOAD_BYTES_AT_FLUSH = 0;
  418. uint64_t EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH = 0;
  419. WriteBatch batch;
  420. Random rnd(301);
  421. // Insertion of of K-V pairs, multiple times.
  422. for (size_t i = 0; i < NUM_REPEAT; i++) {
  423. // Create value strings of arbitrary length RAND_VALUES_LENGTH bytes.
  424. std::string p_v1 = rnd.RandomString(RAND_VALUES_LENGTH);
  425. std::string p_v2 = rnd.RandomString(RAND_VALUES_LENGTH);
  426. std::string p_v3 = rnd.RandomString(RAND_VALUES_LENGTH);
  427. ASSERT_OK(Put(KEY1, p_v1));
  428. ASSERT_OK(Put(KEY2, p_v2));
  429. ASSERT_OK(Put(KEY3, p_v3));
  430. EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH +=
  431. KEY1.size() + p_v1.size() + sizeof(uint64_t);
  432. EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH +=
  433. KEY2.size() + p_v2.size() + sizeof(uint64_t);
  434. EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH +=
  435. KEY3.size() + p_v3.size() + sizeof(uint64_t);
  436. ASSERT_OK(Delete(KEY1));
  437. ASSERT_OK(Delete(KEY2));
  438. ASSERT_OK(Delete(KEY3));
  439. EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH +=
  440. KEY1.size() + KEY2.size() + KEY3.size() + 3 * sizeof(uint64_t);
  441. }
  442. // The memtable data bytes includes the "garbage"
  443. // bytes along with the useful payload.
  444. EXPECTED_MEMTABLE_PAYLOAD_BYTES_AT_FLUSH =
  445. EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH;
  446. // Note : one set of delete for KEY1, KEY2, KEY3 is written to
  447. // SSTable to propagate the delete operations to K-V pairs
  448. // that could have been inserted into the database during past Flush
  449. // opeartions.
  450. EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH -=
  451. KEY1.size() + KEY2.size() + KEY3.size() + 3 * sizeof(uint64_t);
  452. // Additional useful paylaod.
  453. ASSERT_OK(Delete(KEY4));
  454. ASSERT_OK(Delete(KEY5));
  455. ASSERT_OK(Delete(KEY6));
  456. // // Add useful payload to the memtable data bytes:
  457. EXPECTED_MEMTABLE_PAYLOAD_BYTES_AT_FLUSH +=
  458. KEY4.size() + KEY5.size() + KEY6.size() + 3 * sizeof(uint64_t);
  459. // We assert that the K-V pairs have been successfully deleted.
  460. PinnableSlice value;
  461. ASSERT_NOK(Get(KEY1, &value));
  462. ASSERT_NOK(Get(KEY2, &value));
  463. ASSERT_NOK(Get(KEY3, &value));
  464. // Force flush to SST. Increments the statistics counter.
  465. ASSERT_OK(Flush());
  466. // Collect statistics.
  467. uint64_t mem_data_bytes =
  468. TestGetTickerCount(options, MEMTABLE_PAYLOAD_BYTES_AT_FLUSH);
  469. uint64_t mem_garbage_bytes =
  470. TestGetTickerCount(options, MEMTABLE_GARBAGE_BYTES_AT_FLUSH);
  471. EXPECT_EQ(mem_data_bytes, EXPECTED_MEMTABLE_PAYLOAD_BYTES_AT_FLUSH);
  472. EXPECT_EQ(mem_garbage_bytes, EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH);
  473. Close();
  474. }
  475. TEST_F(DBFlushTest, StatisticsGarbageRangeDeletes) {
  476. Options options = CurrentOptions();
  477. options.statistics = CreateDBStatistics();
  478. options.statistics->set_stats_level(StatsLevel::kAll);
  479. options.create_if_missing = true;
  480. options.compression = kNoCompression;
  481. options.inplace_update_support = false;
  482. options.allow_concurrent_memtable_write = true;
  483. options.write_buffer_size = 67108864;
  484. ASSERT_OK(TryReopen(options));
  485. const size_t NUM_REPEAT = 1000;
  486. const size_t RAND_VALUES_LENGTH = 42;
  487. const std::string KEY1 = "key1";
  488. const std::string KEY2 = "key2";
  489. const std::string KEY3 = "key3";
  490. const std::string KEY4 = "key4";
  491. const std::string KEY5 = "key5";
  492. const std::string KEY6 = "key6";
  493. const std::string VALUE3 = "value3";
  494. uint64_t EXPECTED_MEMTABLE_PAYLOAD_BYTES_AT_FLUSH = 0;
  495. uint64_t EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH = 0;
  496. Random rnd(301);
  497. // Insertion of of K-V pairs, multiple times.
  498. // Also insert DeleteRange
  499. for (size_t i = 0; i < NUM_REPEAT; i++) {
  500. // Create value strings of arbitrary length RAND_VALUES_LENGTH bytes.
  501. std::string p_v1 = rnd.RandomString(RAND_VALUES_LENGTH);
  502. std::string p_v2 = rnd.RandomString(RAND_VALUES_LENGTH);
  503. std::string p_v3 = rnd.RandomString(RAND_VALUES_LENGTH);
  504. ASSERT_OK(Put(KEY1, p_v1));
  505. ASSERT_OK(Put(KEY2, p_v2));
  506. ASSERT_OK(Put(KEY3, p_v3));
  507. EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH +=
  508. KEY1.size() + p_v1.size() + sizeof(uint64_t);
  509. EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH +=
  510. KEY2.size() + p_v2.size() + sizeof(uint64_t);
  511. EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH +=
  512. KEY3.size() + p_v3.size() + sizeof(uint64_t);
  513. ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), KEY1,
  514. KEY2));
  515. // Note: DeleteRange have an exclusive upper bound, e.g. here: [KEY2,KEY3)
  516. // is deleted.
  517. ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), KEY2,
  518. KEY3));
  519. // Delete ranges are stored as a regular K-V pair, with key=STARTKEY,
  520. // value=ENDKEY.
  521. EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH +=
  522. (KEY1.size() + KEY2.size() + sizeof(uint64_t)) +
  523. (KEY2.size() + KEY3.size() + sizeof(uint64_t));
  524. }
  525. // The memtable data bytes includes the "garbage"
  526. // bytes along with the useful payload.
  527. EXPECTED_MEMTABLE_PAYLOAD_BYTES_AT_FLUSH =
  528. EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH;
  529. // Note : one set of deleteRange for (KEY1, KEY2) and (KEY2, KEY3) is written
  530. // to SSTable to propagate the deleteRange operations to K-V pairs that could
  531. // have been inserted into the database during past Flush opeartions.
  532. EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH -=
  533. (KEY1.size() + KEY2.size() + sizeof(uint64_t)) +
  534. (KEY2.size() + KEY3.size() + sizeof(uint64_t));
  535. // Overwrite KEY3 with known value (VALUE3)
  536. // Note that during the whole time KEY3 has never been deleted
  537. // by the RangeDeletes.
  538. ASSERT_OK(Put(KEY3, VALUE3));
  539. EXPECTED_MEMTABLE_PAYLOAD_BYTES_AT_FLUSH +=
  540. KEY3.size() + VALUE3.size() + sizeof(uint64_t);
  541. // Additional useful paylaod.
  542. ASSERT_OK(
  543. db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), KEY4, KEY5));
  544. ASSERT_OK(
  545. db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), KEY5, KEY6));
  546. // Add useful payload to the memtable data bytes:
  547. EXPECTED_MEMTABLE_PAYLOAD_BYTES_AT_FLUSH +=
  548. (KEY4.size() + KEY5.size() + sizeof(uint64_t)) +
  549. (KEY5.size() + KEY6.size() + sizeof(uint64_t));
  550. // We assert that the K-V pairs have been successfully deleted.
  551. PinnableSlice value;
  552. ASSERT_NOK(Get(KEY1, &value));
  553. ASSERT_NOK(Get(KEY2, &value));
  554. // And that KEY3's value is correct.
  555. ASSERT_OK(Get(KEY3, &value));
  556. ASSERT_EQ(value, VALUE3);
  557. // Force flush to SST. Increments the statistics counter.
  558. ASSERT_OK(Flush());
  559. // Collect statistics.
  560. uint64_t mem_data_bytes =
  561. TestGetTickerCount(options, MEMTABLE_PAYLOAD_BYTES_AT_FLUSH);
  562. uint64_t mem_garbage_bytes =
  563. TestGetTickerCount(options, MEMTABLE_GARBAGE_BYTES_AT_FLUSH);
  564. EXPECT_EQ(mem_data_bytes, EXPECTED_MEMTABLE_PAYLOAD_BYTES_AT_FLUSH);
  565. EXPECT_EQ(mem_garbage_bytes, EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH);
  566. Close();
  567. }
  568. // This simple Listener can only handle one flush at a time.
  569. class TestFlushListener : public EventListener {
  570. public:
  571. TestFlushListener(Env* env, DBFlushTest* test)
  572. : slowdown_count(0), stop_count(0), db_closed(), env_(env), test_(test) {
  573. db_closed = false;
  574. }
  575. ~TestFlushListener() override {
  576. prev_fc_info_.status.PermitUncheckedError(); // Ignore the status
  577. }
  578. void OnTableFileCreated(const TableFileCreationInfo& info) override {
  579. // remember the info for later checking the FlushJobInfo.
  580. prev_fc_info_ = info;
  581. ASSERT_GT(info.db_name.size(), 0U);
  582. ASSERT_GT(info.cf_name.size(), 0U);
  583. ASSERT_GT(info.file_path.size(), 0U);
  584. ASSERT_GT(info.job_id, 0);
  585. ASSERT_GT(info.table_properties.data_size, 0U);
  586. ASSERT_GT(info.table_properties.raw_key_size, 0U);
  587. ASSERT_GT(info.table_properties.raw_value_size, 0U);
  588. ASSERT_GT(info.table_properties.num_data_blocks, 0U);
  589. ASSERT_GT(info.table_properties.num_entries, 0U);
  590. ASSERT_EQ(info.file_checksum, kUnknownFileChecksum);
  591. ASSERT_EQ(info.file_checksum_func_name, kUnknownFileChecksumFuncName);
  592. }
  593. void OnFlushCompleted(DB* db, const FlushJobInfo& info) override {
  594. flushed_dbs_.push_back(db);
  595. flushed_column_family_names_.push_back(info.cf_name);
  596. if (info.triggered_writes_slowdown) {
  597. slowdown_count++;
  598. }
  599. if (info.triggered_writes_stop) {
  600. stop_count++;
  601. }
  602. // verify whether the previously created file matches the flushed file.
  603. ASSERT_EQ(prev_fc_info_.db_name, db->GetName());
  604. ASSERT_EQ(prev_fc_info_.cf_name, info.cf_name);
  605. ASSERT_EQ(prev_fc_info_.job_id, info.job_id);
  606. ASSERT_EQ(prev_fc_info_.file_path, info.file_path);
  607. ASSERT_EQ(TableFileNameToNumber(info.file_path), info.file_number);
  608. // Note: the following chunk relies on the notification pertaining to the
  609. // database pointed to by DBTestBase::db_, and is thus bypassed when
  610. // that assumption does not hold (see the test case MultiDBMultiListeners
  611. // below).
  612. ASSERT_TRUE(test_);
  613. if (db == test_->db_) {
  614. std::vector<std::vector<FileMetaData>> files_by_level;
  615. test_->dbfull()->TEST_GetFilesMetaData(db->DefaultColumnFamily(),
  616. &files_by_level);
  617. ASSERT_FALSE(files_by_level.empty());
  618. auto it = std::find_if(files_by_level[0].begin(), files_by_level[0].end(),
  619. [&](const FileMetaData& meta) {
  620. return meta.fd.GetNumber() == info.file_number;
  621. });
  622. ASSERT_NE(it, files_by_level[0].end());
  623. ASSERT_EQ(info.oldest_blob_file_number, it->oldest_blob_file_number);
  624. }
  625. ASSERT_EQ(db->GetEnv()->GetThreadID(), info.thread_id);
  626. ASSERT_GT(info.thread_id, 0U);
  627. }
  628. std::vector<std::string> flushed_column_family_names_;
  629. std::vector<DB*> flushed_dbs_;
  630. int slowdown_count;
  631. int stop_count;
  632. bool db_closing;
  633. std::atomic_bool db_closed;
  634. TableFileCreationInfo prev_fc_info_;
  635. protected:
  636. Env* env_;
  637. DBFlushTest* test_;
  638. };
  639. TEST_F(
  640. DBFlushTest,
  641. FixUnrecoverableWriteDuringAtomicFlushWaitUntilFlushWouldNotStallWrites) {
  642. Options options = CurrentOptions();
  643. options.atomic_flush = true;
  644. // To simulate a real-life crash where we can't flush during db's shutdown
  645. options.avoid_flush_during_shutdown = true;
  646. // Set 3 low thresholds (while `disable_auto_compactions=false`) here so flush
  647. // adding one more L0 file during `GetLiveFiles()` will have to wait till such
  648. // flush will not stall writes
  649. options.level0_stop_writes_trigger = 2;
  650. options.level0_slowdown_writes_trigger = 2;
  651. // Disable level-0 compaction triggered by number of files to avoid
  652. // stalling check being skipped (resulting in the flush mentioned above didn't
  653. // wait)
  654. options.level0_file_num_compaction_trigger = -1;
  655. CreateAndReopenWithCF({"cf1"}, options);
  656. // Manually pause compaction thread to ensure enough L0 files as
  657. // `disable_auto_compactions=false`is needed, in order to meet the 3 low
  658. // thresholds above
  659. std::unique_ptr<test::SleepingBackgroundTask> sleeping_task_;
  660. sleeping_task_.reset(new test::SleepingBackgroundTask());
  661. env_->SetBackgroundThreads(1, Env::LOW);
  662. env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
  663. sleeping_task_.get(), Env::Priority::LOW);
  664. sleeping_task_->WaitUntilSleeping();
  665. // Create some initial file to help meet the 3 low thresholds above
  666. ASSERT_OK(Put(1, "dontcare", "dontcare"));
  667. ASSERT_OK(Flush(1));
  668. // Insert some initial data so we have something to atomic-flush later
  669. // triggered by `GetLiveFiles()`
  670. WriteOptions write_opts;
  671. write_opts.disableWAL = true;
  672. ASSERT_OK(Put(1, "k1", "v1", write_opts));
  673. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({{
  674. "DBImpl::WaitUntilFlushWouldNotStallWrites:StallWait",
  675. "DBFlushTest::"
  676. "UnrecoverableWriteInAtomicFlushWaitUntilFlushWouldNotStallWrites::Write",
  677. }});
  678. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  679. // Write to db when atomic flush releases the lock to wait on write stall
  680. // condition to be gone in `WaitUntilFlushWouldNotStallWrites()`
  681. port::Thread write_thread([&] {
  682. TEST_SYNC_POINT(
  683. "DBFlushTest::"
  684. "UnrecoverableWriteInAtomicFlushWaitUntilFlushWouldNotStallWrites::"
  685. "Write");
  686. // Before the fix, the empty default CF would've been prematurely excluded
  687. // from this atomic flush. The following two writes together make default CF
  688. // later contain data that should've been included in the atomic flush.
  689. ASSERT_OK(Put(0, "k2", "v2", write_opts));
  690. // The following write increases the max seqno of this atomic flush to be 3,
  691. // which is greater than the seqno of default CF's data. This then violates
  692. // the invariant that all entries of seqno less than the max seqno
  693. // of this atomic flush should've been flushed by the time of this atomic
  694. // flush finishes.
  695. ASSERT_OK(Put(1, "k3", "v3", write_opts));
  696. // Resume compaction threads and reduce L0 files so `GetLiveFiles()` can
  697. // resume from the wait
  698. sleeping_task_->WakeUp();
  699. sleeping_task_->WaitUntilDone();
  700. MoveFilesToLevel(1, 1);
  701. });
  702. // Trigger an atomic flush by `GetLiveFiles()`
  703. std::vector<std::string> files;
  704. uint64_t manifest_file_size;
  705. ASSERT_OK(db_->GetLiveFiles(files, &manifest_file_size, /*flush*/ true));
  706. write_thread.join();
  707. ReopenWithColumnFamilies({"default", "cf1"}, options);
  708. ASSERT_EQ(Get(1, "k3"), "v3");
  709. // Prior to the fix, `Get()` will return `NotFound as "k2" entry in default CF
  710. // can't be recovered from a crash right after the atomic flush finishes,
  711. // resulting in a "recovery hole" as "k3" can be recovered. It's due to the
  712. // invariant violation described above.
  713. ASSERT_EQ(Get(0, "k2"), "v2");
  714. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
  715. }
  716. TEST_F(DBFlushTest, FixFlushReasonRaceFromConcurrentFlushes) {
  717. Options options = CurrentOptions();
  718. options.atomic_flush = true;
  719. options.disable_auto_compactions = true;
  720. CreateAndReopenWithCF({"cf1"}, options);
  721. for (int idx = 0; idx < 1; ++idx) {
  722. ASSERT_OK(Put(0, Key(idx), std::string(1, 'v')));
  723. ASSERT_OK(Put(1, Key(idx), std::string(1, 'v')));
  724. }
  725. // To coerce a manual flush happenning in the middle of GetLiveFiles's flush,
  726. // we need to pause background flush thread and enable it later.
  727. std::shared_ptr<test::SleepingBackgroundTask> sleeping_task =
  728. std::make_shared<test::SleepingBackgroundTask>();
  729. env_->SetBackgroundThreads(1, Env::HIGH);
  730. env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
  731. sleeping_task.get(), Env::Priority::HIGH);
  732. sleeping_task->WaitUntilSleeping();
  733. // Coerce a manual flush happenning in the middle of GetLiveFiles's flush
  734. bool get_live_files_paused_at_sync_point = false;
  735. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  736. "DBImpl::AtomicFlushMemTables:AfterScheduleFlush", [&](void* /* arg */) {
  737. if (get_live_files_paused_at_sync_point) {
  738. // To prevent non-GetLiveFiles() flush from pausing at this sync point
  739. return;
  740. }
  741. get_live_files_paused_at_sync_point = true;
  742. FlushOptions fo;
  743. fo.wait = false;
  744. fo.allow_write_stall = true;
  745. ASSERT_OK(dbfull()->Flush(fo));
  746. // Resume background flush thread so GetLiveFiles() can finish
  747. sleeping_task->WakeUp();
  748. sleeping_task->WaitUntilDone();
  749. });
  750. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  751. std::vector<std::string> files;
  752. uint64_t manifest_file_size;
  753. // Before the fix, a race condition on default cf's flush reason due to
  754. // concurrent GetLiveFiles's flush and manual flush will fail
  755. // an internal assertion.
  756. // After the fix, such race condition is fixed and there is no assertion
  757. // failure.
  758. ASSERT_OK(db_->GetLiveFiles(files, &manifest_file_size, /*flush*/ true));
  759. ASSERT_TRUE(get_live_files_paused_at_sync_point);
  760. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
  761. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
  762. }
  763. TEST_F(DBFlushTest, MemPurgeBasic) {
  764. Options options = CurrentOptions();
  765. // The following options are used to enforce several values that
  766. // may already exist as default values to make this test resilient
  767. // to default value updates in the future.
  768. options.statistics = CreateDBStatistics();
  769. // Record all statistics.
  770. options.statistics->set_stats_level(StatsLevel::kAll);
  771. // create the DB if it's not already present
  772. options.create_if_missing = true;
  773. // Useful for now as we are trying to compare uncompressed data savings on
  774. // flush().
  775. options.compression = kNoCompression;
  776. // Prevent memtable in place updates. Should already be disabled
  777. // (from Wiki:
  778. // In place updates can be enabled by toggling on the bool
  779. // inplace_update_support flag. However, this flag is by default set to
  780. // false
  781. // because this thread-safe in-place update support is not compatible
  782. // with concurrent memtable writes. Note that the bool
  783. // allow_concurrent_memtable_write is set to true by default )
  784. options.inplace_update_support = false;
  785. options.allow_concurrent_memtable_write = true;
  786. // Enforce size of a single MemTable to 64MB (64MB = 67108864 bytes).
  787. options.write_buffer_size = 1 << 20;
  788. // Initially deactivate the MemPurge prototype.
  789. options.experimental_mempurge_threshold = 0.0;
  790. TestFlushListener* listener = new TestFlushListener(options.env, this);
  791. options.listeners.emplace_back(listener);
  792. ASSERT_OK(TryReopen(options));
  793. // RocksDB lite does not support dynamic options
  794. // Dynamically activate the MemPurge prototype without restarting the DB.
  795. ColumnFamilyHandle* cfh = db_->DefaultColumnFamily();
  796. ASSERT_OK(db_->SetOptions(cfh, {{"experimental_mempurge_threshold", "1.0"}}));
  797. std::atomic<uint32_t> mempurge_count{0};
  798. std::atomic<uint32_t> sst_count{0};
  799. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  800. "DBImpl::FlushJob:MemPurgeSuccessful",
  801. [&](void* /*arg*/) { mempurge_count++; });
  802. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  803. "DBImpl::FlushJob:SSTFileCreated", [&](void* /*arg*/) { sst_count++; });
  804. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  805. std::string KEY1 = "IamKey1";
  806. std::string KEY2 = "IamKey2";
  807. std::string KEY3 = "IamKey3";
  808. std::string KEY4 = "IamKey4";
  809. std::string KEY5 = "IamKey5";
  810. std::string KEY6 = "IamKey6";
  811. std::string KEY7 = "IamKey7";
  812. std::string KEY8 = "IamKey8";
  813. std::string KEY9 = "IamKey9";
  814. std::string RNDKEY1, RNDKEY2, RNDKEY3;
  815. const std::string NOT_FOUND = "NOT_FOUND";
  816. // Heavy overwrite workload,
  817. // more than would fit in maximum allowed memtables.
  818. Random rnd(719);
  819. const size_t NUM_REPEAT = 100;
  820. const size_t RAND_KEYS_LENGTH = 57;
  821. const size_t RAND_VALUES_LENGTH = 10240;
  822. std::string p_v1, p_v2, p_v3, p_v4, p_v5, p_v6, p_v7, p_v8, p_v9, p_rv1,
  823. p_rv2, p_rv3;
  824. // Insert a very first set of keys that will be
  825. // mempurged at least once.
  826. p_v1 = rnd.RandomString(RAND_VALUES_LENGTH);
  827. p_v2 = rnd.RandomString(RAND_VALUES_LENGTH);
  828. p_v3 = rnd.RandomString(RAND_VALUES_LENGTH);
  829. p_v4 = rnd.RandomString(RAND_VALUES_LENGTH);
  830. ASSERT_OK(Put(KEY1, p_v1));
  831. ASSERT_OK(Put(KEY2, p_v2));
  832. ASSERT_OK(Put(KEY3, p_v3));
  833. ASSERT_OK(Put(KEY4, p_v4));
  834. ASSERT_EQ(Get(KEY1), p_v1);
  835. ASSERT_EQ(Get(KEY2), p_v2);
  836. ASSERT_EQ(Get(KEY3), p_v3);
  837. ASSERT_EQ(Get(KEY4), p_v4);
  838. // Insertion of of K-V pairs, multiple times (overwrites).
  839. for (size_t i = 0; i < NUM_REPEAT; i++) {
  840. // Create value strings of arbitrary length RAND_VALUES_LENGTH bytes.
  841. p_v5 = rnd.RandomString(RAND_VALUES_LENGTH);
  842. p_v6 = rnd.RandomString(RAND_VALUES_LENGTH);
  843. p_v7 = rnd.RandomString(RAND_VALUES_LENGTH);
  844. p_v8 = rnd.RandomString(RAND_VALUES_LENGTH);
  845. p_v9 = rnd.RandomString(RAND_VALUES_LENGTH);
  846. ASSERT_OK(Put(KEY5, p_v5));
  847. ASSERT_OK(Put(KEY6, p_v6));
  848. ASSERT_OK(Put(KEY7, p_v7));
  849. ASSERT_OK(Put(KEY8, p_v8));
  850. ASSERT_OK(Put(KEY9, p_v9));
  851. ASSERT_EQ(Get(KEY1), p_v1);
  852. ASSERT_EQ(Get(KEY2), p_v2);
  853. ASSERT_EQ(Get(KEY3), p_v3);
  854. ASSERT_EQ(Get(KEY4), p_v4);
  855. ASSERT_EQ(Get(KEY5), p_v5);
  856. ASSERT_EQ(Get(KEY6), p_v6);
  857. ASSERT_EQ(Get(KEY7), p_v7);
  858. ASSERT_EQ(Get(KEY8), p_v8);
  859. ASSERT_EQ(Get(KEY9), p_v9);
  860. }
  861. // Check that there was at least one mempurge
  862. const uint32_t EXPECTED_MIN_MEMPURGE_COUNT = 1;
  863. // Check that there was no SST files created during flush.
  864. const uint32_t EXPECTED_SST_COUNT = 0;
  865. EXPECT_GE(mempurge_count.exchange(0), EXPECTED_MIN_MEMPURGE_COUNT);
  866. EXPECT_EQ(sst_count.exchange(0), EXPECTED_SST_COUNT);
  867. // Insertion of of K-V pairs, no overwrites.
  868. for (size_t i = 0; i < NUM_REPEAT; i++) {
  869. // Create value strings of arbitrary length RAND_VALUES_LENGTH bytes.
  870. RNDKEY1 = rnd.RandomString(RAND_KEYS_LENGTH);
  871. RNDKEY2 = rnd.RandomString(RAND_KEYS_LENGTH);
  872. RNDKEY3 = rnd.RandomString(RAND_KEYS_LENGTH);
  873. p_rv1 = rnd.RandomString(RAND_VALUES_LENGTH);
  874. p_rv2 = rnd.RandomString(RAND_VALUES_LENGTH);
  875. p_rv3 = rnd.RandomString(RAND_VALUES_LENGTH);
  876. ASSERT_OK(Put(RNDKEY1, p_rv1));
  877. ASSERT_OK(Put(RNDKEY2, p_rv2));
  878. ASSERT_OK(Put(RNDKEY3, p_rv3));
  879. ASSERT_EQ(Get(KEY1), p_v1);
  880. ASSERT_EQ(Get(KEY2), p_v2);
  881. ASSERT_EQ(Get(KEY3), p_v3);
  882. ASSERT_EQ(Get(KEY4), p_v4);
  883. ASSERT_EQ(Get(KEY5), p_v5);
  884. ASSERT_EQ(Get(KEY6), p_v6);
  885. ASSERT_EQ(Get(KEY7), p_v7);
  886. ASSERT_EQ(Get(KEY8), p_v8);
  887. ASSERT_EQ(Get(KEY9), p_v9);
  888. ASSERT_EQ(Get(RNDKEY1), p_rv1);
  889. ASSERT_EQ(Get(RNDKEY2), p_rv2);
  890. ASSERT_EQ(Get(RNDKEY3), p_rv3);
  891. }
  892. // Assert that at least one flush to storage has been performed
  893. EXPECT_GT(sst_count.exchange(0), EXPECTED_SST_COUNT);
  894. // (which will consequently increase the number of mempurges recorded too).
  895. EXPECT_GE(mempurge_count.exchange(0), EXPECTED_MIN_MEMPURGE_COUNT);
  896. // Assert that there is no data corruption, even with
  897. // a flush to storage.
  898. ASSERT_EQ(Get(KEY1), p_v1);
  899. ASSERT_EQ(Get(KEY2), p_v2);
  900. ASSERT_EQ(Get(KEY3), p_v3);
  901. ASSERT_EQ(Get(KEY4), p_v4);
  902. ASSERT_EQ(Get(KEY5), p_v5);
  903. ASSERT_EQ(Get(KEY6), p_v6);
  904. ASSERT_EQ(Get(KEY7), p_v7);
  905. ASSERT_EQ(Get(KEY8), p_v8);
  906. ASSERT_EQ(Get(KEY9), p_v9);
  907. ASSERT_EQ(Get(RNDKEY1), p_rv1);
  908. ASSERT_EQ(Get(RNDKEY2), p_rv2);
  909. ASSERT_EQ(Get(RNDKEY3), p_rv3);
  910. Close();
  911. }
  912. // RocksDB lite does not support dynamic options
  913. TEST_F(DBFlushTest, MemPurgeBasicToggle) {
  914. Options options = CurrentOptions();
  915. // The following options are used to enforce several values that
  916. // may already exist as default values to make this test resilient
  917. // to default value updates in the future.
  918. options.statistics = CreateDBStatistics();
  919. // Record all statistics.
  920. options.statistics->set_stats_level(StatsLevel::kAll);
  921. // create the DB if it's not already present
  922. options.create_if_missing = true;
  923. // Useful for now as we are trying to compare uncompressed data savings on
  924. // flush().
  925. options.compression = kNoCompression;
  926. // Prevent memtable in place updates. Should already be disabled
  927. // (from Wiki:
  928. // In place updates can be enabled by toggling on the bool
  929. // inplace_update_support flag. However, this flag is by default set to
  930. // false
  931. // because this thread-safe in-place update support is not compatible
  932. // with concurrent memtable writes. Note that the bool
  933. // allow_concurrent_memtable_write is set to true by default )
  934. options.inplace_update_support = false;
  935. options.allow_concurrent_memtable_write = true;
  936. // Enforce size of a single MemTable to 64MB (64MB = 67108864 bytes).
  937. options.write_buffer_size = 1 << 20;
  938. // Initially deactivate the MemPurge prototype.
  939. // (negative values are equivalent to 0.0).
  940. options.experimental_mempurge_threshold = -25.3;
  941. TestFlushListener* listener = new TestFlushListener(options.env, this);
  942. options.listeners.emplace_back(listener);
  943. ASSERT_OK(TryReopen(options));
  944. // Dynamically activate the MemPurge prototype without restarting the DB.
  945. ColumnFamilyHandle* cfh = db_->DefaultColumnFamily();
  946. // Values greater than 1.0 are equivalent to 1.0
  947. ASSERT_OK(
  948. db_->SetOptions(cfh, {{"experimental_mempurge_threshold", "3.7898"}}));
  949. std::atomic<uint32_t> mempurge_count{0};
  950. std::atomic<uint32_t> sst_count{0};
  951. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  952. "DBImpl::FlushJob:MemPurgeSuccessful",
  953. [&](void* /*arg*/) { mempurge_count++; });
  954. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  955. "DBImpl::FlushJob:SSTFileCreated", [&](void* /*arg*/) { sst_count++; });
  956. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  957. const size_t KVSIZE = 3;
  958. std::vector<std::string> KEYS(KVSIZE);
  959. for (size_t k = 0; k < KVSIZE; k++) {
  960. KEYS[k] = "IamKey" + std::to_string(k);
  961. }
  962. std::vector<std::string> RNDVALS(KVSIZE);
  963. const std::string NOT_FOUND = "NOT_FOUND";
  964. // Heavy overwrite workload,
  965. // more than would fit in maximum allowed memtables.
  966. Random rnd(719);
  967. const size_t NUM_REPEAT = 100;
  968. const size_t RAND_VALUES_LENGTH = 10240;
  969. // Insertion of of K-V pairs, multiple times (overwrites).
  970. for (size_t i = 0; i < NUM_REPEAT; i++) {
  971. for (size_t j = 0; j < KEYS.size(); j++) {
  972. RNDVALS[j] = rnd.RandomString(RAND_VALUES_LENGTH);
  973. ASSERT_OK(Put(KEYS[j], RNDVALS[j]));
  974. ASSERT_EQ(Get(KEYS[j]), RNDVALS[j]);
  975. }
  976. for (size_t j = 0; j < KEYS.size(); j++) {
  977. ASSERT_EQ(Get(KEYS[j]), RNDVALS[j]);
  978. }
  979. }
  980. // Check that there was at least one mempurge
  981. const uint32_t EXPECTED_MIN_MEMPURGE_COUNT = 1;
  982. // Check that there was no SST files created during flush.
  983. const uint32_t EXPECTED_SST_COUNT = 0;
  984. EXPECT_GE(mempurge_count.exchange(0), EXPECTED_MIN_MEMPURGE_COUNT);
  985. EXPECT_EQ(sst_count.exchange(0), EXPECTED_SST_COUNT);
  986. // Dynamically deactivate MemPurge.
  987. ASSERT_OK(
  988. db_->SetOptions(cfh, {{"experimental_mempurge_threshold", "-1023.0"}}));
  989. // Insertion of of K-V pairs, multiple times (overwrites).
  990. for (size_t i = 0; i < NUM_REPEAT; i++) {
  991. for (size_t j = 0; j < KEYS.size(); j++) {
  992. RNDVALS[j] = rnd.RandomString(RAND_VALUES_LENGTH);
  993. ASSERT_OK(Put(KEYS[j], RNDVALS[j]));
  994. ASSERT_EQ(Get(KEYS[j]), RNDVALS[j]);
  995. }
  996. for (size_t j = 0; j < KEYS.size(); j++) {
  997. ASSERT_EQ(Get(KEYS[j]), RNDVALS[j]);
  998. }
  999. }
  1000. // Check that there was at least one mempurge
  1001. const uint32_t ZERO = 0;
  1002. // Assert that at least one flush to storage has been performed
  1003. EXPECT_GT(sst_count.exchange(0), EXPECTED_SST_COUNT);
  1004. // The mempurge count is expected to be set to 0 when the options are updated.
  1005. // We expect no mempurge at all.
  1006. EXPECT_EQ(mempurge_count.exchange(0), ZERO);
  1007. Close();
  1008. }
  1009. // End of MemPurgeBasicToggle, which is not
  1010. // supported with RocksDB LITE because it
  1011. // relies on dynamically changing the option
  1012. // flag experimental_mempurge_threshold.
  1013. // At the moment, MemPurge feature is deactivated
  1014. // when atomic_flush is enabled. This is because the level
  1015. // of garbage between Column Families is not guaranteed to
  1016. // be consistent, therefore a CF could hypothetically
  1017. // trigger a MemPurge while another CF would trigger
  1018. // a regular Flush.
  1019. TEST_F(DBFlushTest, MemPurgeWithAtomicFlush) {
  1020. Options options = CurrentOptions();
  1021. // The following options are used to enforce several values that
  1022. // may already exist as default values to make this test resilient
  1023. // to default value updates in the future.
  1024. options.statistics = CreateDBStatistics();
  1025. // Record all statistics.
  1026. options.statistics->set_stats_level(StatsLevel::kAll);
  1027. // create the DB if it's not already present
  1028. options.create_if_missing = true;
  1029. // Useful for now as we are trying to compare uncompressed data savings on
  1030. // flush().
  1031. options.compression = kNoCompression;
  1032. // Prevent memtable in place updates. Should already be disabled
  1033. // (from Wiki:
  1034. // In place updates can be enabled by toggling on the bool
  1035. // inplace_update_support flag. However, this flag is by default set to
  1036. // false
  1037. // because this thread-safe in-place update support is not compatible
  1038. // with concurrent memtable writes. Note that the bool
  1039. // allow_concurrent_memtable_write is set to true by default )
  1040. options.inplace_update_support = false;
  1041. options.allow_concurrent_memtable_write = true;
  1042. // Enforce size of a single MemTable to 64KB (64KB = 65,536 bytes).
  1043. options.write_buffer_size = 1 << 20;
  1044. // Activate the MemPurge prototype.
  1045. options.experimental_mempurge_threshold = 153.245;
  1046. // Activate atomic_flush.
  1047. options.atomic_flush = true;
  1048. const std::vector<std::string> new_cf_names = {"pikachu", "eevie"};
  1049. CreateColumnFamilies(new_cf_names, options);
  1050. Close();
  1051. // 3 CFs: default will be filled with overwrites (would normally trigger
  1052. // mempurge)
  1053. // new_cf_names[1] will be filled with random values (would trigger
  1054. // flush) new_cf_names[2] not filled with anything.
  1055. ReopenWithColumnFamilies(
  1056. {kDefaultColumnFamilyName, new_cf_names[0], new_cf_names[1]}, options);
  1057. size_t num_cfs = handles_.size();
  1058. ASSERT_EQ(3, num_cfs);
  1059. ASSERT_OK(Put(1, "foo", "bar"));
  1060. ASSERT_OK(Put(2, "bar", "baz"));
  1061. std::atomic<uint32_t> mempurge_count{0};
  1062. std::atomic<uint32_t> sst_count{0};
  1063. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  1064. "DBImpl::FlushJob:MemPurgeSuccessful",
  1065. [&](void* /*arg*/) { mempurge_count++; });
  1066. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  1067. "DBImpl::FlushJob:SSTFileCreated", [&](void* /*arg*/) { sst_count++; });
  1068. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  1069. const size_t KVSIZE = 3;
  1070. std::vector<std::string> KEYS(KVSIZE);
  1071. for (size_t k = 0; k < KVSIZE; k++) {
  1072. KEYS[k] = "IamKey" + std::to_string(k);
  1073. }
  1074. std::string RNDKEY;
  1075. std::vector<std::string> RNDVALS(KVSIZE);
  1076. const std::string NOT_FOUND = "NOT_FOUND";
  1077. // Heavy overwrite workload,
  1078. // more than would fit in maximum allowed memtables.
  1079. Random rnd(106);
  1080. const size_t NUM_REPEAT = 100;
  1081. const size_t RAND_KEY_LENGTH = 128;
  1082. const size_t RAND_VALUES_LENGTH = 10240;
  1083. // Insertion of of K-V pairs, multiple times (overwrites).
  1084. for (size_t i = 0; i < NUM_REPEAT; i++) {
  1085. for (size_t j = 0; j < KEYS.size(); j++) {
  1086. RNDKEY = rnd.RandomString(RAND_KEY_LENGTH);
  1087. RNDVALS[j] = rnd.RandomString(RAND_VALUES_LENGTH);
  1088. ASSERT_OK(Put(KEYS[j], RNDVALS[j]));
  1089. ASSERT_OK(Put(1, RNDKEY, RNDVALS[j]));
  1090. ASSERT_EQ(Get(KEYS[j]), RNDVALS[j]);
  1091. ASSERT_EQ(Get(1, RNDKEY), RNDVALS[j]);
  1092. }
  1093. }
  1094. // Check that there was no mempurge because atomic_flush option is true.
  1095. const uint32_t EXPECTED_MIN_MEMPURGE_COUNT = 0;
  1096. // Check that there was at least one SST files created during flush.
  1097. const uint32_t EXPECTED_SST_COUNT = 1;
  1098. EXPECT_EQ(mempurge_count.exchange(0), EXPECTED_MIN_MEMPURGE_COUNT);
  1099. EXPECT_GE(sst_count.exchange(0), EXPECTED_SST_COUNT);
  1100. Close();
  1101. }
  1102. TEST_F(DBFlushTest, MemPurgeDeleteAndDeleteRange) {
  1103. Options options = CurrentOptions();
  1104. options.statistics = CreateDBStatistics();
  1105. options.statistics->set_stats_level(StatsLevel::kAll);
  1106. options.create_if_missing = true;
  1107. options.compression = kNoCompression;
  1108. options.inplace_update_support = false;
  1109. options.allow_concurrent_memtable_write = true;
  1110. TestFlushListener* listener = new TestFlushListener(options.env, this);
  1111. options.listeners.emplace_back(listener);
  1112. // Enforce size of a single MemTable to 64MB (64MB = 67108864 bytes).
  1113. options.write_buffer_size = 1 << 20;
  1114. // Activate the MemPurge prototype.
  1115. options.experimental_mempurge_threshold = 15.0;
  1116. ASSERT_OK(TryReopen(options));
  1117. std::atomic<uint32_t> mempurge_count{0};
  1118. std::atomic<uint32_t> sst_count{0};
  1119. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  1120. "DBImpl::FlushJob:MemPurgeSuccessful",
  1121. [&](void* /*arg*/) { mempurge_count++; });
  1122. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  1123. "DBImpl::FlushJob:SSTFileCreated", [&](void* /*arg*/) { sst_count++; });
  1124. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  1125. std::string KEY1 = "ThisIsKey1";
  1126. std::string KEY2 = "ThisIsKey2";
  1127. std::string KEY3 = "ThisIsKey3";
  1128. std::string KEY4 = "ThisIsKey4";
  1129. std::string KEY5 = "ThisIsKey5";
  1130. const std::string NOT_FOUND = "NOT_FOUND";
  1131. Random rnd(117);
  1132. const size_t NUM_REPEAT = 100;
  1133. const size_t RAND_VALUES_LENGTH = 10240;
  1134. std::string key, value, p_v1, p_v2, p_v3, p_v3b, p_v4, p_v5;
  1135. int count = 0;
  1136. const int EXPECTED_COUNT_FORLOOP = 3;
  1137. const int EXPECTED_COUNT_END = 4;
  1138. ReadOptions ropt;
  1139. ropt.pin_data = true;
  1140. ropt.total_order_seek = true;
  1141. Iterator* iter = nullptr;
  1142. // Insertion of of K-V pairs, multiple times.
  1143. // Also insert DeleteRange
  1144. for (size_t i = 0; i < NUM_REPEAT; i++) {
  1145. // Create value strings of arbitrary length RAND_VALUES_LENGTH bytes.
  1146. p_v1 = rnd.RandomString(RAND_VALUES_LENGTH);
  1147. p_v2 = rnd.RandomString(RAND_VALUES_LENGTH);
  1148. p_v3 = rnd.RandomString(RAND_VALUES_LENGTH);
  1149. p_v3b = rnd.RandomString(RAND_VALUES_LENGTH);
  1150. p_v4 = rnd.RandomString(RAND_VALUES_LENGTH);
  1151. p_v5 = rnd.RandomString(RAND_VALUES_LENGTH);
  1152. ASSERT_OK(Put(KEY1, p_v1));
  1153. ASSERT_OK(Put(KEY2, p_v2));
  1154. ASSERT_OK(Put(KEY3, p_v3));
  1155. ASSERT_OK(Put(KEY4, p_v4));
  1156. ASSERT_OK(Put(KEY5, p_v5));
  1157. ASSERT_OK(Delete(KEY2));
  1158. ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), KEY2,
  1159. KEY4));
  1160. ASSERT_OK(Put(KEY3, p_v3b));
  1161. ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), KEY1,
  1162. KEY3));
  1163. ASSERT_OK(Delete(KEY1));
  1164. ASSERT_EQ(Get(KEY1), NOT_FOUND);
  1165. ASSERT_EQ(Get(KEY2), NOT_FOUND);
  1166. ASSERT_EQ(Get(KEY3), p_v3b);
  1167. ASSERT_EQ(Get(KEY4), p_v4);
  1168. ASSERT_EQ(Get(KEY5), p_v5);
  1169. iter = db_->NewIterator(ropt);
  1170. iter->SeekToFirst();
  1171. count = 0;
  1172. for (; iter->Valid(); iter->Next()) {
  1173. ASSERT_OK(iter->status());
  1174. key = (iter->key()).ToString(false);
  1175. value = (iter->value()).ToString(false);
  1176. if (key.compare(KEY3) == 0) {
  1177. ASSERT_EQ(value, p_v3b);
  1178. } else if (key.compare(KEY4) == 0) {
  1179. ASSERT_EQ(value, p_v4);
  1180. } else if (key.compare(KEY5) == 0) {
  1181. ASSERT_EQ(value, p_v5);
  1182. } else {
  1183. ASSERT_EQ(value, NOT_FOUND);
  1184. }
  1185. count++;
  1186. }
  1187. ASSERT_OK(iter->status());
  1188. // Expected count here is 3: KEY3, KEY4, KEY5.
  1189. ASSERT_EQ(count, EXPECTED_COUNT_FORLOOP);
  1190. if (iter) {
  1191. delete iter;
  1192. }
  1193. }
  1194. // Check that there was at least one mempurge
  1195. const uint32_t EXPECTED_MIN_MEMPURGE_COUNT = 1;
  1196. // Check that there was no SST files created during flush.
  1197. const uint32_t EXPECTED_SST_COUNT = 0;
  1198. EXPECT_GE(mempurge_count.exchange(0), EXPECTED_MIN_MEMPURGE_COUNT);
  1199. EXPECT_EQ(sst_count.exchange(0), EXPECTED_SST_COUNT);
  1200. // Additional test for the iterator+memPurge.
  1201. ASSERT_OK(Put(KEY2, p_v2));
  1202. iter = db_->NewIterator(ropt);
  1203. iter->SeekToFirst();
  1204. ASSERT_OK(Put(KEY4, p_v4));
  1205. count = 0;
  1206. for (; iter->Valid(); iter->Next()) {
  1207. ASSERT_OK(iter->status());
  1208. key = (iter->key()).ToString(false);
  1209. value = (iter->value()).ToString(false);
  1210. if (key.compare(KEY2) == 0) {
  1211. ASSERT_EQ(value, p_v2);
  1212. } else if (key.compare(KEY3) == 0) {
  1213. ASSERT_EQ(value, p_v3b);
  1214. } else if (key.compare(KEY4) == 0) {
  1215. ASSERT_EQ(value, p_v4);
  1216. } else if (key.compare(KEY5) == 0) {
  1217. ASSERT_EQ(value, p_v5);
  1218. } else {
  1219. ASSERT_EQ(value, NOT_FOUND);
  1220. }
  1221. count++;
  1222. }
  1223. // Expected count here is 4: KEY2, KEY3, KEY4, KEY5.
  1224. ASSERT_EQ(count, EXPECTED_COUNT_END);
  1225. if (iter) {
  1226. delete iter;
  1227. }
  1228. Close();
  1229. }
  1230. // Create a Compaction Fitler that will be invoked
  1231. // at flush time and will update the value of a KV pair
  1232. // if the key string is "lower" than the filter_key_ string.
  1233. class ConditionalUpdateFilter : public CompactionFilter {
  1234. public:
  1235. explicit ConditionalUpdateFilter(const std::string* filtered_key)
  1236. : filtered_key_(filtered_key) {}
  1237. bool Filter(int /*level*/, const Slice& key, const Slice& /*value*/,
  1238. std::string* new_value, bool* value_changed) const override {
  1239. // If key<filtered_key_, update the value of the KV-pair.
  1240. if (key.compare(*filtered_key_) < 0) {
  1241. assert(new_value != nullptr);
  1242. *new_value = NEW_VALUE;
  1243. *value_changed = true;
  1244. }
  1245. return false /*do not remove this KV-pair*/;
  1246. }
  1247. const char* Name() const override { return "ConditionalUpdateFilter"; }
  1248. private:
  1249. const std::string* filtered_key_;
  1250. };
  1251. class ConditionalUpdateFilterFactory : public CompactionFilterFactory {
  1252. public:
  1253. explicit ConditionalUpdateFilterFactory(const Slice& filtered_key)
  1254. : filtered_key_(filtered_key.ToString()) {}
  1255. std::unique_ptr<CompactionFilter> CreateCompactionFilter(
  1256. const CompactionFilter::Context& /*context*/) override {
  1257. return std::unique_ptr<CompactionFilter>(
  1258. new ConditionalUpdateFilter(&filtered_key_));
  1259. }
  1260. const char* Name() const override { return "ConditionalUpdateFilterFactory"; }
  1261. bool ShouldFilterTableFileCreation(
  1262. TableFileCreationReason reason) const override {
  1263. // This compaction filter will be invoked
  1264. // at flush time (and therefore at MemPurge time).
  1265. return (reason == TableFileCreationReason::kFlush);
  1266. }
  1267. private:
  1268. std::string filtered_key_;
  1269. };
  1270. TEST_F(DBFlushTest, MemPurgeAndCompactionFilter) {
  1271. Options options = CurrentOptions();
  1272. std::string KEY1 = "ThisIsKey1";
  1273. std::string KEY2 = "ThisIsKey2";
  1274. std::string KEY3 = "ThisIsKey3";
  1275. std::string KEY4 = "ThisIsKey4";
  1276. std::string KEY5 = "ThisIsKey5";
  1277. std::string KEY6 = "ThisIsKey6";
  1278. std::string KEY7 = "ThisIsKey7";
  1279. std::string KEY8 = "ThisIsKey8";
  1280. std::string KEY9 = "ThisIsKey9";
  1281. const std::string NOT_FOUND = "NOT_FOUND";
  1282. options.statistics = CreateDBStatistics();
  1283. options.statistics->set_stats_level(StatsLevel::kAll);
  1284. options.create_if_missing = true;
  1285. options.compression = kNoCompression;
  1286. options.inplace_update_support = false;
  1287. options.allow_concurrent_memtable_write = true;
  1288. TestFlushListener* listener = new TestFlushListener(options.env, this);
  1289. options.listeners.emplace_back(listener);
  1290. // Create a ConditionalUpdate compaction filter
  1291. // that will update all the values of the KV pairs
  1292. // where the keys are "lower" than KEY4.
  1293. options.compaction_filter_factory =
  1294. std::make_shared<ConditionalUpdateFilterFactory>(KEY4);
  1295. // Enforce size of a single MemTable to 64MB (64MB = 67108864 bytes).
  1296. options.write_buffer_size = 1 << 20;
  1297. // Activate the MemPurge prototype.
  1298. options.experimental_mempurge_threshold = 26.55;
  1299. ASSERT_OK(TryReopen(options));
  1300. std::atomic<uint32_t> mempurge_count{0};
  1301. std::atomic<uint32_t> sst_count{0};
  1302. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  1303. "DBImpl::FlushJob:MemPurgeSuccessful",
  1304. [&](void* /*arg*/) { mempurge_count++; });
  1305. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  1306. "DBImpl::FlushJob:SSTFileCreated", [&](void* /*arg*/) { sst_count++; });
  1307. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  1308. Random rnd(53);
  1309. const size_t NUM_REPEAT = 1000;
  1310. const size_t RAND_VALUES_LENGTH = 10240;
  1311. std::string p_v1, p_v2, p_v3, p_v4, p_v5, p_v6, p_v7, p_v8, p_v9;
  1312. p_v1 = rnd.RandomString(RAND_VALUES_LENGTH);
  1313. p_v2 = rnd.RandomString(RAND_VALUES_LENGTH);
  1314. p_v3 = rnd.RandomString(RAND_VALUES_LENGTH);
  1315. p_v4 = rnd.RandomString(RAND_VALUES_LENGTH);
  1316. p_v5 = rnd.RandomString(RAND_VALUES_LENGTH);
  1317. ASSERT_OK(Put(KEY1, p_v1));
  1318. ASSERT_OK(Put(KEY2, p_v2));
  1319. ASSERT_OK(Put(KEY3, p_v3));
  1320. ASSERT_OK(Put(KEY4, p_v4));
  1321. ASSERT_OK(Put(KEY5, p_v5));
  1322. ASSERT_OK(Delete(KEY1));
  1323. // Insertion of of K-V pairs, multiple times.
  1324. for (size_t i = 0; i < NUM_REPEAT; i++) {
  1325. // Create value strings of arbitrary
  1326. // length RAND_VALUES_LENGTH bytes.
  1327. p_v6 = rnd.RandomString(RAND_VALUES_LENGTH);
  1328. p_v7 = rnd.RandomString(RAND_VALUES_LENGTH);
  1329. p_v8 = rnd.RandomString(RAND_VALUES_LENGTH);
  1330. p_v9 = rnd.RandomString(RAND_VALUES_LENGTH);
  1331. ASSERT_OK(Put(KEY6, p_v6));
  1332. ASSERT_OK(Put(KEY7, p_v7));
  1333. ASSERT_OK(Put(KEY8, p_v8));
  1334. ASSERT_OK(Put(KEY9, p_v9));
  1335. ASSERT_OK(Delete(KEY7));
  1336. }
  1337. // Check that there was at least one mempurge
  1338. const uint32_t EXPECTED_MIN_MEMPURGE_COUNT = 1;
  1339. // Check that there was no SST files created during flush.
  1340. const uint32_t EXPECTED_SST_COUNT = 0;
  1341. EXPECT_GE(mempurge_count.exchange(0), EXPECTED_MIN_MEMPURGE_COUNT);
  1342. EXPECT_EQ(sst_count.exchange(0), EXPECTED_SST_COUNT);
  1343. // Verify that the ConditionalUpdateCompactionFilter
  1344. // updated the values of KEY2 and KEY3, and not KEY4 and KEY5.
  1345. ASSERT_EQ(Get(KEY1), NOT_FOUND);
  1346. ASSERT_EQ(Get(KEY2), NEW_VALUE);
  1347. ASSERT_EQ(Get(KEY3), NEW_VALUE);
  1348. ASSERT_EQ(Get(KEY4), p_v4);
  1349. ASSERT_EQ(Get(KEY5), p_v5);
  1350. }
  1351. TEST_F(DBFlushTest, DISABLED_MemPurgeWALSupport) {
  1352. Options options = CurrentOptions();
  1353. options.statistics = CreateDBStatistics();
  1354. options.statistics->set_stats_level(StatsLevel::kAll);
  1355. options.create_if_missing = true;
  1356. options.compression = kNoCompression;
  1357. options.inplace_update_support = false;
  1358. options.allow_concurrent_memtable_write = true;
  1359. // Enforce size of a single MemTable to 128KB.
  1360. options.write_buffer_size = 128 << 10;
  1361. // Activate the MemPurge prototype
  1362. // (values >1.0 are equivalent to 1.0).
  1363. options.experimental_mempurge_threshold = 2.5;
  1364. ASSERT_OK(TryReopen(options));
  1365. const size_t KVSIZE = 10;
  1366. do {
  1367. CreateAndReopenWithCF({"pikachu"}, options);
  1368. ASSERT_OK(Put(1, "foo", "v1"));
  1369. ASSERT_OK(Put(1, "baz", "v5"));
  1370. ReopenWithColumnFamilies({"default", "pikachu"}, options);
  1371. ASSERT_EQ("v1", Get(1, "foo"));
  1372. ASSERT_EQ("v1", Get(1, "foo"));
  1373. ASSERT_EQ("v5", Get(1, "baz"));
  1374. ASSERT_OK(Put(0, "bar", "v2"));
  1375. ASSERT_OK(Put(1, "bar", "v2"));
  1376. ASSERT_OK(Put(1, "foo", "v3"));
  1377. std::atomic<uint32_t> mempurge_count{0};
  1378. std::atomic<uint32_t> sst_count{0};
  1379. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  1380. "DBImpl::FlushJob:MemPurgeSuccessful",
  1381. [&](void* /*arg*/) { mempurge_count++; });
  1382. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  1383. "DBImpl::FlushJob:SSTFileCreated", [&](void* /*arg*/) { sst_count++; });
  1384. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  1385. std::vector<std::string> keys;
  1386. for (size_t k = 0; k < KVSIZE; k++) {
  1387. keys.push_back("IamKey" + std::to_string(k));
  1388. }
  1389. std::string RNDKEY, RNDVALUE;
  1390. const std::string NOT_FOUND = "NOT_FOUND";
  1391. // Heavy overwrite workload,
  1392. // more than would fit in maximum allowed memtables.
  1393. Random rnd(719);
  1394. const size_t NUM_REPEAT = 100;
  1395. const size_t RAND_KEY_LENGTH = 4096;
  1396. const size_t RAND_VALUES_LENGTH = 1024;
  1397. std::vector<std::string> values_default(KVSIZE), values_pikachu(KVSIZE);
  1398. // Insert a very first set of keys that will be
  1399. // mempurged at least once.
  1400. for (size_t k = 0; k < KVSIZE / 2; k++) {
  1401. values_default[k] = rnd.RandomString(RAND_VALUES_LENGTH);
  1402. values_pikachu[k] = rnd.RandomString(RAND_VALUES_LENGTH);
  1403. }
  1404. // Insert keys[0:KVSIZE/2] to
  1405. // both 'default' and 'pikachu' CFs.
  1406. for (size_t k = 0; k < KVSIZE / 2; k++) {
  1407. ASSERT_OK(Put(0, keys[k], values_default[k]));
  1408. ASSERT_OK(Put(1, keys[k], values_pikachu[k]));
  1409. }
  1410. // Check that the insertion was seamless.
  1411. for (size_t k = 0; k < KVSIZE / 2; k++) {
  1412. ASSERT_EQ(Get(0, keys[k]), values_default[k]);
  1413. ASSERT_EQ(Get(1, keys[k]), values_pikachu[k]);
  1414. }
  1415. // Insertion of of K-V pairs, multiple times (overwrites)
  1416. // into 'default' CF. Will trigger mempurge.
  1417. for (size_t j = 0; j < NUM_REPEAT; j++) {
  1418. // Create value strings of arbitrary length RAND_VALUES_LENGTH bytes.
  1419. for (size_t k = KVSIZE / 2; k < KVSIZE; k++) {
  1420. values_default[k] = rnd.RandomString(RAND_VALUES_LENGTH);
  1421. }
  1422. // Insert K-V into default CF.
  1423. for (size_t k = KVSIZE / 2; k < KVSIZE; k++) {
  1424. ASSERT_OK(Put(0, keys[k], values_default[k]));
  1425. }
  1426. // Check key validity, for all keys, both in
  1427. // default and pikachu CFs.
  1428. for (size_t k = 0; k < KVSIZE; k++) {
  1429. ASSERT_EQ(Get(0, keys[k]), values_default[k]);
  1430. }
  1431. // Note that at this point, only keys[0:KVSIZE/2]
  1432. // have been inserted into Pikachu.
  1433. for (size_t k = 0; k < KVSIZE / 2; k++) {
  1434. ASSERT_EQ(Get(1, keys[k]), values_pikachu[k]);
  1435. }
  1436. }
  1437. // Insertion of of K-V pairs, multiple times (overwrites)
  1438. // into 'pikachu' CF. Will trigger mempurge.
  1439. // Check that we keep the older logs for 'default' imm().
  1440. for (size_t j = 0; j < NUM_REPEAT; j++) {
  1441. // Create value strings of arbitrary length RAND_VALUES_LENGTH bytes.
  1442. for (size_t k = KVSIZE / 2; k < KVSIZE; k++) {
  1443. values_pikachu[k] = rnd.RandomString(RAND_VALUES_LENGTH);
  1444. }
  1445. // Insert K-V into pikachu CF.
  1446. for (size_t k = KVSIZE / 2; k < KVSIZE; k++) {
  1447. ASSERT_OK(Put(1, keys[k], values_pikachu[k]));
  1448. }
  1449. // Check key validity, for all keys,
  1450. // both in default and pikachu.
  1451. for (size_t k = 0; k < KVSIZE; k++) {
  1452. ASSERT_EQ(Get(0, keys[k]), values_default[k]);
  1453. ASSERT_EQ(Get(1, keys[k]), values_pikachu[k]);
  1454. }
  1455. }
  1456. // Check that there was at least one mempurge
  1457. const uint32_t EXPECTED_MIN_MEMPURGE_COUNT = 1;
  1458. // Check that there was no SST files created during flush.
  1459. const uint32_t EXPECTED_SST_COUNT = 0;
  1460. EXPECT_GE(mempurge_count.exchange(0), EXPECTED_MIN_MEMPURGE_COUNT);
  1461. if (options.experimental_mempurge_threshold ==
  1462. std::numeric_limits<double>::max()) {
  1463. EXPECT_EQ(sst_count.exchange(0), EXPECTED_SST_COUNT);
  1464. }
  1465. ReopenWithColumnFamilies({"default", "pikachu"}, options);
  1466. // Check that there was no data corruption anywhere,
  1467. // not in 'default' nor in 'Pikachu' CFs.
  1468. ASSERT_EQ("v3", Get(1, "foo"));
  1469. ASSERT_OK(Put(1, "foo", "v4"));
  1470. ASSERT_EQ("v4", Get(1, "foo"));
  1471. ASSERT_EQ("v2", Get(1, "bar"));
  1472. ASSERT_EQ("v5", Get(1, "baz"));
  1473. // Check keys in 'Default' and 'Pikachu'.
  1474. // keys[0:KVSIZE/2] were for sure contained
  1475. // in the imm() at Reopen/recovery time.
  1476. for (size_t k = 0; k < KVSIZE; k++) {
  1477. ASSERT_EQ(Get(0, keys[k]), values_default[k]);
  1478. ASSERT_EQ(Get(1, keys[k]), values_pikachu[k]);
  1479. }
  1480. // Insertion of random K-V pairs to trigger
  1481. // a flush in the Pikachu CF.
  1482. for (size_t j = 0; j < NUM_REPEAT; j++) {
  1483. RNDKEY = rnd.RandomString(RAND_KEY_LENGTH);
  1484. RNDVALUE = rnd.RandomString(RAND_VALUES_LENGTH);
  1485. ASSERT_OK(Put(1, RNDKEY, RNDVALUE));
  1486. }
  1487. // ASsert than there was at least one flush to storage.
  1488. EXPECT_GT(sst_count.exchange(0), EXPECTED_SST_COUNT);
  1489. ReopenWithColumnFamilies({"default", "pikachu"}, options);
  1490. ASSERT_EQ("v4", Get(1, "foo"));
  1491. ASSERT_EQ("v2", Get(1, "bar"));
  1492. ASSERT_EQ("v5", Get(1, "baz"));
  1493. // Since values in default are held in mutable mem()
  1494. // and imm(), check if the flush in pikachu didn't
  1495. // affect these values.
  1496. for (size_t k = 0; k < KVSIZE; k++) {
  1497. ASSERT_EQ(Get(0, keys[k]), values_default[k]);
  1498. ASSERT_EQ(Get(1, keys[k]), values_pikachu[k]);
  1499. }
  1500. ASSERT_EQ(Get(1, RNDKEY), RNDVALUE);
  1501. } while (ChangeWalOptions());
  1502. }
  1503. TEST_F(DBFlushTest, MemPurgeCorrectLogNumberAndSSTFileCreation) {
  1504. // Before our bug fix, we noticed that when 2 memtables were
  1505. // being flushed (with one memtable being the output of a
  1506. // previous MemPurge and one memtable being a newly-sealed memtable),
  1507. // the SST file created was not properly added to the DB version
  1508. // (via the VersionEdit obj), leading to data loss (the SST file
  1509. // was later being purged as an obsolete file).
  1510. // Therefore, we reproduce this scenario to test our fix.
  1511. Options options = CurrentOptions();
  1512. options.create_if_missing = true;
  1513. options.compression = kNoCompression;
  1514. options.inplace_update_support = false;
  1515. options.allow_concurrent_memtable_write = true;
  1516. // Enforce size of a single MemTable to 1MB (64MB = 1048576 bytes).
  1517. options.write_buffer_size = 1 << 20;
  1518. // Activate the MemPurge prototype.
  1519. options.experimental_mempurge_threshold = 1.0;
  1520. // Force to have more than one memtable to trigger a flush.
  1521. // For some reason this option does not seem to be enforced,
  1522. // so the following test is designed to make sure that we
  1523. // are testing the correct test case.
  1524. options.min_write_buffer_number_to_merge = 3;
  1525. options.max_write_buffer_number = 5;
  1526. options.max_write_buffer_size_to_maintain = 2 * (options.write_buffer_size);
  1527. options.disable_auto_compactions = true;
  1528. ASSERT_OK(TryReopen(options));
  1529. std::atomic<uint32_t> mempurge_count{0};
  1530. std::atomic<uint32_t> sst_count{0};
  1531. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  1532. "DBImpl::FlushJob:MemPurgeSuccessful",
  1533. [&](void* /*arg*/) { mempurge_count++; });
  1534. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  1535. "DBImpl::FlushJob:SSTFileCreated", [&](void* /*arg*/) { sst_count++; });
  1536. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  1537. // Dummy variable used for the following callback function.
  1538. uint64_t ZERO = 0;
  1539. // We will first execute mempurge operations exclusively.
  1540. // Therefore, when the first flush is triggered, we want to make
  1541. // sure there is at least 2 memtables being flushed: one output
  1542. // from a previous mempurge, and one newly sealed memtable.
  1543. // This is when we observed in the past that some SST files created
  1544. // were not properly added to the DB version (via the VersionEdit obj).
  1545. std::atomic<uint64_t> num_memtable_at_first_flush(0);
  1546. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  1547. "FlushJob::WriteLevel0Table:num_memtables", [&](void* arg) {
  1548. uint64_t* mems_size = static_cast<uint64_t*>(arg);
  1549. // atomic_compare_exchange_strong sometimes updates the value
  1550. // of ZERO (the "expected" object), so we make sure ZERO is indeed...
  1551. // zero.
  1552. ZERO = 0;
  1553. std::atomic_compare_exchange_strong(&num_memtable_at_first_flush, &ZERO,
  1554. *mems_size);
  1555. });
  1556. const std::vector<std::string> KEYS = {
  1557. "ThisIsKey1", "ThisIsKey2", "ThisIsKey3", "ThisIsKey4", "ThisIsKey5",
  1558. "ThisIsKey6", "ThisIsKey7", "ThisIsKey8", "ThisIsKey9"};
  1559. const std::string NOT_FOUND = "NOT_FOUND";
  1560. Random rnd(117);
  1561. const uint64_t NUM_REPEAT_OVERWRITES = 100;
  1562. const uint64_t NUM_RAND_INSERTS = 500;
  1563. const uint64_t RAND_VALUES_LENGTH = 10240;
  1564. std::string key, value;
  1565. std::vector<std::string> values(9, "");
  1566. // Keys used to check that no SST file disappeared.
  1567. for (uint64_t k = 0; k < 5; k++) {
  1568. values[k] = rnd.RandomString(RAND_VALUES_LENGTH);
  1569. ASSERT_OK(Put(KEYS[k], values[k]));
  1570. }
  1571. // Insertion of of K-V pairs, multiple times.
  1572. // Trigger at least one mempurge and no SST file creation.
  1573. for (size_t i = 0; i < NUM_REPEAT_OVERWRITES; i++) {
  1574. // Create value strings of arbitrary length RAND_VALUES_LENGTH bytes.
  1575. for (uint64_t k = 5; k < values.size(); k++) {
  1576. values[k] = rnd.RandomString(RAND_VALUES_LENGTH);
  1577. ASSERT_OK(Put(KEYS[k], values[k]));
  1578. }
  1579. // Check database consistency.
  1580. for (uint64_t k = 0; k < values.size(); k++) {
  1581. ASSERT_EQ(Get(KEYS[k]), values[k]);
  1582. }
  1583. }
  1584. // Check that there was at least one mempurge
  1585. uint32_t expected_min_mempurge_count = 1;
  1586. // Check that there was no SST files created during flush.
  1587. uint32_t expected_sst_count = 0;
  1588. EXPECT_GE(mempurge_count.load(), expected_min_mempurge_count);
  1589. EXPECT_EQ(sst_count.load(), expected_sst_count);
  1590. // Trigger an SST file creation and no mempurge.
  1591. for (size_t i = 0; i < NUM_RAND_INSERTS; i++) {
  1592. key = rnd.RandomString(RAND_VALUES_LENGTH);
  1593. // Create value strings of arbitrary length RAND_VALUES_LENGTH bytes.
  1594. value = rnd.RandomString(RAND_VALUES_LENGTH);
  1595. ASSERT_OK(Put(key, value));
  1596. // Check database consistency.
  1597. for (uint64_t k = 0; k < values.size(); k++) {
  1598. ASSERT_EQ(Get(KEYS[k]), values[k]);
  1599. }
  1600. ASSERT_EQ(Get(key), value);
  1601. }
  1602. // Check that there was at least one SST files created during flush.
  1603. expected_sst_count = 1;
  1604. EXPECT_GE(sst_count.load(), expected_sst_count);
  1605. // Oddly enough, num_memtable_at_first_flush is not enforced to be
  1606. // equal to min_write_buffer_number_to_merge. So by asserting that
  1607. // the first SST file creation comes from one output memtable
  1608. // from a previous mempurge, and one newly sealed memtable. This
  1609. // is the scenario where we observed that some SST files created
  1610. // were not properly added to the DB version before our bug fix.
  1611. ASSERT_GE(num_memtable_at_first_flush.load(), 2);
  1612. // Check that no data was lost after SST file creation.
  1613. for (uint64_t k = 0; k < values.size(); k++) {
  1614. ASSERT_EQ(Get(KEYS[k]), values[k]);
  1615. }
  1616. // Extra check of database consistency.
  1617. ASSERT_EQ(Get(key), value);
  1618. Close();
  1619. }
  1620. TEST_P(DBFlushDirectIOTest, DirectIO) {
  1621. Options options;
  1622. options.create_if_missing = true;
  1623. options.disable_auto_compactions = true;
  1624. options.max_background_flushes = 2;
  1625. options.use_direct_io_for_flush_and_compaction = GetParam();
  1626. options.env = MockEnv::Create(Env::Default());
  1627. SyncPoint::GetInstance()->SetCallBack(
  1628. "BuildTable:create_file", [&](void* arg) {
  1629. bool* use_direct_writes = static_cast<bool*>(arg);
  1630. ASSERT_EQ(*use_direct_writes,
  1631. options.use_direct_io_for_flush_and_compaction);
  1632. });
  1633. SyncPoint::GetInstance()->EnableProcessing();
  1634. Reopen(options);
  1635. ASSERT_OK(Put("foo", "v"));
  1636. FlushOptions flush_options;
  1637. flush_options.wait = true;
  1638. ASSERT_OK(dbfull()->Flush(flush_options));
  1639. Destroy(options);
  1640. delete options.env;
  1641. }
  1642. TEST_F(DBFlushTest, FlushError) {
  1643. Options options;
  1644. std::unique_ptr<FaultInjectionTestEnv> fault_injection_env(
  1645. new FaultInjectionTestEnv(env_));
  1646. options.write_buffer_size = 100;
  1647. options.max_write_buffer_number = 4;
  1648. options.min_write_buffer_number_to_merge = 3;
  1649. options.disable_auto_compactions = true;
  1650. options.env = fault_injection_env.get();
  1651. Reopen(options);
  1652. ASSERT_OK(Put("key1", "value1"));
  1653. ASSERT_OK(Put("key2", "value2"));
  1654. fault_injection_env->SetFilesystemActive(false);
  1655. Status s = dbfull()->TEST_SwitchMemtable();
  1656. fault_injection_env->SetFilesystemActive(true);
  1657. Destroy(options);
  1658. ASSERT_NE(s, Status::OK());
  1659. }
  1660. TEST_F(DBFlushTest, ManualFlushFailsInReadOnlyMode) {
  1661. // Regression test for bug where manual flush hangs forever when the DB
  1662. // is in read-only mode. Verify it now at least returns, despite failing.
  1663. Options options;
  1664. std::unique_ptr<FaultInjectionTestEnv> fault_injection_env(
  1665. new FaultInjectionTestEnv(env_));
  1666. options.env = fault_injection_env.get();
  1667. options.max_write_buffer_number = 2;
  1668. Reopen(options);
  1669. // Trigger a first flush but don't let it run
  1670. ASSERT_OK(db_->PauseBackgroundWork());
  1671. ASSERT_OK(Put("key1", "value1"));
  1672. FlushOptions flush_opts;
  1673. flush_opts.wait = false;
  1674. ASSERT_OK(db_->Flush(flush_opts));
  1675. // Write a key to the second memtable so we have something to flush later
  1676. // after the DB is in read-only mode.
  1677. ASSERT_OK(Put("key2", "value2"));
  1678. // Let the first flush continue, hit an error, and put the DB in read-only
  1679. // mode.
  1680. fault_injection_env->SetFilesystemActive(false);
  1681. ASSERT_OK(db_->ContinueBackgroundWork());
  1682. // We ingested the error to env, so the returned status is not OK.
  1683. ASSERT_NOK(dbfull()->TEST_WaitForFlushMemTable());
  1684. uint64_t num_bg_errors;
  1685. ASSERT_TRUE(
  1686. db_->GetIntProperty(DB::Properties::kBackgroundErrors, &num_bg_errors));
  1687. ASSERT_GT(num_bg_errors, 0);
  1688. // In the bug scenario, triggering another flush would cause the second flush
  1689. // to hang forever. After the fix we expect it to return an error.
  1690. ASSERT_NOK(db_->Flush(FlushOptions()));
  1691. Close();
  1692. }
  1693. TEST_F(DBFlushTest, CFDropRaceWithWaitForFlushMemTables) {
  1694. Options options = CurrentOptions();
  1695. options.create_if_missing = true;
  1696. CreateAndReopenWithCF({"pikachu"}, options);
  1697. SyncPoint::GetInstance()->DisableProcessing();
  1698. SyncPoint::GetInstance()->LoadDependency(
  1699. {{"DBImpl::FlushMemTable:AfterScheduleFlush",
  1700. "DBFlushTest::CFDropRaceWithWaitForFlushMemTables:BeforeDrop"},
  1701. {"DBFlushTest::CFDropRaceWithWaitForFlushMemTables:AfterFree",
  1702. "DBImpl::BackgroundCallFlush:start"},
  1703. {"DBImpl::BackgroundCallFlush:start",
  1704. "DBImpl::FlushMemTable:BeforeWaitForBgFlush"}});
  1705. SyncPoint::GetInstance()->EnableProcessing();
  1706. ASSERT_EQ(2, handles_.size());
  1707. ASSERT_OK(Put(1, "key", "value"));
  1708. auto* cfd = static_cast<ColumnFamilyHandleImpl*>(handles_[1])->cfd();
  1709. port::Thread drop_cf_thr([&]() {
  1710. TEST_SYNC_POINT(
  1711. "DBFlushTest::CFDropRaceWithWaitForFlushMemTables:BeforeDrop");
  1712. ASSERT_OK(dbfull()->DropColumnFamily(handles_[1]));
  1713. ASSERT_OK(dbfull()->DestroyColumnFamilyHandle(handles_[1]));
  1714. handles_.resize(1);
  1715. TEST_SYNC_POINT(
  1716. "DBFlushTest::CFDropRaceWithWaitForFlushMemTables:AfterFree");
  1717. });
  1718. FlushOptions flush_opts;
  1719. flush_opts.allow_write_stall = true;
  1720. ASSERT_NOK(dbfull()->TEST_FlushMemTable(cfd, flush_opts));
  1721. drop_cf_thr.join();
  1722. Close();
  1723. SyncPoint::GetInstance()->DisableProcessing();
  1724. }
  1725. TEST_F(DBFlushTest, FireOnFlushCompletedAfterCommittedResult) {
  1726. class TestListener : public EventListener {
  1727. public:
  1728. void OnFlushCompleted(DB* db, const FlushJobInfo& info) override {
  1729. // There's only one key in each flush.
  1730. ASSERT_EQ(info.smallest_seqno, info.largest_seqno);
  1731. ASSERT_NE(0, info.smallest_seqno);
  1732. if (info.smallest_seqno == seq1) {
  1733. // First flush completed
  1734. ASSERT_FALSE(completed1);
  1735. completed1 = true;
  1736. CheckFlushResultCommitted(db, seq1);
  1737. } else {
  1738. // Second flush completed
  1739. ASSERT_FALSE(completed2);
  1740. completed2 = true;
  1741. ASSERT_EQ(info.smallest_seqno, seq2);
  1742. CheckFlushResultCommitted(db, seq2);
  1743. }
  1744. }
  1745. void CheckFlushResultCommitted(DB* db, SequenceNumber seq) {
  1746. DBImpl* db_impl = static_cast_with_check<DBImpl>(db);
  1747. InstrumentedMutex* mutex = db_impl->mutex();
  1748. mutex->Lock();
  1749. auto* cfd = static_cast_with_check<ColumnFamilyHandleImpl>(
  1750. db->DefaultColumnFamily())
  1751. ->cfd();
  1752. ASSERT_LT(seq, cfd->imm()->current()->GetEarliestSequenceNumber());
  1753. mutex->Unlock();
  1754. }
  1755. std::atomic<SequenceNumber> seq1{0};
  1756. std::atomic<SequenceNumber> seq2{0};
  1757. std::atomic<bool> completed1{false};
  1758. std::atomic<bool> completed2{false};
  1759. };
  1760. std::shared_ptr<TestListener> listener = std::make_shared<TestListener>();
  1761. SyncPoint::GetInstance()->LoadDependency(
  1762. {{"DBImpl::FlushMemTableToOutputFile:AfterPickMemtables",
  1763. "DBFlushTest::FireOnFlushCompletedAfterCommittedResult:WaitFirst"},
  1764. {"DBImpl::FlushMemTableToOutputFile:Finish",
  1765. "DBFlushTest::FireOnFlushCompletedAfterCommittedResult:WaitSecond"}});
  1766. SyncPoint::GetInstance()->SetCallBack(
  1767. "FlushJob::WriteLevel0Table", [&listener](void* arg) {
  1768. // Wait for the second flush finished, out of mutex.
  1769. auto* mems = static_cast<autovector<MemTable*>*>(arg);
  1770. if (mems->front()->GetEarliestSequenceNumber() == listener->seq1 - 1) {
  1771. TEST_SYNC_POINT(
  1772. "DBFlushTest::FireOnFlushCompletedAfterCommittedResult:"
  1773. "WaitSecond");
  1774. }
  1775. });
  1776. Options options = CurrentOptions();
  1777. options.create_if_missing = true;
  1778. options.listeners.push_back(listener);
  1779. // Setting max_flush_jobs = max_background_jobs / 4 = 2.
  1780. options.max_background_jobs = 8;
  1781. // Allow 2 immutable memtables.
  1782. options.max_write_buffer_number = 3;
  1783. Reopen(options);
  1784. SyncPoint::GetInstance()->EnableProcessing();
  1785. ASSERT_OK(Put("foo", "v"));
  1786. listener->seq1 = db_->GetLatestSequenceNumber();
  1787. // t1 will wait for the second flush complete before committing flush result.
  1788. auto t1 = port::Thread([&]() {
  1789. // flush_opts.wait = true
  1790. ASSERT_OK(db_->Flush(FlushOptions()));
  1791. });
  1792. // Wait for first flush started.
  1793. TEST_SYNC_POINT(
  1794. "DBFlushTest::FireOnFlushCompletedAfterCommittedResult:WaitFirst");
  1795. // The second flush will exit early without commit its result. The work
  1796. // is delegated to the first flush.
  1797. ASSERT_OK(Put("bar", "v"));
  1798. listener->seq2 = db_->GetLatestSequenceNumber();
  1799. FlushOptions flush_opts;
  1800. flush_opts.wait = false;
  1801. ASSERT_OK(db_->Flush(flush_opts));
  1802. t1.join();
  1803. // Ensure background work is fully finished including listener callbacks
  1804. // before accessing listener state.
  1805. ASSERT_OK(dbfull()->TEST_WaitForBackgroundWork());
  1806. ASSERT_TRUE(listener->completed1);
  1807. ASSERT_TRUE(listener->completed2);
  1808. SyncPoint::GetInstance()->DisableProcessing();
  1809. SyncPoint::GetInstance()->ClearAllCallBacks();
  1810. }
  1811. TEST_F(DBFlushTest, FlushWithBlob) {
  1812. constexpr uint64_t min_blob_size = 10;
  1813. Options options;
  1814. options.enable_blob_files = true;
  1815. options.min_blob_size = min_blob_size;
  1816. options.disable_auto_compactions = true;
  1817. options.env = env_;
  1818. Reopen(options);
  1819. constexpr char short_value[] = "short";
  1820. static_assert(sizeof(short_value) - 1 < min_blob_size,
  1821. "short_value too long");
  1822. constexpr char long_value[] = "long_value";
  1823. static_assert(sizeof(long_value) - 1 >= min_blob_size,
  1824. "long_value too short");
  1825. ASSERT_OK(Put("key1", short_value));
  1826. ASSERT_OK(Put("key2", long_value));
  1827. ASSERT_OK(Flush());
  1828. ASSERT_EQ(Get("key1"), short_value);
  1829. ASSERT_EQ(Get("key2"), long_value);
  1830. VersionSet* const versions = dbfull()->GetVersionSet();
  1831. assert(versions);
  1832. ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
  1833. assert(cfd);
  1834. Version* const current = cfd->current();
  1835. assert(current);
  1836. const VersionStorageInfo* const storage_info = current->storage_info();
  1837. assert(storage_info);
  1838. const auto& l0_files = storage_info->LevelFiles(0);
  1839. ASSERT_EQ(l0_files.size(), 1);
  1840. const FileMetaData* const table_file = l0_files[0];
  1841. assert(table_file);
  1842. const auto& blob_files = storage_info->GetBlobFiles();
  1843. ASSERT_EQ(blob_files.size(), 1);
  1844. const auto& blob_file = blob_files.front();
  1845. assert(blob_file);
  1846. ASSERT_EQ(table_file->smallest.user_key(), "key1");
  1847. ASSERT_EQ(table_file->largest.user_key(), "key2");
  1848. ASSERT_EQ(table_file->fd.smallest_seqno, 1);
  1849. ASSERT_EQ(table_file->fd.largest_seqno, 2);
  1850. ASSERT_EQ(table_file->oldest_blob_file_number,
  1851. blob_file->GetBlobFileNumber());
  1852. ASSERT_EQ(blob_file->GetTotalBlobCount(), 1);
  1853. const InternalStats* const internal_stats = cfd->internal_stats();
  1854. assert(internal_stats);
  1855. const auto& compaction_stats = internal_stats->TEST_GetCompactionStats();
  1856. ASSERT_FALSE(compaction_stats.empty());
  1857. ASSERT_EQ(compaction_stats[0].bytes_written, table_file->fd.GetFileSize());
  1858. ASSERT_EQ(compaction_stats[0].bytes_written_blob,
  1859. blob_file->GetTotalBlobBytes());
  1860. ASSERT_EQ(compaction_stats[0].num_output_files, 1);
  1861. ASSERT_EQ(compaction_stats[0].num_output_files_blob, 1);
  1862. const uint64_t* const cf_stats_value = internal_stats->TEST_GetCFStatsValue();
  1863. ASSERT_EQ(cf_stats_value[InternalStats::BYTES_FLUSHED],
  1864. compaction_stats[0].bytes_written +
  1865. compaction_stats[0].bytes_written_blob);
  1866. }
  1867. TEST_F(DBFlushTest, FlushWithChecksumHandoff1) {
  1868. if (mem_env_ || encrypted_env_) {
  1869. ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment");
  1870. return;
  1871. }
  1872. std::shared_ptr<FaultInjectionTestFS> fault_fs(
  1873. new FaultInjectionTestFS(FileSystem::Default()));
  1874. std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
  1875. Options options = CurrentOptions();
  1876. options.write_buffer_size = 100;
  1877. options.max_write_buffer_number = 4;
  1878. options.min_write_buffer_number_to_merge = 3;
  1879. options.disable_auto_compactions = true;
  1880. options.env = fault_fs_env.get();
  1881. options.checksum_handoff_file_types.Add(FileType::kTableFile);
  1882. Reopen(options);
  1883. fault_fs->SetChecksumHandoffFuncType(ChecksumType::kCRC32c);
  1884. ASSERT_OK(Put("key1", "value1"));
  1885. ASSERT_OK(Put("key2", "value2"));
  1886. ASSERT_OK(dbfull()->TEST_SwitchMemtable());
  1887. // The hash does not match, write fails
  1888. // fault_fs->SetChecksumHandoffFuncType(ChecksumType::kxxHash);
  1889. // Since the file system returns IOStatus::Corruption, it is an
  1890. // unrecoverable error.
  1891. SyncPoint::GetInstance()->SetCallBack("FlushJob::Start", [&](void*) {
  1892. fault_fs->SetChecksumHandoffFuncType(ChecksumType::kxxHash);
  1893. });
  1894. ASSERT_OK(Put("key3", "value3"));
  1895. ASSERT_OK(Put("key4", "value4"));
  1896. SyncPoint::GetInstance()->EnableProcessing();
  1897. Status s = Flush();
  1898. ASSERT_EQ(s.severity(),
  1899. ROCKSDB_NAMESPACE::Status::Severity::kUnrecoverableError);
  1900. SyncPoint::GetInstance()->DisableProcessing();
  1901. Destroy(options);
  1902. Reopen(options);
  1903. // The file system does not support checksum handoff. The check
  1904. // will be ignored.
  1905. fault_fs->SetChecksumHandoffFuncType(ChecksumType::kNoChecksum);
  1906. ASSERT_OK(Put("key5", "value5"));
  1907. ASSERT_OK(Put("key6", "value6"));
  1908. ASSERT_OK(dbfull()->TEST_SwitchMemtable());
  1909. // Each write will be similated as corrupted.
  1910. // Since the file system returns IOStatus::Corruption, it is an
  1911. // unrecoverable error.
  1912. fault_fs->SetChecksumHandoffFuncType(ChecksumType::kCRC32c);
  1913. SyncPoint::GetInstance()->SetCallBack("FlushJob::Start", [&](void*) {
  1914. fault_fs->IngestDataCorruptionBeforeWrite();
  1915. });
  1916. ASSERT_OK(Put("key7", "value7"));
  1917. ASSERT_OK(Put("key8", "value8"));
  1918. SyncPoint::GetInstance()->EnableProcessing();
  1919. s = Flush();
  1920. ASSERT_EQ(s.severity(),
  1921. ROCKSDB_NAMESPACE::Status::Severity::kUnrecoverableError);
  1922. SyncPoint::GetInstance()->DisableProcessing();
  1923. Destroy(options);
  1924. }
  1925. TEST_F(DBFlushTest, FlushWithChecksumHandoff2) {
  1926. if (mem_env_ || encrypted_env_) {
  1927. ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment");
  1928. return;
  1929. }
  1930. std::shared_ptr<FaultInjectionTestFS> fault_fs(
  1931. new FaultInjectionTestFS(FileSystem::Default()));
  1932. std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
  1933. Options options = CurrentOptions();
  1934. options.write_buffer_size = 100;
  1935. options.max_write_buffer_number = 4;
  1936. options.min_write_buffer_number_to_merge = 3;
  1937. options.disable_auto_compactions = true;
  1938. options.env = fault_fs_env.get();
  1939. Reopen(options);
  1940. fault_fs->SetChecksumHandoffFuncType(ChecksumType::kCRC32c);
  1941. ASSERT_OK(Put("key1", "value1"));
  1942. ASSERT_OK(Put("key2", "value2"));
  1943. ASSERT_OK(Flush());
  1944. // options is not set, the checksum handoff will not be triggered
  1945. SyncPoint::GetInstance()->SetCallBack("FlushJob::Start", [&](void*) {
  1946. fault_fs->SetChecksumHandoffFuncType(ChecksumType::kxxHash);
  1947. });
  1948. ASSERT_OK(Put("key3", "value3"));
  1949. ASSERT_OK(Put("key4", "value4"));
  1950. SyncPoint::GetInstance()->EnableProcessing();
  1951. ASSERT_OK(Flush());
  1952. SyncPoint::GetInstance()->DisableProcessing();
  1953. Destroy(options);
  1954. Reopen(options);
  1955. // The file system does not support checksum handoff. The check
  1956. // will be ignored.
  1957. fault_fs->SetChecksumHandoffFuncType(ChecksumType::kNoChecksum);
  1958. ASSERT_OK(Put("key5", "value5"));
  1959. ASSERT_OK(Put("key6", "value6"));
  1960. ASSERT_OK(Flush());
  1961. // options is not set, the checksum handoff will not be triggered
  1962. fault_fs->SetChecksumHandoffFuncType(ChecksumType::kCRC32c);
  1963. SyncPoint::GetInstance()->SetCallBack("FlushJob::Start", [&](void*) {
  1964. fault_fs->IngestDataCorruptionBeforeWrite();
  1965. });
  1966. ASSERT_OK(Put("key7", "value7"));
  1967. ASSERT_OK(Put("key8", "value8"));
  1968. SyncPoint::GetInstance()->EnableProcessing();
  1969. ASSERT_OK(Flush());
  1970. SyncPoint::GetInstance()->DisableProcessing();
  1971. Destroy(options);
  1972. }
  1973. TEST_F(DBFlushTest, FlushWithChecksumHandoffManifest1) {
  1974. if (mem_env_ || encrypted_env_) {
  1975. ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment");
  1976. return;
  1977. }
  1978. std::shared_ptr<FaultInjectionTestFS> fault_fs(
  1979. new FaultInjectionTestFS(FileSystem::Default()));
  1980. std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
  1981. Options options = CurrentOptions();
  1982. options.write_buffer_size = 100;
  1983. options.max_write_buffer_number = 4;
  1984. options.min_write_buffer_number_to_merge = 3;
  1985. options.disable_auto_compactions = true;
  1986. options.env = fault_fs_env.get();
  1987. options.checksum_handoff_file_types.Add(FileType::kDescriptorFile);
  1988. fault_fs->SetChecksumHandoffFuncType(ChecksumType::kCRC32c);
  1989. Reopen(options);
  1990. ASSERT_OK(Put("key1", "value1"));
  1991. ASSERT_OK(Put("key2", "value2"));
  1992. ASSERT_OK(Flush());
  1993. // The hash does not match, write fails
  1994. // fault_fs->SetChecksumHandoffFuncType(ChecksumType::kxxHash);
  1995. // Since the file system returns IOStatus::Corruption, it is mapped to
  1996. // kFatalError error.
  1997. ASSERT_OK(Put("key3", "value3"));
  1998. SyncPoint::GetInstance()->SetCallBack(
  1999. "VersionSet::LogAndApply:WriteManifest", [&](void*) {
  2000. fault_fs->SetChecksumHandoffFuncType(ChecksumType::kxxHash);
  2001. });
  2002. ASSERT_OK(Put("key3", "value3"));
  2003. ASSERT_OK(Put("key4", "value4"));
  2004. SyncPoint::GetInstance()->EnableProcessing();
  2005. Status s = Flush();
  2006. ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kFatalError);
  2007. SyncPoint::GetInstance()->DisableProcessing();
  2008. Destroy(options);
  2009. }
  2010. TEST_F(DBFlushTest, FlushWithChecksumHandoffManifest2) {
  2011. if (mem_env_ || encrypted_env_) {
  2012. ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment");
  2013. return;
  2014. }
  2015. std::shared_ptr<FaultInjectionTestFS> fault_fs(
  2016. new FaultInjectionTestFS(FileSystem::Default()));
  2017. std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
  2018. Options options = CurrentOptions();
  2019. options.write_buffer_size = 100;
  2020. options.max_write_buffer_number = 4;
  2021. options.min_write_buffer_number_to_merge = 3;
  2022. options.disable_auto_compactions = true;
  2023. options.env = fault_fs_env.get();
  2024. options.checksum_handoff_file_types.Add(FileType::kDescriptorFile);
  2025. fault_fs->SetChecksumHandoffFuncType(ChecksumType::kNoChecksum);
  2026. Reopen(options);
  2027. // The file system does not support checksum handoff. The check
  2028. // will be ignored.
  2029. ASSERT_OK(Put("key5", "value5"));
  2030. ASSERT_OK(Put("key6", "value6"));
  2031. ASSERT_OK(Flush());
  2032. // Each write will be similated as corrupted.
  2033. // Since the file system returns IOStatus::Corruption, it is mapped to
  2034. // kFatalError error.
  2035. fault_fs->SetChecksumHandoffFuncType(ChecksumType::kCRC32c);
  2036. SyncPoint::GetInstance()->SetCallBack(
  2037. "VersionSet::LogAndApply:WriteManifest",
  2038. [&](void*) { fault_fs->IngestDataCorruptionBeforeWrite(); });
  2039. ASSERT_OK(Put("key7", "value7"));
  2040. ASSERT_OK(Put("key8", "value8"));
  2041. SyncPoint::GetInstance()->EnableProcessing();
  2042. Status s = Flush();
  2043. ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kFatalError);
  2044. SyncPoint::GetInstance()->DisableProcessing();
  2045. Destroy(options);
  2046. }
  2047. TEST_F(DBFlushTest, PickRightMemtables) {
  2048. Options options = CurrentOptions();
  2049. DestroyAndReopen(options);
  2050. options.create_if_missing = true;
  2051. const std::string test_cf_name = "test_cf";
  2052. options.max_write_buffer_number = 128;
  2053. CreateColumnFamilies({test_cf_name}, options);
  2054. Close();
  2055. ReopenWithColumnFamilies({kDefaultColumnFamilyName, test_cf_name}, options);
  2056. ASSERT_OK(db_->Put(WriteOptions(), "key", "value"));
  2057. ASSERT_OK(db_->Put(WriteOptions(), handles_[1], "key", "value"));
  2058. SyncPoint::GetInstance()->DisableProcessing();
  2059. SyncPoint::GetInstance()->ClearAllCallBacks();
  2060. SyncPoint::GetInstance()->SetCallBack(
  2061. "DBImpl::SyncClosedWals:BeforeReLock", [&](void* /*arg*/) {
  2062. ASSERT_OK(db_->Put(WriteOptions(), handles_[1], "what", "v"));
  2063. auto* cfhi =
  2064. static_cast_with_check<ColumnFamilyHandleImpl>(handles_[1]);
  2065. assert(cfhi);
  2066. ASSERT_OK(dbfull()->TEST_SwitchMemtable(cfhi->cfd()));
  2067. });
  2068. SyncPoint::GetInstance()->SetCallBack(
  2069. "DBImpl::FlushMemTableToOutputFile:AfterPickMemtables", [&](void* arg) {
  2070. auto* job = static_cast<FlushJob*>(arg);
  2071. assert(job);
  2072. const auto& mems = job->GetMemTables();
  2073. assert(mems.size() == 1);
  2074. assert(mems[0]);
  2075. ASSERT_EQ(1, mems[0]->GetID());
  2076. });
  2077. SyncPoint::GetInstance()->EnableProcessing();
  2078. ASSERT_OK(db_->Flush(FlushOptions(), handles_[1]));
  2079. SyncPoint::GetInstance()->DisableProcessing();
  2080. SyncPoint::GetInstance()->ClearAllCallBacks();
  2081. }
  2082. class DBFlushTestBlobError : public DBFlushTest,
  2083. public testing::WithParamInterface<std::string> {
  2084. public:
  2085. DBFlushTestBlobError() : sync_point_(GetParam()) {}
  2086. std::string sync_point_;
  2087. };
  2088. INSTANTIATE_TEST_CASE_P(DBFlushTestBlobError, DBFlushTestBlobError,
  2089. ::testing::ValuesIn(std::vector<std::string>{
  2090. "BlobFileBuilder::WriteBlobToFile:AddRecord",
  2091. "BlobFileBuilder::WriteBlobToFile:AppendFooter"}));
  2092. TEST_P(DBFlushTestBlobError, FlushError) {
  2093. Options options;
  2094. options.enable_blob_files = true;
  2095. options.disable_auto_compactions = true;
  2096. options.env = env_;
  2097. Reopen(options);
  2098. ASSERT_OK(Put("key", "blob"));
  2099. SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* arg) {
  2100. Status* const s = static_cast<Status*>(arg);
  2101. assert(s);
  2102. (*s) = Status::IOError(sync_point_);
  2103. });
  2104. SyncPoint::GetInstance()->EnableProcessing();
  2105. ASSERT_NOK(Flush());
  2106. SyncPoint::GetInstance()->DisableProcessing();
  2107. SyncPoint::GetInstance()->ClearAllCallBacks();
  2108. VersionSet* const versions = dbfull()->GetVersionSet();
  2109. assert(versions);
  2110. ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
  2111. assert(cfd);
  2112. Version* const current = cfd->current();
  2113. assert(current);
  2114. const VersionStorageInfo* const storage_info = current->storage_info();
  2115. assert(storage_info);
  2116. const auto& l0_files = storage_info->LevelFiles(0);
  2117. ASSERT_TRUE(l0_files.empty());
  2118. const auto& blob_files = storage_info->GetBlobFiles();
  2119. ASSERT_TRUE(blob_files.empty());
  2120. // Make sure the files generated by the failed job have been deleted
  2121. std::vector<std::string> files;
  2122. ASSERT_OK(env_->GetChildren(dbname_, &files));
  2123. for (const auto& file : files) {
  2124. uint64_t number = 0;
  2125. FileType type = kTableFile;
  2126. if (!ParseFileName(file, &number, &type)) {
  2127. continue;
  2128. }
  2129. ASSERT_NE(type, kTableFile);
  2130. ASSERT_NE(type, kBlobFile);
  2131. }
  2132. const InternalStats* const internal_stats = cfd->internal_stats();
  2133. assert(internal_stats);
  2134. const auto& compaction_stats = internal_stats->TEST_GetCompactionStats();
  2135. ASSERT_FALSE(compaction_stats.empty());
  2136. if (sync_point_ == "BlobFileBuilder::WriteBlobToFile:AddRecord") {
  2137. ASSERT_EQ(compaction_stats[0].bytes_written, 0);
  2138. ASSERT_EQ(compaction_stats[0].bytes_written_blob, 0);
  2139. ASSERT_EQ(compaction_stats[0].num_output_files, 0);
  2140. ASSERT_EQ(compaction_stats[0].num_output_files_blob, 0);
  2141. } else {
  2142. // SST file writing succeeded; blob file writing failed (during Finish)
  2143. ASSERT_GT(compaction_stats[0].bytes_written, 0);
  2144. ASSERT_EQ(compaction_stats[0].bytes_written_blob, 0);
  2145. ASSERT_EQ(compaction_stats[0].num_output_files, 1);
  2146. ASSERT_EQ(compaction_stats[0].num_output_files_blob, 0);
  2147. }
  2148. const uint64_t* const cf_stats_value = internal_stats->TEST_GetCFStatsValue();
  2149. ASSERT_EQ(cf_stats_value[InternalStats::BYTES_FLUSHED],
  2150. compaction_stats[0].bytes_written +
  2151. compaction_stats[0].bytes_written_blob);
  2152. }
  2153. TEST_F(DBFlushTest, TombstoneVisibleInSnapshot) {
  2154. class SimpleTestFlushListener : public EventListener {
  2155. public:
  2156. explicit SimpleTestFlushListener(DBFlushTest* _test) : test_(_test) {}
  2157. ~SimpleTestFlushListener() override = default;
  2158. void OnFlushBegin(DB* db, const FlushJobInfo& info) override {
  2159. ASSERT_EQ(static_cast<uint32_t>(0), info.cf_id);
  2160. ASSERT_OK(db->Delete(WriteOptions(), "foo"));
  2161. snapshot_ = db->GetSnapshot();
  2162. ASSERT_OK(db->Put(WriteOptions(), "foo", "value"));
  2163. auto* dbimpl = static_cast_with_check<DBImpl>(db);
  2164. assert(dbimpl);
  2165. ColumnFamilyHandle* cfh = db->DefaultColumnFamily();
  2166. auto* cfhi = static_cast_with_check<ColumnFamilyHandleImpl>(cfh);
  2167. assert(cfhi);
  2168. ASSERT_OK(dbimpl->TEST_SwitchMemtable(cfhi->cfd()));
  2169. }
  2170. DBFlushTest* test_ = nullptr;
  2171. const Snapshot* snapshot_ = nullptr;
  2172. };
  2173. Options options = CurrentOptions();
  2174. options.create_if_missing = true;
  2175. auto* listener = new SimpleTestFlushListener(this);
  2176. options.listeners.emplace_back(listener);
  2177. DestroyAndReopen(options);
  2178. ASSERT_OK(db_->Put(WriteOptions(), "foo", "value0"));
  2179. ManagedSnapshot snapshot_guard(db_);
  2180. ColumnFamilyHandle* default_cf = db_->DefaultColumnFamily();
  2181. ASSERT_OK(db_->Flush(FlushOptions(), default_cf));
  2182. const Snapshot* snapshot = listener->snapshot_;
  2183. assert(snapshot);
  2184. ReadOptions read_opts;
  2185. read_opts.snapshot = snapshot;
  2186. // Using snapshot should not see "foo".
  2187. {
  2188. std::string value;
  2189. Status s = db_->Get(read_opts, "foo", &value);
  2190. ASSERT_TRUE(s.IsNotFound());
  2191. }
  2192. db_->ReleaseSnapshot(snapshot);
  2193. }
  2194. TEST_P(DBAtomicFlushTest, ManualFlushUnder2PC) {
  2195. Options options = CurrentOptions();
  2196. options.create_if_missing = true;
  2197. options.allow_2pc = true;
  2198. options.atomic_flush = GetParam();
  2199. // 64MB so that memtable flush won't be trigger by the small writes.
  2200. options.write_buffer_size = (static_cast<size_t>(64) << 20);
  2201. auto flush_listener = std::make_shared<FlushCounterListener>();
  2202. flush_listener->expected_flush_reason = FlushReason::kManualFlush;
  2203. options.listeners.push_back(flush_listener);
  2204. // Destroy the DB to recreate as a TransactionDB.
  2205. Close();
  2206. Destroy(options, true);
  2207. // Create a TransactionDB.
  2208. TransactionDB* txn_db = nullptr;
  2209. TransactionDBOptions txn_db_opts;
  2210. txn_db_opts.write_policy = TxnDBWritePolicy::WRITE_COMMITTED;
  2211. ASSERT_OK(TransactionDB::Open(options, txn_db_opts, dbname_, &txn_db));
  2212. ASSERT_NE(txn_db, nullptr);
  2213. db_ = txn_db;
  2214. // Create two more columns other than default CF.
  2215. std::vector<std::string> cfs = {"puppy", "kitty"};
  2216. CreateColumnFamilies(cfs, options);
  2217. ASSERT_EQ(handles_.size(), 2);
  2218. ASSERT_EQ(handles_[0]->GetName(), cfs[0]);
  2219. ASSERT_EQ(handles_[1]->GetName(), cfs[1]);
  2220. const size_t kNumCfToFlush = options.atomic_flush ? 2 : 1;
  2221. WriteOptions wopts;
  2222. TransactionOptions txn_opts;
  2223. // txn1 only prepare, but does not commit.
  2224. // The WAL containing the prepared but uncommitted data must be kept.
  2225. Transaction* txn1 = txn_db->BeginTransaction(wopts, txn_opts, nullptr);
  2226. // txn2 not only prepare, but also commit.
  2227. Transaction* txn2 = txn_db->BeginTransaction(wopts, txn_opts, nullptr);
  2228. ASSERT_NE(txn1, nullptr);
  2229. ASSERT_NE(txn2, nullptr);
  2230. for (size_t i = 0; i < kNumCfToFlush; i++) {
  2231. ASSERT_OK(txn1->Put(handles_[i], "k1", "v1"));
  2232. ASSERT_OK(txn2->Put(handles_[i], "k2", "v2"));
  2233. }
  2234. // A txn must be named before prepare.
  2235. ASSERT_OK(txn1->SetName("txn1"));
  2236. ASSERT_OK(txn2->SetName("txn2"));
  2237. // Prepare writes to WAL, but not to memtable. (WriteCommitted)
  2238. ASSERT_OK(txn1->Prepare());
  2239. ASSERT_OK(txn2->Prepare());
  2240. // Commit writes to memtable.
  2241. ASSERT_OK(txn2->Commit());
  2242. delete txn1;
  2243. delete txn2;
  2244. // There are still data in memtable not flushed.
  2245. // But since data is small enough to reside in the active memtable,
  2246. // there are no immutable memtable.
  2247. for (size_t i = 0; i < kNumCfToFlush; i++) {
  2248. auto cfh = static_cast<ColumnFamilyHandleImpl*>(handles_[i]);
  2249. ASSERT_EQ(0, cfh->cfd()->imm()->NumNotFlushed());
  2250. ASSERT_FALSE(cfh->cfd()->mem()->IsEmpty());
  2251. }
  2252. // Atomic flush memtables,
  2253. // the min log with prepared data should be written to MANIFEST.
  2254. std::vector<ColumnFamilyHandle*> cfs_to_flush(kNumCfToFlush);
  2255. for (size_t i = 0; i < kNumCfToFlush; i++) {
  2256. cfs_to_flush[i] = handles_[i];
  2257. }
  2258. ASSERT_OK(txn_db->Flush(FlushOptions(), cfs_to_flush));
  2259. // There are no remaining data in memtable after flush.
  2260. for (size_t i = 0; i < kNumCfToFlush; i++) {
  2261. auto cfh = static_cast<ColumnFamilyHandleImpl*>(handles_[i]);
  2262. ASSERT_EQ(0, cfh->cfd()->imm()->NumNotFlushed());
  2263. ASSERT_TRUE(cfh->cfd()->mem()->IsEmpty());
  2264. }
  2265. // The recovered min log number with prepared data should be non-zero.
  2266. // In 2pc mode, MinLogNumberToKeep returns the
  2267. // VersionSet::min_log_number_to_keep recovered from MANIFEST, if it's 0,
  2268. // it means atomic flush didn't write the min_log_number_to_keep to MANIFEST.
  2269. cfs.push_back(kDefaultColumnFamilyName);
  2270. ASSERT_OK(TryReopenWithColumnFamilies(cfs, options));
  2271. DBImpl* db_impl = static_cast<DBImpl*>(db_);
  2272. ASSERT_TRUE(db_impl->allow_2pc());
  2273. ASSERT_NE(db_impl->MinLogNumberToKeep(), 0);
  2274. }
  2275. TEST_P(DBAtomicFlushTest, ManualAtomicFlush) {
  2276. Options options = CurrentOptions();
  2277. options.create_if_missing = true;
  2278. options.atomic_flush = GetParam();
  2279. options.write_buffer_size = (static_cast<size_t>(64) << 20);
  2280. auto flush_listener = std::make_shared<FlushCounterListener>();
  2281. flush_listener->expected_flush_reason = FlushReason::kManualFlush;
  2282. options.listeners.push_back(flush_listener);
  2283. CreateAndReopenWithCF({"pikachu", "eevee"}, options);
  2284. size_t num_cfs = handles_.size();
  2285. ASSERT_EQ(3, num_cfs);
  2286. WriteOptions wopts;
  2287. wopts.disableWAL = true;
  2288. for (size_t i = 0; i != num_cfs; ++i) {
  2289. ASSERT_OK(Put(static_cast<int>(i) /*cf*/, "key", "value", wopts));
  2290. }
  2291. for (size_t i = 0; i != num_cfs; ++i) {
  2292. auto cfh = static_cast<ColumnFamilyHandleImpl*>(handles_[i]);
  2293. ASSERT_EQ(0, cfh->cfd()->imm()->NumNotFlushed());
  2294. ASSERT_FALSE(cfh->cfd()->mem()->IsEmpty());
  2295. }
  2296. std::vector<int> cf_ids;
  2297. for (size_t i = 0; i != num_cfs; ++i) {
  2298. cf_ids.emplace_back(static_cast<int>(i));
  2299. }
  2300. ASSERT_OK(Flush(cf_ids));
  2301. for (size_t i = 0; i != num_cfs; ++i) {
  2302. auto cfh = static_cast<ColumnFamilyHandleImpl*>(handles_[i]);
  2303. ASSERT_EQ(0, cfh->cfd()->imm()->NumNotFlushed());
  2304. ASSERT_TRUE(cfh->cfd()->mem()->IsEmpty());
  2305. }
  2306. }
  2307. TEST_P(DBAtomicFlushTest, PrecomputeMinLogNumberToKeepNon2PC) {
  2308. Options options = CurrentOptions();
  2309. options.create_if_missing = true;
  2310. options.atomic_flush = GetParam();
  2311. options.write_buffer_size = (static_cast<size_t>(64) << 20);
  2312. CreateAndReopenWithCF({"pikachu"}, options);
  2313. const size_t num_cfs = handles_.size();
  2314. ASSERT_EQ(num_cfs, 2);
  2315. WriteOptions wopts;
  2316. for (size_t i = 0; i != num_cfs; ++i) {
  2317. ASSERT_OK(Put(static_cast<int>(i) /*cf*/, "key", "value", wopts));
  2318. }
  2319. {
  2320. // Flush the default CF only.
  2321. std::vector<int> cf_ids{0};
  2322. ASSERT_OK(Flush(cf_ids));
  2323. autovector<ColumnFamilyData*> flushed_cfds;
  2324. autovector<autovector<VersionEdit*>> flush_edits;
  2325. auto flushed_cfh = static_cast<ColumnFamilyHandleImpl*>(handles_[0]);
  2326. flushed_cfds.push_back(flushed_cfh->cfd());
  2327. flush_edits.push_back({});
  2328. auto unflushed_cfh = static_cast<ColumnFamilyHandleImpl*>(handles_[1]);
  2329. ASSERT_EQ(PrecomputeMinLogNumberToKeepNon2PC(dbfull()->GetVersionSet(),
  2330. flushed_cfds, flush_edits),
  2331. unflushed_cfh->cfd()->GetLogNumber());
  2332. }
  2333. {
  2334. // Flush all CFs.
  2335. std::vector<int> cf_ids;
  2336. for (size_t i = 0; i != num_cfs; ++i) {
  2337. cf_ids.emplace_back(static_cast<int>(i));
  2338. }
  2339. ASSERT_OK(Flush(cf_ids));
  2340. uint64_t log_num_after_flush = dbfull()->TEST_GetCurrentLogNumber();
  2341. uint64_t min_log_number_to_keep = std::numeric_limits<uint64_t>::max();
  2342. autovector<ColumnFamilyData*> flushed_cfds;
  2343. autovector<autovector<VersionEdit*>> flush_edits;
  2344. for (size_t i = 0; i != num_cfs; ++i) {
  2345. auto cfh = static_cast<ColumnFamilyHandleImpl*>(handles_[i]);
  2346. flushed_cfds.push_back(cfh->cfd());
  2347. flush_edits.push_back({});
  2348. min_log_number_to_keep =
  2349. std::min(min_log_number_to_keep, cfh->cfd()->GetLogNumber());
  2350. }
  2351. ASSERT_EQ(min_log_number_to_keep, log_num_after_flush);
  2352. ASSERT_EQ(PrecomputeMinLogNumberToKeepNon2PC(dbfull()->GetVersionSet(),
  2353. flushed_cfds, flush_edits),
  2354. min_log_number_to_keep);
  2355. }
  2356. }
  2357. TEST_P(DBAtomicFlushTest, AtomicFlushTriggeredByMemTableFull) {
  2358. Options options = CurrentOptions();
  2359. options.create_if_missing = true;
  2360. options.atomic_flush = GetParam();
  2361. // 4KB so that we can easily trigger auto flush.
  2362. options.write_buffer_size = 4096;
  2363. SyncPoint::GetInstance()->LoadDependency(
  2364. {{"DBImpl::BackgroundCallFlush:FlushFinish:0",
  2365. "DBAtomicFlushTest::AtomicFlushTriggeredByMemTableFull:BeforeCheck"}});
  2366. SyncPoint::GetInstance()->EnableProcessing();
  2367. CreateAndReopenWithCF({"pikachu", "eevee"}, options);
  2368. size_t num_cfs = handles_.size();
  2369. ASSERT_EQ(3, num_cfs);
  2370. WriteOptions wopts;
  2371. wopts.disableWAL = true;
  2372. for (size_t i = 0; i != num_cfs; ++i) {
  2373. ASSERT_OK(Put(static_cast<int>(i) /*cf*/, "key", "value", wopts));
  2374. }
  2375. // Keep writing to one of them column families to trigger auto flush.
  2376. for (int i = 0; i != 4000; ++i) {
  2377. ASSERT_OK(Put(static_cast<int>(num_cfs) - 1 /*cf*/,
  2378. "key" + std::to_string(i), "value" + std::to_string(i),
  2379. wopts));
  2380. }
  2381. TEST_SYNC_POINT(
  2382. "DBAtomicFlushTest::AtomicFlushTriggeredByMemTableFull:BeforeCheck");
  2383. if (options.atomic_flush) {
  2384. for (size_t i = 0; i + 1 != num_cfs; ++i) {
  2385. auto cfh = static_cast<ColumnFamilyHandleImpl*>(handles_[i]);
  2386. ASSERT_EQ(0, cfh->cfd()->imm()->NumNotFlushed());
  2387. ASSERT_TRUE(cfh->cfd()->mem()->IsEmpty());
  2388. }
  2389. } else {
  2390. for (size_t i = 0; i + 1 != num_cfs; ++i) {
  2391. auto cfh = static_cast<ColumnFamilyHandleImpl*>(handles_[i]);
  2392. ASSERT_EQ(0, cfh->cfd()->imm()->NumNotFlushed());
  2393. ASSERT_FALSE(cfh->cfd()->mem()->IsEmpty());
  2394. }
  2395. }
  2396. SyncPoint::GetInstance()->DisableProcessing();
  2397. }
  2398. TEST_P(DBAtomicFlushTest, AtomicFlushRollbackSomeJobs) {
  2399. bool atomic_flush = GetParam();
  2400. if (!atomic_flush) {
  2401. return;
  2402. }
  2403. std::unique_ptr<FaultInjectionTestEnv> fault_injection_env(
  2404. new FaultInjectionTestEnv(env_));
  2405. Options options = CurrentOptions();
  2406. options.create_if_missing = true;
  2407. options.atomic_flush = atomic_flush;
  2408. options.env = fault_injection_env.get();
  2409. SyncPoint::GetInstance()->DisableProcessing();
  2410. SyncPoint::GetInstance()->LoadDependency(
  2411. {{"DBImpl::AtomicFlushMemTablesToOutputFiles:SomeFlushJobsComplete:1",
  2412. "DBAtomicFlushTest::AtomicFlushRollbackSomeJobs:1"},
  2413. {"DBAtomicFlushTest::AtomicFlushRollbackSomeJobs:2",
  2414. "DBImpl::AtomicFlushMemTablesToOutputFiles:SomeFlushJobsComplete:2"}});
  2415. SyncPoint::GetInstance()->EnableProcessing();
  2416. CreateAndReopenWithCF({"pikachu", "eevee"}, options);
  2417. size_t num_cfs = handles_.size();
  2418. ASSERT_EQ(3, num_cfs);
  2419. WriteOptions wopts;
  2420. wopts.disableWAL = true;
  2421. for (size_t i = 0; i != num_cfs; ++i) {
  2422. int cf_id = static_cast<int>(i);
  2423. ASSERT_OK(Put(cf_id, "key", "value", wopts));
  2424. }
  2425. FlushOptions flush_opts;
  2426. flush_opts.wait = false;
  2427. ASSERT_OK(dbfull()->Flush(flush_opts, handles_));
  2428. TEST_SYNC_POINT("DBAtomicFlushTest::AtomicFlushRollbackSomeJobs:1");
  2429. fault_injection_env->SetFilesystemActive(false);
  2430. TEST_SYNC_POINT("DBAtomicFlushTest::AtomicFlushRollbackSomeJobs:2");
  2431. for (auto* cfh : handles_) {
  2432. // Returns the IO error happend during flush.
  2433. ASSERT_NOK(dbfull()->TEST_WaitForFlushMemTable(cfh));
  2434. }
  2435. for (size_t i = 0; i != num_cfs; ++i) {
  2436. auto cfh = static_cast<ColumnFamilyHandleImpl*>(handles_[i]);
  2437. ASSERT_EQ(1, cfh->cfd()->imm()->NumNotFlushed());
  2438. ASSERT_TRUE(cfh->cfd()->mem()->IsEmpty());
  2439. }
  2440. fault_injection_env->SetFilesystemActive(true);
  2441. Destroy(options);
  2442. }
  2443. TEST_P(DBAtomicFlushTest, FlushMultipleCFs_DropSomeBeforeRequestFlush) {
  2444. bool atomic_flush = GetParam();
  2445. if (!atomic_flush) {
  2446. return;
  2447. }
  2448. Options options = CurrentOptions();
  2449. options.create_if_missing = true;
  2450. options.atomic_flush = atomic_flush;
  2451. SyncPoint::GetInstance()->DisableProcessing();
  2452. SyncPoint::GetInstance()->ClearAllCallBacks();
  2453. SyncPoint::GetInstance()->EnableProcessing();
  2454. CreateAndReopenWithCF({"pikachu", "eevee"}, options);
  2455. size_t num_cfs = handles_.size();
  2456. ASSERT_EQ(3, num_cfs);
  2457. WriteOptions wopts;
  2458. wopts.disableWAL = true;
  2459. std::vector<int> cf_ids;
  2460. for (size_t i = 0; i != num_cfs; ++i) {
  2461. int cf_id = static_cast<int>(i);
  2462. ASSERT_OK(Put(cf_id, "key", "value", wopts));
  2463. cf_ids.push_back(cf_id);
  2464. }
  2465. ASSERT_OK(dbfull()->DropColumnFamily(handles_[1]));
  2466. ASSERT_TRUE(Flush(cf_ids).IsColumnFamilyDropped());
  2467. Destroy(options);
  2468. }
  2469. TEST_P(DBAtomicFlushTest,
  2470. FlushMultipleCFs_DropSomeAfterScheduleFlushBeforeFlushJobRun) {
  2471. bool atomic_flush = GetParam();
  2472. if (!atomic_flush) {
  2473. return;
  2474. }
  2475. Options options = CurrentOptions();
  2476. options.create_if_missing = true;
  2477. options.atomic_flush = atomic_flush;
  2478. CreateAndReopenWithCF({"pikachu", "eevee"}, options);
  2479. SyncPoint::GetInstance()->DisableProcessing();
  2480. SyncPoint::GetInstance()->ClearAllCallBacks();
  2481. SyncPoint::GetInstance()->LoadDependency(
  2482. {{"DBImpl::AtomicFlushMemTables:AfterScheduleFlush",
  2483. "DBAtomicFlushTest::BeforeDropCF"},
  2484. {"DBAtomicFlushTest::AfterDropCF",
  2485. "DBImpl::BackgroundCallFlush:start"}});
  2486. SyncPoint::GetInstance()->EnableProcessing();
  2487. size_t num_cfs = handles_.size();
  2488. ASSERT_EQ(3, num_cfs);
  2489. WriteOptions wopts;
  2490. wopts.disableWAL = true;
  2491. for (size_t i = 0; i != num_cfs; ++i) {
  2492. int cf_id = static_cast<int>(i);
  2493. ASSERT_OK(Put(cf_id, "key", "value", wopts));
  2494. }
  2495. port::Thread user_thread([&]() {
  2496. TEST_SYNC_POINT("DBAtomicFlushTest::BeforeDropCF");
  2497. ASSERT_OK(dbfull()->DropColumnFamily(handles_[1]));
  2498. TEST_SYNC_POINT("DBAtomicFlushTest::AfterDropCF");
  2499. });
  2500. FlushOptions flush_opts;
  2501. flush_opts.wait = true;
  2502. ASSERT_OK(dbfull()->Flush(flush_opts, handles_));
  2503. user_thread.join();
  2504. for (size_t i = 0; i != num_cfs; ++i) {
  2505. int cf_id = static_cast<int>(i);
  2506. ASSERT_EQ("value", Get(cf_id, "key"));
  2507. }
  2508. ReopenWithColumnFamilies({kDefaultColumnFamilyName, "eevee"}, options);
  2509. num_cfs = handles_.size();
  2510. ASSERT_EQ(2, num_cfs);
  2511. for (size_t i = 0; i != num_cfs; ++i) {
  2512. int cf_id = static_cast<int>(i);
  2513. ASSERT_EQ("value", Get(cf_id, "key"));
  2514. }
  2515. Destroy(options);
  2516. }
  2517. TEST_P(DBAtomicFlushTest, TriggerFlushAndClose) {
  2518. bool atomic_flush = GetParam();
  2519. if (!atomic_flush) {
  2520. return;
  2521. }
  2522. const int kNumKeysTriggerFlush = 4;
  2523. Options options = CurrentOptions();
  2524. options.create_if_missing = true;
  2525. options.atomic_flush = atomic_flush;
  2526. options.memtable_factory.reset(
  2527. test::NewSpecialSkipListFactory(kNumKeysTriggerFlush));
  2528. CreateAndReopenWithCF({"pikachu"}, options);
  2529. for (int i = 0; i != kNumKeysTriggerFlush; ++i) {
  2530. ASSERT_OK(Put(0, "key" + std::to_string(i), "value" + std::to_string(i)));
  2531. }
  2532. SyncPoint::GetInstance()->DisableProcessing();
  2533. SyncPoint::GetInstance()->ClearAllCallBacks();
  2534. SyncPoint::GetInstance()->EnableProcessing();
  2535. ASSERT_OK(Put(0, "key", "value"));
  2536. Close();
  2537. ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options);
  2538. ASSERT_EQ("value", Get(0, "key"));
  2539. }
  2540. TEST_P(DBAtomicFlushTest, PickMemtablesRaceWithBackgroundFlush) {
  2541. bool atomic_flush = GetParam();
  2542. Options options = CurrentOptions();
  2543. options.create_if_missing = true;
  2544. options.atomic_flush = atomic_flush;
  2545. options.max_write_buffer_number = 4;
  2546. // Set min_write_buffer_number_to_merge to be greater than 1, so that
  2547. // a column family with one memtable in the imm will not cause IsFlushPending
  2548. // to return true when flush_requested_ is false.
  2549. options.min_write_buffer_number_to_merge = 2;
  2550. CreateAndReopenWithCF({"pikachu"}, options);
  2551. ASSERT_EQ(2, handles_.size());
  2552. ASSERT_OK(dbfull()->PauseBackgroundWork());
  2553. ASSERT_OK(Put(0, "key00", "value00"));
  2554. ASSERT_OK(Put(1, "key10", "value10"));
  2555. FlushOptions flush_opts;
  2556. flush_opts.wait = false;
  2557. ASSERT_OK(dbfull()->Flush(flush_opts, handles_));
  2558. ASSERT_OK(Put(0, "key01", "value01"));
  2559. // Since max_write_buffer_number is 4, the following flush won't cause write
  2560. // stall.
  2561. ASSERT_OK(dbfull()->Flush(flush_opts));
  2562. ASSERT_OK(dbfull()->DropColumnFamily(handles_[1]));
  2563. ASSERT_OK(dbfull()->DestroyColumnFamilyHandle(handles_[1]));
  2564. handles_[1] = nullptr;
  2565. ASSERT_OK(dbfull()->ContinueBackgroundWork());
  2566. ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[0]));
  2567. delete handles_[0];
  2568. handles_.clear();
  2569. }
  2570. TEST_P(DBAtomicFlushTest, CFDropRaceWithWaitForFlushMemTables) {
  2571. bool atomic_flush = GetParam();
  2572. if (!atomic_flush) {
  2573. return;
  2574. }
  2575. Options options = CurrentOptions();
  2576. options.create_if_missing = true;
  2577. options.atomic_flush = atomic_flush;
  2578. CreateAndReopenWithCF({"pikachu"}, options);
  2579. SyncPoint::GetInstance()->DisableProcessing();
  2580. SyncPoint::GetInstance()->LoadDependency(
  2581. {{"DBImpl::AtomicFlushMemTables:AfterScheduleFlush",
  2582. "DBAtomicFlushTest::CFDropRaceWithWaitForFlushMemTables:BeforeDrop"},
  2583. {"DBAtomicFlushTest::CFDropRaceWithWaitForFlushMemTables:AfterFree",
  2584. "DBImpl::BackgroundCallFlush:start"},
  2585. {"DBImpl::BackgroundCallFlush:start",
  2586. "DBImpl::AtomicFlushMemTables:BeforeWaitForBgFlush"}});
  2587. SyncPoint::GetInstance()->EnableProcessing();
  2588. ASSERT_EQ(2, handles_.size());
  2589. ASSERT_OK(Put(0, "key", "value"));
  2590. ASSERT_OK(Put(1, "key", "value"));
  2591. auto* cfd_default =
  2592. static_cast<ColumnFamilyHandleImpl*>(dbfull()->DefaultColumnFamily())
  2593. ->cfd();
  2594. auto* cfd_pikachu = static_cast<ColumnFamilyHandleImpl*>(handles_[1])->cfd();
  2595. port::Thread drop_cf_thr([&]() {
  2596. TEST_SYNC_POINT(
  2597. "DBAtomicFlushTest::CFDropRaceWithWaitForFlushMemTables:BeforeDrop");
  2598. ASSERT_OK(dbfull()->DropColumnFamily(handles_[1]));
  2599. delete handles_[1];
  2600. handles_.resize(1);
  2601. TEST_SYNC_POINT(
  2602. "DBAtomicFlushTest::CFDropRaceWithWaitForFlushMemTables:AfterFree");
  2603. });
  2604. FlushOptions flush_opts;
  2605. flush_opts.allow_write_stall = true;
  2606. ASSERT_OK(dbfull()->TEST_AtomicFlushMemTables({cfd_default, cfd_pikachu},
  2607. flush_opts));
  2608. drop_cf_thr.join();
  2609. Close();
  2610. SyncPoint::GetInstance()->DisableProcessing();
  2611. }
  2612. TEST_P(DBAtomicFlushTest, RollbackAfterFailToInstallResults) {
  2613. bool atomic_flush = GetParam();
  2614. if (!atomic_flush) {
  2615. return;
  2616. }
  2617. auto fault_injection_env = std::make_shared<FaultInjectionTestEnv>(env_);
  2618. Options options = CurrentOptions();
  2619. options.env = fault_injection_env.get();
  2620. options.create_if_missing = true;
  2621. options.atomic_flush = atomic_flush;
  2622. CreateAndReopenWithCF({"pikachu"}, options);
  2623. ASSERT_EQ(2, handles_.size());
  2624. for (size_t cf = 0; cf < handles_.size(); ++cf) {
  2625. ASSERT_OK(Put(static_cast<int>(cf), "a", "value"));
  2626. }
  2627. SyncPoint::GetInstance()->DisableProcessing();
  2628. SyncPoint::GetInstance()->ClearAllCallBacks();
  2629. SyncPoint::GetInstance()->SetCallBack(
  2630. "VersionSet::ProcessManifestWrites:BeforeWriteLastVersionEdit:0",
  2631. [&](void* /*arg*/) { fault_injection_env->SetFilesystemActive(false); });
  2632. SyncPoint::GetInstance()->EnableProcessing();
  2633. FlushOptions flush_opts;
  2634. Status s = db_->Flush(flush_opts, handles_);
  2635. ASSERT_NOK(s);
  2636. fault_injection_env->SetFilesystemActive(true);
  2637. Close();
  2638. SyncPoint::GetInstance()->ClearAllCallBacks();
  2639. }
  2640. TEST_P(DBAtomicFlushTest, FailureInMultiCfAutomaticFlush) {
  2641. bool atomic_flush = GetParam();
  2642. auto fault_injection_env = std::make_shared<FaultInjectionTestEnv>(env_);
  2643. Options options = CurrentOptions();
  2644. options.env = fault_injection_env.get();
  2645. options.create_if_missing = true;
  2646. options.atomic_flush = atomic_flush;
  2647. const int kNumKeysTriggerFlush = 4;
  2648. options.memtable_factory.reset(
  2649. test::NewSpecialSkipListFactory(kNumKeysTriggerFlush));
  2650. CreateAndReopenWithCF({"pikachu"}, options);
  2651. ASSERT_EQ(2, handles_.size());
  2652. for (size_t cf = 0; cf < handles_.size(); ++cf) {
  2653. ASSERT_OK(Put(static_cast<int>(cf), "a", "value"));
  2654. }
  2655. SyncPoint::GetInstance()->DisableProcessing();
  2656. SyncPoint::GetInstance()->ClearAllCallBacks();
  2657. SyncPoint::GetInstance()->SetCallBack(
  2658. "DBImpl::ScheduleFlushes:PreSwitchMemtable",
  2659. [&](void* /*arg*/) { fault_injection_env->SetFilesystemActive(false); });
  2660. SyncPoint::GetInstance()->EnableProcessing();
  2661. for (int i = 1; i < kNumKeysTriggerFlush; ++i) {
  2662. ASSERT_OK(Put(0, "key" + std::to_string(i), "value" + std::to_string(i)));
  2663. }
  2664. ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
  2665. // Next write after failed flush should fail.
  2666. ASSERT_NOK(Put(0, "x", "y"));
  2667. fault_injection_env->SetFilesystemActive(true);
  2668. Close();
  2669. SyncPoint::GetInstance()->ClearAllCallBacks();
  2670. }
  2671. // In atomic flush, concurrent bg flush threads commit to the MANIFEST in
  2672. // serial, in the order of their picked memtables for each column family.
  2673. // Only when a bg flush thread finds out that its memtables are the earliest
  2674. // unflushed ones for all the included column families will this bg flush
  2675. // thread continue to commit to MANIFEST.
  2676. // This unit test uses sync point to coordinate the execution of two bg threads
  2677. // executing the same sequence of functions. The interleaving are as follows.
  2678. // time bg1 bg2
  2679. // | pick memtables to flush
  2680. // | flush memtables cf1_m1, cf2_m1
  2681. // | join MANIFEST write queue
  2682. // | pick memtabls to flush
  2683. // | flush memtables cf1_(m1+1)
  2684. // | join MANIFEST write queue
  2685. // | wait to write MANIFEST
  2686. // | write MANIFEST
  2687. // | IO error
  2688. // | detect IO error and stop waiting
  2689. // V
  2690. TEST_P(DBAtomicFlushTest, BgThreadNoWaitAfterManifestError) {
  2691. bool atomic_flush = GetParam();
  2692. if (!atomic_flush) {
  2693. return;
  2694. }
  2695. auto fault_injection_env = std::make_shared<FaultInjectionTestEnv>(env_);
  2696. Options options = GetDefaultOptions();
  2697. options.create_if_missing = true;
  2698. options.atomic_flush = true;
  2699. options.env = fault_injection_env.get();
  2700. // Set a larger value than default so that RocksDB can schedule concurrent
  2701. // background flush threads.
  2702. options.max_background_jobs = 8;
  2703. options.max_write_buffer_number = 8;
  2704. CreateAndReopenWithCF({"pikachu"}, options);
  2705. assert(2 == handles_.size());
  2706. WriteOptions write_opts;
  2707. write_opts.disableWAL = true;
  2708. ASSERT_OK(Put(0, "a", "v_0_a", write_opts));
  2709. ASSERT_OK(Put(1, "a", "v_1_a", write_opts));
  2710. SyncPoint::GetInstance()->DisableProcessing();
  2711. SyncPoint::GetInstance()->ClearAllCallBacks();
  2712. SyncPoint::GetInstance()->LoadDependency({
  2713. {"BgFlushThr2:WaitToCommit", "BgFlushThr1:BeforeWriteManifest"},
  2714. });
  2715. std::thread::id bg_flush_thr1, bg_flush_thr2;
  2716. SyncPoint::GetInstance()->SetCallBack(
  2717. "DBImpl::BackgroundCallFlush:start", [&](void*) {
  2718. if (bg_flush_thr1 == std::thread::id()) {
  2719. bg_flush_thr1 = std::this_thread::get_id();
  2720. } else if (bg_flush_thr2 == std::thread::id()) {
  2721. bg_flush_thr2 = std::this_thread::get_id();
  2722. }
  2723. });
  2724. int called = 0;
  2725. SyncPoint::GetInstance()->SetCallBack(
  2726. "DBImpl::AtomicFlushMemTablesToOutputFiles:WaitToCommit", [&](void* arg) {
  2727. if (std::this_thread::get_id() == bg_flush_thr2) {
  2728. const auto* ptr = reinterpret_cast<std::pair<Status, bool>*>(arg);
  2729. assert(ptr);
  2730. if (0 == called) {
  2731. // When bg flush thread 2 reaches here for the first time.
  2732. ASSERT_OK(ptr->first);
  2733. ASSERT_TRUE(ptr->second);
  2734. } else if (1 == called) {
  2735. // When bg flush thread 2 reaches here for the second time.
  2736. ASSERT_TRUE(ptr->first.IsIOError());
  2737. ASSERT_FALSE(ptr->second);
  2738. }
  2739. ++called;
  2740. TEST_SYNC_POINT("BgFlushThr2:WaitToCommit");
  2741. }
  2742. });
  2743. SyncPoint::GetInstance()->SetCallBack(
  2744. "VersionSet::ProcessManifestWrites:BeforeWriteLastVersionEdit:0",
  2745. [&](void*) {
  2746. if (std::this_thread::get_id() == bg_flush_thr1) {
  2747. TEST_SYNC_POINT("BgFlushThr1:BeforeWriteManifest");
  2748. }
  2749. });
  2750. SyncPoint::GetInstance()->SetCallBack(
  2751. "VersionSet::LogAndApply:WriteManifest", [&](void*) {
  2752. if (std::this_thread::get_id() != bg_flush_thr1) {
  2753. return;
  2754. }
  2755. ASSERT_OK(db_->Put(write_opts, "b", "v_1_b"));
  2756. FlushOptions flush_opts;
  2757. flush_opts.wait = false;
  2758. std::vector<ColumnFamilyHandle*> cfhs(1, db_->DefaultColumnFamily());
  2759. ASSERT_OK(dbfull()->Flush(flush_opts, cfhs));
  2760. });
  2761. SyncPoint::GetInstance()->SetCallBack(
  2762. "VersionSet::ProcessManifestWrites:AfterSyncManifest", [&](void* arg) {
  2763. auto* ptr = static_cast<IOStatus*>(arg);
  2764. assert(ptr);
  2765. *ptr = IOStatus::IOError("Injected failure");
  2766. });
  2767. SyncPoint::GetInstance()->EnableProcessing();
  2768. ASSERT_TRUE(dbfull()->Flush(FlushOptions(), handles_).IsIOError());
  2769. Close();
  2770. SyncPoint::GetInstance()->DisableProcessing();
  2771. SyncPoint::GetInstance()->ClearAllCallBacks();
  2772. }
  2773. TEST_P(DBAtomicFlushTest, NoWaitWhenWritesStopped) {
  2774. Options options = GetDefaultOptions();
  2775. options.create_if_missing = true;
  2776. options.atomic_flush = GetParam();
  2777. options.max_write_buffer_number = 2;
  2778. options.memtable_factory.reset(test::NewSpecialSkipListFactory(1));
  2779. Reopen(options);
  2780. SyncPoint::GetInstance()->DisableProcessing();
  2781. SyncPoint::GetInstance()->LoadDependency(
  2782. {{"DBImpl::DelayWrite:Start",
  2783. "DBAtomicFlushTest::NoWaitWhenWritesStopped:0"}});
  2784. SyncPoint::GetInstance()->EnableProcessing();
  2785. ASSERT_OK(dbfull()->PauseBackgroundWork());
  2786. for (int i = 0; i < options.max_write_buffer_number; ++i) {
  2787. ASSERT_OK(Put("k" + std::to_string(i), "v" + std::to_string(i)));
  2788. }
  2789. std::thread stalled_writer([&]() { ASSERT_OK(Put("k", "v")); });
  2790. TEST_SYNC_POINT("DBAtomicFlushTest::NoWaitWhenWritesStopped:0");
  2791. {
  2792. FlushOptions flush_opts;
  2793. flush_opts.wait = false;
  2794. flush_opts.allow_write_stall = true;
  2795. ASSERT_TRUE(db_->Flush(flush_opts).IsTryAgain());
  2796. }
  2797. ASSERT_OK(dbfull()->ContinueBackgroundWork());
  2798. ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
  2799. stalled_writer.join();
  2800. SyncPoint::GetInstance()->DisableProcessing();
  2801. }
  2802. INSTANTIATE_TEST_CASE_P(DBFlushDirectIOTest, DBFlushDirectIOTest,
  2803. testing::Bool());
  2804. INSTANTIATE_TEST_CASE_P(DBAtomicFlushTest, DBAtomicFlushTest, testing::Bool());
  2805. TEST_F(DBFlushTest, NonAtomicFlushRollbackPendingFlushes) {
  2806. // Fix a bug in when atomic_flush=false.
  2807. // The bug can happen as follows:
  2808. // Start Flush0 for memtable M0 to SST0
  2809. // Start Flush1 for memtable M1 to SST1
  2810. // Flush1 returns OK, but don't install to MANIFEST and let whoever flushes
  2811. // M0 to take care of it
  2812. // Flush0 finishes with a retryable IOError
  2813. // - It rollbacks M0, (incorrectly) not M1
  2814. // - Deletes SST1 and SST2
  2815. //
  2816. // Auto-recovery will start Flush2 for M0, it does not pick up M1 since it
  2817. // thinks that M1 is flushed
  2818. // Flush2 writes SST3 and finishes OK, tries to install SST3 and SST2
  2819. // Error opening SST2 since it's already deleted
  2820. //
  2821. // The fix is to let Flush0 also rollback M1.
  2822. Options opts = CurrentOptions();
  2823. opts.atomic_flush = false;
  2824. opts.memtable_factory.reset(test::NewSpecialSkipListFactory(1));
  2825. opts.max_write_buffer_number = 64;
  2826. opts.max_background_flushes = 4;
  2827. env_->SetBackgroundThreads(4, Env::HIGH);
  2828. DestroyAndReopen(opts);
  2829. std::atomic_int flush_count = 0;
  2830. SyncPoint::GetInstance()->ClearAllCallBacks();
  2831. SyncPoint::GetInstance()->DisableProcessing();
  2832. SyncPoint::GetInstance()->SetCallBack(
  2833. "FlushJob::WriteLevel0Table:s", [&](void* s_ptr) {
  2834. int c = flush_count.fetch_add(1);
  2835. if (c == 0) {
  2836. Status* s = (Status*)(s_ptr);
  2837. IOStatus io_error = IOStatus::IOError("injected foobar");
  2838. io_error.SetRetryable(true);
  2839. *s = io_error;
  2840. TEST_SYNC_POINT("Let mem1 flush start");
  2841. TEST_SYNC_POINT("Wait for mem1 flush to finish");
  2842. }
  2843. });
  2844. SyncPoint::GetInstance()->LoadDependency(
  2845. {{"Let mem1 flush start", "Mem1 flush starts"},
  2846. {"DBImpl::BGWorkFlush:done", "Wait for mem1 flush to finish"},
  2847. {"RecoverFromRetryableBGIOError:RecoverSuccess",
  2848. "Wait for error recover"}});
  2849. // Need first flush to wait for the second flush to finish
  2850. SyncPoint::GetInstance()->EnableProcessing();
  2851. ASSERT_OK(Put(Key(1), "val1"));
  2852. // trigger bg flush mem0
  2853. ASSERT_OK(Put(Key(2), "val2"));
  2854. TEST_SYNC_POINT("Mem1 flush starts");
  2855. // trigger bg flush mem1
  2856. ASSERT_OK(Put(Key(3), "val3"));
  2857. TEST_SYNC_POINT("Wait for error recover");
  2858. ASSERT_EQ(1, NumTableFilesAtLevel(0));
  2859. SyncPoint::GetInstance()->ClearAllCallBacks();
  2860. SyncPoint::GetInstance()->DisableProcessing();
  2861. }
  2862. TEST_F(DBFlushTest, AbortNonAtomicFlushWhenBGError) {
  2863. // Fix a bug in when atomic_flush=false.
  2864. // The bug can happen as follows:
  2865. // Start Flush0 for memtable M0 to SST0
  2866. // Start Flush1 for memtable M1 to SST1
  2867. // Flush1 returns OK, but doesn't install output MANIFEST and let whoever
  2868. // flushes M0 to take care of it
  2869. // Start Flush2 for memtable M2 to SST2
  2870. // Flush0 finishes with a retryable IOError
  2871. // - It rollbacks M0 AND M1
  2872. // - Deletes SST1 and SST2
  2873. // Flush2 finishes, does not rollback M2,
  2874. // - releases the pending file number that keeps SST2 alive
  2875. // - deletes SST2
  2876. //
  2877. // Then auto-recovery starts, error opening SST2 when try to install
  2878. // flush result
  2879. //
  2880. // The fix is to let Flush2 rollback M2 if it finds that
  2881. // there is a background error.
  2882. Options opts = CurrentOptions();
  2883. opts.atomic_flush = false;
  2884. opts.memtable_factory.reset(test::NewSpecialSkipListFactory(1));
  2885. opts.max_write_buffer_number = 64;
  2886. opts.max_background_flushes = 4;
  2887. env_->SetBackgroundThreads(4, Env::HIGH);
  2888. DestroyAndReopen(opts);
  2889. std::atomic_int flush_count = 0;
  2890. SyncPoint::GetInstance()->ClearAllCallBacks();
  2891. SyncPoint::GetInstance()->DisableProcessing();
  2892. SyncPoint::GetInstance()->SetCallBack(
  2893. "FlushJob::WriteLevel0Table:s", [&](void* s_ptr) {
  2894. int c = flush_count.fetch_add(1);
  2895. if (c == 0) {
  2896. Status* s = (Status*)(s_ptr);
  2897. IOStatus io_error = IOStatus::IOError("injected foobar");
  2898. io_error.SetRetryable(true);
  2899. *s = io_error;
  2900. TEST_SYNC_POINT("Let mem1 flush start");
  2901. TEST_SYNC_POINT("Wait for mem1 flush to finish");
  2902. TEST_SYNC_POINT("Let mem2 flush start");
  2903. TEST_SYNC_POINT("Wait for mem2 to start writing table");
  2904. }
  2905. });
  2906. SyncPoint::GetInstance()->SetCallBack(
  2907. "FlushJob::WriteLevel0Table", [&](void* mems) {
  2908. autovector<MemTable*>* mems_ptr = (autovector<MemTable*>*)mems;
  2909. if ((*mems_ptr)[0]->GetID() == 3) {
  2910. TEST_SYNC_POINT("Mem2 flush starts writing table");
  2911. TEST_SYNC_POINT("Mem2 flush waits until rollback");
  2912. }
  2913. });
  2914. SyncPoint::GetInstance()->LoadDependency(
  2915. {{"Let mem1 flush start", "Mem1 flush starts"},
  2916. {"DBImpl::BGWorkFlush:done", "Wait for mem1 flush to finish"},
  2917. {"Let mem2 flush start", "Mem2 flush starts"},
  2918. {"Mem2 flush starts writing table",
  2919. "Wait for mem2 to start writing table"},
  2920. {"RollbackMemtableFlush", "Mem2 flush waits until rollback"},
  2921. {"RecoverFromRetryableBGIOError:RecoverSuccess",
  2922. "Wait for error recover"}});
  2923. SyncPoint::GetInstance()->EnableProcessing();
  2924. ASSERT_OK(Put(Key(1), "val1"));
  2925. // trigger bg flush mem0
  2926. ASSERT_OK(Put(Key(2), "val2"));
  2927. TEST_SYNC_POINT("Mem1 flush starts");
  2928. // trigger bg flush mem1
  2929. ASSERT_OK(Put(Key(3), "val3"));
  2930. TEST_SYNC_POINT("Mem2 flush starts");
  2931. ASSERT_OK(Put(Key(4), "val4"));
  2932. TEST_SYNC_POINT("Wait for error recover");
  2933. // Recovery flush writes 3 memtables together into 1 file.
  2934. ASSERT_EQ(1, NumTableFilesAtLevel(0));
  2935. SyncPoint::GetInstance()->ClearAllCallBacks();
  2936. SyncPoint::GetInstance()->DisableProcessing();
  2937. }
  2938. TEST_F(DBFlushTest, NonAtomicNormalFlushAbortWhenBGError) {
  2939. Options opts = CurrentOptions();
  2940. opts.atomic_flush = false;
  2941. opts.memtable_factory.reset(test::NewSpecialSkipListFactory(1));
  2942. opts.max_write_buffer_number = 64;
  2943. opts.max_background_flushes = 1;
  2944. env_->SetBackgroundThreads(2, Env::HIGH);
  2945. DestroyAndReopen(opts);
  2946. SyncPoint::GetInstance()->ClearAllCallBacks();
  2947. SyncPoint::GetInstance()->DisableProcessing();
  2948. std::atomic_int flush_write_table_count = 0;
  2949. SyncPoint::GetInstance()->SetCallBack(
  2950. "FlushJob::WriteLevel0Table:s", [&](void* s_ptr) {
  2951. int c = flush_write_table_count.fetch_add(1);
  2952. if (c == 0) {
  2953. Status* s = (Status*)(s_ptr);
  2954. IOStatus io_error = IOStatus::IOError("injected foobar");
  2955. io_error.SetRetryable(true);
  2956. *s = io_error;
  2957. }
  2958. });
  2959. SyncPoint::GetInstance()->EnableProcessing();
  2960. SyncPoint::GetInstance()->LoadDependency(
  2961. {{"Let error recovery start",
  2962. "RecoverFromRetryableBGIOError:BeforeStart"},
  2963. {"RecoverFromRetryableBGIOError:RecoverSuccess",
  2964. "Wait for error recover"}});
  2965. ASSERT_OK(Put(Key(1), "val1"));
  2966. // trigger bg flush0 for mem0
  2967. ASSERT_OK(Put(Key(2), "val2"));
  2968. // Not checking status since this wait can finish before flush starts.
  2969. dbfull()->TEST_WaitForFlushMemTable().PermitUncheckedError();
  2970. // trigger bg flush1 for mem1, should see bg error and abort
  2971. // before picking a memtable to flush
  2972. ASSERT_OK(Put(Key(3), "val3"));
  2973. ASSERT_NOK(dbfull()->TEST_WaitForFlushMemTable());
  2974. ASSERT_EQ(0, NumTableFilesAtLevel(0));
  2975. TEST_SYNC_POINT("Let error recovery start");
  2976. TEST_SYNC_POINT("Wait for error recover");
  2977. // Recovery flush writes 2 memtables together into 1 file.
  2978. ASSERT_EQ(1, NumTableFilesAtLevel(0));
  2979. // 1 for flush 0 and 1 for recovery flush
  2980. ASSERT_EQ(2, flush_write_table_count);
  2981. SyncPoint::GetInstance()->ClearAllCallBacks();
  2982. SyncPoint::GetInstance()->DisableProcessing();
  2983. }
  2984. TEST_F(DBFlushTest, DBStuckAfterAtomicFlushError) {
  2985. // Test for a bug with atomic flush where DB can become stuck
  2986. // after a flush error. A repro timeline:
  2987. //
  2988. // Start Flush0 for mem0
  2989. // Start Flush1 for mem1
  2990. // Now Flush1 will wait for Flush0 to install mem0
  2991. // Flush0 finishes with retryable IOError, rollbacks mem0
  2992. // Resume starts and waits for background job to finish, i.e., Flush1
  2993. // Fill memtable again, trigger Flush2 for mem0
  2994. // Flush2 will get error status, and not rollback mem0, see code in
  2995. // https://github.com/facebook/rocksdb/blob/b927ba5936216861c2c35ab68f50ba4a78e65747/db/db_impl/db_impl_compaction_flush.cc#L725
  2996. //
  2997. // DB is stuck since mem0 can never be picked now
  2998. //
  2999. // The fix is to rollback mem0 in Flush2, and let Flush1 also abort upon
  3000. // background error besides waiting for older memtables to be installed.
  3001. // The recovery flush in this case should pick up all memtables
  3002. // and write them to a single L0 file.
  3003. Options opts = CurrentOptions();
  3004. opts.atomic_flush = true;
  3005. opts.memtable_factory.reset(test::NewSpecialSkipListFactory(1));
  3006. opts.max_write_buffer_number = 64;
  3007. opts.max_background_flushes = 4;
  3008. env_->SetBackgroundThreads(4, Env::HIGH);
  3009. DestroyAndReopen(opts);
  3010. std::atomic_int flush_count = 0;
  3011. SyncPoint::GetInstance()->ClearAllCallBacks();
  3012. SyncPoint::GetInstance()->DisableProcessing();
  3013. SyncPoint::GetInstance()->SetCallBack(
  3014. "FlushJob::WriteLevel0Table:s", [&](void* s_ptr) {
  3015. int c = flush_count.fetch_add(1);
  3016. if (c == 0) {
  3017. Status* s = (Status*)(s_ptr);
  3018. IOStatus io_error = IOStatus::IOError("injected foobar");
  3019. io_error.SetRetryable(true);
  3020. *s = io_error;
  3021. TEST_SYNC_POINT("Let flush for mem1 start");
  3022. // Wait for Flush1 to start waiting to install flush result
  3023. TEST_SYNC_POINT("Wait for flush for mem1");
  3024. }
  3025. });
  3026. SyncPoint::GetInstance()->LoadDependency(
  3027. {{"Let flush for mem1 start", "Flush for mem1"},
  3028. {"DBImpl::AtomicFlushMemTablesToOutputFiles:WaitCV",
  3029. "Wait for flush for mem1"},
  3030. {"RecoverFromRetryableBGIOError:BeforeStart",
  3031. "Wait for resume to start"},
  3032. {"Recovery should continue here",
  3033. "RecoverFromRetryableBGIOError:BeforeStart2"},
  3034. {"RecoverFromRetryableBGIOError:RecoverSuccess",
  3035. "Wait for error recover"}});
  3036. SyncPoint::GetInstance()->EnableProcessing();
  3037. ASSERT_OK(Put(Key(1), "val1"));
  3038. // trigger Flush0 for mem0
  3039. ASSERT_OK(Put(Key(2), "val2"));
  3040. // trigger Flush1 for mem1
  3041. TEST_SYNC_POINT("Flush for mem1");
  3042. ASSERT_OK(Put(Key(3), "val3"));
  3043. // Wait until resume started to schedule another flush
  3044. TEST_SYNC_POINT("Wait for resume to start");
  3045. // This flush should not be scheduled due to bg error
  3046. ASSERT_OK(Put(Key(4), "val4"));
  3047. // TEST_WaitForBackgroundWork() returns background error
  3048. // after all background work is done.
  3049. ASSERT_NOK(dbfull()->TEST_WaitForBackgroundWork());
  3050. // Flush should abort and not writing any table
  3051. ASSERT_EQ(0, NumTableFilesAtLevel(0));
  3052. // Wait until this flush is done.
  3053. TEST_SYNC_POINT("Recovery should continue here");
  3054. TEST_SYNC_POINT("Wait for error recover");
  3055. // error recovery can schedule new flushes, but should not
  3056. // encounter error
  3057. ASSERT_OK(dbfull()->TEST_WaitForBackgroundWork());
  3058. ASSERT_EQ(1, NumTableFilesAtLevel(0));
  3059. }
  3060. TEST_F(DBFlushTest, VerifyOutputRecordCount) {
  3061. for (bool use_plain_table : {false, true}) {
  3062. Options options = CurrentOptions();
  3063. options.flush_verify_memtable_count = true;
  3064. options.merge_operator = MergeOperators::CreateStringAppendOperator();
  3065. DestroyAndReopen(options);
  3066. // Verify flush output record count verification in different table
  3067. // formats
  3068. if (use_plain_table) {
  3069. options.table_factory.reset(NewPlainTableFactory());
  3070. }
  3071. // Verify that flush output record count verification does not produce false
  3072. // positives.
  3073. ASSERT_OK(Merge("k0", "v1"));
  3074. ASSERT_OK(Put("k1", "v1"));
  3075. ASSERT_OK(Put("k2", "v1"));
  3076. ASSERT_OK(SingleDelete("k2"));
  3077. ASSERT_OK(Delete("k2"));
  3078. ASSERT_OK(Delete("k3"));
  3079. ASSERT_OK(db_->DeleteRange(WriteOptions(), "k1", "k3"));
  3080. ASSERT_OK(Flush());
  3081. // Verify that flush output record count verification catch corruption
  3082. DestroyAndReopen(options);
  3083. if (use_plain_table) {
  3084. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  3085. "PlainTableBuilder::Add::skip",
  3086. [&](void* skip) { *(bool*)skip = true; });
  3087. } else {
  3088. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  3089. "BlockBasedTableBuilder::Add::skip",
  3090. [&](void* skip) { *(bool*)skip = true; });
  3091. }
  3092. SyncPoint::GetInstance()->EnableProcessing();
  3093. const char* expect =
  3094. "Number of keys in flush output SST files does not match";
  3095. // 1. During DB open flush
  3096. ASSERT_OK(Put("k1", "v1"));
  3097. ASSERT_OK(Put("k2", "v1"));
  3098. Status s = TryReopen(options);
  3099. ASSERT_TRUE(s.IsCorruption());
  3100. ASSERT_TRUE(std::strstr(s.getState(), expect));
  3101. // 2. During regular flush
  3102. DestroyAndReopen(options);
  3103. ASSERT_OK(Put("k1", "v1"));
  3104. ASSERT_OK(Put("k2", "v1"));
  3105. s = Flush();
  3106. ASSERT_TRUE(s.IsCorruption());
  3107. ASSERT_TRUE(std::strstr(s.getState(), expect));
  3108. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
  3109. }
  3110. }
  3111. class DBFlushSuperBlockTest
  3112. : public DBFlushTest,
  3113. public ::testing::WithParamInterface<std::tuple<bool, size_t, size_t>> {
  3114. public:
  3115. DBFlushSuperBlockTest() : DBFlushTest() {}
  3116. std::string formatKey(int i) {
  3117. int desired_length = 10;
  3118. char buffer[64];
  3119. snprintf(buffer, 64, "%0*d", desired_length, i);
  3120. return buffer;
  3121. }
  3122. void VerifyReadWithGet(int key_count) {
  3123. for (int i = 0; i < key_count; ++i) {
  3124. PinnableSlice value;
  3125. ASSERT_OK(Get(formatKey(i), &value));
  3126. ASSERT_EQ(value.ToString(), added_data[formatKey(i)]);
  3127. }
  3128. }
  3129. void VerifyReadWithIterator(int key_count) {
  3130. {
  3131. std::unique_ptr<Iterator> it(db_->NewIterator(ReadOptions()));
  3132. int i = 0;
  3133. for (it->SeekToFirst(); it->Valid(); it->Next()) {
  3134. ASSERT_OK(it->status());
  3135. ASSERT_EQ((it->key()).ToString(), formatKey(i));
  3136. ASSERT_EQ((it->value()).ToString(), added_data[formatKey(i)]);
  3137. i++;
  3138. }
  3139. ASSERT_OK(it->status());
  3140. ASSERT_EQ(i, key_count);
  3141. }
  3142. }
  3143. protected:
  3144. Random rnd{123};
  3145. std::unordered_map<std::string, std::string> added_data;
  3146. };
  3147. constexpr size_t kLowSpaceOverheadRatio = 256;
  3148. TEST_P(DBFlushSuperBlockTest, SuperBlock) {
  3149. constexpr int key_count = 12345;
  3150. Options options;
  3151. options.env = env_;
  3152. options.file_checksum_gen_factory = GetFileChecksumGenCrc32cFactory();
  3153. options.paranoid_file_checks = true;
  3154. options.write_buffer_size = 1024 * 1024;
  3155. BlockBasedTableOptions block_options;
  3156. block_options.block_align = get<0>(GetParam());
  3157. block_options.index_block_restart_interval = 3;
  3158. block_options.super_block_alignment_size = get<1>(GetParam());
  3159. block_options.super_block_alignment_space_overhead_ratio = get<2>(GetParam());
  3160. options.table_factory.reset(NewBlockBasedTableFactory(block_options));
  3161. if (block_options.block_align) {
  3162. // When block align is enabled, disable compression
  3163. options.compression = kNoCompression;
  3164. }
  3165. ASSERT_OK(options.table_factory->ValidateOptions(
  3166. DBOptions(options), ColumnFamilyOptions(options)));
  3167. Reopen(options);
  3168. int super_block_pad_count = 0;
  3169. int super_block_pad_exceed_limit_count = 0;
  3170. SyncPoint::GetInstance()->SetCallBack(
  3171. "BlockBasedTableBuilder::WriteMaybeCompressedBlock:"
  3172. "SuperBlockAlignment",
  3173. [&super_block_pad_count](void* /*arg*/) { super_block_pad_count++; });
  3174. SyncPoint::GetInstance()->SetCallBack(
  3175. "BlockBasedTableBuilder::WriteMaybeCompressedBlock:"
  3176. "SuperBlockAlignmentPaddingBytesExceedLimit",
  3177. [&super_block_pad_exceed_limit_count](void* /*arg*/) {
  3178. super_block_pad_exceed_limit_count++;
  3179. });
  3180. SyncPoint::GetInstance()->EnableProcessing();
  3181. // Add lots of keys
  3182. for (int i = 0; i < key_count; ++i) {
  3183. added_data[formatKey(i)] = std::string(rnd.RandomString(rnd.Next() % 1000));
  3184. ASSERT_OK(Put(formatKey(i), added_data[formatKey(i)]));
  3185. }
  3186. // flush the data in memory to disk to verify with super block alignment, the
  3187. // data could be read back properly
  3188. Reopen(options);
  3189. SyncPoint::GetInstance()->DisableProcessing();
  3190. SyncPoint::GetInstance()->ClearAllCallBacks();
  3191. // When block_align is enabled, super block is always aligned, so there should
  3192. // be 0 padding for super block alignment
  3193. if (block_options.super_block_alignment_size != 0 &&
  3194. !block_options.block_align) {
  3195. ASSERT_GT(super_block_pad_count, 0);
  3196. } else {
  3197. ASSERT_EQ(super_block_pad_count, 0);
  3198. }
  3199. if (!block_options.block_align &&
  3200. block_options.super_block_alignment_size != 0 &&
  3201. block_options.super_block_alignment_space_overhead_ratio ==
  3202. kLowSpaceOverheadRatio) {
  3203. ASSERT_GT(super_block_pad_exceed_limit_count, 0);
  3204. }
  3205. // verify the values are correct
  3206. VerifyReadWithGet(key_count);
  3207. Reopen(options);
  3208. VerifyReadWithIterator(key_count);
  3209. // verify checksum
  3210. ASSERT_OK(db_->VerifyFileChecksums(ReadOptions()));
  3211. // Reopen options and flip the option of super block configuration, read still
  3212. // works. This verifies the forward/backward compatibility
  3213. if (block_options.super_block_alignment_size == 0) {
  3214. block_options.super_block_alignment_size = 16 * 1024;
  3215. } else {
  3216. block_options.super_block_alignment_size = 0;
  3217. }
  3218. options.table_factory.reset(NewBlockBasedTableFactory(block_options));
  3219. Reopen(options);
  3220. // verify the values are correct
  3221. VerifyReadWithGet(key_count);
  3222. Reopen(options);
  3223. VerifyReadWithIterator(key_count);
  3224. // verify checksum
  3225. ASSERT_OK(db_->VerifyFileChecksums(ReadOptions()));
  3226. }
  3227. INSTANTIATE_TEST_CASE_P(
  3228. SuperBlockTests, DBFlushSuperBlockTest,
  3229. testing::Combine(testing::Bool(), testing::Values(0, 32 * 1024, 16 * 1024),
  3230. // Use very low space overhead ratio to test
  3231. // the case where required padded bytes is
  3232. // larger than the max allowed padding size
  3233. testing::Values(4, kLowSpaceOverheadRatio)));
  3234. } // namespace ROCKSDB_NAMESPACE
  3235. int main(int argc, char** argv) {
  3236. ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
  3237. ::testing::InitGoogleTest(&argc, argv);
  3238. return RUN_ALL_TESTS();
  3239. }