error_handler_fs_test.cc 104 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937293829392940294129422943294429452946294729482949295029512952295329542955295629572958295929602961296229632964296529662967296829692970297129722973297429752976297729782979298029812982298329842985298629872988298929902991299229932994299529962997299829993000300130023003300430053006300730083009301030113012301330143015301630173018301930203021302230233024302530263027302830293030
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #include <memory>
  10. #include "db/db_test_util.h"
  11. #include "file/sst_file_manager_impl.h"
  12. #include "port/stack_trace.h"
  13. #include "rocksdb/io_status.h"
  14. #include "rocksdb/sst_file_manager.h"
  15. #include "test_util/sync_point.h"
  16. #include "test_util/testharness.h"
  17. #include "util/random.h"
  18. #include "utilities/fault_injection_env.h"
  19. #include "utilities/fault_injection_fs.h"
  20. namespace ROCKSDB_NAMESPACE {
  21. class DBErrorHandlingFSTest : public DBTestBase {
  22. public:
  23. DBErrorHandlingFSTest()
  24. : DBTestBase("db_error_handling_fs_test", /*env_do_fsync=*/true) {
  25. fault_fs_.reset(new FaultInjectionTestFS(env_->GetFileSystem()));
  26. fault_env_.reset(new CompositeEnvWrapper(env_, fault_fs_));
  27. }
  28. ~DBErrorHandlingFSTest() {
  29. // Before destroying fault_env_
  30. SyncPoint::GetInstance()->DisableProcessing();
  31. SyncPoint::GetInstance()->LoadDependency({});
  32. SyncPoint::GetInstance()->ClearAllCallBacks();
  33. Close();
  34. }
  35. std::string GetManifestNameFromLiveFiles() {
  36. std::vector<std::string> live_files;
  37. uint64_t manifest_size;
  38. Status s = dbfull()->GetLiveFiles(live_files, &manifest_size, false);
  39. if (!s.ok()) {
  40. return "";
  41. }
  42. for (auto& file : live_files) {
  43. uint64_t num = 0;
  44. FileType type;
  45. if (ParseFileName(file, &num, &type) && type == kDescriptorFile) {
  46. return file;
  47. }
  48. }
  49. return "";
  50. }
  51. std::shared_ptr<FaultInjectionTestFS> fault_fs_;
  52. std::unique_ptr<Env> fault_env_;
  53. };
  54. class ErrorHandlerFSListener : public EventListener {
  55. public:
  56. ErrorHandlerFSListener()
  57. : mutex_(),
  58. cv_(&mutex_),
  59. no_auto_recovery_(false),
  60. recovery_complete_(false),
  61. file_creation_started_(false),
  62. override_bg_error_(false),
  63. file_count_(0),
  64. fault_fs_(nullptr) {}
  65. ~ErrorHandlerFSListener() {
  66. file_creation_error_.PermitUncheckedError();
  67. bg_error_.PermitUncheckedError();
  68. new_bg_error_.PermitUncheckedError();
  69. }
  70. void OnTableFileCreationStarted(
  71. const TableFileCreationBriefInfo& /*ti*/) override {
  72. InstrumentedMutexLock l(&mutex_);
  73. file_creation_started_ = true;
  74. if (file_count_ > 0) {
  75. if (--file_count_ == 0) {
  76. fault_fs_->SetFilesystemActive(false, file_creation_error_);
  77. file_creation_error_ = IOStatus::OK();
  78. }
  79. }
  80. cv_.SignalAll();
  81. }
  82. void OnErrorRecoveryBegin(BackgroundErrorReason /*reason*/, Status bg_error,
  83. bool* auto_recovery) override {
  84. bg_error.PermitUncheckedError();
  85. if (*auto_recovery && no_auto_recovery_) {
  86. *auto_recovery = false;
  87. }
  88. }
  89. void OnErrorRecoveryEnd(const BackgroundErrorRecoveryInfo& info) override {
  90. InstrumentedMutexLock l(&mutex_);
  91. recovery_complete_ = true;
  92. cv_.SignalAll();
  93. new_bg_error_ = info.new_bg_error;
  94. }
  95. bool WaitForRecovery(uint64_t /*abs_time_us*/) {
  96. InstrumentedMutexLock l(&mutex_);
  97. while (!recovery_complete_) {
  98. cv_.Wait(/*abs_time_us*/);
  99. }
  100. if (recovery_complete_) {
  101. recovery_complete_ = false;
  102. return true;
  103. }
  104. return false;
  105. }
  106. void WaitForTableFileCreationStarted(uint64_t /*abs_time_us*/) {
  107. InstrumentedMutexLock l(&mutex_);
  108. while (!file_creation_started_) {
  109. cv_.Wait(/*abs_time_us*/);
  110. }
  111. file_creation_started_ = false;
  112. }
  113. void OnBackgroundError(BackgroundErrorReason /*reason*/,
  114. Status* bg_error) override {
  115. if (override_bg_error_) {
  116. *bg_error = bg_error_;
  117. override_bg_error_ = false;
  118. }
  119. }
  120. void EnableAutoRecovery(bool enable = true) { no_auto_recovery_ = !enable; }
  121. void OverrideBGError(Status bg_err) {
  122. bg_error_ = bg_err;
  123. override_bg_error_ = true;
  124. }
  125. void InjectFileCreationError(FaultInjectionTestFS* fs, int file_count,
  126. IOStatus io_s) {
  127. fault_fs_ = fs;
  128. file_count_ = file_count;
  129. file_creation_error_ = io_s;
  130. }
  131. Status new_bg_error() { return new_bg_error_; }
  132. private:
  133. InstrumentedMutex mutex_;
  134. InstrumentedCondVar cv_;
  135. bool no_auto_recovery_;
  136. bool recovery_complete_;
  137. bool file_creation_started_;
  138. bool override_bg_error_;
  139. int file_count_;
  140. IOStatus file_creation_error_;
  141. Status bg_error_;
  142. Status new_bg_error_;
  143. FaultInjectionTestFS* fault_fs_;
  144. };
  145. TEST_F(DBErrorHandlingFSTest, FlushWriteError) {
  146. std::shared_ptr<ErrorHandlerFSListener> listener =
  147. std::make_shared<ErrorHandlerFSListener>();
  148. Options options = GetDefaultOptions();
  149. options.env = fault_env_.get();
  150. options.create_if_missing = true;
  151. options.listeners.emplace_back(listener);
  152. options.statistics = CreateDBStatistics();
  153. Status s;
  154. listener->EnableAutoRecovery(false);
  155. DestroyAndReopen(options);
  156. ASSERT_OK(Put(Key(0), "val"));
  157. SyncPoint::GetInstance()->SetCallBack("FlushJob::Start", [&](void*) {
  158. fault_fs_->SetFilesystemActive(false, IOStatus::NoSpace("Out of space"));
  159. });
  160. SyncPoint::GetInstance()->EnableProcessing();
  161. s = Flush();
  162. ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
  163. SyncPoint::GetInstance()->DisableProcessing();
  164. fault_fs_->SetFilesystemActive(true);
  165. s = dbfull()->Resume();
  166. ASSERT_OK(s);
  167. ASSERT_EQ(1, options.statistics->getAndResetTickerCount(
  168. ERROR_HANDLER_BG_ERROR_COUNT));
  169. ASSERT_EQ(1, options.statistics->getAndResetTickerCount(
  170. ERROR_HANDLER_BG_IO_ERROR_COUNT));
  171. ASSERT_EQ(0, options.statistics->getAndResetTickerCount(
  172. ERROR_HANDLER_BG_RETRYABLE_IO_ERROR_COUNT));
  173. ASSERT_EQ(0, options.statistics->getAndResetTickerCount(
  174. ERROR_HANDLER_AUTORESUME_COUNT));
  175. ASSERT_EQ(0, options.statistics->getAndResetTickerCount(
  176. ERROR_HANDLER_AUTORESUME_RETRY_TOTAL_COUNT));
  177. ASSERT_EQ(0, options.statistics->getAndResetTickerCount(
  178. ERROR_HANDLER_AUTORESUME_SUCCESS_COUNT));
  179. Reopen(options);
  180. ASSERT_EQ("val", Get(Key(0)));
  181. Destroy(options);
  182. }
  183. // All the NoSpace IOError will be handled as the regular BG Error no matter the
  184. // retryable flag is set of not. So the auto resume for retryable IO Error will
  185. // not be triggered. Also, it is mapped as hard error.
  186. TEST_F(DBErrorHandlingFSTest, FlushWriteNoSpaceError) {
  187. std::shared_ptr<ErrorHandlerFSListener> listener =
  188. std::make_shared<ErrorHandlerFSListener>();
  189. Options options = GetDefaultOptions();
  190. options.env = fault_env_.get();
  191. options.create_if_missing = true;
  192. options.listeners.emplace_back(listener);
  193. options.max_bgerror_resume_count = 2;
  194. options.bgerror_resume_retry_interval = 100000; // 0.1 second
  195. options.statistics = CreateDBStatistics();
  196. Status s;
  197. listener->EnableAutoRecovery(false);
  198. DestroyAndReopen(options);
  199. IOStatus error_msg = IOStatus::NoSpace("Retryable IO Error");
  200. error_msg.SetRetryable(true);
  201. ASSERT_OK(Put(Key(1), "val1"));
  202. SyncPoint::GetInstance()->SetCallBack(
  203. "BuildTable:BeforeFinishBuildTable",
  204. [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
  205. SyncPoint::GetInstance()->EnableProcessing();
  206. s = Flush();
  207. ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
  208. SyncPoint::GetInstance()->DisableProcessing();
  209. fault_fs_->SetFilesystemActive(true);
  210. s = dbfull()->Resume();
  211. ASSERT_OK(s);
  212. ASSERT_EQ(1, options.statistics->getAndResetTickerCount(
  213. ERROR_HANDLER_BG_ERROR_COUNT));
  214. ASSERT_EQ(1, options.statistics->getAndResetTickerCount(
  215. ERROR_HANDLER_BG_IO_ERROR_COUNT));
  216. ASSERT_EQ(0, options.statistics->getAndResetTickerCount(
  217. ERROR_HANDLER_BG_RETRYABLE_IO_ERROR_COUNT));
  218. ASSERT_EQ(0, options.statistics->getAndResetTickerCount(
  219. ERROR_HANDLER_AUTORESUME_COUNT));
  220. ASSERT_EQ(0, options.statistics->getAndResetTickerCount(
  221. ERROR_HANDLER_AUTORESUME_RETRY_TOTAL_COUNT));
  222. ASSERT_EQ(0, options.statistics->getAndResetTickerCount(
  223. ERROR_HANDLER_AUTORESUME_SUCCESS_COUNT));
  224. Destroy(options);
  225. }
  226. TEST_F(DBErrorHandlingFSTest, FlushWriteRetryableError) {
  227. std::shared_ptr<ErrorHandlerFSListener> listener =
  228. std::make_shared<ErrorHandlerFSListener>();
  229. Options options = GetDefaultOptions();
  230. options.env = fault_env_.get();
  231. options.create_if_missing = true;
  232. options.listeners.emplace_back(listener);
  233. options.max_bgerror_resume_count = 0;
  234. options.statistics = CreateDBStatistics();
  235. Status s;
  236. listener->EnableAutoRecovery(false);
  237. DestroyAndReopen(options);
  238. IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
  239. error_msg.SetRetryable(true);
  240. ASSERT_OK(Put(Key(1), "val1"));
  241. SyncPoint::GetInstance()->SetCallBack(
  242. "BuildTable:BeforeFinishBuildTable",
  243. [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
  244. SyncPoint::GetInstance()->EnableProcessing();
  245. s = Flush();
  246. ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
  247. SyncPoint::GetInstance()->DisableProcessing();
  248. fault_fs_->SetFilesystemActive(true);
  249. s = dbfull()->Resume();
  250. ASSERT_OK(s);
  251. ASSERT_EQ(1, options.statistics->getAndResetTickerCount(
  252. ERROR_HANDLER_BG_ERROR_COUNT));
  253. ASSERT_EQ(1, options.statistics->getAndResetTickerCount(
  254. ERROR_HANDLER_BG_IO_ERROR_COUNT));
  255. ASSERT_EQ(1, options.statistics->getAndResetTickerCount(
  256. ERROR_HANDLER_BG_RETRYABLE_IO_ERROR_COUNT));
  257. ASSERT_EQ(0, options.statistics->getAndResetTickerCount(
  258. ERROR_HANDLER_AUTORESUME_COUNT));
  259. ASSERT_EQ(0, options.statistics->getAndResetTickerCount(
  260. ERROR_HANDLER_AUTORESUME_RETRY_TOTAL_COUNT));
  261. ASSERT_EQ(0, options.statistics->getAndResetTickerCount(
  262. ERROR_HANDLER_AUTORESUME_SUCCESS_COUNT));
  263. Reopen(options);
  264. ASSERT_EQ("val1", Get(Key(1)));
  265. ASSERT_OK(Put(Key(2), "val2"));
  266. SyncPoint::GetInstance()->SetCallBack(
  267. "BuildTable:BeforeSyncTable",
  268. [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
  269. SyncPoint::GetInstance()->EnableProcessing();
  270. s = Flush();
  271. ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
  272. SyncPoint::GetInstance()->DisableProcessing();
  273. fault_fs_->SetFilesystemActive(true);
  274. s = dbfull()->Resume();
  275. ASSERT_OK(s);
  276. Reopen(options);
  277. ASSERT_EQ("val2", Get(Key(2)));
  278. ASSERT_OK(Put(Key(3), "val3"));
  279. SyncPoint::GetInstance()->SetCallBack(
  280. "BuildTable:BeforeCloseTableFile",
  281. [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
  282. SyncPoint::GetInstance()->EnableProcessing();
  283. s = Flush();
  284. ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
  285. SyncPoint::GetInstance()->DisableProcessing();
  286. fault_fs_->SetFilesystemActive(true);
  287. s = dbfull()->Resume();
  288. ASSERT_OK(s);
  289. Reopen(options);
  290. ASSERT_EQ("val3", Get(Key(3)));
  291. Destroy(options);
  292. }
  293. TEST_F(DBErrorHandlingFSTest, FlushWriteFileScopeError) {
  294. std::shared_ptr<ErrorHandlerFSListener> listener =
  295. std::make_shared<ErrorHandlerFSListener>();
  296. Options options = GetDefaultOptions();
  297. options.env = fault_env_.get();
  298. options.create_if_missing = true;
  299. options.listeners.emplace_back(listener);
  300. options.max_bgerror_resume_count = 0;
  301. Status s;
  302. listener->EnableAutoRecovery(false);
  303. DestroyAndReopen(options);
  304. IOStatus error_msg = IOStatus::IOError("File Scope Data Loss Error");
  305. error_msg.SetDataLoss(true);
  306. error_msg.SetScope(
  307. ROCKSDB_NAMESPACE::IOStatus::IOErrorScope::kIOErrorScopeFile);
  308. error_msg.SetRetryable(false);
  309. ASSERT_OK(Put(Key(1), "val1"));
  310. SyncPoint::GetInstance()->SetCallBack(
  311. "BuildTable:BeforeFinishBuildTable",
  312. [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
  313. SyncPoint::GetInstance()->EnableProcessing();
  314. s = Flush();
  315. ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
  316. SyncPoint::GetInstance()->DisableProcessing();
  317. fault_fs_->SetFilesystemActive(true);
  318. s = dbfull()->Resume();
  319. ASSERT_OK(s);
  320. Reopen(options);
  321. ASSERT_EQ("val1", Get(Key(1)));
  322. ASSERT_OK(Put(Key(2), "val2"));
  323. SyncPoint::GetInstance()->SetCallBack(
  324. "BuildTable:BeforeSyncTable",
  325. [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
  326. SyncPoint::GetInstance()->EnableProcessing();
  327. s = Flush();
  328. ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
  329. SyncPoint::GetInstance()->DisableProcessing();
  330. fault_fs_->SetFilesystemActive(true);
  331. s = dbfull()->Resume();
  332. ASSERT_OK(s);
  333. Reopen(options);
  334. ASSERT_EQ("val2", Get(Key(2)));
  335. ASSERT_OK(Put(Key(3), "val3"));
  336. SyncPoint::GetInstance()->SetCallBack(
  337. "BuildTable:BeforeCloseTableFile",
  338. [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
  339. SyncPoint::GetInstance()->EnableProcessing();
  340. s = Flush();
  341. ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
  342. SyncPoint::GetInstance()->DisableProcessing();
  343. fault_fs_->SetFilesystemActive(true);
  344. s = dbfull()->Resume();
  345. ASSERT_OK(s);
  346. Reopen(options);
  347. ASSERT_EQ("val3", Get(Key(3)));
  348. // not file scope, but retyrable set
  349. error_msg.SetDataLoss(false);
  350. error_msg.SetScope(
  351. ROCKSDB_NAMESPACE::IOStatus::IOErrorScope::kIOErrorScopeFileSystem);
  352. error_msg.SetRetryable(true);
  353. ASSERT_OK(Put(Key(3), "val3"));
  354. SyncPoint::GetInstance()->SetCallBack(
  355. "BuildTable:BeforeCloseTableFile",
  356. [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
  357. SyncPoint::GetInstance()->EnableProcessing();
  358. s = Flush();
  359. ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
  360. SyncPoint::GetInstance()->DisableProcessing();
  361. fault_fs_->SetFilesystemActive(true);
  362. s = dbfull()->Resume();
  363. ASSERT_OK(s);
  364. Reopen(options);
  365. ASSERT_EQ("val3", Get(Key(3)));
  366. Destroy(options);
  367. }
  368. TEST_F(DBErrorHandlingFSTest, FlushWALWriteRetryableError) {
  369. std::shared_ptr<ErrorHandlerFSListener> listener =
  370. std::make_shared<ErrorHandlerFSListener>();
  371. Options options = GetDefaultOptions();
  372. options.env = fault_env_.get();
  373. options.create_if_missing = true;
  374. options.listeners.emplace_back(listener);
  375. options.max_bgerror_resume_count = 0;
  376. Status s;
  377. IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
  378. error_msg.SetRetryable(true);
  379. listener->EnableAutoRecovery(false);
  380. SyncPoint::GetInstance()->SetCallBack(
  381. "DBImpl::SyncClosedWals:Start",
  382. [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
  383. SyncPoint::GetInstance()->EnableProcessing();
  384. CreateAndReopenWithCF({"pikachu, sdfsdfsdf"}, options);
  385. WriteOptions wo = WriteOptions();
  386. wo.disableWAL = false;
  387. ASSERT_OK(Put(Key(1), "val1", wo));
  388. s = Flush();
  389. ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
  390. SyncPoint::GetInstance()->DisableProcessing();
  391. fault_fs_->SetFilesystemActive(true);
  392. auto cfh = dbfull()->GetColumnFamilyHandle(1);
  393. s = dbfull()->DropColumnFamily(cfh);
  394. s = dbfull()->Resume();
  395. ASSERT_OK(s);
  396. ASSERT_EQ("val1", Get(Key(1)));
  397. ASSERT_OK(Put(Key(3), "val3", wo));
  398. ASSERT_EQ("val3", Get(Key(3)));
  399. s = Flush();
  400. ASSERT_OK(s);
  401. ASSERT_EQ("val3", Get(Key(3)));
  402. Destroy(options);
  403. }
  404. TEST_F(DBErrorHandlingFSTest, FlushWALAtomicWriteRetryableError) {
  405. std::shared_ptr<ErrorHandlerFSListener> listener =
  406. std::make_shared<ErrorHandlerFSListener>();
  407. Options options = GetDefaultOptions();
  408. options.env = fault_env_.get();
  409. options.create_if_missing = true;
  410. options.listeners.emplace_back(listener);
  411. options.max_bgerror_resume_count = 0;
  412. options.atomic_flush = true;
  413. Status s;
  414. IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
  415. error_msg.SetRetryable(true);
  416. listener->EnableAutoRecovery(false);
  417. SyncPoint::GetInstance()->SetCallBack(
  418. "DBImpl::SyncClosedWals:Start",
  419. [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
  420. SyncPoint::GetInstance()->EnableProcessing();
  421. CreateAndReopenWithCF({"pikachu, sdfsdfsdf"}, options);
  422. WriteOptions wo = WriteOptions();
  423. wo.disableWAL = false;
  424. ASSERT_OK(Put(Key(1), "val1", wo));
  425. s = Flush();
  426. ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
  427. SyncPoint::GetInstance()->DisableProcessing();
  428. fault_fs_->SetFilesystemActive(true);
  429. auto cfh = dbfull()->GetColumnFamilyHandle(1);
  430. s = dbfull()->DropColumnFamily(cfh);
  431. s = dbfull()->Resume();
  432. ASSERT_OK(s);
  433. ASSERT_EQ("val1", Get(Key(1)));
  434. ASSERT_OK(Put(Key(3), "val3", wo));
  435. ASSERT_EQ("val3", Get(Key(3)));
  436. s = Flush();
  437. ASSERT_OK(s);
  438. ASSERT_EQ("val3", Get(Key(3)));
  439. Destroy(options);
  440. }
  441. // The flush error is injected before we finish the table build
  442. TEST_F(DBErrorHandlingFSTest, FlushWritNoWALRetryableError1) {
  443. std::shared_ptr<ErrorHandlerFSListener> listener =
  444. std::make_shared<ErrorHandlerFSListener>();
  445. Options options = GetDefaultOptions();
  446. options.env = fault_env_.get();
  447. options.create_if_missing = true;
  448. options.listeners.emplace_back(listener);
  449. options.max_bgerror_resume_count = 0;
  450. options.statistics = CreateDBStatistics();
  451. Status s;
  452. listener->EnableAutoRecovery(false);
  453. DestroyAndReopen(options);
  454. IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
  455. error_msg.SetRetryable(true);
  456. WriteOptions wo = WriteOptions();
  457. wo.disableWAL = true;
  458. ASSERT_OK(Put(Key(1), "val1", wo));
  459. SyncPoint::GetInstance()->SetCallBack(
  460. "BuildTable:BeforeFinishBuildTable",
  461. [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
  462. SyncPoint::GetInstance()->EnableProcessing();
  463. s = Flush();
  464. ASSERT_OK(Put(Key(2), "val2", wo));
  465. ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
  466. ASSERT_EQ("val2", Get(Key(2)));
  467. SyncPoint::GetInstance()->DisableProcessing();
  468. fault_fs_->SetFilesystemActive(true);
  469. s = dbfull()->Resume();
  470. ASSERT_OK(s);
  471. ASSERT_EQ("val1", Get(Key(1)));
  472. ASSERT_EQ("val2", Get(Key(2)));
  473. ASSERT_OK(Put(Key(3), "val3", wo));
  474. ASSERT_EQ("val3", Get(Key(3)));
  475. s = Flush();
  476. ASSERT_OK(s);
  477. ASSERT_EQ("val3", Get(Key(3)));
  478. ASSERT_EQ(1, options.statistics->getAndResetTickerCount(
  479. ERROR_HANDLER_BG_ERROR_COUNT));
  480. ASSERT_EQ(1, options.statistics->getAndResetTickerCount(
  481. ERROR_HANDLER_BG_IO_ERROR_COUNT));
  482. ASSERT_EQ(1, options.statistics->getAndResetTickerCount(
  483. ERROR_HANDLER_BG_RETRYABLE_IO_ERROR_COUNT));
  484. ASSERT_EQ(0, options.statistics->getAndResetTickerCount(
  485. ERROR_HANDLER_AUTORESUME_COUNT));
  486. ASSERT_EQ(0, options.statistics->getAndResetTickerCount(
  487. ERROR_HANDLER_AUTORESUME_RETRY_TOTAL_COUNT));
  488. ASSERT_EQ(0, options.statistics->getAndResetTickerCount(
  489. ERROR_HANDLER_AUTORESUME_SUCCESS_COUNT));
  490. Destroy(options);
  491. }
  492. // The retryable IO error is injected before we sync table
  493. TEST_F(DBErrorHandlingFSTest, FlushWriteNoWALRetryableError2) {
  494. std::shared_ptr<ErrorHandlerFSListener> listener =
  495. std::make_shared<ErrorHandlerFSListener>();
  496. Options options = GetDefaultOptions();
  497. options.env = fault_env_.get();
  498. options.create_if_missing = true;
  499. options.listeners.emplace_back(listener);
  500. options.max_bgerror_resume_count = 0;
  501. Status s;
  502. listener->EnableAutoRecovery(false);
  503. DestroyAndReopen(options);
  504. IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
  505. error_msg.SetRetryable(true);
  506. WriteOptions wo = WriteOptions();
  507. wo.disableWAL = true;
  508. ASSERT_OK(Put(Key(1), "val1", wo));
  509. SyncPoint::GetInstance()->SetCallBack(
  510. "BuildTable:BeforeSyncTable",
  511. [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
  512. SyncPoint::GetInstance()->EnableProcessing();
  513. s = Flush();
  514. ASSERT_OK(Put(Key(2), "val2", wo));
  515. ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
  516. ASSERT_EQ("val2", Get(Key(2)));
  517. SyncPoint::GetInstance()->DisableProcessing();
  518. fault_fs_->SetFilesystemActive(true);
  519. s = dbfull()->Resume();
  520. ASSERT_OK(s);
  521. ASSERT_EQ("val1", Get(Key(1)));
  522. ASSERT_EQ("val2", Get(Key(2)));
  523. ASSERT_OK(Put(Key(3), "val3", wo));
  524. ASSERT_EQ("val3", Get(Key(3)));
  525. s = Flush();
  526. ASSERT_OK(s);
  527. ASSERT_EQ("val3", Get(Key(3)));
  528. Destroy(options);
  529. }
  530. // The retryable IO error is injected before we close the table file
  531. TEST_F(DBErrorHandlingFSTest, FlushWriteNoWALRetryableError3) {
  532. std::shared_ptr<ErrorHandlerFSListener> listener =
  533. std::make_shared<ErrorHandlerFSListener>();
  534. Options options = GetDefaultOptions();
  535. options.env = fault_env_.get();
  536. options.create_if_missing = true;
  537. options.listeners.emplace_back(listener);
  538. options.max_bgerror_resume_count = 0;
  539. Status s;
  540. listener->EnableAutoRecovery(false);
  541. DestroyAndReopen(options);
  542. IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
  543. error_msg.SetRetryable(true);
  544. WriteOptions wo = WriteOptions();
  545. wo.disableWAL = true;
  546. ASSERT_OK(Put(Key(1), "val1", wo));
  547. SyncPoint::GetInstance()->SetCallBack(
  548. "BuildTable:BeforeCloseTableFile",
  549. [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
  550. SyncPoint::GetInstance()->EnableProcessing();
  551. s = Flush();
  552. ASSERT_OK(Put(Key(2), "val2", wo));
  553. ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
  554. ASSERT_EQ("val2", Get(Key(2)));
  555. SyncPoint::GetInstance()->DisableProcessing();
  556. fault_fs_->SetFilesystemActive(true);
  557. s = dbfull()->Resume();
  558. ASSERT_OK(s);
  559. ASSERT_EQ("val1", Get(Key(1)));
  560. ASSERT_EQ("val2", Get(Key(2)));
  561. ASSERT_OK(Put(Key(3), "val3", wo));
  562. ASSERT_EQ("val3", Get(Key(3)));
  563. s = Flush();
  564. ASSERT_OK(s);
  565. ASSERT_EQ("val3", Get(Key(3)));
  566. Destroy(options);
  567. }
  568. TEST_F(DBErrorHandlingFSTest, ManifestWriteError) {
  569. std::shared_ptr<ErrorHandlerFSListener> listener =
  570. std::make_shared<ErrorHandlerFSListener>();
  571. Options options = GetDefaultOptions();
  572. options.env = fault_env_.get();
  573. options.create_if_missing = true;
  574. options.listeners.emplace_back(listener);
  575. Status s;
  576. std::string old_manifest;
  577. std::string new_manifest;
  578. listener->EnableAutoRecovery(false);
  579. DestroyAndReopen(options);
  580. old_manifest = GetManifestNameFromLiveFiles();
  581. ASSERT_OK(Put(Key(0), "val"));
  582. ASSERT_OK(Flush());
  583. ASSERT_OK(Put(Key(1), "val"));
  584. SyncPoint::GetInstance()->SetCallBack(
  585. "VersionSet::LogAndApply:WriteManifest", [&](void*) {
  586. fault_fs_->SetFilesystemActive(false,
  587. IOStatus::NoSpace("Out of space"));
  588. });
  589. SyncPoint::GetInstance()->EnableProcessing();
  590. s = Flush();
  591. ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
  592. ASSERT_FALSE(dbfull()->TEST_GetFilesToQuarantine().empty());
  593. SyncPoint::GetInstance()->ClearAllCallBacks();
  594. SyncPoint::GetInstance()->DisableProcessing();
  595. fault_fs_->SetFilesystemActive(true);
  596. s = dbfull()->Resume();
  597. ASSERT_OK(s);
  598. new_manifest = GetManifestNameFromLiveFiles();
  599. ASSERT_NE(new_manifest, old_manifest);
  600. ASSERT_TRUE(dbfull()->TEST_GetFilesToQuarantine().empty());
  601. Reopen(options);
  602. ASSERT_EQ("val", Get(Key(0)));
  603. ASSERT_EQ("val", Get(Key(1)));
  604. Close();
  605. }
  606. TEST_F(DBErrorHandlingFSTest, ManifestWriteRetryableError) {
  607. std::shared_ptr<ErrorHandlerFSListener> listener =
  608. std::make_shared<ErrorHandlerFSListener>();
  609. Options options = GetDefaultOptions();
  610. options.env = fault_env_.get();
  611. options.create_if_missing = true;
  612. options.listeners.emplace_back(listener);
  613. options.max_bgerror_resume_count = 0;
  614. Status s;
  615. std::string old_manifest;
  616. std::string new_manifest;
  617. listener->EnableAutoRecovery(false);
  618. DestroyAndReopen(options);
  619. old_manifest = GetManifestNameFromLiveFiles();
  620. IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
  621. error_msg.SetRetryable(true);
  622. ASSERT_OK(Put(Key(0), "val"));
  623. ASSERT_OK(Flush());
  624. ASSERT_OK(Put(Key(1), "val"));
  625. SyncPoint::GetInstance()->SetCallBack(
  626. "VersionSet::LogAndApply:WriteManifest",
  627. [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
  628. SyncPoint::GetInstance()->EnableProcessing();
  629. s = Flush();
  630. ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
  631. ASSERT_FALSE(dbfull()->TEST_GetFilesToQuarantine().empty());
  632. SyncPoint::GetInstance()->ClearAllCallBacks();
  633. SyncPoint::GetInstance()->DisableProcessing();
  634. fault_fs_->SetFilesystemActive(true);
  635. s = dbfull()->Resume();
  636. ASSERT_OK(s);
  637. new_manifest = GetManifestNameFromLiveFiles();
  638. ASSERT_NE(new_manifest, old_manifest);
  639. ASSERT_TRUE(dbfull()->TEST_GetFilesToQuarantine().empty());
  640. Reopen(options);
  641. ASSERT_EQ("val", Get(Key(0)));
  642. ASSERT_EQ("val", Get(Key(1)));
  643. Close();
  644. }
  645. TEST_F(DBErrorHandlingFSTest, ManifestWriteFileScopeError) {
  646. std::shared_ptr<ErrorHandlerFSListener> listener =
  647. std::make_shared<ErrorHandlerFSListener>();
  648. Options options = GetDefaultOptions();
  649. options.env = fault_env_.get();
  650. options.create_if_missing = true;
  651. options.listeners.emplace_back(listener);
  652. options.max_bgerror_resume_count = 0;
  653. Status s;
  654. std::string old_manifest;
  655. std::string new_manifest;
  656. listener->EnableAutoRecovery(false);
  657. DestroyAndReopen(options);
  658. old_manifest = GetManifestNameFromLiveFiles();
  659. IOStatus error_msg = IOStatus::IOError("File Scope Data Loss Error");
  660. error_msg.SetDataLoss(true);
  661. error_msg.SetScope(
  662. ROCKSDB_NAMESPACE::IOStatus::IOErrorScope::kIOErrorScopeFile);
  663. error_msg.SetRetryable(false);
  664. ASSERT_OK(Put(Key(0), "val"));
  665. ASSERT_OK(Flush());
  666. ASSERT_OK(Put(Key(1), "val"));
  667. SyncPoint::GetInstance()->SetCallBack(
  668. "VersionSet::LogAndApply:WriteManifest",
  669. [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
  670. SyncPoint::GetInstance()->EnableProcessing();
  671. s = Flush();
  672. ASSERT_FALSE(dbfull()->TEST_GetFilesToQuarantine().empty());
  673. ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
  674. SyncPoint::GetInstance()->ClearAllCallBacks();
  675. SyncPoint::GetInstance()->DisableProcessing();
  676. fault_fs_->SetFilesystemActive(true);
  677. s = dbfull()->Resume();
  678. ASSERT_OK(s);
  679. new_manifest = GetManifestNameFromLiveFiles();
  680. ASSERT_NE(new_manifest, old_manifest);
  681. ASSERT_TRUE(dbfull()->TEST_GetFilesToQuarantine().empty());
  682. Reopen(options);
  683. ASSERT_EQ("val", Get(Key(0)));
  684. ASSERT_EQ("val", Get(Key(1)));
  685. Close();
  686. }
  687. TEST_F(DBErrorHandlingFSTest, ManifestWriteNoWALRetryableError) {
  688. std::shared_ptr<ErrorHandlerFSListener> listener =
  689. std::make_shared<ErrorHandlerFSListener>();
  690. Options options = GetDefaultOptions();
  691. options.env = fault_env_.get();
  692. options.create_if_missing = true;
  693. options.listeners.emplace_back(listener);
  694. options.max_bgerror_resume_count = 0;
  695. Status s;
  696. std::string old_manifest;
  697. std::string new_manifest;
  698. listener->EnableAutoRecovery(false);
  699. DestroyAndReopen(options);
  700. old_manifest = GetManifestNameFromLiveFiles();
  701. IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
  702. error_msg.SetRetryable(true);
  703. WriteOptions wo = WriteOptions();
  704. wo.disableWAL = true;
  705. ASSERT_OK(Put(Key(0), "val", wo));
  706. ASSERT_OK(Flush());
  707. ASSERT_OK(Put(Key(1), "val", wo));
  708. SyncPoint::GetInstance()->SetCallBack(
  709. "VersionSet::LogAndApply:WriteManifest",
  710. [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
  711. SyncPoint::GetInstance()->EnableProcessing();
  712. s = Flush();
  713. ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
  714. ASSERT_FALSE(dbfull()->TEST_GetFilesToQuarantine().empty());
  715. SyncPoint::GetInstance()->ClearAllCallBacks();
  716. SyncPoint::GetInstance()->DisableProcessing();
  717. fault_fs_->SetFilesystemActive(true);
  718. s = dbfull()->Resume();
  719. ASSERT_OK(s);
  720. new_manifest = GetManifestNameFromLiveFiles();
  721. ASSERT_NE(new_manifest, old_manifest);
  722. ASSERT_TRUE(dbfull()->TEST_GetFilesToQuarantine().empty());
  723. Reopen(options);
  724. ASSERT_EQ("val", Get(Key(0)));
  725. ASSERT_EQ("val", Get(Key(1)));
  726. Close();
  727. }
  728. TEST_F(DBErrorHandlingFSTest, DoubleManifestWriteError) {
  729. std::shared_ptr<ErrorHandlerFSListener> listener =
  730. std::make_shared<ErrorHandlerFSListener>();
  731. Options options = GetDefaultOptions();
  732. options.env = fault_env_.get();
  733. options.create_if_missing = true;
  734. options.listeners.emplace_back(listener);
  735. Status s;
  736. std::string old_manifest;
  737. std::string new_manifest;
  738. listener->EnableAutoRecovery(false);
  739. DestroyAndReopen(options);
  740. old_manifest = GetManifestNameFromLiveFiles();
  741. ASSERT_OK(Put(Key(0), "val"));
  742. ASSERT_OK(Flush());
  743. ASSERT_OK(Put(Key(1), "val"));
  744. SyncPoint::GetInstance()->SetCallBack(
  745. "VersionSet::LogAndApply:WriteManifest", [&](void*) {
  746. fault_fs_->SetFilesystemActive(false,
  747. IOStatus::NoSpace("Out of space"));
  748. });
  749. SyncPoint::GetInstance()->EnableProcessing();
  750. s = Flush();
  751. ASSERT_TRUE(s.IsNoSpace());
  752. ASSERT_EQ(dbfull()->TEST_GetBGError().severity(),
  753. ROCKSDB_NAMESPACE::Status::Severity::kHardError);
  754. ASSERT_FALSE(dbfull()->TEST_GetFilesToQuarantine().empty());
  755. fault_fs_->SetFilesystemActive(true);
  756. // This Resume() will attempt to create a new manifest file and fail again
  757. s = dbfull()->Resume();
  758. ASSERT_TRUE(s.IsNoSpace());
  759. ASSERT_EQ(dbfull()->TEST_GetBGError().severity(),
  760. ROCKSDB_NAMESPACE::Status::Severity::kHardError);
  761. ASSERT_FALSE(dbfull()->TEST_GetFilesToQuarantine().empty());
  762. fault_fs_->SetFilesystemActive(true);
  763. SyncPoint::GetInstance()->ClearAllCallBacks();
  764. SyncPoint::GetInstance()->DisableProcessing();
  765. // A successful Resume() will create a new manifest file
  766. s = dbfull()->Resume();
  767. ASSERT_OK(s);
  768. new_manifest = GetManifestNameFromLiveFiles();
  769. ASSERT_NE(new_manifest, old_manifest);
  770. ASSERT_TRUE(dbfull()->TEST_GetFilesToQuarantine().empty());
  771. Reopen(options);
  772. ASSERT_EQ("val", Get(Key(0)));
  773. ASSERT_EQ("val", Get(Key(1)));
  774. Close();
  775. }
  776. TEST_F(DBErrorHandlingFSTest, CompactionManifestWriteError) {
  777. if (mem_env_ != nullptr) {
  778. ROCKSDB_GTEST_SKIP("Test requires non-mock environment");
  779. return;
  780. }
  781. std::shared_ptr<ErrorHandlerFSListener> listener =
  782. std::make_shared<ErrorHandlerFSListener>();
  783. Options options = GetDefaultOptions();
  784. options.env = fault_env_.get();
  785. options.create_if_missing = true;
  786. options.level0_file_num_compaction_trigger = 2;
  787. options.listeners.emplace_back(listener);
  788. Status s;
  789. std::string old_manifest;
  790. std::string new_manifest;
  791. std::atomic<bool> fail_manifest(false);
  792. DestroyAndReopen(options);
  793. old_manifest = GetManifestNameFromLiveFiles();
  794. ASSERT_OK(Put(Key(0), "val"));
  795. ASSERT_OK(Put(Key(2), "val"));
  796. s = Flush();
  797. ASSERT_OK(s);
  798. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
  799. // Wait for flush of 2nd L0 file before starting compaction
  800. {{"DBImpl::FlushMemTable:FlushMemTableFinished",
  801. "BackgroundCallCompaction:0"},
  802. // Wait for compaction to detect manifest write error
  803. {"BackgroundCallCompaction:1", "CompactionManifestWriteError:0"},
  804. // Make compaction thread wait for error to be cleared
  805. {"CompactionManifestWriteError:1",
  806. "DBImpl::BackgroundCallCompaction:FoundObsoleteFiles"},
  807. // Wait for DB instance to clear bg_error before calling
  808. // TEST_WaitForCompact
  809. {"SstFileManagerImpl::ErrorCleared", "CompactionManifestWriteError:2"}});
  810. // trigger manifest write failure in compaction thread
  811. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  812. "BackgroundCallCompaction:0", [&](void*) { fail_manifest.store(true); });
  813. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  814. "VersionSet::LogAndApply:WriteManifest", [&](void*) {
  815. if (fail_manifest.load()) {
  816. fault_fs_->SetFilesystemActive(false,
  817. IOStatus::NoSpace("Out of space"));
  818. }
  819. });
  820. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  821. ASSERT_OK(Put(Key(1), "val"));
  822. // This Flush will trigger a compaction, which will fail when appending to
  823. // the manifest
  824. s = Flush();
  825. ASSERT_OK(s);
  826. TEST_SYNC_POINT("CompactionManifestWriteError:0");
  827. // Clear all errors so when the compaction is retried, it will succeed
  828. fault_fs_->SetFilesystemActive(true);
  829. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
  830. TEST_SYNC_POINT("CompactionManifestWriteError:1");
  831. ASSERT_FALSE(dbfull()->TEST_GetFilesToQuarantine().empty());
  832. TEST_SYNC_POINT("CompactionManifestWriteError:2");
  833. s = dbfull()->TEST_WaitForCompact();
  834. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
  835. ASSERT_OK(s);
  836. new_manifest = GetManifestNameFromLiveFiles();
  837. ASSERT_NE(new_manifest, old_manifest);
  838. ASSERT_TRUE(dbfull()->TEST_GetFilesToQuarantine().empty());
  839. Reopen(options);
  840. ASSERT_EQ("val", Get(Key(0)));
  841. ASSERT_EQ("val", Get(Key(1)));
  842. ASSERT_EQ("val", Get(Key(2)));
  843. Close();
  844. }
  845. TEST_F(DBErrorHandlingFSTest, CompactionManifestWriteRetryableError) {
  846. std::shared_ptr<ErrorHandlerFSListener> listener =
  847. std::make_shared<ErrorHandlerFSListener>();
  848. Options options = GetDefaultOptions();
  849. options.env = fault_env_.get();
  850. options.create_if_missing = true;
  851. options.level0_file_num_compaction_trigger = 2;
  852. options.listeners.emplace_back(listener);
  853. options.max_bgerror_resume_count = 0;
  854. Status s;
  855. std::string old_manifest;
  856. std::string new_manifest;
  857. std::atomic<bool> fail_manifest(false);
  858. DestroyAndReopen(options);
  859. old_manifest = GetManifestNameFromLiveFiles();
  860. IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
  861. error_msg.SetRetryable(true);
  862. ASSERT_OK(Put(Key(0), "val"));
  863. ASSERT_OK(Put(Key(2), "val"));
  864. s = Flush();
  865. ASSERT_OK(s);
  866. listener->OverrideBGError(Status(error_msg, Status::Severity::kHardError));
  867. listener->EnableAutoRecovery(false);
  868. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
  869. // Wait for flush of 2nd L0 file before starting compaction
  870. {{"DBImpl::FlushMemTable:FlushMemTableFinished",
  871. "BackgroundCallCompaction:0"},
  872. // Wait for compaction to detect manifest write error
  873. {"BackgroundCallCompaction:1", "CompactionManifestWriteError:0"},
  874. // Make compaction thread wait for error to be cleared
  875. {"CompactionManifestWriteError:1",
  876. "DBImpl::BackgroundCallCompaction:FoundObsoleteFiles"}});
  877. // trigger manifest write failure in compaction thread
  878. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  879. "BackgroundCallCompaction:0", [&](void*) { fail_manifest.store(true); });
  880. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  881. "VersionSet::LogAndApply:WriteManifest", [&](void*) {
  882. if (fail_manifest.load()) {
  883. fault_fs_->SetFilesystemActive(false, error_msg);
  884. }
  885. });
  886. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  887. ASSERT_OK(Put(Key(1), "val"));
  888. s = Flush();
  889. ASSERT_OK(s);
  890. TEST_SYNC_POINT("CompactionManifestWriteError:0");
  891. ASSERT_FALSE(dbfull()->TEST_GetFilesToQuarantine().empty());
  892. TEST_SYNC_POINT("CompactionManifestWriteError:1");
  893. s = dbfull()->TEST_WaitForCompact();
  894. ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
  895. fault_fs_->SetFilesystemActive(true);
  896. SyncPoint::GetInstance()->ClearAllCallBacks();
  897. SyncPoint::GetInstance()->DisableProcessing();
  898. s = dbfull()->Resume();
  899. ASSERT_OK(s);
  900. new_manifest = GetManifestNameFromLiveFiles();
  901. ASSERT_NE(new_manifest, old_manifest);
  902. ASSERT_TRUE(dbfull()->TEST_GetFilesToQuarantine().empty());
  903. Reopen(options);
  904. ASSERT_EQ("val", Get(Key(0)));
  905. ASSERT_EQ("val", Get(Key(1)));
  906. ASSERT_EQ("val", Get(Key(2)));
  907. Close();
  908. }
  909. TEST_F(DBErrorHandlingFSTest, CompactionWriteError) {
  910. std::shared_ptr<ErrorHandlerFSListener> listener =
  911. std::make_shared<ErrorHandlerFSListener>();
  912. Options options = GetDefaultOptions();
  913. options.env = fault_env_.get();
  914. options.create_if_missing = true;
  915. options.level0_file_num_compaction_trigger = 2;
  916. options.listeners.emplace_back(listener);
  917. Status s;
  918. DestroyAndReopen(options);
  919. ASSERT_OK(Put(Key(0), "va;"));
  920. ASSERT_OK(Put(Key(2), "va;"));
  921. s = Flush();
  922. ASSERT_OK(s);
  923. listener->OverrideBGError(
  924. Status(Status::NoSpace(), Status::Severity::kHardError));
  925. listener->EnableAutoRecovery(false);
  926. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
  927. {{"DBImpl::FlushMemTable:FlushMemTableFinished",
  928. "BackgroundCallCompaction:0"}});
  929. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  930. "BackgroundCallCompaction:0", [&](void*) {
  931. fault_fs_->SetFilesystemActive(false,
  932. IOStatus::NoSpace("Out of space"));
  933. });
  934. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  935. ASSERT_OK(Put(Key(1), "val"));
  936. s = Flush();
  937. ASSERT_OK(s);
  938. s = dbfull()->TEST_WaitForCompact();
  939. ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
  940. fault_fs_->SetFilesystemActive(true);
  941. s = dbfull()->Resume();
  942. ASSERT_OK(s);
  943. Destroy(options);
  944. }
  945. TEST_F(DBErrorHandlingFSTest, DISABLED_CompactionWriteRetryableError) {
  946. std::shared_ptr<ErrorHandlerFSListener> listener =
  947. std::make_shared<ErrorHandlerFSListener>();
  948. Options options = GetDefaultOptions();
  949. options.env = fault_env_.get();
  950. options.create_if_missing = true;
  951. options.level0_file_num_compaction_trigger = 2;
  952. options.listeners.emplace_back(listener);
  953. options.max_bgerror_resume_count = 0;
  954. Status s;
  955. DestroyAndReopen(options);
  956. IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
  957. error_msg.SetRetryable(true);
  958. ASSERT_OK(Put(Key(0), "va;"));
  959. ASSERT_OK(Put(Key(2), "va;"));
  960. s = Flush();
  961. ASSERT_OK(s);
  962. listener->OverrideBGError(Status(error_msg, Status::Severity::kHardError));
  963. listener->EnableAutoRecovery(false);
  964. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
  965. {{"DBImpl::FlushMemTable:FlushMemTableFinished",
  966. "BackgroundCallCompaction:0"}});
  967. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  968. "CompactionJob::OpenCompactionOutputFile",
  969. [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
  970. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  971. "DBImpl::BackgroundCompaction:Finish",
  972. [&](void*) { CancelAllBackgroundWork(dbfull()); });
  973. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  974. ASSERT_OK(Put(Key(1), "val"));
  975. s = Flush();
  976. ASSERT_OK(s);
  977. s = dbfull()->TEST_GetBGError();
  978. ASSERT_OK(s);
  979. fault_fs_->SetFilesystemActive(true);
  980. SyncPoint::GetInstance()->ClearAllCallBacks();
  981. SyncPoint::GetInstance()->DisableProcessing();
  982. s = dbfull()->Resume();
  983. ASSERT_OK(s);
  984. Destroy(options);
  985. }
  986. TEST_F(DBErrorHandlingFSTest, DISABLED_CompactionWriteFileScopeError) {
  987. std::shared_ptr<ErrorHandlerFSListener> listener =
  988. std::make_shared<ErrorHandlerFSListener>();
  989. Options options = GetDefaultOptions();
  990. options.env = fault_env_.get();
  991. options.create_if_missing = true;
  992. options.level0_file_num_compaction_trigger = 2;
  993. options.listeners.emplace_back(listener);
  994. options.max_bgerror_resume_count = 0;
  995. Status s;
  996. DestroyAndReopen(options);
  997. IOStatus error_msg = IOStatus::IOError("File Scope Data Loss Error");
  998. error_msg.SetDataLoss(true);
  999. error_msg.SetScope(
  1000. ROCKSDB_NAMESPACE::IOStatus::IOErrorScope::kIOErrorScopeFile);
  1001. error_msg.SetRetryable(false);
  1002. ASSERT_OK(Put(Key(0), "va;"));
  1003. ASSERT_OK(Put(Key(2), "va;"));
  1004. s = Flush();
  1005. ASSERT_OK(s);
  1006. listener->OverrideBGError(Status(error_msg, Status::Severity::kHardError));
  1007. listener->EnableAutoRecovery(false);
  1008. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
  1009. {{"DBImpl::FlushMemTable:FlushMemTableFinished",
  1010. "BackgroundCallCompaction:0"}});
  1011. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  1012. "CompactionJob::OpenCompactionOutputFile",
  1013. [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
  1014. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  1015. "DBImpl::BackgroundCompaction:Finish",
  1016. [&](void*) { CancelAllBackgroundWork(dbfull()); });
  1017. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  1018. ASSERT_OK(Put(Key(1), "val"));
  1019. s = Flush();
  1020. ASSERT_OK(s);
  1021. s = dbfull()->TEST_GetBGError();
  1022. ASSERT_OK(s);
  1023. fault_fs_->SetFilesystemActive(true);
  1024. SyncPoint::GetInstance()->ClearAllCallBacks();
  1025. SyncPoint::GetInstance()->DisableProcessing();
  1026. s = dbfull()->Resume();
  1027. ASSERT_OK(s);
  1028. Destroy(options);
  1029. }
  1030. TEST_F(DBErrorHandlingFSTest, CorruptionError) {
  1031. Options options = GetDefaultOptions();
  1032. options.env = fault_env_.get();
  1033. options.create_if_missing = true;
  1034. options.level0_file_num_compaction_trigger = 2;
  1035. Status s;
  1036. DestroyAndReopen(options);
  1037. ASSERT_OK(Put(Key(0), "va;"));
  1038. ASSERT_OK(Put(Key(2), "va;"));
  1039. s = Flush();
  1040. ASSERT_OK(s);
  1041. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
  1042. {{"DBImpl::FlushMemTable:FlushMemTableFinished",
  1043. "BackgroundCallCompaction:0"}});
  1044. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  1045. "BackgroundCallCompaction:0", [&](void*) {
  1046. fault_fs_->SetFilesystemActive(false,
  1047. IOStatus::Corruption("Corruption"));
  1048. });
  1049. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  1050. ASSERT_OK(Put(Key(1), "val"));
  1051. s = Flush();
  1052. ASSERT_OK(s);
  1053. s = dbfull()->TEST_WaitForCompact();
  1054. ASSERT_EQ(s.severity(),
  1055. ROCKSDB_NAMESPACE::Status::Severity::kUnrecoverableError);
  1056. fault_fs_->SetFilesystemActive(true);
  1057. s = dbfull()->Resume();
  1058. ASSERT_NOK(s);
  1059. Destroy(options);
  1060. }
  1061. TEST_F(DBErrorHandlingFSTest, AutoRecoverFlushError) {
  1062. if (mem_env_ != nullptr) {
  1063. ROCKSDB_GTEST_SKIP("Test requires non-mock environment");
  1064. return;
  1065. }
  1066. std::shared_ptr<ErrorHandlerFSListener> listener =
  1067. std::make_shared<ErrorHandlerFSListener>();
  1068. Options options = GetDefaultOptions();
  1069. options.env = fault_env_.get();
  1070. options.create_if_missing = true;
  1071. options.listeners.emplace_back(listener);
  1072. options.statistics = CreateDBStatistics();
  1073. Status s;
  1074. listener->EnableAutoRecovery();
  1075. DestroyAndReopen(options);
  1076. ASSERT_OK(Put(Key(0), "val"));
  1077. SyncPoint::GetInstance()->SetCallBack("FlushJob::Start", [&](void*) {
  1078. fault_fs_->SetFilesystemActive(false, IOStatus::NoSpace("Out of space"));
  1079. });
  1080. SyncPoint::GetInstance()->EnableProcessing();
  1081. s = Flush();
  1082. ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
  1083. SyncPoint::GetInstance()->DisableProcessing();
  1084. fault_fs_->SetFilesystemActive(true);
  1085. ASSERT_EQ(listener->WaitForRecovery(5000000), true);
  1086. s = Put(Key(1), "val");
  1087. ASSERT_OK(s);
  1088. ASSERT_EQ(1, options.statistics->getAndResetTickerCount(
  1089. ERROR_HANDLER_BG_ERROR_COUNT));
  1090. ASSERT_EQ(1, options.statistics->getAndResetTickerCount(
  1091. ERROR_HANDLER_BG_IO_ERROR_COUNT));
  1092. ASSERT_EQ(0, options.statistics->getAndResetTickerCount(
  1093. ERROR_HANDLER_BG_RETRYABLE_IO_ERROR_COUNT));
  1094. ASSERT_EQ(0, options.statistics->getAndResetTickerCount(
  1095. ERROR_HANDLER_AUTORESUME_COUNT));
  1096. ASSERT_EQ(0, options.statistics->getAndResetTickerCount(
  1097. ERROR_HANDLER_AUTORESUME_RETRY_TOTAL_COUNT));
  1098. ASSERT_EQ(0, options.statistics->getAndResetTickerCount(
  1099. ERROR_HANDLER_AUTORESUME_SUCCESS_COUNT));
  1100. ASSERT_OK(dbfull()->SyncWAL());
  1101. Reopen(options);
  1102. ASSERT_EQ("val", Get(Key(0)));
  1103. ASSERT_EQ("val", Get(Key(1)));
  1104. Destroy(options);
  1105. }
  1106. TEST_F(DBErrorHandlingFSTest, FailRecoverFlushError) {
  1107. std::shared_ptr<ErrorHandlerFSListener> listener =
  1108. std::make_shared<ErrorHandlerFSListener>();
  1109. Options options = GetDefaultOptions();
  1110. options.env = fault_env_.get();
  1111. options.create_if_missing = true;
  1112. options.listeners.emplace_back(listener);
  1113. Status s;
  1114. listener->EnableAutoRecovery();
  1115. DestroyAndReopen(options);
  1116. ASSERT_OK(Put(Key(0), "val"));
  1117. SyncPoint::GetInstance()->SetCallBack("FlushJob::Start", [&](void*) {
  1118. fault_fs_->SetFilesystemActive(false, IOStatus::NoSpace("Out of space"));
  1119. });
  1120. SyncPoint::GetInstance()->EnableProcessing();
  1121. s = Flush();
  1122. ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
  1123. // We should be able to shutdown the database while auto recovery is going
  1124. // on in the background
  1125. Close();
  1126. DestroyDB(dbname_, options).PermitUncheckedError();
  1127. }
  1128. TEST_F(DBErrorHandlingFSTest, WALWriteError) {
  1129. if (mem_env_ != nullptr) {
  1130. ROCKSDB_GTEST_SKIP("Test requires non-mock environment");
  1131. return;
  1132. }
  1133. std::shared_ptr<ErrorHandlerFSListener> listener =
  1134. std::make_shared<ErrorHandlerFSListener>();
  1135. Options options = GetDefaultOptions();
  1136. options.env = fault_env_.get();
  1137. options.create_if_missing = true;
  1138. options.writable_file_max_buffer_size = 32768;
  1139. options.listeners.emplace_back(listener);
  1140. Status s;
  1141. Random rnd(301);
  1142. listener->EnableAutoRecovery();
  1143. DestroyAndReopen(options);
  1144. {
  1145. WriteBatch batch;
  1146. for (auto i = 0; i < 100; ++i) {
  1147. ASSERT_OK(batch.Put(Key(i), rnd.RandomString(1024)));
  1148. }
  1149. WriteOptions wopts;
  1150. wopts.sync = true;
  1151. ASSERT_OK(dbfull()->Write(wopts, &batch));
  1152. };
  1153. {
  1154. WriteBatch batch;
  1155. int write_error = 0;
  1156. for (auto i = 100; i < 199; ++i) {
  1157. ASSERT_OK(batch.Put(Key(i), rnd.RandomString(1024)));
  1158. }
  1159. SyncPoint::GetInstance()->SetCallBack(
  1160. "WritableFileWriter::Append:BeforePrepareWrite", [&](void*) {
  1161. write_error++;
  1162. if (write_error > 2) {
  1163. fault_fs_->SetFilesystemActive(false,
  1164. IOStatus::NoSpace("Out of space"));
  1165. }
  1166. });
  1167. SyncPoint::GetInstance()->EnableProcessing();
  1168. WriteOptions wopts;
  1169. wopts.sync = true;
  1170. s = dbfull()->Write(wopts, &batch);
  1171. ASSERT_EQ(s, s.NoSpace());
  1172. }
  1173. SyncPoint::GetInstance()->DisableProcessing();
  1174. // `ClearAllCallBacks()` is needed in addition to `DisableProcessing()` to
  1175. // drain all callbacks. Otherwise, a pending callback in the background
  1176. // could re-disable `fault_fs_` after we enable it below.
  1177. SyncPoint::GetInstance()->ClearAllCallBacks();
  1178. fault_fs_->SetFilesystemActive(true);
  1179. ASSERT_EQ(listener->WaitForRecovery(5000000), true);
  1180. for (auto i = 0; i < 199; ++i) {
  1181. if (i < 100) {
  1182. ASSERT_NE(Get(Key(i)), "NOT_FOUND");
  1183. } else {
  1184. ASSERT_EQ(Get(Key(i)), "NOT_FOUND");
  1185. }
  1186. }
  1187. Reopen(options);
  1188. for (auto i = 0; i < 199; ++i) {
  1189. if (i < 100) {
  1190. ASSERT_NE(Get(Key(i)), "NOT_FOUND");
  1191. } else {
  1192. ASSERT_EQ(Get(Key(i)), "NOT_FOUND");
  1193. }
  1194. }
  1195. Close();
  1196. }
  1197. TEST_F(DBErrorHandlingFSTest, WALWriteRetryableError) {
  1198. std::shared_ptr<ErrorHandlerFSListener> listener =
  1199. std::make_shared<ErrorHandlerFSListener>();
  1200. Options options = GetDefaultOptions();
  1201. options.env = fault_env_.get();
  1202. options.create_if_missing = true;
  1203. options.writable_file_max_buffer_size = 32768;
  1204. options.listeners.emplace_back(listener);
  1205. options.paranoid_checks = true;
  1206. options.max_bgerror_resume_count = 0;
  1207. Random rnd(301);
  1208. DestroyAndReopen(options);
  1209. IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
  1210. error_msg.SetRetryable(true);
  1211. // For the first batch, write is successful, require sync
  1212. {
  1213. WriteBatch batch;
  1214. for (auto i = 0; i < 100; ++i) {
  1215. ASSERT_OK(batch.Put(Key(i), rnd.RandomString(1024)));
  1216. }
  1217. WriteOptions wopts;
  1218. wopts.sync = true;
  1219. ASSERT_OK(dbfull()->Write(wopts, &batch));
  1220. };
  1221. // For the second batch, the first 2 file Append are successful, then the
  1222. // following Append fails due to file system retryable IOError.
  1223. {
  1224. WriteBatch batch;
  1225. int write_error = 0;
  1226. for (auto i = 100; i < 200; ++i) {
  1227. ASSERT_OK(batch.Put(Key(i), rnd.RandomString(1024)));
  1228. }
  1229. SyncPoint::GetInstance()->SetCallBack(
  1230. "WritableFileWriter::Append:BeforePrepareWrite", [&](void*) {
  1231. write_error++;
  1232. if (write_error > 2) {
  1233. fault_fs_->SetFilesystemActive(false, error_msg);
  1234. }
  1235. });
  1236. SyncPoint::GetInstance()->EnableProcessing();
  1237. WriteOptions wopts;
  1238. wopts.sync = true;
  1239. Status s = dbfull()->Write(wopts, &batch);
  1240. ASSERT_TRUE(s.IsIOError());
  1241. }
  1242. fault_fs_->SetFilesystemActive(true);
  1243. SyncPoint::GetInstance()->ClearAllCallBacks();
  1244. SyncPoint::GetInstance()->DisableProcessing();
  1245. // Data in corrupted WAL are not stored
  1246. for (auto i = 0; i < 199; ++i) {
  1247. if (i < 100) {
  1248. ASSERT_NE(Get(Key(i)), "NOT_FOUND");
  1249. } else {
  1250. ASSERT_EQ(Get(Key(i)), "NOT_FOUND");
  1251. }
  1252. }
  1253. // Resume and write a new batch, should be in the WAL
  1254. ASSERT_OK(dbfull()->Resume());
  1255. {
  1256. WriteBatch batch;
  1257. for (auto i = 200; i < 300; ++i) {
  1258. ASSERT_OK(batch.Put(Key(i), rnd.RandomString(1024)));
  1259. }
  1260. WriteOptions wopts;
  1261. wopts.sync = true;
  1262. ASSERT_OK(dbfull()->Write(wopts, &batch));
  1263. };
  1264. Reopen(options);
  1265. for (auto i = 0; i < 300; ++i) {
  1266. if (i < 100 || i >= 200) {
  1267. ASSERT_NE(Get(Key(i)), "NOT_FOUND");
  1268. } else {
  1269. ASSERT_EQ(Get(Key(i)), "NOT_FOUND");
  1270. }
  1271. }
  1272. Close();
  1273. }
  1274. TEST_F(DBErrorHandlingFSTest, MultiCFWALWriteError) {
  1275. if (mem_env_ != nullptr) {
  1276. ROCKSDB_GTEST_SKIP("Test requires non-mock environment");
  1277. return;
  1278. }
  1279. std::shared_ptr<ErrorHandlerFSListener> listener =
  1280. std::make_shared<ErrorHandlerFSListener>();
  1281. Options options = GetDefaultOptions();
  1282. options.env = fault_env_.get();
  1283. options.create_if_missing = true;
  1284. options.writable_file_max_buffer_size = 32768;
  1285. options.listeners.emplace_back(listener);
  1286. Random rnd(301);
  1287. listener->EnableAutoRecovery();
  1288. CreateAndReopenWithCF({"one", "two", "three"}, options);
  1289. {
  1290. WriteBatch batch;
  1291. for (auto i = 1; i < 4; ++i) {
  1292. for (auto j = 0; j < 100; ++j) {
  1293. ASSERT_OK(batch.Put(handles_[i], Key(j), rnd.RandomString(1024)));
  1294. }
  1295. }
  1296. WriteOptions wopts;
  1297. wopts.sync = true;
  1298. ASSERT_OK(dbfull()->Write(wopts, &batch));
  1299. };
  1300. {
  1301. WriteBatch batch;
  1302. int write_error = 0;
  1303. // Write to one CF
  1304. for (auto i = 100; i < 199; ++i) {
  1305. ASSERT_OK(batch.Put(handles_[2], Key(i), rnd.RandomString(1024)));
  1306. }
  1307. SyncPoint::GetInstance()->SetCallBack(
  1308. "WritableFileWriter::Append:BeforePrepareWrite", [&](void*) {
  1309. write_error++;
  1310. if (write_error > 2) {
  1311. fault_fs_->SetFilesystemActive(false,
  1312. IOStatus::NoSpace("Out of space"));
  1313. }
  1314. });
  1315. SyncPoint::GetInstance()->EnableProcessing();
  1316. WriteOptions wopts;
  1317. wopts.sync = true;
  1318. Status s = dbfull()->Write(wopts, &batch);
  1319. ASSERT_TRUE(s.IsNoSpace());
  1320. }
  1321. SyncPoint::GetInstance()->DisableProcessing();
  1322. // `ClearAllCallBacks()` is needed in addition to `DisableProcessing()` to
  1323. // drain all callbacks. Otherwise, a pending callback in the background
  1324. // could re-disable `fault_fs_` after we enable it below.
  1325. SyncPoint::GetInstance()->ClearAllCallBacks();
  1326. fault_fs_->SetFilesystemActive(true);
  1327. ASSERT_EQ(listener->WaitForRecovery(5000000), true);
  1328. for (auto i = 1; i < 4; ++i) {
  1329. // Every CF should have been flushed
  1330. ASSERT_EQ(NumTableFilesAtLevel(0, i), 1);
  1331. }
  1332. for (auto i = 1; i < 4; ++i) {
  1333. for (auto j = 0; j < 199; ++j) {
  1334. if (j < 100) {
  1335. ASSERT_NE(Get(i, Key(j)), "NOT_FOUND");
  1336. } else {
  1337. ASSERT_EQ(Get(i, Key(j)), "NOT_FOUND");
  1338. }
  1339. }
  1340. }
  1341. ReopenWithColumnFamilies({"default", "one", "two", "three"}, options);
  1342. for (auto i = 1; i < 4; ++i) {
  1343. for (auto j = 0; j < 199; ++j) {
  1344. if (j < 100) {
  1345. ASSERT_NE(Get(i, Key(j)), "NOT_FOUND");
  1346. } else {
  1347. ASSERT_EQ(Get(i, Key(j)), "NOT_FOUND");
  1348. }
  1349. }
  1350. }
  1351. Close();
  1352. }
  1353. TEST_F(DBErrorHandlingFSTest, MultiDBCompactionError) {
  1354. if (mem_env_ != nullptr) {
  1355. ROCKSDB_GTEST_SKIP("Test requires non-mock environment");
  1356. return;
  1357. }
  1358. FaultInjectionTestEnv* def_env = new FaultInjectionTestEnv(env_);
  1359. std::vector<std::unique_ptr<Env>> fault_envs;
  1360. std::vector<FaultInjectionTestFS*> fault_fs;
  1361. std::vector<Options> options;
  1362. std::vector<std::shared_ptr<ErrorHandlerFSListener>> listener;
  1363. std::vector<DB*> db;
  1364. std::shared_ptr<SstFileManager> sfm(NewSstFileManager(def_env));
  1365. int kNumDbInstances = 3;
  1366. Random rnd(301);
  1367. for (auto i = 0; i < kNumDbInstances; ++i) {
  1368. listener.emplace_back(new ErrorHandlerFSListener());
  1369. options.emplace_back(GetDefaultOptions());
  1370. fault_fs.emplace_back(new FaultInjectionTestFS(env_->GetFileSystem()));
  1371. std::shared_ptr<FileSystem> fs(fault_fs.back());
  1372. fault_envs.emplace_back(new CompositeEnvWrapper(def_env, fs));
  1373. options[i].env = fault_envs.back().get();
  1374. options[i].create_if_missing = true;
  1375. options[i].level0_file_num_compaction_trigger = 2;
  1376. options[i].writable_file_max_buffer_size = 32768;
  1377. options[i].listeners.emplace_back(listener[i]);
  1378. options[i].sst_file_manager = sfm;
  1379. DB* dbptr;
  1380. char buf[16];
  1381. listener[i]->EnableAutoRecovery();
  1382. // Setup for returning error for the 3rd SST, which would be level 1
  1383. listener[i]->InjectFileCreationError(fault_fs[i], 3,
  1384. IOStatus::NoSpace("Out of space"));
  1385. snprintf(buf, sizeof(buf), "_%d", i);
  1386. ASSERT_OK(DestroyDB(dbname_ + std::string(buf), options[i]));
  1387. ASSERT_OK(DB::Open(options[i], dbname_ + std::string(buf), &dbptr));
  1388. db.emplace_back(dbptr);
  1389. }
  1390. for (auto i = 0; i < kNumDbInstances; ++i) {
  1391. WriteBatch batch;
  1392. for (auto j = 0; j <= 100; ++j) {
  1393. ASSERT_OK(batch.Put(Key(j), rnd.RandomString(1024)));
  1394. }
  1395. WriteOptions wopts;
  1396. wopts.sync = true;
  1397. ASSERT_OK(db[i]->Write(wopts, &batch));
  1398. ASSERT_OK(db[i]->Flush(FlushOptions()));
  1399. }
  1400. def_env->SetFilesystemActive(false, Status::NoSpace("Out of space"));
  1401. for (auto i = 0; i < kNumDbInstances; ++i) {
  1402. WriteBatch batch;
  1403. // Write to one CF
  1404. for (auto j = 100; j < 199; ++j) {
  1405. ASSERT_OK(batch.Put(Key(j), rnd.RandomString(1024)));
  1406. }
  1407. WriteOptions wopts;
  1408. wopts.sync = true;
  1409. ASSERT_OK(db[i]->Write(wopts, &batch));
  1410. ASSERT_OK(db[i]->Flush(FlushOptions()));
  1411. }
  1412. for (auto i = 0; i < kNumDbInstances; ++i) {
  1413. Status s = static_cast<DBImpl*>(db[i])->TEST_WaitForCompact();
  1414. ASSERT_EQ(s.severity(), Status::Severity::kSoftError);
  1415. fault_fs[i]->SetFilesystemActive(true);
  1416. }
  1417. def_env->SetFilesystemActive(true);
  1418. for (auto i = 0; i < kNumDbInstances; ++i) {
  1419. std::string prop;
  1420. ASSERT_EQ(listener[i]->WaitForRecovery(5000000), true);
  1421. ASSERT_OK(static_cast<DBImpl*>(db[i])->TEST_WaitForCompact());
  1422. EXPECT_TRUE(db[i]->GetProperty(
  1423. "rocksdb.num-files-at-level" + std::to_string(0), &prop));
  1424. EXPECT_EQ(atoi(prop.c_str()), 0);
  1425. EXPECT_TRUE(db[i]->GetProperty(
  1426. "rocksdb.num-files-at-level" + std::to_string(1), &prop));
  1427. EXPECT_EQ(atoi(prop.c_str()), 1);
  1428. }
  1429. SstFileManagerImpl* sfmImpl =
  1430. static_cast_with_check<SstFileManagerImpl>(sfm.get());
  1431. sfmImpl->Close();
  1432. for (auto i = 0; i < kNumDbInstances; ++i) {
  1433. char buf[16];
  1434. snprintf(buf, sizeof(buf), "_%d", i);
  1435. delete db[i];
  1436. fault_fs[i]->SetFilesystemActive(true);
  1437. if (getenv("KEEP_DB")) {
  1438. printf("DB is still at %s%s\n", dbname_.c_str(), buf);
  1439. } else {
  1440. ASSERT_OK(DestroyDB(dbname_ + std::string(buf), options[i]));
  1441. }
  1442. }
  1443. options.clear();
  1444. sfm.reset();
  1445. delete def_env;
  1446. }
  1447. TEST_F(DBErrorHandlingFSTest, MultiDBVariousErrors) {
  1448. if (mem_env_ != nullptr) {
  1449. ROCKSDB_GTEST_SKIP("Test requires non-mock environment");
  1450. return;
  1451. }
  1452. FaultInjectionTestEnv* def_env = new FaultInjectionTestEnv(env_);
  1453. std::vector<std::unique_ptr<Env>> fault_envs;
  1454. std::vector<FaultInjectionTestFS*> fault_fs;
  1455. std::vector<Options> options;
  1456. std::vector<std::shared_ptr<ErrorHandlerFSListener>> listener;
  1457. std::vector<DB*> db;
  1458. std::shared_ptr<SstFileManager> sfm(NewSstFileManager(def_env));
  1459. int kNumDbInstances = 3;
  1460. Random rnd(301);
  1461. for (auto i = 0; i < kNumDbInstances; ++i) {
  1462. listener.emplace_back(new ErrorHandlerFSListener());
  1463. options.emplace_back(GetDefaultOptions());
  1464. fault_fs.emplace_back(new FaultInjectionTestFS(env_->GetFileSystem()));
  1465. std::shared_ptr<FileSystem> fs(fault_fs.back());
  1466. fault_envs.emplace_back(new CompositeEnvWrapper(def_env, fs));
  1467. options[i].env = fault_envs.back().get();
  1468. options[i].create_if_missing = true;
  1469. options[i].level0_file_num_compaction_trigger = 2;
  1470. options[i].writable_file_max_buffer_size = 32768;
  1471. options[i].listeners.emplace_back(listener[i]);
  1472. options[i].sst_file_manager = sfm;
  1473. DB* dbptr;
  1474. char buf[16];
  1475. listener[i]->EnableAutoRecovery();
  1476. switch (i) {
  1477. case 0:
  1478. // Setup for returning error for the 3rd SST, which would be level 1
  1479. listener[i]->InjectFileCreationError(fault_fs[i], 3,
  1480. IOStatus::NoSpace("Out of space"));
  1481. break;
  1482. case 1:
  1483. // Setup for returning error after the 1st SST, which would result
  1484. // in a hard error
  1485. listener[i]->InjectFileCreationError(fault_fs[i], 2,
  1486. IOStatus::NoSpace("Out of space"));
  1487. break;
  1488. default:
  1489. break;
  1490. }
  1491. snprintf(buf, sizeof(buf), "_%d", i);
  1492. ASSERT_OK(DestroyDB(dbname_ + std::string(buf), options[i]));
  1493. ASSERT_OK(DB::Open(options[i], dbname_ + std::string(buf), &dbptr));
  1494. db.emplace_back(dbptr);
  1495. }
  1496. for (auto i = 0; i < kNumDbInstances; ++i) {
  1497. WriteBatch batch;
  1498. for (auto j = 0; j <= 100; ++j) {
  1499. ASSERT_OK(batch.Put(Key(j), rnd.RandomString(1024)));
  1500. }
  1501. WriteOptions wopts;
  1502. wopts.sync = true;
  1503. ASSERT_OK(db[i]->Write(wopts, &batch));
  1504. ASSERT_OK(db[i]->Flush(FlushOptions()));
  1505. }
  1506. def_env->SetFilesystemActive(false, Status::NoSpace("Out of space"));
  1507. for (auto i = 0; i < kNumDbInstances; ++i) {
  1508. WriteBatch batch;
  1509. // Write to one CF
  1510. for (auto j = 100; j < 199; ++j) {
  1511. ASSERT_OK(batch.Put(Key(j), rnd.RandomString(1024)));
  1512. }
  1513. WriteOptions wopts;
  1514. wopts.sync = true;
  1515. ASSERT_OK(db[i]->Write(wopts, &batch));
  1516. if (i != 1) {
  1517. ASSERT_OK(db[i]->Flush(FlushOptions()));
  1518. } else {
  1519. ASSERT_TRUE(db[i]->Flush(FlushOptions()).IsNoSpace());
  1520. }
  1521. }
  1522. for (auto i = 0; i < kNumDbInstances; ++i) {
  1523. Status s = static_cast<DBImpl*>(db[i])->TEST_WaitForCompact();
  1524. switch (i) {
  1525. case 0:
  1526. ASSERT_EQ(s.severity(), Status::Severity::kSoftError);
  1527. break;
  1528. case 1:
  1529. ASSERT_EQ(s.severity(), Status::Severity::kHardError);
  1530. break;
  1531. case 2:
  1532. ASSERT_OK(s);
  1533. break;
  1534. }
  1535. fault_fs[i]->SetFilesystemActive(true);
  1536. }
  1537. def_env->SetFilesystemActive(true);
  1538. for (auto i = 0; i < kNumDbInstances; ++i) {
  1539. std::string prop;
  1540. if (i < 2) {
  1541. ASSERT_EQ(listener[i]->WaitForRecovery(5000000), true);
  1542. }
  1543. if (i == 1) {
  1544. ASSERT_OK(static_cast<DBImpl*>(db[i])->TEST_WaitForCompact());
  1545. }
  1546. EXPECT_TRUE(db[i]->GetProperty(
  1547. "rocksdb.num-files-at-level" + std::to_string(0), &prop));
  1548. EXPECT_EQ(atoi(prop.c_str()), 0);
  1549. EXPECT_TRUE(db[i]->GetProperty(
  1550. "rocksdb.num-files-at-level" + std::to_string(1), &prop));
  1551. EXPECT_EQ(atoi(prop.c_str()), 1);
  1552. }
  1553. SstFileManagerImpl* sfmImpl =
  1554. static_cast_with_check<SstFileManagerImpl>(sfm.get());
  1555. sfmImpl->Close();
  1556. for (auto i = 0; i < kNumDbInstances; ++i) {
  1557. char buf[16];
  1558. snprintf(buf, sizeof(buf), "_%d", i);
  1559. fault_fs[i]->SetFilesystemActive(true);
  1560. delete db[i];
  1561. if (getenv("KEEP_DB")) {
  1562. printf("DB is still at %s%s\n", dbname_.c_str(), buf);
  1563. } else {
  1564. EXPECT_OK(DestroyDB(dbname_ + std::string(buf), options[i]));
  1565. }
  1566. }
  1567. options.clear();
  1568. delete def_env;
  1569. }
  1570. // When Put the KV-pair, the write option is set to disable WAL.
  1571. // If retryable error happens in this condition, map the bg error
  1572. // to soft error and trigger auto resume. During auto resume, SwitchMemtable
  1573. // is disabled to avoid small SST tables. Write can still be applied before
  1574. // the bg error is cleaned unless the memtable is full.
  1575. TEST_F(DBErrorHandlingFSTest, FlushWritNoWALRetryableErrorAutoRecover1) {
  1576. // Activate the FS before the first resume
  1577. std::shared_ptr<ErrorHandlerFSListener> listener =
  1578. std::make_shared<ErrorHandlerFSListener>();
  1579. Options options = GetDefaultOptions();
  1580. options.env = fault_env_.get();
  1581. options.create_if_missing = true;
  1582. options.listeners.emplace_back(listener);
  1583. options.max_bgerror_resume_count = 2;
  1584. options.bgerror_resume_retry_interval = 100000; // 0.1 second
  1585. options.statistics = CreateDBStatistics();
  1586. Status s;
  1587. listener->EnableAutoRecovery(false);
  1588. DestroyAndReopen(options);
  1589. IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
  1590. error_msg.SetRetryable(true);
  1591. WriteOptions wo = WriteOptions();
  1592. wo.disableWAL = true;
  1593. ASSERT_OK(Put(Key(1), "val1", wo));
  1594. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
  1595. {{"RecoverFromRetryableBGIOError:LoopOut",
  1596. "FlushWritNoWALRetryableeErrorAutoRecover1:1"}});
  1597. SyncPoint::GetInstance()->SetCallBack(
  1598. "BuildTable:BeforeFinishBuildTable",
  1599. [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
  1600. SyncPoint::GetInstance()->EnableProcessing();
  1601. s = Flush();
  1602. ASSERT_EQ("val1", Get(Key(1)));
  1603. ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
  1604. TEST_SYNC_POINT("FlushWritNoWALRetryableeErrorAutoRecover1:1");
  1605. ASSERT_EQ("val1", Get(Key(1)));
  1606. ASSERT_EQ("val1", Get(Key(1)));
  1607. SyncPoint::GetInstance()->DisableProcessing();
  1608. fault_fs_->SetFilesystemActive(true);
  1609. ASSERT_EQ(3, options.statistics->getAndResetTickerCount(
  1610. ERROR_HANDLER_BG_ERROR_COUNT));
  1611. ASSERT_EQ(3, options.statistics->getAndResetTickerCount(
  1612. ERROR_HANDLER_BG_IO_ERROR_COUNT));
  1613. ASSERT_EQ(3, options.statistics->getAndResetTickerCount(
  1614. ERROR_HANDLER_BG_RETRYABLE_IO_ERROR_COUNT));
  1615. ASSERT_EQ(1, options.statistics->getAndResetTickerCount(
  1616. ERROR_HANDLER_AUTORESUME_COUNT));
  1617. ASSERT_LE(0, options.statistics->getAndResetTickerCount(
  1618. ERROR_HANDLER_AUTORESUME_RETRY_TOTAL_COUNT));
  1619. ASSERT_LE(0, options.statistics->getAndResetTickerCount(
  1620. ERROR_HANDLER_AUTORESUME_SUCCESS_COUNT));
  1621. HistogramData autoresume_retry;
  1622. options.statistics->histogramData(ERROR_HANDLER_AUTORESUME_RETRY_COUNT,
  1623. &autoresume_retry);
  1624. ASSERT_GE(autoresume_retry.max, 0);
  1625. ASSERT_OK(Put(Key(2), "val2", wo));
  1626. s = Flush();
  1627. // Since auto resume fails, the bg error is not cleand, flush will
  1628. // return the bg_error set before.
  1629. ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
  1630. ASSERT_EQ("val2", Get(Key(2)));
  1631. // call auto resume
  1632. ASSERT_OK(dbfull()->Resume());
  1633. ASSERT_OK(Put(Key(3), "val3", wo));
  1634. // After resume is successful, the flush should be ok.
  1635. ASSERT_OK(Flush());
  1636. ASSERT_EQ("val3", Get(Key(3)));
  1637. Destroy(options);
  1638. }
  1639. TEST_F(DBErrorHandlingFSTest, MultipleRecoveryThreads) {
  1640. // This test creates a scenario where second write's recovery can get started
  1641. // while mutex is released for a short period during
  1642. // NotifyOnErrorRecoveryEnd() from the first write's recovery. This is to make
  1643. // sure RecoverFromRetryableBGIOError() from the second write's recovery
  1644. // thread does not start with recovery_in_prog_ = false;
  1645. std::shared_ptr<ErrorHandlerFSListener> listener =
  1646. std::make_shared<ErrorHandlerFSListener>();
  1647. Options options = GetDefaultOptions();
  1648. options.env = fault_env_.get();
  1649. options.create_if_missing = true;
  1650. options.listeners.emplace_back(listener);
  1651. options.max_bgerror_resume_count = 100;
  1652. options.bgerror_resume_retry_interval = 1000000; // 1 second
  1653. options.statistics = CreateDBStatistics();
  1654. listener->EnableAutoRecovery(false);
  1655. DestroyAndReopen(options);
  1656. IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
  1657. error_msg.SetRetryable(true);
  1658. WriteOptions wo = WriteOptions();
  1659. wo.disableWAL = true;
  1660. fault_fs_->SetFilesystemActive(false, error_msg);
  1661. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
  1662. {{"NotifyOnErrorRecoveryEnd:MutexUnlocked:1",
  1663. "MultipleRecoveryThreads:1"},
  1664. {"MultipleRecoveryThreads:2",
  1665. "NotifyOnErrorRecoveryEnd:MutexUnlocked:2"},
  1666. {"StartRecoverFromRetryableBGIOError:BeforeWaitingForOtherThread",
  1667. "MultipleRecoveryThreads:3"},
  1668. {"RecoverFromRetryableBGIOError:RecoverSuccess",
  1669. "MultipleRecoveryThreads:4"},
  1670. {"MultipleRecoveryThreads:4",
  1671. "StartRecoverFromRetryableBGIOError:AfterWaitingForOtherThread"}});
  1672. SyncPoint::GetInstance()->EnableProcessing();
  1673. // First write with read fault injected and recovery will start
  1674. {
  1675. ASSERT_OK(Put(Key(1), "val1", wo));
  1676. Status s = Flush();
  1677. ASSERT_NOK(s);
  1678. }
  1679. // Remove read fault injection so that first recovery can go through
  1680. fault_fs_->SetFilesystemActive(true);
  1681. // At this point, first recovery is now at NotifyOnErrorRecoveryEnd. Mutex is
  1682. // released.
  1683. TEST_SYNC_POINT("MultipleRecoveryThreads:1");
  1684. ROCKSDB_NAMESPACE::port::Thread second_write([&] {
  1685. // Second write with read fault injected
  1686. fault_fs_->SetFilesystemActive(false, error_msg);
  1687. ASSERT_OK(Put(Key(2), "val2", wo));
  1688. Status s = Flush();
  1689. ASSERT_NOK(s);
  1690. });
  1691. // Second bg thread before waiting for the first thread's recovery thread
  1692. TEST_SYNC_POINT("MultipleRecoveryThreads:3");
  1693. // First thread's recovery thread continues
  1694. TEST_SYNC_POINT("MultipleRecoveryThreads:2");
  1695. // Wait for the first thread's recovery to finish
  1696. // (this sets recovery_in_prog_ = false)
  1697. // And second thread continues and starts recovery thread
  1698. TEST_SYNC_POINT("MultipleRecoveryThreads:4");
  1699. second_write.join();
  1700. // Remove error injection so that second thread recovery can go through
  1701. fault_fs_->SetFilesystemActive(true);
  1702. // Set up sync point so that we can wait for the recovery thread to finish
  1703. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
  1704. {{"RecoverFromRetryableBGIOError:RecoverSuccess",
  1705. "MultipleRecoveryThreads:6"}});
  1706. // Wait for the second thread's recovery to be done
  1707. TEST_SYNC_POINT("MultipleRecoveryThreads:6");
  1708. SyncPoint::GetInstance()->DisableProcessing();
  1709. SyncPoint::GetInstance()->ClearAllCallBacks();
  1710. Destroy(options);
  1711. }
  1712. TEST_F(DBErrorHandlingFSTest, FlushWritNoWALRetryableErrorAutoRecover2) {
  1713. // Activate the FS before the first resume
  1714. std::shared_ptr<ErrorHandlerFSListener> listener =
  1715. std::make_shared<ErrorHandlerFSListener>();
  1716. Options options = GetDefaultOptions();
  1717. options.env = fault_env_.get();
  1718. options.create_if_missing = true;
  1719. options.listeners.emplace_back(listener);
  1720. options.max_bgerror_resume_count = 2;
  1721. options.bgerror_resume_retry_interval = 100000; // 0.1 second
  1722. options.statistics = CreateDBStatistics();
  1723. Status s;
  1724. listener->EnableAutoRecovery(false);
  1725. DestroyAndReopen(options);
  1726. IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
  1727. error_msg.SetRetryable(true);
  1728. WriteOptions wo = WriteOptions();
  1729. wo.disableWAL = true;
  1730. ASSERT_OK(Put(Key(1), "val1", wo));
  1731. SyncPoint::GetInstance()->SetCallBack(
  1732. "BuildTable:BeforeFinishBuildTable",
  1733. [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
  1734. SyncPoint::GetInstance()->EnableProcessing();
  1735. s = Flush();
  1736. ASSERT_EQ("val1", Get(Key(1)));
  1737. ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
  1738. SyncPoint::GetInstance()->DisableProcessing();
  1739. fault_fs_->SetFilesystemActive(true);
  1740. ASSERT_EQ(listener->WaitForRecovery(5000000), true);
  1741. ASSERT_EQ("val1", Get(Key(1)));
  1742. ASSERT_EQ(1, options.statistics->getAndResetTickerCount(
  1743. ERROR_HANDLER_BG_ERROR_COUNT));
  1744. ASSERT_EQ(1, options.statistics->getAndResetTickerCount(
  1745. ERROR_HANDLER_BG_IO_ERROR_COUNT));
  1746. ASSERT_EQ(1, options.statistics->getAndResetTickerCount(
  1747. ERROR_HANDLER_BG_RETRYABLE_IO_ERROR_COUNT));
  1748. ASSERT_EQ(1, options.statistics->getAndResetTickerCount(
  1749. ERROR_HANDLER_AUTORESUME_COUNT));
  1750. ASSERT_LE(0, options.statistics->getAndResetTickerCount(
  1751. ERROR_HANDLER_AUTORESUME_RETRY_TOTAL_COUNT));
  1752. ASSERT_LE(0, options.statistics->getAndResetTickerCount(
  1753. ERROR_HANDLER_AUTORESUME_SUCCESS_COUNT));
  1754. HistogramData autoresume_retry;
  1755. options.statistics->histogramData(ERROR_HANDLER_AUTORESUME_RETRY_COUNT,
  1756. &autoresume_retry);
  1757. ASSERT_GE(autoresume_retry.max, 0);
  1758. ASSERT_OK(Put(Key(2), "val2", wo));
  1759. s = Flush();
  1760. // Since auto resume is successful, the bg error is cleaned, flush will
  1761. // be successful.
  1762. ASSERT_OK(s);
  1763. ASSERT_EQ("val2", Get(Key(2)));
  1764. Destroy(options);
  1765. }
  1766. // Auto resume fromt the flush retryable IO error. Activate the FS before the
  1767. // first resume. Resume is successful
  1768. TEST_F(DBErrorHandlingFSTest, FlushWritRetryableErrorAutoRecover1) {
  1769. // Activate the FS before the first resume
  1770. std::shared_ptr<ErrorHandlerFSListener> listener =
  1771. std::make_shared<ErrorHandlerFSListener>();
  1772. Options options = GetDefaultOptions();
  1773. options.env = fault_env_.get();
  1774. options.create_if_missing = true;
  1775. options.listeners.emplace_back(listener);
  1776. options.max_bgerror_resume_count = 2;
  1777. options.bgerror_resume_retry_interval = 100000; // 0.1 second
  1778. Status s;
  1779. listener->EnableAutoRecovery(false);
  1780. DestroyAndReopen(options);
  1781. IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
  1782. error_msg.SetRetryable(true);
  1783. ASSERT_OK(Put(Key(1), "val1"));
  1784. SyncPoint::GetInstance()->SetCallBack(
  1785. "BuildTable:BeforeFinishBuildTable",
  1786. [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
  1787. SyncPoint::GetInstance()->EnableProcessing();
  1788. s = Flush();
  1789. ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
  1790. SyncPoint::GetInstance()->DisableProcessing();
  1791. fault_fs_->SetFilesystemActive(true);
  1792. ASSERT_EQ(listener->WaitForRecovery(5000000), true);
  1793. ASSERT_EQ("val1", Get(Key(1)));
  1794. Reopen(options);
  1795. ASSERT_EQ("val1", Get(Key(1)));
  1796. ASSERT_OK(Put(Key(2), "val2"));
  1797. ASSERT_OK(Flush());
  1798. ASSERT_EQ("val2", Get(Key(2)));
  1799. Destroy(options);
  1800. }
  1801. // Auto resume fromt the flush retryable IO error and set the retry limit count.
  1802. // Never activate the FS and auto resume should fail at the end
  1803. TEST_F(DBErrorHandlingFSTest, FlushWritRetryableErrorAutoRecover2) {
  1804. // Fail all the resume and let user to resume
  1805. std::shared_ptr<ErrorHandlerFSListener> listener =
  1806. std::make_shared<ErrorHandlerFSListener>();
  1807. Options options = GetDefaultOptions();
  1808. options.env = fault_env_.get();
  1809. options.create_if_missing = true;
  1810. options.listeners.emplace_back(listener);
  1811. options.max_bgerror_resume_count = 2;
  1812. options.bgerror_resume_retry_interval = 100000; // 0.1 second
  1813. Status s;
  1814. listener->EnableAutoRecovery(false);
  1815. DestroyAndReopen(options);
  1816. IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
  1817. error_msg.SetRetryable(true);
  1818. ASSERT_OK(Put(Key(1), "val1"));
  1819. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
  1820. {{"FlushWritRetryableeErrorAutoRecover2:0",
  1821. "RecoverFromRetryableBGIOError:BeforeStart"},
  1822. {"RecoverFromRetryableBGIOError:LoopOut",
  1823. "FlushWritRetryableeErrorAutoRecover2:1"}});
  1824. SyncPoint::GetInstance()->SetCallBack(
  1825. "BuildTable:BeforeFinishBuildTable",
  1826. [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
  1827. SyncPoint::GetInstance()->EnableProcessing();
  1828. s = Flush();
  1829. ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
  1830. TEST_SYNC_POINT("FlushWritRetryableeErrorAutoRecover2:0");
  1831. TEST_SYNC_POINT("FlushWritRetryableeErrorAutoRecover2:1");
  1832. fault_fs_->SetFilesystemActive(true);
  1833. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
  1834. SyncPoint::GetInstance()->DisableProcessing();
  1835. ASSERT_EQ("val1", Get(Key(1)));
  1836. // Auto resume fails due to FS does not recover during resume. User call
  1837. // resume manually here.
  1838. s = dbfull()->Resume();
  1839. ASSERT_EQ("val1", Get(Key(1)));
  1840. ASSERT_OK(s);
  1841. ASSERT_OK(Put(Key(2), "val2"));
  1842. ASSERT_OK(Flush());
  1843. ASSERT_EQ("val2", Get(Key(2)));
  1844. Destroy(options);
  1845. }
  1846. // Auto resume fromt the flush retryable IO error and set the retry limit count.
  1847. // Fail the first resume and let the second resume be successful.
  1848. TEST_F(DBErrorHandlingFSTest, ManifestWriteRetryableErrorAutoRecover) {
  1849. // Fail the first resume and let the second resume be successful
  1850. std::shared_ptr<ErrorHandlerFSListener> listener =
  1851. std::make_shared<ErrorHandlerFSListener>();
  1852. Options options = GetDefaultOptions();
  1853. options.env = fault_env_.get();
  1854. options.create_if_missing = true;
  1855. options.listeners.emplace_back(listener);
  1856. options.max_bgerror_resume_count = 2;
  1857. options.bgerror_resume_retry_interval = 100000; // 0.1 second
  1858. Status s;
  1859. std::string old_manifest;
  1860. std::string new_manifest;
  1861. listener->EnableAutoRecovery(false);
  1862. DestroyAndReopen(options);
  1863. old_manifest = GetManifestNameFromLiveFiles();
  1864. IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
  1865. error_msg.SetRetryable(true);
  1866. ASSERT_OK(Put(Key(0), "val"));
  1867. ASSERT_OK(Flush());
  1868. ASSERT_OK(Put(Key(1), "val"));
  1869. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
  1870. {{"RecoverFromRetryableBGIOError:BeforeStart",
  1871. "ManifestWriteRetryableErrorAutoRecover:0"},
  1872. {"ManifestWriteRetryableErrorAutoRecover:1",
  1873. "RecoverFromRetryableBGIOError:BeforeWait1"},
  1874. {"RecoverFromRetryableBGIOError:RecoverSuccess",
  1875. "ManifestWriteRetryableErrorAutoRecover:2"}});
  1876. SyncPoint::GetInstance()->SetCallBack(
  1877. "VersionSet::LogAndApply:WriteManifest",
  1878. [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
  1879. SyncPoint::GetInstance()->EnableProcessing();
  1880. s = Flush();
  1881. ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
  1882. TEST_SYNC_POINT("ManifestWriteRetryableErrorAutoRecover:0");
  1883. fault_fs_->SetFilesystemActive(true);
  1884. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
  1885. TEST_SYNC_POINT("ManifestWriteRetryableErrorAutoRecover:1");
  1886. TEST_SYNC_POINT("ManifestWriteRetryableErrorAutoRecover:2");
  1887. SyncPoint::GetInstance()->DisableProcessing();
  1888. new_manifest = GetManifestNameFromLiveFiles();
  1889. ASSERT_NE(new_manifest, old_manifest);
  1890. Reopen(options);
  1891. ASSERT_EQ("val", Get(Key(0)));
  1892. ASSERT_EQ("val", Get(Key(1)));
  1893. Close();
  1894. }
  1895. TEST_F(DBErrorHandlingFSTest, ManifestWriteNoWALRetryableErrorAutoRecover) {
  1896. // Fail the first resume and let the second resume be successful
  1897. std::shared_ptr<ErrorHandlerFSListener> listener =
  1898. std::make_shared<ErrorHandlerFSListener>();
  1899. Options options = GetDefaultOptions();
  1900. options.env = fault_env_.get();
  1901. options.create_if_missing = true;
  1902. options.listeners.emplace_back(listener);
  1903. options.max_bgerror_resume_count = 2;
  1904. options.bgerror_resume_retry_interval = 100000; // 0.1 second
  1905. Status s;
  1906. std::string old_manifest;
  1907. std::string new_manifest;
  1908. listener->EnableAutoRecovery(false);
  1909. DestroyAndReopen(options);
  1910. old_manifest = GetManifestNameFromLiveFiles();
  1911. IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
  1912. error_msg.SetRetryable(true);
  1913. WriteOptions wo = WriteOptions();
  1914. wo.disableWAL = true;
  1915. ASSERT_OK(Put(Key(0), "val", wo));
  1916. ASSERT_OK(Flush());
  1917. ASSERT_OK(Put(Key(1), "val", wo));
  1918. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
  1919. {{"RecoverFromRetryableBGIOError:BeforeStart",
  1920. "ManifestWriteNoWALRetryableErrorAutoRecover:0"},
  1921. {"ManifestWriteNoWALRetryableErrorAutoRecover:1",
  1922. "RecoverFromRetryableBGIOError:BeforeWait1"},
  1923. {"RecoverFromRetryableBGIOError:RecoverSuccess",
  1924. "ManifestWriteNoWALRetryableErrorAutoRecover:2"}});
  1925. SyncPoint::GetInstance()->SetCallBack(
  1926. "VersionSet::LogAndApply:WriteManifest",
  1927. [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
  1928. SyncPoint::GetInstance()->EnableProcessing();
  1929. s = Flush();
  1930. ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
  1931. TEST_SYNC_POINT("ManifestWriteNoWALRetryableErrorAutoRecover:0");
  1932. fault_fs_->SetFilesystemActive(true);
  1933. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
  1934. TEST_SYNC_POINT("ManifestWriteNoWALRetryableErrorAutoRecover:1");
  1935. TEST_SYNC_POINT("ManifestWriteNoWALRetryableErrorAutoRecover:2");
  1936. SyncPoint::GetInstance()->DisableProcessing();
  1937. new_manifest = GetManifestNameFromLiveFiles();
  1938. ASSERT_NE(new_manifest, old_manifest);
  1939. Reopen(options);
  1940. ASSERT_EQ("val", Get(Key(0)));
  1941. ASSERT_EQ("val", Get(Key(1)));
  1942. Close();
  1943. }
  1944. TEST_F(DBErrorHandlingFSTest,
  1945. CompactionManifestWriteRetryableErrorAutoRecover) {
  1946. std::shared_ptr<ErrorHandlerFSListener> listener =
  1947. std::make_shared<ErrorHandlerFSListener>();
  1948. Options options = GetDefaultOptions();
  1949. options.env = fault_env_.get();
  1950. options.create_if_missing = true;
  1951. options.level0_file_num_compaction_trigger = 2;
  1952. options.listeners.emplace_back(listener);
  1953. options.max_bgerror_resume_count = 2;
  1954. options.bgerror_resume_retry_interval = 100000; // 0.1 second
  1955. Status s;
  1956. std::string old_manifest;
  1957. std::string new_manifest;
  1958. std::atomic<bool> fail_manifest(false);
  1959. DestroyAndReopen(options);
  1960. old_manifest = GetManifestNameFromLiveFiles();
  1961. IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
  1962. error_msg.SetRetryable(true);
  1963. ASSERT_OK(Put(Key(0), "val"));
  1964. ASSERT_OK(Put(Key(2), "val"));
  1965. ASSERT_OK(Flush());
  1966. listener->OverrideBGError(Status(error_msg, Status::Severity::kHardError));
  1967. listener->EnableAutoRecovery(false);
  1968. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
  1969. // Wait for flush of 2nd L0 file before starting compaction
  1970. {{"DBImpl::FlushMemTable:FlushMemTableFinished",
  1971. "BackgroundCallCompaction:0"},
  1972. // Wait for compaction to detect manifest write error
  1973. {"BackgroundCallCompaction:1", "CompactionManifestWriteErrorAR:0"},
  1974. // Make compaction thread wait for error to be cleared
  1975. {"CompactionManifestWriteErrorAR:1",
  1976. "DBImpl::BackgroundCallCompaction:FoundObsoleteFiles"},
  1977. {"CompactionManifestWriteErrorAR:2",
  1978. "RecoverFromRetryableBGIOError:BeforeStart"},
  1979. // Fail the first resume, before the wait in resume
  1980. {"RecoverFromRetryableBGIOError:BeforeResume0",
  1981. "CompactionManifestWriteErrorAR:3"},
  1982. // Activate the FS before the second resume
  1983. {"CompactionManifestWriteErrorAR:4",
  1984. "RecoverFromRetryableBGIOError:BeforeResume1"},
  1985. // Wait the auto resume be sucessful
  1986. {"RecoverFromRetryableBGIOError:RecoverSuccess",
  1987. "CompactionManifestWriteErrorAR:5"}});
  1988. // trigger manifest write failure in compaction thread
  1989. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  1990. "BackgroundCallCompaction:0", [&](void*) { fail_manifest.store(true); });
  1991. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  1992. "VersionSet::LogAndApply:WriteManifest", [&](void*) {
  1993. if (fail_manifest.load()) {
  1994. fault_fs_->SetFilesystemActive(false, error_msg);
  1995. }
  1996. });
  1997. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  1998. ASSERT_OK(Put(Key(1), "val"));
  1999. s = Flush();
  2000. ASSERT_OK(s);
  2001. TEST_SYNC_POINT("CompactionManifestWriteErrorAR:0");
  2002. TEST_SYNC_POINT("CompactionManifestWriteErrorAR:1");
  2003. s = dbfull()->TEST_WaitForCompact();
  2004. ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
  2005. TEST_SYNC_POINT("CompactionManifestWriteErrorAR:2");
  2006. TEST_SYNC_POINT("CompactionManifestWriteErrorAR:3");
  2007. fault_fs_->SetFilesystemActive(true);
  2008. SyncPoint::GetInstance()->ClearAllCallBacks();
  2009. TEST_SYNC_POINT("CompactionManifestWriteErrorAR:4");
  2010. TEST_SYNC_POINT("CompactionManifestWriteErrorAR:5");
  2011. SyncPoint::GetInstance()->DisableProcessing();
  2012. new_manifest = GetManifestNameFromLiveFiles();
  2013. ASSERT_NE(new_manifest, old_manifest);
  2014. Reopen(options);
  2015. ASSERT_EQ("val", Get(Key(0)));
  2016. ASSERT_EQ("val", Get(Key(1)));
  2017. ASSERT_EQ("val", Get(Key(2)));
  2018. Close();
  2019. }
  2020. TEST_F(DBErrorHandlingFSTest, CompactionWriteRetryableErrorAutoRecover) {
  2021. // In this test, in the first round of compaction, the FS is set to error.
  2022. // So the first compaction fails due to retryable IO error and it is mapped
  2023. // to soft error. Then, compaction is rescheduled, in the second round of
  2024. // compaction, the FS is set to active and compaction is successful, so
  2025. // the test will hit the CompactionJob::FinishCompactionOutputFile1 sync
  2026. // point.
  2027. std::shared_ptr<ErrorHandlerFSListener> listener =
  2028. std::make_shared<ErrorHandlerFSListener>();
  2029. Options options = GetDefaultOptions();
  2030. options.env = fault_env_.get();
  2031. options.create_if_missing = true;
  2032. options.level0_file_num_compaction_trigger = 2;
  2033. options.listeners.emplace_back(listener);
  2034. Status s;
  2035. std::atomic<bool> fail_first(false);
  2036. std::atomic<bool> fail_second(true);
  2037. DestroyAndReopen(options);
  2038. IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
  2039. error_msg.SetRetryable(true);
  2040. ASSERT_OK(Put(Key(0), "va;"));
  2041. ASSERT_OK(Put(Key(2), "va;"));
  2042. s = Flush();
  2043. ASSERT_OK(s);
  2044. listener->OverrideBGError(Status(error_msg, Status::Severity::kHardError));
  2045. listener->EnableAutoRecovery(false);
  2046. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
  2047. {{"DBImpl::FlushMemTable:FlushMemTableFinished",
  2048. "BackgroundCallCompaction:0"},
  2049. {"CompactionJob::FinishCompactionOutputFile1",
  2050. "CompactionWriteRetryableErrorAutoRecover0"}});
  2051. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  2052. "DBImpl::BackgroundCompaction:Start",
  2053. [&](void*) { fault_fs_->SetFilesystemActive(true); });
  2054. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  2055. "BackgroundCallCompaction:0", [&](void*) { fail_first.store(true); });
  2056. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  2057. "CompactionJob::OpenCompactionOutputFile", [&](void*) {
  2058. if (fail_first.load() && fail_second.load()) {
  2059. fault_fs_->SetFilesystemActive(false, error_msg);
  2060. fail_second.store(false);
  2061. }
  2062. });
  2063. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  2064. ASSERT_OK(Put(Key(1), "val"));
  2065. s = Flush();
  2066. ASSERT_OK(s);
  2067. s = dbfull()->TEST_WaitForCompact();
  2068. ASSERT_OK(s);
  2069. TEST_SYNC_POINT("CompactionWriteRetryableErrorAutoRecover0");
  2070. SyncPoint::GetInstance()->ClearAllCallBacks();
  2071. SyncPoint::GetInstance()->DisableProcessing();
  2072. Destroy(options);
  2073. }
  2074. TEST_F(DBErrorHandlingFSTest, WALWriteRetryableErrorAutoRecover1) {
  2075. std::shared_ptr<ErrorHandlerFSListener> listener =
  2076. std::make_shared<ErrorHandlerFSListener>();
  2077. Options options = GetDefaultOptions();
  2078. options.env = fault_env_.get();
  2079. options.create_if_missing = true;
  2080. options.writable_file_max_buffer_size = 32768;
  2081. options.listeners.emplace_back(listener);
  2082. options.paranoid_checks = true;
  2083. options.max_bgerror_resume_count = 2;
  2084. options.bgerror_resume_retry_interval = 100000; // 0.1 second
  2085. Status s;
  2086. Random rnd(301);
  2087. DestroyAndReopen(options);
  2088. IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
  2089. error_msg.SetRetryable(true);
  2090. // For the first batch, write is successful, require sync
  2091. {
  2092. WriteBatch batch;
  2093. for (auto i = 0; i < 100; ++i) {
  2094. ASSERT_OK(batch.Put(Key(i), rnd.RandomString(1024)));
  2095. }
  2096. WriteOptions wopts;
  2097. wopts.sync = true;
  2098. ASSERT_OK(dbfull()->Write(wopts, &batch));
  2099. };
  2100. // For the second batch, the first 2 file Append are successful, then the
  2101. // following Append fails due to file system retryable IOError.
  2102. {
  2103. WriteBatch batch;
  2104. int write_error = 0;
  2105. for (auto i = 100; i < 200; ++i) {
  2106. ASSERT_OK(batch.Put(Key(i), rnd.RandomString(1024)));
  2107. }
  2108. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
  2109. {{"WALWriteErrorDone", "RecoverFromRetryableBGIOError:BeforeStart"},
  2110. {"RecoverFromRetryableBGIOError:BeforeResume0", "WALWriteError1:0"},
  2111. {"WALWriteError1:1", "RecoverFromRetryableBGIOError:BeforeResume1"},
  2112. {"RecoverFromRetryableBGIOError:RecoverSuccess", "WALWriteError1:2"}});
  2113. SyncPoint::GetInstance()->SetCallBack(
  2114. "WritableFileWriter::Append:BeforePrepareWrite", [&](void*) {
  2115. write_error++;
  2116. if (write_error > 2) {
  2117. fault_fs_->SetFilesystemActive(false, error_msg);
  2118. }
  2119. });
  2120. SyncPoint::GetInstance()->EnableProcessing();
  2121. WriteOptions wopts;
  2122. wopts.sync = true;
  2123. s = dbfull()->Write(wopts, &batch);
  2124. ASSERT_EQ(true, s.IsIOError());
  2125. TEST_SYNC_POINT("WALWriteErrorDone");
  2126. TEST_SYNC_POINT("WALWriteError1:0");
  2127. fault_fs_->SetFilesystemActive(true);
  2128. SyncPoint::GetInstance()->ClearAllCallBacks();
  2129. TEST_SYNC_POINT("WALWriteError1:1");
  2130. TEST_SYNC_POINT("WALWriteError1:2");
  2131. }
  2132. SyncPoint::GetInstance()->DisableProcessing();
  2133. // Data in corrupted WAL are not stored
  2134. for (auto i = 0; i < 199; ++i) {
  2135. if (i < 100) {
  2136. ASSERT_NE(Get(Key(i)), "NOT_FOUND");
  2137. } else {
  2138. ASSERT_EQ(Get(Key(i)), "NOT_FOUND");
  2139. }
  2140. }
  2141. // Resume and write a new batch, should be in the WAL
  2142. {
  2143. WriteBatch batch;
  2144. for (auto i = 200; i < 300; ++i) {
  2145. ASSERT_OK(batch.Put(Key(i), rnd.RandomString(1024)));
  2146. }
  2147. WriteOptions wopts;
  2148. wopts.sync = true;
  2149. ASSERT_OK(dbfull()->Write(wopts, &batch));
  2150. };
  2151. Reopen(options);
  2152. for (auto i = 0; i < 300; ++i) {
  2153. if (i < 100 || i >= 200) {
  2154. ASSERT_NE(Get(Key(i)), "NOT_FOUND");
  2155. } else {
  2156. ASSERT_EQ(Get(Key(i)), "NOT_FOUND");
  2157. }
  2158. }
  2159. Close();
  2160. }
  2161. TEST_F(DBErrorHandlingFSTest, WALWriteRetryableErrorAutoRecover2) {
  2162. // Fail the first recover and try second time.
  2163. std::shared_ptr<ErrorHandlerFSListener> listener =
  2164. std::make_shared<ErrorHandlerFSListener>();
  2165. Options options = GetDefaultOptions();
  2166. options.env = fault_env_.get();
  2167. options.create_if_missing = true;
  2168. options.writable_file_max_buffer_size = 32768;
  2169. options.listeners.emplace_back(listener);
  2170. options.paranoid_checks = true;
  2171. options.max_bgerror_resume_count = 2;
  2172. options.bgerror_resume_retry_interval = 100000; // 0.1 second
  2173. Status s;
  2174. Random rnd(301);
  2175. DestroyAndReopen(options);
  2176. IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
  2177. error_msg.SetRetryable(true);
  2178. // For the first batch, write is successful, require sync
  2179. {
  2180. WriteBatch batch;
  2181. for (auto i = 0; i < 100; ++i) {
  2182. ASSERT_OK(batch.Put(Key(i), rnd.RandomString(1024)));
  2183. }
  2184. WriteOptions wopts;
  2185. wopts.sync = true;
  2186. ASSERT_OK(dbfull()->Write(wopts, &batch));
  2187. };
  2188. // For the second batch, the first 2 file Append are successful, then the
  2189. // following Append fails due to file system retryable IOError.
  2190. {
  2191. WriteBatch batch;
  2192. int write_error = 0;
  2193. for (auto i = 100; i < 200; ++i) {
  2194. ASSERT_OK(batch.Put(Key(i), rnd.RandomString(1024)));
  2195. }
  2196. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
  2197. {{"RecoverFromRetryableBGIOError:BeforeWait0", "WALWriteError2:0"},
  2198. {"WALWriteError2:1", "RecoverFromRetryableBGIOError:BeforeWait1"},
  2199. {"RecoverFromRetryableBGIOError:RecoverSuccess", "WALWriteError2:2"}});
  2200. SyncPoint::GetInstance()->SetCallBack(
  2201. "WritableFileWriter::Append:BeforePrepareWrite", [&](void*) {
  2202. write_error++;
  2203. if (write_error > 2) {
  2204. fault_fs_->SetFilesystemActive(false, error_msg);
  2205. }
  2206. });
  2207. SyncPoint::GetInstance()->EnableProcessing();
  2208. WriteOptions wopts;
  2209. wopts.sync = true;
  2210. s = dbfull()->Write(wopts, &batch);
  2211. ASSERT_EQ(true, s.IsIOError());
  2212. TEST_SYNC_POINT("WALWriteError2:0");
  2213. fault_fs_->SetFilesystemActive(true);
  2214. SyncPoint::GetInstance()->ClearAllCallBacks();
  2215. TEST_SYNC_POINT("WALWriteError2:1");
  2216. TEST_SYNC_POINT("WALWriteError2:2");
  2217. }
  2218. SyncPoint::GetInstance()->DisableProcessing();
  2219. // Data in corrupted WAL are not stored
  2220. for (auto i = 0; i < 199; ++i) {
  2221. if (i < 100) {
  2222. ASSERT_NE(Get(Key(i)), "NOT_FOUND");
  2223. } else {
  2224. ASSERT_EQ(Get(Key(i)), "NOT_FOUND");
  2225. }
  2226. }
  2227. // Resume and write a new batch, should be in the WAL
  2228. {
  2229. WriteBatch batch;
  2230. for (auto i = 200; i < 300; ++i) {
  2231. ASSERT_OK(batch.Put(Key(i), rnd.RandomString(1024)));
  2232. }
  2233. WriteOptions wopts;
  2234. wopts.sync = true;
  2235. ASSERT_OK(dbfull()->Write(wopts, &batch));
  2236. };
  2237. Reopen(options);
  2238. for (auto i = 0; i < 300; ++i) {
  2239. if (i < 100 || i >= 200) {
  2240. ASSERT_NE(Get(Key(i)), "NOT_FOUND");
  2241. } else {
  2242. ASSERT_EQ(Get(Key(i)), "NOT_FOUND");
  2243. }
  2244. }
  2245. Close();
  2246. }
  2247. // Fail auto resume from a flush retryable error and verify that
  2248. // OnErrorRecoveryEnd listener callback is called
  2249. TEST_F(DBErrorHandlingFSTest, FlushWritRetryableErrorAbortRecovery) {
  2250. // Activate the FS before the first resume
  2251. std::shared_ptr<ErrorHandlerFSListener> listener =
  2252. std::make_shared<ErrorHandlerFSListener>();
  2253. Options options = GetDefaultOptions();
  2254. options.env = fault_env_.get();
  2255. options.create_if_missing = true;
  2256. options.listeners.emplace_back(listener);
  2257. options.max_bgerror_resume_count = 2;
  2258. options.bgerror_resume_retry_interval = 100000; // 0.1 second
  2259. Status s;
  2260. listener->EnableAutoRecovery(false);
  2261. DestroyAndReopen(options);
  2262. IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
  2263. error_msg.SetRetryable(true);
  2264. ASSERT_OK(Put(Key(1), "val1"));
  2265. SyncPoint::GetInstance()->SetCallBack(
  2266. "BuildTable:BeforeFinishBuildTable",
  2267. [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
  2268. SyncPoint::GetInstance()->EnableProcessing();
  2269. s = Flush();
  2270. ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
  2271. ASSERT_EQ(listener->WaitForRecovery(5000000), true);
  2272. ASSERT_EQ(listener->new_bg_error(), Status::Aborted());
  2273. SyncPoint::GetInstance()->DisableProcessing();
  2274. fault_fs_->SetFilesystemActive(true);
  2275. Destroy(options);
  2276. }
  2277. TEST_F(DBErrorHandlingFSTest, FlushErrorRecoveryRaceWithDBDestruction) {
  2278. Options options = GetDefaultOptions();
  2279. options.env = fault_env_.get();
  2280. options.create_if_missing = true;
  2281. std::shared_ptr<ErrorHandlerFSListener> listener =
  2282. std::make_shared<ErrorHandlerFSListener>();
  2283. options.listeners.emplace_back(listener);
  2284. DestroyAndReopen(options);
  2285. ASSERT_OK(Put("k1", "val"));
  2286. // Inject retryable flush error
  2287. bool error_set = false;
  2288. SyncPoint::GetInstance()->SetCallBack(
  2289. "BuildTable:BeforeOutputValidation", [&](void*) {
  2290. if (error_set) {
  2291. return;
  2292. }
  2293. IOStatus st = IOStatus::IOError("Injected");
  2294. st.SetRetryable(true);
  2295. fault_fs_->SetFilesystemActive(false, st);
  2296. error_set = true;
  2297. });
  2298. port::Thread db_close_thread;
  2299. SyncPoint::GetInstance()->SetCallBack(
  2300. "BuildTable:BeforeDeleteFile", [&](void*) {
  2301. // Clear retryable flush error injection
  2302. fault_fs_->SetFilesystemActive(true);
  2303. // Coerce race between ending auto recovery in db destruction and flush
  2304. // error recovery
  2305. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
  2306. {{"PostEndAutoRecovery", "FlushJob::WriteLevel0Table"}});
  2307. db_close_thread = port::Thread([&] { Close(); });
  2308. });
  2309. SyncPoint::GetInstance()->EnableProcessing();
  2310. Status s = Flush();
  2311. ASSERT_NOK(s);
  2312. int placeholder = 1;
  2313. listener->WaitForRecovery(placeholder);
  2314. ASSERT_TRUE(listener->new_bg_error().IsShutdownInProgress());
  2315. // Prior to the fix, the db close will crash due to the recovery thread for
  2316. // flush error is not joined by the time of destruction.
  2317. db_close_thread.join();
  2318. SyncPoint::GetInstance()->DisableProcessing();
  2319. SyncPoint::GetInstance()->ClearAllCallBacks();
  2320. Destroy(options);
  2321. }
  2322. TEST_F(DBErrorHandlingFSTest, FlushReadError) {
  2323. std::shared_ptr<ErrorHandlerFSListener> listener =
  2324. std::make_shared<ErrorHandlerFSListener>();
  2325. Options options = GetDefaultOptions();
  2326. options.env = fault_env_.get();
  2327. options.create_if_missing = true;
  2328. options.listeners.emplace_back(listener);
  2329. options.statistics = CreateDBStatistics();
  2330. Status s;
  2331. listener->EnableAutoRecovery(false);
  2332. DestroyAndReopen(options);
  2333. ASSERT_OK(Put(Key(0), "val"));
  2334. SyncPoint::GetInstance()->SetCallBack(
  2335. "BuildTable:BeforeOutputValidation", [&](void*) {
  2336. IOStatus st = IOStatus::IOError();
  2337. st.SetRetryable(true);
  2338. st.SetScope(IOStatus::IOErrorScope::kIOErrorScopeFile);
  2339. fault_fs_->SetFilesystemActive(false, st);
  2340. });
  2341. SyncPoint::GetInstance()->SetCallBack(
  2342. "BuildTable:BeforeDeleteFile",
  2343. [&](void*) { fault_fs_->SetFilesystemActive(true, IOStatus::OK()); });
  2344. SyncPoint::GetInstance()->EnableProcessing();
  2345. s = Flush();
  2346. ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
  2347. SyncPoint::GetInstance()->DisableProcessing();
  2348. fault_fs_->SetFilesystemActive(true);
  2349. ASSERT_EQ(listener->WaitForRecovery(5000000), true);
  2350. ASSERT_EQ(1, options.statistics->getAndResetTickerCount(
  2351. ERROR_HANDLER_BG_ERROR_COUNT));
  2352. ASSERT_EQ(1, options.statistics->getAndResetTickerCount(
  2353. ERROR_HANDLER_BG_IO_ERROR_COUNT));
  2354. ASSERT_EQ(1, options.statistics->getAndResetTickerCount(
  2355. ERROR_HANDLER_BG_RETRYABLE_IO_ERROR_COUNT));
  2356. ASSERT_LE(1, options.statistics->getAndResetTickerCount(
  2357. ERROR_HANDLER_AUTORESUME_COUNT));
  2358. ASSERT_LE(0, options.statistics->getAndResetTickerCount(
  2359. ERROR_HANDLER_AUTORESUME_RETRY_TOTAL_COUNT));
  2360. s = dbfull()->TEST_GetBGError();
  2361. ASSERT_OK(s);
  2362. Reopen(GetDefaultOptions());
  2363. ASSERT_EQ("val", Get(Key(0)));
  2364. }
  2365. TEST_F(DBErrorHandlingFSTest, AtomicFlushReadError) {
  2366. std::shared_ptr<ErrorHandlerFSListener> listener =
  2367. std::make_shared<ErrorHandlerFSListener>();
  2368. Options options = GetDefaultOptions();
  2369. options.env = fault_env_.get();
  2370. options.create_if_missing = true;
  2371. options.listeners.emplace_back(listener);
  2372. options.statistics = CreateDBStatistics();
  2373. Status s;
  2374. listener->EnableAutoRecovery(false);
  2375. options.atomic_flush = true;
  2376. CreateAndReopenWithCF({"pikachu"}, options);
  2377. ASSERT_OK(Put(0, Key(0), "val"));
  2378. ASSERT_OK(Put(1, Key(0), "val"));
  2379. SyncPoint::GetInstance()->SetCallBack(
  2380. "BuildTable:BeforeOutputValidation", [&](void*) {
  2381. IOStatus st = IOStatus::IOError();
  2382. st.SetRetryable(true);
  2383. st.SetScope(IOStatus::IOErrorScope::kIOErrorScopeFile);
  2384. fault_fs_->SetFilesystemActive(false, st);
  2385. });
  2386. SyncPoint::GetInstance()->SetCallBack(
  2387. "BuildTable:BeforeDeleteFile",
  2388. [&](void*) { fault_fs_->SetFilesystemActive(true, IOStatus::OK()); });
  2389. SyncPoint::GetInstance()->EnableProcessing();
  2390. s = Flush({0, 1});
  2391. ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
  2392. SyncPoint::GetInstance()->DisableProcessing();
  2393. fault_fs_->SetFilesystemActive(true);
  2394. ASSERT_EQ(listener->WaitForRecovery(5000000), true);
  2395. ASSERT_EQ(1, options.statistics->getAndResetTickerCount(
  2396. ERROR_HANDLER_BG_ERROR_COUNT));
  2397. ASSERT_EQ(1, options.statistics->getAndResetTickerCount(
  2398. ERROR_HANDLER_BG_IO_ERROR_COUNT));
  2399. ASSERT_EQ(1, options.statistics->getAndResetTickerCount(
  2400. ERROR_HANDLER_BG_RETRYABLE_IO_ERROR_COUNT));
  2401. ASSERT_LE(1, options.statistics->getAndResetTickerCount(
  2402. ERROR_HANDLER_AUTORESUME_COUNT));
  2403. ASSERT_LE(0, options.statistics->getAndResetTickerCount(
  2404. ERROR_HANDLER_AUTORESUME_RETRY_TOTAL_COUNT));
  2405. s = dbfull()->TEST_GetBGError();
  2406. ASSERT_OK(s);
  2407. ASSERT_OK(TryReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"},
  2408. GetDefaultOptions()));
  2409. ASSERT_EQ("val", Get(Key(0)));
  2410. }
  2411. TEST_F(DBErrorHandlingFSTest, AtomicFlushNoSpaceError) {
  2412. std::shared_ptr<ErrorHandlerFSListener> listener =
  2413. std::make_shared<ErrorHandlerFSListener>();
  2414. Options options = GetDefaultOptions();
  2415. options.env = fault_env_.get();
  2416. options.create_if_missing = true;
  2417. options.listeners.emplace_back(listener);
  2418. options.statistics = CreateDBStatistics();
  2419. Status s;
  2420. listener->EnableAutoRecovery(true);
  2421. options.atomic_flush = true;
  2422. CreateAndReopenWithCF({"pikachu"}, options);
  2423. ASSERT_OK(Put(0, Key(0), "val"));
  2424. ASSERT_OK(Put(1, Key(0), "val"));
  2425. SyncPoint::GetInstance()->SetCallBack("BuildTable:create_file", [&](void*) {
  2426. IOStatus st = IOStatus::NoSpace();
  2427. fault_fs_->SetFilesystemActive(false, st);
  2428. });
  2429. SyncPoint::GetInstance()->SetCallBack(
  2430. "BuildTable:BeforeDeleteFile",
  2431. [&](void*) { fault_fs_->SetFilesystemActive(true, IOStatus::OK()); });
  2432. SyncPoint::GetInstance()->EnableProcessing();
  2433. s = Flush({0, 1});
  2434. ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
  2435. SyncPoint::GetInstance()->DisableProcessing();
  2436. fault_fs_->SetFilesystemActive(true);
  2437. ASSERT_EQ(listener->WaitForRecovery(5000000), true);
  2438. ASSERT_LE(1, options.statistics->getAndResetTickerCount(
  2439. ERROR_HANDLER_BG_ERROR_COUNT));
  2440. ASSERT_LE(1, options.statistics->getAndResetTickerCount(
  2441. ERROR_HANDLER_BG_IO_ERROR_COUNT));
  2442. s = dbfull()->TEST_GetBGError();
  2443. ASSERT_OK(s);
  2444. ASSERT_OK(TryReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"},
  2445. GetDefaultOptions()));
  2446. ASSERT_EQ("val", Get(Key(0)));
  2447. }
  2448. TEST_F(DBErrorHandlingFSTest, CompactionReadRetryableErrorAutoRecover) {
  2449. // In this test, in the first round of compaction, the FS is set to error.
  2450. // So the first compaction fails due to retryable IO error and it is mapped
  2451. // to soft error. Then, compaction is rescheduled, in the second round of
  2452. // compaction, the FS is set to active and compaction is successful, so
  2453. // the test will hit the CompactionJob::FinishCompactionOutputFile1 sync
  2454. // point.
  2455. std::shared_ptr<ErrorHandlerFSListener> listener =
  2456. std::make_shared<ErrorHandlerFSListener>();
  2457. Options options = GetDefaultOptions();
  2458. options.env = fault_env_.get();
  2459. options.create_if_missing = true;
  2460. options.level0_file_num_compaction_trigger = 2;
  2461. options.listeners.emplace_back(listener);
  2462. BlockBasedTableOptions table_options;
  2463. table_options.no_block_cache = true;
  2464. options.table_factory.reset(NewBlockBasedTableFactory(table_options));
  2465. Status s;
  2466. std::atomic<bool> fail_first(false);
  2467. std::atomic<bool> fail_second(true);
  2468. Random rnd(301);
  2469. DestroyAndReopen(options);
  2470. IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
  2471. error_msg.SetRetryable(true);
  2472. for (int i = 0; i < 100; ++i) {
  2473. ASSERT_OK(Put(Key(i), rnd.RandomString(1024)));
  2474. }
  2475. s = Flush();
  2476. ASSERT_OK(s);
  2477. listener->OverrideBGError(Status(error_msg, Status::Severity::kHardError));
  2478. listener->EnableAutoRecovery(false);
  2479. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
  2480. {{"DBImpl::FlushMemTable:FlushMemTableFinished",
  2481. "BackgroundCallCompaction:0"},
  2482. {"CompactionJob::FinishCompactionOutputFile1",
  2483. "CompactionWriteRetryableErrorAutoRecover0"}});
  2484. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  2485. "DBImpl::BackgroundCompaction:Start",
  2486. [&](void*) { fault_fs_->SetFilesystemActive(true); });
  2487. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  2488. "BackgroundCallCompaction:0", [&](void*) { fail_first.store(true); });
  2489. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  2490. "CompactionJob::Run():PausingManualCompaction:2", [&](void*) {
  2491. if (fail_first.load() && fail_second.load()) {
  2492. fault_fs_->SetFilesystemActive(false, error_msg);
  2493. fail_second.store(false);
  2494. }
  2495. });
  2496. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  2497. ASSERT_OK(Put(Key(1), "val"));
  2498. s = Flush();
  2499. ASSERT_OK(s);
  2500. s = dbfull()->TEST_WaitForCompact();
  2501. ASSERT_OK(s);
  2502. TEST_SYNC_POINT("CompactionWriteRetryableErrorAutoRecover0");
  2503. SyncPoint::GetInstance()->ClearAllCallBacks();
  2504. SyncPoint::GetInstance()->DisableProcessing();
  2505. Reopen(GetDefaultOptions());
  2506. }
  2507. class DBErrorHandlingFencingTest : public DBErrorHandlingFSTest,
  2508. public testing::WithParamInterface<bool> {};
  2509. TEST_P(DBErrorHandlingFencingTest, FlushWriteFenced) {
  2510. std::shared_ptr<ErrorHandlerFSListener> listener =
  2511. std::make_shared<ErrorHandlerFSListener>();
  2512. Options options = GetDefaultOptions();
  2513. options.env = fault_env_.get();
  2514. options.create_if_missing = true;
  2515. options.listeners.emplace_back(listener);
  2516. options.paranoid_checks = GetParam();
  2517. Status s;
  2518. listener->EnableAutoRecovery(true);
  2519. DestroyAndReopen(options);
  2520. ASSERT_OK(Put(Key(0), "val"));
  2521. SyncPoint::GetInstance()->SetCallBack("FlushJob::Start", [&](void*) {
  2522. fault_fs_->SetFilesystemActive(false, IOStatus::IOFenced("IO fenced"));
  2523. });
  2524. SyncPoint::GetInstance()->EnableProcessing();
  2525. s = Flush();
  2526. ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kFatalError);
  2527. ASSERT_TRUE(s.IsIOFenced());
  2528. SyncPoint::GetInstance()->DisableProcessing();
  2529. fault_fs_->SetFilesystemActive(true);
  2530. s = dbfull()->Resume();
  2531. ASSERT_TRUE(s.IsIOFenced());
  2532. Destroy(options);
  2533. }
  2534. TEST_P(DBErrorHandlingFencingTest, ManifestWriteFenced) {
  2535. std::shared_ptr<ErrorHandlerFSListener> listener =
  2536. std::make_shared<ErrorHandlerFSListener>();
  2537. Options options = GetDefaultOptions();
  2538. options.env = fault_env_.get();
  2539. options.create_if_missing = true;
  2540. options.listeners.emplace_back(listener);
  2541. options.paranoid_checks = GetParam();
  2542. Status s;
  2543. std::string old_manifest;
  2544. std::string new_manifest;
  2545. listener->EnableAutoRecovery(true);
  2546. DestroyAndReopen(options);
  2547. old_manifest = GetManifestNameFromLiveFiles();
  2548. ASSERT_OK(Put(Key(0), "val"));
  2549. ASSERT_OK(Flush());
  2550. ASSERT_OK(Put(Key(1), "val"));
  2551. SyncPoint::GetInstance()->SetCallBack(
  2552. "VersionSet::LogAndApply:WriteManifest", [&](void*) {
  2553. fault_fs_->SetFilesystemActive(false, IOStatus::IOFenced("IO fenced"));
  2554. });
  2555. SyncPoint::GetInstance()->EnableProcessing();
  2556. s = Flush();
  2557. ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kFatalError);
  2558. ASSERT_TRUE(s.IsIOFenced());
  2559. SyncPoint::GetInstance()->ClearAllCallBacks();
  2560. SyncPoint::GetInstance()->DisableProcessing();
  2561. fault_fs_->SetFilesystemActive(true);
  2562. s = dbfull()->Resume();
  2563. ASSERT_TRUE(s.IsIOFenced());
  2564. Close();
  2565. }
  2566. TEST_P(DBErrorHandlingFencingTest, CompactionWriteFenced) {
  2567. std::shared_ptr<ErrorHandlerFSListener> listener =
  2568. std::make_shared<ErrorHandlerFSListener>();
  2569. Options options = GetDefaultOptions();
  2570. options.env = fault_env_.get();
  2571. options.create_if_missing = true;
  2572. options.level0_file_num_compaction_trigger = 2;
  2573. options.listeners.emplace_back(listener);
  2574. options.paranoid_checks = GetParam();
  2575. Status s;
  2576. DestroyAndReopen(options);
  2577. ASSERT_OK(Put(Key(0), "va;"));
  2578. ASSERT_OK(Put(Key(2), "va;"));
  2579. s = Flush();
  2580. ASSERT_OK(s);
  2581. listener->EnableAutoRecovery(true);
  2582. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
  2583. {{"DBImpl::FlushMemTable:FlushMemTableFinished",
  2584. "BackgroundCallCompaction:0"}});
  2585. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  2586. "BackgroundCallCompaction:0", [&](void*) {
  2587. fault_fs_->SetFilesystemActive(false, IOStatus::IOFenced("IO fenced"));
  2588. });
  2589. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  2590. ASSERT_OK(Put(Key(1), "val"));
  2591. s = Flush();
  2592. ASSERT_OK(s);
  2593. s = dbfull()->TEST_WaitForCompact();
  2594. ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kFatalError);
  2595. ASSERT_TRUE(s.IsIOFenced());
  2596. fault_fs_->SetFilesystemActive(true);
  2597. s = dbfull()->Resume();
  2598. ASSERT_TRUE(s.IsIOFenced());
  2599. Destroy(options);
  2600. }
  2601. TEST_P(DBErrorHandlingFencingTest, WALWriteFenced) {
  2602. std::shared_ptr<ErrorHandlerFSListener> listener =
  2603. std::make_shared<ErrorHandlerFSListener>();
  2604. Options options = GetDefaultOptions();
  2605. options.env = fault_env_.get();
  2606. options.create_if_missing = true;
  2607. options.writable_file_max_buffer_size = 32768;
  2608. options.listeners.emplace_back(listener);
  2609. options.paranoid_checks = GetParam();
  2610. Status s;
  2611. Random rnd(301);
  2612. listener->EnableAutoRecovery(true);
  2613. DestroyAndReopen(options);
  2614. {
  2615. WriteBatch batch;
  2616. for (auto i = 0; i < 100; ++i) {
  2617. ASSERT_OK(batch.Put(Key(i), rnd.RandomString(1024)));
  2618. }
  2619. WriteOptions wopts;
  2620. wopts.sync = true;
  2621. ASSERT_OK(dbfull()->Write(wopts, &batch));
  2622. };
  2623. {
  2624. WriteBatch batch;
  2625. int write_error = 0;
  2626. for (auto i = 100; i < 199; ++i) {
  2627. ASSERT_OK(batch.Put(Key(i), rnd.RandomString(1024)));
  2628. }
  2629. SyncPoint::GetInstance()->SetCallBack(
  2630. "WritableFileWriter::Append:BeforePrepareWrite", [&](void*) {
  2631. write_error++;
  2632. if (write_error > 2) {
  2633. fault_fs_->SetFilesystemActive(false,
  2634. IOStatus::IOFenced("IO fenced"));
  2635. }
  2636. });
  2637. SyncPoint::GetInstance()->EnableProcessing();
  2638. WriteOptions wopts;
  2639. wopts.sync = true;
  2640. s = dbfull()->Write(wopts, &batch);
  2641. ASSERT_TRUE(s.IsIOFenced());
  2642. }
  2643. SyncPoint::GetInstance()->DisableProcessing();
  2644. fault_fs_->SetFilesystemActive(true);
  2645. {
  2646. WriteBatch batch;
  2647. for (auto i = 0; i < 100; ++i) {
  2648. ASSERT_OK(batch.Put(Key(i), rnd.RandomString(1024)));
  2649. }
  2650. WriteOptions wopts;
  2651. wopts.sync = true;
  2652. s = dbfull()->Write(wopts, &batch);
  2653. ASSERT_TRUE(s.IsIOFenced());
  2654. }
  2655. Close();
  2656. }
  2657. INSTANTIATE_TEST_CASE_P(DBErrorHandlingFSTest, DBErrorHandlingFencingTest,
  2658. ::testing::Bool());
  2659. } // namespace ROCKSDB_NAMESPACE
  2660. int main(int argc, char** argv) {
  2661. ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
  2662. ::testing::InitGoogleTest(&argc, argv);
  2663. return RUN_ALL_TESTS();
  2664. }