db_wal_test.cc 55 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #include "db/db_test_util.h"
  10. #include "env/composite_env_wrapper.h"
  11. #include "options/options_helper.h"
  12. #include "port/port.h"
  13. #include "port/stack_trace.h"
  14. #include "test_util/fault_injection_test_env.h"
  15. #include "test_util/sync_point.h"
  16. namespace ROCKSDB_NAMESPACE {
  17. class DBWALTest : public DBTestBase {
  18. public:
  19. DBWALTest() : DBTestBase("/db_wal_test") {}
  20. #if defined(ROCKSDB_PLATFORM_POSIX)
  21. uint64_t GetAllocatedFileSize(std::string file_name) {
  22. struct stat sbuf;
  23. int err = stat(file_name.c_str(), &sbuf);
  24. assert(err == 0);
  25. return sbuf.st_blocks * 512;
  26. }
  27. #endif
  28. };
  29. // A SpecialEnv enriched to give more insight about deleted files
  30. class EnrichedSpecialEnv : public SpecialEnv {
  31. public:
  32. explicit EnrichedSpecialEnv(Env* base) : SpecialEnv(base) {}
  33. Status NewSequentialFile(const std::string& f,
  34. std::unique_ptr<SequentialFile>* r,
  35. const EnvOptions& soptions) override {
  36. InstrumentedMutexLock l(&env_mutex_);
  37. if (f == skipped_wal) {
  38. deleted_wal_reopened = true;
  39. if (IsWAL(f) && largetest_deleted_wal.size() != 0 &&
  40. f.compare(largetest_deleted_wal) <= 0) {
  41. gap_in_wals = true;
  42. }
  43. }
  44. return SpecialEnv::NewSequentialFile(f, r, soptions);
  45. }
  46. Status DeleteFile(const std::string& fname) override {
  47. if (IsWAL(fname)) {
  48. deleted_wal_cnt++;
  49. InstrumentedMutexLock l(&env_mutex_);
  50. // If this is the first WAL, remember its name and skip deleting it. We
  51. // remember its name partly because the application might attempt to
  52. // delete the file again.
  53. if (skipped_wal.size() != 0 && skipped_wal != fname) {
  54. if (largetest_deleted_wal.size() == 0 ||
  55. largetest_deleted_wal.compare(fname) < 0) {
  56. largetest_deleted_wal = fname;
  57. }
  58. } else {
  59. skipped_wal = fname;
  60. return Status::OK();
  61. }
  62. }
  63. return SpecialEnv::DeleteFile(fname);
  64. }
  65. bool IsWAL(const std::string& fname) {
  66. // printf("iswal %s\n", fname.c_str());
  67. return fname.compare(fname.size() - 3, 3, "log") == 0;
  68. }
  69. InstrumentedMutex env_mutex_;
  70. // the wal whose actual delete was skipped by the env
  71. std::string skipped_wal = "";
  72. // the largest WAL that was requested to be deleted
  73. std::string largetest_deleted_wal = "";
  74. // number of WALs that were successfully deleted
  75. std::atomic<size_t> deleted_wal_cnt = {0};
  76. // the WAL whose delete from fs was skipped is reopened during recovery
  77. std::atomic<bool> deleted_wal_reopened = {false};
  78. // whether a gap in the WALs was detected during recovery
  79. std::atomic<bool> gap_in_wals = {false};
  80. };
  81. class DBWALTestWithEnrichedEnv : public DBTestBase {
  82. public:
  83. DBWALTestWithEnrichedEnv() : DBTestBase("/db_wal_test") {
  84. enriched_env_ = new EnrichedSpecialEnv(env_->target());
  85. auto options = CurrentOptions();
  86. options.env = enriched_env_;
  87. options.allow_2pc = true;
  88. Reopen(options);
  89. delete env_;
  90. // to be deleted by the parent class
  91. env_ = enriched_env_;
  92. }
  93. protected:
  94. EnrichedSpecialEnv* enriched_env_;
  95. };
  96. // Test that the recovery would successfully avoid the gaps between the logs.
  97. // One known scenario that could cause this is that the application issue the
  98. // WAL deletion out of order. For the sake of simplicity in the test, here we
  99. // create the gap by manipulating the env to skip deletion of the first WAL but
  100. // not the ones after it.
  101. TEST_F(DBWALTestWithEnrichedEnv, SkipDeletedWALs) {
  102. auto options = last_options_;
  103. // To cause frequent WAL deletion
  104. options.write_buffer_size = 128;
  105. Reopen(options);
  106. WriteOptions writeOpt = WriteOptions();
  107. for (int i = 0; i < 128 * 5; i++) {
  108. ASSERT_OK(dbfull()->Put(writeOpt, "foo", "v1"));
  109. }
  110. FlushOptions fo;
  111. fo.wait = true;
  112. ASSERT_OK(db_->Flush(fo));
  113. // some wals are deleted
  114. ASSERT_NE(0, enriched_env_->deleted_wal_cnt);
  115. // but not the first one
  116. ASSERT_NE(0, enriched_env_->skipped_wal.size());
  117. // Test that the WAL that was not deleted will be skipped during recovery
  118. options = last_options_;
  119. Reopen(options);
  120. ASSERT_FALSE(enriched_env_->deleted_wal_reopened);
  121. ASSERT_FALSE(enriched_env_->gap_in_wals);
  122. }
  123. TEST_F(DBWALTest, WAL) {
  124. do {
  125. CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
  126. WriteOptions writeOpt = WriteOptions();
  127. writeOpt.disableWAL = true;
  128. ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v1"));
  129. ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v1"));
  130. ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
  131. ASSERT_EQ("v1", Get(1, "foo"));
  132. ASSERT_EQ("v1", Get(1, "bar"));
  133. writeOpt.disableWAL = false;
  134. ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v2"));
  135. writeOpt.disableWAL = true;
  136. ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v2"));
  137. ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
  138. // Both value's should be present.
  139. ASSERT_EQ("v2", Get(1, "bar"));
  140. ASSERT_EQ("v2", Get(1, "foo"));
  141. writeOpt.disableWAL = true;
  142. ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v3"));
  143. writeOpt.disableWAL = false;
  144. ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v3"));
  145. ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
  146. // again both values should be present.
  147. ASSERT_EQ("v3", Get(1, "foo"));
  148. ASSERT_EQ("v3", Get(1, "bar"));
  149. } while (ChangeWalOptions());
  150. }
  151. TEST_F(DBWALTest, RollLog) {
  152. do {
  153. CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
  154. ASSERT_OK(Put(1, "foo", "v1"));
  155. ASSERT_OK(Put(1, "baz", "v5"));
  156. ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
  157. for (int i = 0; i < 10; i++) {
  158. ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
  159. }
  160. ASSERT_OK(Put(1, "foo", "v4"));
  161. for (int i = 0; i < 10; i++) {
  162. ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
  163. }
  164. } while (ChangeWalOptions());
  165. }
  166. TEST_F(DBWALTest, SyncWALNotBlockWrite) {
  167. Options options = CurrentOptions();
  168. options.max_write_buffer_number = 4;
  169. DestroyAndReopen(options);
  170. ASSERT_OK(Put("foo1", "bar1"));
  171. ASSERT_OK(Put("foo5", "bar5"));
  172. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
  173. {"WritableFileWriter::SyncWithoutFlush:1",
  174. "DBWALTest::SyncWALNotBlockWrite:1"},
  175. {"DBWALTest::SyncWALNotBlockWrite:2",
  176. "WritableFileWriter::SyncWithoutFlush:2"},
  177. });
  178. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  179. ROCKSDB_NAMESPACE::port::Thread thread([&]() { ASSERT_OK(db_->SyncWAL()); });
  180. TEST_SYNC_POINT("DBWALTest::SyncWALNotBlockWrite:1");
  181. ASSERT_OK(Put("foo2", "bar2"));
  182. ASSERT_OK(Put("foo3", "bar3"));
  183. FlushOptions fo;
  184. fo.wait = false;
  185. ASSERT_OK(db_->Flush(fo));
  186. ASSERT_OK(Put("foo4", "bar4"));
  187. TEST_SYNC_POINT("DBWALTest::SyncWALNotBlockWrite:2");
  188. thread.join();
  189. ASSERT_EQ(Get("foo1"), "bar1");
  190. ASSERT_EQ(Get("foo2"), "bar2");
  191. ASSERT_EQ(Get("foo3"), "bar3");
  192. ASSERT_EQ(Get("foo4"), "bar4");
  193. ASSERT_EQ(Get("foo5"), "bar5");
  194. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
  195. }
  196. TEST_F(DBWALTest, SyncWALNotWaitWrite) {
  197. ASSERT_OK(Put("foo1", "bar1"));
  198. ASSERT_OK(Put("foo3", "bar3"));
  199. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
  200. {"SpecialEnv::WalFile::Append:1", "DBWALTest::SyncWALNotWaitWrite:1"},
  201. {"DBWALTest::SyncWALNotWaitWrite:2", "SpecialEnv::WalFile::Append:2"},
  202. });
  203. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  204. ROCKSDB_NAMESPACE::port::Thread thread(
  205. [&]() { ASSERT_OK(Put("foo2", "bar2")); });
  206. // Moving this to SyncWAL before the actual fsync
  207. // TEST_SYNC_POINT("DBWALTest::SyncWALNotWaitWrite:1");
  208. ASSERT_OK(db_->SyncWAL());
  209. // Moving this to SyncWAL after actual fsync
  210. // TEST_SYNC_POINT("DBWALTest::SyncWALNotWaitWrite:2");
  211. thread.join();
  212. ASSERT_EQ(Get("foo1"), "bar1");
  213. ASSERT_EQ(Get("foo2"), "bar2");
  214. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
  215. }
  216. TEST_F(DBWALTest, Recover) {
  217. do {
  218. CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
  219. ASSERT_OK(Put(1, "foo", "v1"));
  220. ASSERT_OK(Put(1, "baz", "v5"));
  221. ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
  222. ASSERT_EQ("v1", Get(1, "foo"));
  223. ASSERT_EQ("v1", Get(1, "foo"));
  224. ASSERT_EQ("v5", Get(1, "baz"));
  225. ASSERT_OK(Put(1, "bar", "v2"));
  226. ASSERT_OK(Put(1, "foo", "v3"));
  227. ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
  228. ASSERT_EQ("v3", Get(1, "foo"));
  229. ASSERT_OK(Put(1, "foo", "v4"));
  230. ASSERT_EQ("v4", Get(1, "foo"));
  231. ASSERT_EQ("v2", Get(1, "bar"));
  232. ASSERT_EQ("v5", Get(1, "baz"));
  233. } while (ChangeWalOptions());
  234. }
  235. TEST_F(DBWALTest, RecoverWithTableHandle) {
  236. do {
  237. Options options = CurrentOptions();
  238. options.create_if_missing = true;
  239. options.disable_auto_compactions = true;
  240. options.avoid_flush_during_recovery = false;
  241. DestroyAndReopen(options);
  242. CreateAndReopenWithCF({"pikachu"}, options);
  243. ASSERT_OK(Put(1, "foo", "v1"));
  244. ASSERT_OK(Put(1, "bar", "v2"));
  245. ASSERT_OK(Flush(1));
  246. ASSERT_OK(Put(1, "foo", "v3"));
  247. ASSERT_OK(Put(1, "bar", "v4"));
  248. ASSERT_OK(Flush(1));
  249. ASSERT_OK(Put(1, "big", std::string(100, 'a')));
  250. options = CurrentOptions();
  251. const int kSmallMaxOpenFiles = 13;
  252. if (option_config_ == kDBLogDir) {
  253. // Use this option to check not preloading files
  254. // Set the max open files to be small enough so no preload will
  255. // happen.
  256. options.max_open_files = kSmallMaxOpenFiles;
  257. // RocksDB sanitize max open files to at least 20. Modify it back.
  258. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  259. "SanitizeOptions::AfterChangeMaxOpenFiles", [&](void* arg) {
  260. int* max_open_files = static_cast<int*>(arg);
  261. *max_open_files = kSmallMaxOpenFiles;
  262. });
  263. } else if (option_config_ == kWalDirAndMmapReads) {
  264. // Use this option to check always loading all files.
  265. options.max_open_files = 100;
  266. } else {
  267. options.max_open_files = -1;
  268. }
  269. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  270. ReopenWithColumnFamilies({"default", "pikachu"}, options);
  271. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
  272. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
  273. std::vector<std::vector<FileMetaData>> files;
  274. dbfull()->TEST_GetFilesMetaData(handles_[1], &files);
  275. size_t total_files = 0;
  276. for (const auto& level : files) {
  277. total_files += level.size();
  278. }
  279. ASSERT_EQ(total_files, 3);
  280. for (const auto& level : files) {
  281. for (const auto& file : level) {
  282. if (options.max_open_files == kSmallMaxOpenFiles) {
  283. ASSERT_TRUE(file.table_reader_handle == nullptr);
  284. } else {
  285. ASSERT_TRUE(file.table_reader_handle != nullptr);
  286. }
  287. }
  288. }
  289. } while (ChangeWalOptions());
  290. }
  291. TEST_F(DBWALTest, IgnoreRecoveredLog) {
  292. std::string backup_logs = dbname_ + "/backup_logs";
  293. do {
  294. // delete old files in backup_logs directory
  295. env_->CreateDirIfMissing(backup_logs);
  296. std::vector<std::string> old_files;
  297. env_->GetChildren(backup_logs, &old_files);
  298. for (auto& file : old_files) {
  299. if (file != "." && file != "..") {
  300. env_->DeleteFile(backup_logs + "/" + file);
  301. }
  302. }
  303. Options options = CurrentOptions();
  304. options.create_if_missing = true;
  305. options.merge_operator = MergeOperators::CreateUInt64AddOperator();
  306. options.wal_dir = dbname_ + "/logs";
  307. DestroyAndReopen(options);
  308. // fill up the DB
  309. std::string one, two;
  310. PutFixed64(&one, 1);
  311. PutFixed64(&two, 2);
  312. ASSERT_OK(db_->Merge(WriteOptions(), Slice("foo"), Slice(one)));
  313. ASSERT_OK(db_->Merge(WriteOptions(), Slice("foo"), Slice(one)));
  314. ASSERT_OK(db_->Merge(WriteOptions(), Slice("bar"), Slice(one)));
  315. // copy the logs to backup
  316. std::vector<std::string> logs;
  317. env_->GetChildren(options.wal_dir, &logs);
  318. for (auto& log : logs) {
  319. if (log != ".." && log != ".") {
  320. CopyFile(options.wal_dir + "/" + log, backup_logs + "/" + log);
  321. }
  322. }
  323. // recover the DB
  324. Reopen(options);
  325. ASSERT_EQ(two, Get("foo"));
  326. ASSERT_EQ(one, Get("bar"));
  327. Close();
  328. // copy the logs from backup back to wal dir
  329. for (auto& log : logs) {
  330. if (log != ".." && log != ".") {
  331. CopyFile(backup_logs + "/" + log, options.wal_dir + "/" + log);
  332. }
  333. }
  334. // this should ignore the log files, recovery should not happen again
  335. // if the recovery happens, the same merge operator would be called twice,
  336. // leading to incorrect results
  337. Reopen(options);
  338. ASSERT_EQ(two, Get("foo"));
  339. ASSERT_EQ(one, Get("bar"));
  340. Close();
  341. Destroy(options);
  342. Reopen(options);
  343. Close();
  344. // copy the logs from backup back to wal dir
  345. env_->CreateDirIfMissing(options.wal_dir);
  346. for (auto& log : logs) {
  347. if (log != ".." && log != ".") {
  348. CopyFile(backup_logs + "/" + log, options.wal_dir + "/" + log);
  349. }
  350. }
  351. // assert that we successfully recovered only from logs, even though we
  352. // destroyed the DB
  353. Reopen(options);
  354. ASSERT_EQ(two, Get("foo"));
  355. ASSERT_EQ(one, Get("bar"));
  356. // Recovery will fail if DB directory doesn't exist.
  357. Destroy(options);
  358. // copy the logs from backup back to wal dir
  359. env_->CreateDirIfMissing(options.wal_dir);
  360. for (auto& log : logs) {
  361. if (log != ".." && log != ".") {
  362. CopyFile(backup_logs + "/" + log, options.wal_dir + "/" + log);
  363. // we won't be needing this file no more
  364. env_->DeleteFile(backup_logs + "/" + log);
  365. }
  366. }
  367. Status s = TryReopen(options);
  368. ASSERT_TRUE(!s.ok());
  369. Destroy(options);
  370. } while (ChangeWalOptions());
  371. }
  372. TEST_F(DBWALTest, RecoveryWithEmptyLog) {
  373. do {
  374. CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
  375. ASSERT_OK(Put(1, "foo", "v1"));
  376. ASSERT_OK(Put(1, "foo", "v2"));
  377. ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
  378. ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
  379. ASSERT_OK(Put(1, "foo", "v3"));
  380. ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
  381. ASSERT_EQ("v3", Get(1, "foo"));
  382. } while (ChangeWalOptions());
  383. }
  384. #if !(defined NDEBUG) || !defined(OS_WIN)
  385. TEST_F(DBWALTest, PreallocateBlock) {
  386. Options options = CurrentOptions();
  387. options.write_buffer_size = 10 * 1000 * 1000;
  388. options.max_total_wal_size = 0;
  389. size_t expected_preallocation_size = static_cast<size_t>(
  390. options.write_buffer_size + options.write_buffer_size / 10);
  391. DestroyAndReopen(options);
  392. std::atomic<int> called(0);
  393. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  394. "DBTestWalFile.GetPreallocationStatus", [&](void* arg) {
  395. ASSERT_TRUE(arg != nullptr);
  396. size_t preallocation_size = *(static_cast<size_t*>(arg));
  397. ASSERT_EQ(expected_preallocation_size, preallocation_size);
  398. called.fetch_add(1);
  399. });
  400. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  401. Put("", "");
  402. Flush();
  403. Put("", "");
  404. Close();
  405. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
  406. ASSERT_EQ(2, called.load());
  407. options.max_total_wal_size = 1000 * 1000;
  408. expected_preallocation_size = static_cast<size_t>(options.max_total_wal_size);
  409. Reopen(options);
  410. called.store(0);
  411. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  412. "DBTestWalFile.GetPreallocationStatus", [&](void* arg) {
  413. ASSERT_TRUE(arg != nullptr);
  414. size_t preallocation_size = *(static_cast<size_t*>(arg));
  415. ASSERT_EQ(expected_preallocation_size, preallocation_size);
  416. called.fetch_add(1);
  417. });
  418. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  419. Put("", "");
  420. Flush();
  421. Put("", "");
  422. Close();
  423. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
  424. ASSERT_EQ(2, called.load());
  425. options.db_write_buffer_size = 800 * 1000;
  426. expected_preallocation_size =
  427. static_cast<size_t>(options.db_write_buffer_size);
  428. Reopen(options);
  429. called.store(0);
  430. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  431. "DBTestWalFile.GetPreallocationStatus", [&](void* arg) {
  432. ASSERT_TRUE(arg != nullptr);
  433. size_t preallocation_size = *(static_cast<size_t*>(arg));
  434. ASSERT_EQ(expected_preallocation_size, preallocation_size);
  435. called.fetch_add(1);
  436. });
  437. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  438. Put("", "");
  439. Flush();
  440. Put("", "");
  441. Close();
  442. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
  443. ASSERT_EQ(2, called.load());
  444. expected_preallocation_size = 700 * 1000;
  445. std::shared_ptr<WriteBufferManager> write_buffer_manager =
  446. std::make_shared<WriteBufferManager>(static_cast<uint64_t>(700 * 1000));
  447. options.write_buffer_manager = write_buffer_manager;
  448. Reopen(options);
  449. called.store(0);
  450. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  451. "DBTestWalFile.GetPreallocationStatus", [&](void* arg) {
  452. ASSERT_TRUE(arg != nullptr);
  453. size_t preallocation_size = *(static_cast<size_t*>(arg));
  454. ASSERT_EQ(expected_preallocation_size, preallocation_size);
  455. called.fetch_add(1);
  456. });
  457. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  458. Put("", "");
  459. Flush();
  460. Put("", "");
  461. Close();
  462. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
  463. ASSERT_EQ(2, called.load());
  464. }
  465. #endif // !(defined NDEBUG) || !defined(OS_WIN)
  466. #ifndef ROCKSDB_LITE
  467. TEST_F(DBWALTest, FullPurgePreservesRecycledLog) {
  468. // For github issue #1303
  469. for (int i = 0; i < 2; ++i) {
  470. Options options = CurrentOptions();
  471. options.create_if_missing = true;
  472. options.recycle_log_file_num = 2;
  473. if (i != 0) {
  474. options.wal_dir = alternative_wal_dir_;
  475. }
  476. DestroyAndReopen(options);
  477. ASSERT_OK(Put("foo", "v1"));
  478. VectorLogPtr log_files;
  479. ASSERT_OK(dbfull()->GetSortedWalFiles(log_files));
  480. ASSERT_GT(log_files.size(), 0);
  481. ASSERT_OK(Flush());
  482. // Now the original WAL is in log_files[0] and should be marked for
  483. // recycling.
  484. // Verify full purge cannot remove this file.
  485. JobContext job_context(0);
  486. dbfull()->TEST_LockMutex();
  487. dbfull()->FindObsoleteFiles(&job_context, true /* force */);
  488. dbfull()->TEST_UnlockMutex();
  489. dbfull()->PurgeObsoleteFiles(job_context);
  490. if (i == 0) {
  491. ASSERT_OK(
  492. env_->FileExists(LogFileName(dbname_, log_files[0]->LogNumber())));
  493. } else {
  494. ASSERT_OK(env_->FileExists(
  495. LogFileName(alternative_wal_dir_, log_files[0]->LogNumber())));
  496. }
  497. }
  498. }
  499. TEST_F(DBWALTest, FullPurgePreservesLogPendingReuse) {
  500. // Ensures full purge cannot delete a WAL while it's in the process of being
  501. // recycled. In particular, we force the full purge after a file has been
  502. // chosen for reuse, but before it has been renamed.
  503. for (int i = 0; i < 2; ++i) {
  504. Options options = CurrentOptions();
  505. options.recycle_log_file_num = 1;
  506. if (i != 0) {
  507. options.wal_dir = alternative_wal_dir_;
  508. }
  509. DestroyAndReopen(options);
  510. // The first flush creates a second log so writes can continue before the
  511. // flush finishes.
  512. ASSERT_OK(Put("foo", "bar"));
  513. ASSERT_OK(Flush());
  514. // The second flush can recycle the first log. Sync points enforce the
  515. // full purge happens after choosing the log to recycle and before it is
  516. // renamed.
  517. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
  518. {"DBImpl::CreateWAL:BeforeReuseWritableFile1",
  519. "DBWALTest::FullPurgePreservesLogPendingReuse:PreFullPurge"},
  520. {"DBWALTest::FullPurgePreservesLogPendingReuse:PostFullPurge",
  521. "DBImpl::CreateWAL:BeforeReuseWritableFile2"},
  522. });
  523. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  524. ROCKSDB_NAMESPACE::port::Thread thread([&]() {
  525. TEST_SYNC_POINT(
  526. "DBWALTest::FullPurgePreservesLogPendingReuse:PreFullPurge");
  527. ASSERT_OK(db_->EnableFileDeletions(true));
  528. TEST_SYNC_POINT(
  529. "DBWALTest::FullPurgePreservesLogPendingReuse:PostFullPurge");
  530. });
  531. ASSERT_OK(Put("foo", "bar"));
  532. ASSERT_OK(Flush());
  533. thread.join();
  534. }
  535. }
  536. TEST_F(DBWALTest, GetSortedWalFiles) {
  537. do {
  538. CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
  539. VectorLogPtr log_files;
  540. ASSERT_OK(dbfull()->GetSortedWalFiles(log_files));
  541. ASSERT_EQ(0, log_files.size());
  542. ASSERT_OK(Put(1, "foo", "v1"));
  543. ASSERT_OK(dbfull()->GetSortedWalFiles(log_files));
  544. ASSERT_EQ(1, log_files.size());
  545. } while (ChangeWalOptions());
  546. }
  547. TEST_F(DBWALTest, GetCurrentWalFile) {
  548. do {
  549. CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
  550. std::unique_ptr<LogFile>* bad_log_file = nullptr;
  551. ASSERT_NOK(dbfull()->GetCurrentWalFile(bad_log_file));
  552. std::unique_ptr<LogFile> log_file;
  553. ASSERT_OK(dbfull()->GetCurrentWalFile(&log_file));
  554. // nothing has been written to the log yet
  555. ASSERT_EQ(log_file->StartSequence(), 0);
  556. ASSERT_EQ(log_file->SizeFileBytes(), 0);
  557. ASSERT_EQ(log_file->Type(), kAliveLogFile);
  558. ASSERT_GT(log_file->LogNumber(), 0);
  559. // add some data and verify that the file size actually moves foward
  560. ASSERT_OK(Put(0, "foo", "v1"));
  561. ASSERT_OK(Put(0, "foo2", "v2"));
  562. ASSERT_OK(Put(0, "foo3", "v3"));
  563. ASSERT_OK(dbfull()->GetCurrentWalFile(&log_file));
  564. ASSERT_EQ(log_file->StartSequence(), 0);
  565. ASSERT_GT(log_file->SizeFileBytes(), 0);
  566. ASSERT_EQ(log_file->Type(), kAliveLogFile);
  567. ASSERT_GT(log_file->LogNumber(), 0);
  568. // force log files to cycle and add some more data, then check if
  569. // log number moves forward
  570. ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
  571. for (int i = 0; i < 10; i++) {
  572. ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
  573. }
  574. ASSERT_OK(Put(0, "foo4", "v4"));
  575. ASSERT_OK(Put(0, "foo5", "v5"));
  576. ASSERT_OK(Put(0, "foo6", "v6"));
  577. ASSERT_OK(dbfull()->GetCurrentWalFile(&log_file));
  578. ASSERT_EQ(log_file->StartSequence(), 0);
  579. ASSERT_GT(log_file->SizeFileBytes(), 0);
  580. ASSERT_EQ(log_file->Type(), kAliveLogFile);
  581. ASSERT_GT(log_file->LogNumber(), 0);
  582. } while (ChangeWalOptions());
  583. }
  584. TEST_F(DBWALTest, RecoveryWithLogDataForSomeCFs) {
  585. // Test for regression of WAL cleanup missing files that don't contain data
  586. // for every column family.
  587. do {
  588. CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
  589. ASSERT_OK(Put(1, "foo", "v1"));
  590. ASSERT_OK(Put(1, "foo", "v2"));
  591. uint64_t earliest_log_nums[2];
  592. for (int i = 0; i < 2; ++i) {
  593. if (i > 0) {
  594. ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
  595. }
  596. VectorLogPtr log_files;
  597. ASSERT_OK(dbfull()->GetSortedWalFiles(log_files));
  598. if (log_files.size() > 0) {
  599. earliest_log_nums[i] = log_files[0]->LogNumber();
  600. } else {
  601. earliest_log_nums[i] = port::kMaxUint64;
  602. }
  603. }
  604. // Check at least the first WAL was cleaned up during the recovery.
  605. ASSERT_LT(earliest_log_nums[0], earliest_log_nums[1]);
  606. } while (ChangeWalOptions());
  607. }
  608. TEST_F(DBWALTest, RecoverWithLargeLog) {
  609. do {
  610. {
  611. Options options = CurrentOptions();
  612. CreateAndReopenWithCF({"pikachu"}, options);
  613. ASSERT_OK(Put(1, "big1", std::string(200000, '1')));
  614. ASSERT_OK(Put(1, "big2", std::string(200000, '2')));
  615. ASSERT_OK(Put(1, "small3", std::string(10, '3')));
  616. ASSERT_OK(Put(1, "small4", std::string(10, '4')));
  617. ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0);
  618. }
  619. // Make sure that if we re-open with a small write buffer size that
  620. // we flush table files in the middle of a large log file.
  621. Options options;
  622. options.write_buffer_size = 100000;
  623. options = CurrentOptions(options);
  624. ReopenWithColumnFamilies({"default", "pikachu"}, options);
  625. ASSERT_EQ(NumTableFilesAtLevel(0, 1), 3);
  626. ASSERT_EQ(std::string(200000, '1'), Get(1, "big1"));
  627. ASSERT_EQ(std::string(200000, '2'), Get(1, "big2"));
  628. ASSERT_EQ(std::string(10, '3'), Get(1, "small3"));
  629. ASSERT_EQ(std::string(10, '4'), Get(1, "small4"));
  630. ASSERT_GT(NumTableFilesAtLevel(0, 1), 1);
  631. } while (ChangeWalOptions());
  632. }
  633. // In https://reviews.facebook.net/D20661 we change
  634. // recovery behavior: previously for each log file each column family
  635. // memtable was flushed, even it was empty. Now it's changed:
  636. // we try to create the smallest number of table files by merging
  637. // updates from multiple logs
  638. TEST_F(DBWALTest, RecoverCheckFileAmountWithSmallWriteBuffer) {
  639. Options options = CurrentOptions();
  640. options.write_buffer_size = 5000000;
  641. CreateAndReopenWithCF({"pikachu", "dobrynia", "nikitich"}, options);
  642. // Since we will reopen DB with smaller write_buffer_size,
  643. // each key will go to new SST file
  644. ASSERT_OK(Put(1, Key(10), DummyString(1000000)));
  645. ASSERT_OK(Put(1, Key(10), DummyString(1000000)));
  646. ASSERT_OK(Put(1, Key(10), DummyString(1000000)));
  647. ASSERT_OK(Put(1, Key(10), DummyString(1000000)));
  648. ASSERT_OK(Put(3, Key(10), DummyString(1)));
  649. // Make 'dobrynia' to be flushed and new WAL file to be created
  650. ASSERT_OK(Put(2, Key(10), DummyString(7500000)));
  651. ASSERT_OK(Put(2, Key(1), DummyString(1)));
  652. dbfull()->TEST_WaitForFlushMemTable(handles_[2]);
  653. {
  654. auto tables = ListTableFiles(env_, dbname_);
  655. ASSERT_EQ(tables.size(), static_cast<size_t>(1));
  656. // Make sure 'dobrynia' was flushed: check sst files amount
  657. ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"),
  658. static_cast<uint64_t>(1));
  659. }
  660. // New WAL file
  661. ASSERT_OK(Put(1, Key(1), DummyString(1)));
  662. ASSERT_OK(Put(1, Key(1), DummyString(1)));
  663. ASSERT_OK(Put(3, Key(10), DummyString(1)));
  664. ASSERT_OK(Put(3, Key(10), DummyString(1)));
  665. ASSERT_OK(Put(3, Key(10), DummyString(1)));
  666. options.write_buffer_size = 4096;
  667. options.arena_block_size = 4096;
  668. ReopenWithColumnFamilies({"default", "pikachu", "dobrynia", "nikitich"},
  669. options);
  670. {
  671. // No inserts => default is empty
  672. ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
  673. static_cast<uint64_t>(0));
  674. // First 4 keys goes to separate SSTs + 1 more SST for 2 smaller keys
  675. ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"),
  676. static_cast<uint64_t>(5));
  677. // 1 SST for big key + 1 SST for small one
  678. ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"),
  679. static_cast<uint64_t>(2));
  680. // 1 SST for all keys
  681. ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
  682. static_cast<uint64_t>(1));
  683. }
  684. }
  685. // In https://reviews.facebook.net/D20661 we change
  686. // recovery behavior: previously for each log file each column family
  687. // memtable was flushed, even it wasn't empty. Now it's changed:
  688. // we try to create the smallest number of table files by merging
  689. // updates from multiple logs
  690. TEST_F(DBWALTest, RecoverCheckFileAmount) {
  691. Options options = CurrentOptions();
  692. options.write_buffer_size = 100000;
  693. options.arena_block_size = 4 * 1024;
  694. options.avoid_flush_during_recovery = false;
  695. CreateAndReopenWithCF({"pikachu", "dobrynia", "nikitich"}, options);
  696. ASSERT_OK(Put(0, Key(1), DummyString(1)));
  697. ASSERT_OK(Put(1, Key(1), DummyString(1)));
  698. ASSERT_OK(Put(2, Key(1), DummyString(1)));
  699. // Make 'nikitich' memtable to be flushed
  700. ASSERT_OK(Put(3, Key(10), DummyString(1002400)));
  701. ASSERT_OK(Put(3, Key(1), DummyString(1)));
  702. dbfull()->TEST_WaitForFlushMemTable(handles_[3]);
  703. // 4 memtable are not flushed, 1 sst file
  704. {
  705. auto tables = ListTableFiles(env_, dbname_);
  706. ASSERT_EQ(tables.size(), static_cast<size_t>(1));
  707. ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
  708. static_cast<uint64_t>(1));
  709. }
  710. // Memtable for 'nikitich' has flushed, new WAL file has opened
  711. // 4 memtable still not flushed
  712. // Write to new WAL file
  713. ASSERT_OK(Put(0, Key(1), DummyString(1)));
  714. ASSERT_OK(Put(1, Key(1), DummyString(1)));
  715. ASSERT_OK(Put(2, Key(1), DummyString(1)));
  716. // Fill up 'nikitich' one more time
  717. ASSERT_OK(Put(3, Key(10), DummyString(1002400)));
  718. // make it flush
  719. ASSERT_OK(Put(3, Key(1), DummyString(1)));
  720. dbfull()->TEST_WaitForFlushMemTable(handles_[3]);
  721. // There are still 4 memtable not flushed, and 2 sst tables
  722. ASSERT_OK(Put(0, Key(1), DummyString(1)));
  723. ASSERT_OK(Put(1, Key(1), DummyString(1)));
  724. ASSERT_OK(Put(2, Key(1), DummyString(1)));
  725. {
  726. auto tables = ListTableFiles(env_, dbname_);
  727. ASSERT_EQ(tables.size(), static_cast<size_t>(2));
  728. ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
  729. static_cast<uint64_t>(2));
  730. }
  731. ReopenWithColumnFamilies({"default", "pikachu", "dobrynia", "nikitich"},
  732. options);
  733. {
  734. std::vector<uint64_t> table_files = ListTableFiles(env_, dbname_);
  735. // Check, that records for 'default', 'dobrynia' and 'pikachu' from
  736. // first, second and third WALs went to the same SST.
  737. // So, there is 6 SSTs: three for 'nikitich', one for 'default', one for
  738. // 'dobrynia', one for 'pikachu'
  739. ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
  740. static_cast<uint64_t>(1));
  741. ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
  742. static_cast<uint64_t>(3));
  743. ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"),
  744. static_cast<uint64_t>(1));
  745. ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"),
  746. static_cast<uint64_t>(1));
  747. }
  748. }
  749. TEST_F(DBWALTest, SyncMultipleLogs) {
  750. const uint64_t kNumBatches = 2;
  751. const int kBatchSize = 1000;
  752. Options options = CurrentOptions();
  753. options.create_if_missing = true;
  754. options.write_buffer_size = 4096;
  755. Reopen(options);
  756. WriteBatch batch;
  757. WriteOptions wo;
  758. wo.sync = true;
  759. for (uint64_t b = 0; b < kNumBatches; b++) {
  760. batch.Clear();
  761. for (int i = 0; i < kBatchSize; i++) {
  762. batch.Put(Key(i), DummyString(128));
  763. }
  764. dbfull()->Write(wo, &batch);
  765. }
  766. ASSERT_OK(dbfull()->SyncWAL());
  767. }
  768. // Github issue 1339. Prior the fix we read sequence id from the first log to
  769. // a local variable, then keep increase the variable as we replay logs,
  770. // ignoring actual sequence id of the records. This is incorrect if some writes
  771. // come with WAL disabled.
  772. TEST_F(DBWALTest, PartOfWritesWithWALDisabled) {
  773. std::unique_ptr<FaultInjectionTestEnv> fault_env(
  774. new FaultInjectionTestEnv(env_));
  775. Options options = CurrentOptions();
  776. options.env = fault_env.get();
  777. options.disable_auto_compactions = true;
  778. WriteOptions wal_on, wal_off;
  779. wal_on.sync = true;
  780. wal_on.disableWAL = false;
  781. wal_off.disableWAL = true;
  782. CreateAndReopenWithCF({"dummy"}, options);
  783. ASSERT_OK(Put(1, "dummy", "d1", wal_on)); // seq id 1
  784. ASSERT_OK(Put(1, "dummy", "d2", wal_off));
  785. ASSERT_OK(Put(1, "dummy", "d3", wal_off));
  786. ASSERT_OK(Put(0, "key", "v4", wal_on)); // seq id 4
  787. ASSERT_OK(Flush(0));
  788. ASSERT_OK(Put(0, "key", "v5", wal_on)); // seq id 5
  789. ASSERT_EQ("v5", Get(0, "key"));
  790. dbfull()->FlushWAL(false);
  791. // Simulate a crash.
  792. fault_env->SetFilesystemActive(false);
  793. Close();
  794. fault_env->ResetState();
  795. ReopenWithColumnFamilies({"default", "dummy"}, options);
  796. // Prior to the fix, we may incorrectly recover "v5" with sequence id = 3.
  797. ASSERT_EQ("v5", Get(0, "key"));
  798. // Destroy DB before destruct fault_env.
  799. Destroy(options);
  800. }
  801. //
  802. // Test WAL recovery for the various modes available
  803. //
  804. class RecoveryTestHelper {
  805. public:
  806. // Number of WAL files to generate
  807. static const int kWALFilesCount = 10;
  808. // Starting number for the WAL file name like 00010.log
  809. static const int kWALFileOffset = 10;
  810. // Keys to be written per WAL file
  811. static const int kKeysPerWALFile = 133;
  812. // Size of the value
  813. static const int kValueSize = 96;
  814. // Create WAL files with values filled in
  815. static void FillData(DBWALTest* test, const Options& options,
  816. const size_t wal_count, size_t* count) {
  817. // Calling internal functions requires sanitized options.
  818. Options sanitized_options = SanitizeOptions(test->dbname_, options);
  819. const ImmutableDBOptions db_options(sanitized_options);
  820. *count = 0;
  821. std::shared_ptr<Cache> table_cache = NewLRUCache(50, 0);
  822. EnvOptions env_options;
  823. WriteBufferManager write_buffer_manager(db_options.db_write_buffer_size);
  824. std::unique_ptr<VersionSet> versions;
  825. std::unique_ptr<WalManager> wal_manager;
  826. WriteController write_controller;
  827. versions.reset(new VersionSet(test->dbname_, &db_options, env_options,
  828. table_cache.get(), &write_buffer_manager,
  829. &write_controller,
  830. /*block_cache_tracer=*/nullptr));
  831. wal_manager.reset(new WalManager(db_options, env_options));
  832. std::unique_ptr<log::Writer> current_log_writer;
  833. for (size_t j = kWALFileOffset; j < wal_count + kWALFileOffset; j++) {
  834. uint64_t current_log_number = j;
  835. std::string fname = LogFileName(test->dbname_, current_log_number);
  836. std::unique_ptr<WritableFile> file;
  837. ASSERT_OK(db_options.env->NewWritableFile(fname, &file, env_options));
  838. std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
  839. NewLegacyWritableFileWrapper(std::move(file)), fname, env_options));
  840. current_log_writer.reset(
  841. new log::Writer(std::move(file_writer), current_log_number,
  842. db_options.recycle_log_file_num > 0));
  843. WriteBatch batch;
  844. for (int i = 0; i < kKeysPerWALFile; i++) {
  845. std::string key = "key" + ToString((*count)++);
  846. std::string value = test->DummyString(kValueSize);
  847. assert(current_log_writer.get() != nullptr);
  848. uint64_t seq = versions->LastSequence() + 1;
  849. batch.Clear();
  850. batch.Put(key, value);
  851. WriteBatchInternal::SetSequence(&batch, seq);
  852. current_log_writer->AddRecord(WriteBatchInternal::Contents(&batch));
  853. versions->SetLastAllocatedSequence(seq);
  854. versions->SetLastPublishedSequence(seq);
  855. versions->SetLastSequence(seq);
  856. }
  857. }
  858. }
  859. // Recreate and fill the store with some data
  860. static size_t FillData(DBWALTest* test, Options* options) {
  861. options->create_if_missing = true;
  862. test->DestroyAndReopen(*options);
  863. test->Close();
  864. size_t count = 0;
  865. FillData(test, *options, kWALFilesCount, &count);
  866. return count;
  867. }
  868. // Read back all the keys we wrote and return the number of keys found
  869. static size_t GetData(DBWALTest* test) {
  870. size_t count = 0;
  871. for (size_t i = 0; i < kWALFilesCount * kKeysPerWALFile; i++) {
  872. if (test->Get("key" + ToString(i)) != "NOT_FOUND") {
  873. ++count;
  874. }
  875. }
  876. return count;
  877. }
  878. // Manuall corrupt the specified WAL
  879. static void CorruptWAL(DBWALTest* test, const Options& options,
  880. const double off, const double len,
  881. const int wal_file_id, const bool trunc = false) {
  882. Env* env = options.env;
  883. std::string fname = LogFileName(test->dbname_, wal_file_id);
  884. uint64_t size;
  885. ASSERT_OK(env->GetFileSize(fname, &size));
  886. ASSERT_GT(size, 0);
  887. #ifdef OS_WIN
  888. // Windows disk cache behaves differently. When we truncate
  889. // the original content is still in the cache due to the original
  890. // handle is still open. Generally, in Windows, one prohibits
  891. // shared access to files and it is not needed for WAL but we allow
  892. // it to induce corruption at various tests.
  893. test->Close();
  894. #endif
  895. if (trunc) {
  896. ASSERT_EQ(0, truncate(fname.c_str(), static_cast<int64_t>(size * off)));
  897. } else {
  898. InduceCorruption(fname, static_cast<size_t>(size * off + 8),
  899. static_cast<size_t>(size * len));
  900. }
  901. }
  902. // Overwrite data with 'a' from offset for length len
  903. static void InduceCorruption(const std::string& filename, size_t offset,
  904. size_t len) {
  905. ASSERT_GT(len, 0U);
  906. int fd = open(filename.c_str(), O_RDWR);
  907. // On windows long is 32-bit
  908. ASSERT_LE(offset, std::numeric_limits<long>::max());
  909. ASSERT_GT(fd, 0);
  910. ASSERT_EQ(offset, lseek(fd, static_cast<long>(offset), SEEK_SET));
  911. void* buf = alloca(len);
  912. memset(buf, 'b', len);
  913. ASSERT_EQ(len, write(fd, buf, static_cast<unsigned int>(len)));
  914. close(fd);
  915. }
  916. };
  917. // Test scope:
  918. // - We expect to open the data store when there is incomplete trailing writes
  919. // at the end of any of the logs
  920. // - We do not expect to open the data store for corruption
  921. TEST_F(DBWALTest, kTolerateCorruptedTailRecords) {
  922. const int jstart = RecoveryTestHelper::kWALFileOffset;
  923. const int jend = jstart + RecoveryTestHelper::kWALFilesCount;
  924. for (auto trunc : {true, false}) { /* Corruption style */
  925. for (int i = 0; i < 3; i++) { /* Corruption offset position */
  926. for (int j = jstart; j < jend; j++) { /* WAL file */
  927. // Fill data for testing
  928. Options options = CurrentOptions();
  929. const size_t row_count = RecoveryTestHelper::FillData(this, &options);
  930. // test checksum failure or parsing
  931. RecoveryTestHelper::CorruptWAL(this, options, /*off=*/i * .3,
  932. /*len%=*/.1, /*wal=*/j, trunc);
  933. if (trunc) {
  934. options.wal_recovery_mode =
  935. WALRecoveryMode::kTolerateCorruptedTailRecords;
  936. options.create_if_missing = false;
  937. ASSERT_OK(TryReopen(options));
  938. const size_t recovered_row_count = RecoveryTestHelper::GetData(this);
  939. ASSERT_TRUE(i == 0 || recovered_row_count > 0);
  940. ASSERT_LT(recovered_row_count, row_count);
  941. } else {
  942. options.wal_recovery_mode =
  943. WALRecoveryMode::kTolerateCorruptedTailRecords;
  944. ASSERT_NOK(TryReopen(options));
  945. }
  946. }
  947. }
  948. }
  949. }
  950. // Test scope:
  951. // We don't expect the data store to be opened if there is any corruption
  952. // (leading, middle or trailing -- incomplete writes or corruption)
  953. TEST_F(DBWALTest, kAbsoluteConsistency) {
  954. const int jstart = RecoveryTestHelper::kWALFileOffset;
  955. const int jend = jstart + RecoveryTestHelper::kWALFilesCount;
  956. // Verify clean slate behavior
  957. Options options = CurrentOptions();
  958. const size_t row_count = RecoveryTestHelper::FillData(this, &options);
  959. options.wal_recovery_mode = WALRecoveryMode::kAbsoluteConsistency;
  960. options.create_if_missing = false;
  961. ASSERT_OK(TryReopen(options));
  962. ASSERT_EQ(RecoveryTestHelper::GetData(this), row_count);
  963. for (auto trunc : {true, false}) { /* Corruption style */
  964. for (int i = 0; i < 4; i++) { /* Corruption offset position */
  965. if (trunc && i == 0) {
  966. continue;
  967. }
  968. for (int j = jstart; j < jend; j++) { /* wal files */
  969. // fill with new date
  970. RecoveryTestHelper::FillData(this, &options);
  971. // corrupt the wal
  972. RecoveryTestHelper::CorruptWAL(this, options, /*off=*/i * .3,
  973. /*len%=*/.1, j, trunc);
  974. // verify
  975. options.wal_recovery_mode = WALRecoveryMode::kAbsoluteConsistency;
  976. options.create_if_missing = false;
  977. ASSERT_NOK(TryReopen(options));
  978. }
  979. }
  980. }
  981. }
  982. // Test scope:
  983. // We don't expect the data store to be opened if there is any inconsistency
  984. // between WAL and SST files
  985. TEST_F(DBWALTest, kPointInTimeRecoveryCFConsistency) {
  986. Options options = CurrentOptions();
  987. options.avoid_flush_during_recovery = true;
  988. // Create DB with multiple column families.
  989. CreateAndReopenWithCF({"one", "two"}, options);
  990. ASSERT_OK(Put(1, "key1", "val1"));
  991. ASSERT_OK(Put(2, "key2", "val2"));
  992. // Record the offset at this point
  993. Env* env = options.env;
  994. uint64_t wal_file_id = dbfull()->TEST_LogfileNumber();
  995. std::string fname = LogFileName(dbname_, wal_file_id);
  996. uint64_t offset_to_corrupt;
  997. ASSERT_OK(env->GetFileSize(fname, &offset_to_corrupt));
  998. ASSERT_GT(offset_to_corrupt, 0);
  999. ASSERT_OK(Put(1, "key3", "val3"));
  1000. // Corrupt WAL at location of key3
  1001. RecoveryTestHelper::InduceCorruption(
  1002. fname, static_cast<size_t>(offset_to_corrupt), static_cast<size_t>(4));
  1003. ASSERT_OK(Put(2, "key4", "val4"));
  1004. ASSERT_OK(Put(1, "key5", "val5"));
  1005. Flush(2);
  1006. // PIT recovery & verify
  1007. options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
  1008. ASSERT_NOK(TryReopenWithColumnFamilies({"default", "one", "two"}, options));
  1009. }
  1010. // Test scope:
  1011. // - We expect to open data store under all circumstances
  1012. // - We expect only data upto the point where the first error was encountered
  1013. TEST_F(DBWALTest, kPointInTimeRecovery) {
  1014. const int jstart = RecoveryTestHelper::kWALFileOffset;
  1015. const int jend = jstart + RecoveryTestHelper::kWALFilesCount;
  1016. const int maxkeys =
  1017. RecoveryTestHelper::kWALFilesCount * RecoveryTestHelper::kKeysPerWALFile;
  1018. for (auto trunc : {true, false}) { /* Corruption style */
  1019. for (int i = 0; i < 4; i++) { /* Offset of corruption */
  1020. for (int j = jstart; j < jend; j++) { /* WAL file */
  1021. // Fill data for testing
  1022. Options options = CurrentOptions();
  1023. const size_t row_count = RecoveryTestHelper::FillData(this, &options);
  1024. // Corrupt the wal
  1025. RecoveryTestHelper::CorruptWAL(this, options, /*off=*/i * .3,
  1026. /*len%=*/.1, j, trunc);
  1027. // Verify
  1028. options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
  1029. options.create_if_missing = false;
  1030. ASSERT_OK(TryReopen(options));
  1031. // Probe data for invariants
  1032. size_t recovered_row_count = RecoveryTestHelper::GetData(this);
  1033. ASSERT_LT(recovered_row_count, row_count);
  1034. bool expect_data = true;
  1035. for (size_t k = 0; k < maxkeys; ++k) {
  1036. bool found = Get("key" + ToString(i)) != "NOT_FOUND";
  1037. if (expect_data && !found) {
  1038. expect_data = false;
  1039. }
  1040. ASSERT_EQ(found, expect_data);
  1041. }
  1042. const size_t min = RecoveryTestHelper::kKeysPerWALFile *
  1043. (j - RecoveryTestHelper::kWALFileOffset);
  1044. ASSERT_GE(recovered_row_count, min);
  1045. if (!trunc && i != 0) {
  1046. const size_t max = RecoveryTestHelper::kKeysPerWALFile *
  1047. (j - RecoveryTestHelper::kWALFileOffset + 1);
  1048. ASSERT_LE(recovered_row_count, max);
  1049. }
  1050. }
  1051. }
  1052. }
  1053. }
  1054. // Test scope:
  1055. // - We expect to open the data store under all scenarios
  1056. // - We expect to have recovered records past the corruption zone
  1057. TEST_F(DBWALTest, kSkipAnyCorruptedRecords) {
  1058. const int jstart = RecoveryTestHelper::kWALFileOffset;
  1059. const int jend = jstart + RecoveryTestHelper::kWALFilesCount;
  1060. for (auto trunc : {true, false}) { /* Corruption style */
  1061. for (int i = 0; i < 4; i++) { /* Corruption offset */
  1062. for (int j = jstart; j < jend; j++) { /* wal files */
  1063. // Fill data for testing
  1064. Options options = CurrentOptions();
  1065. const size_t row_count = RecoveryTestHelper::FillData(this, &options);
  1066. // Corrupt the WAL
  1067. RecoveryTestHelper::CorruptWAL(this, options, /*off=*/i * .3,
  1068. /*len%=*/.1, j, trunc);
  1069. // Verify behavior
  1070. options.wal_recovery_mode = WALRecoveryMode::kSkipAnyCorruptedRecords;
  1071. options.create_if_missing = false;
  1072. ASSERT_OK(TryReopen(options));
  1073. // Probe data for invariants
  1074. size_t recovered_row_count = RecoveryTestHelper::GetData(this);
  1075. ASSERT_LT(recovered_row_count, row_count);
  1076. if (!trunc) {
  1077. ASSERT_TRUE(i != 0 || recovered_row_count > 0);
  1078. }
  1079. }
  1080. }
  1081. }
  1082. }
  1083. TEST_F(DBWALTest, AvoidFlushDuringRecovery) {
  1084. Options options = CurrentOptions();
  1085. options.disable_auto_compactions = true;
  1086. options.avoid_flush_during_recovery = false;
  1087. // Test with flush after recovery.
  1088. Reopen(options);
  1089. ASSERT_OK(Put("foo", "v1"));
  1090. ASSERT_OK(Put("bar", "v2"));
  1091. ASSERT_OK(Flush());
  1092. ASSERT_OK(Put("foo", "v3"));
  1093. ASSERT_OK(Put("bar", "v4"));
  1094. ASSERT_EQ(1, TotalTableFiles());
  1095. // Reopen DB. Check if WAL logs flushed.
  1096. Reopen(options);
  1097. ASSERT_EQ("v3", Get("foo"));
  1098. ASSERT_EQ("v4", Get("bar"));
  1099. ASSERT_EQ(2, TotalTableFiles());
  1100. // Test without flush after recovery.
  1101. options.avoid_flush_during_recovery = true;
  1102. DestroyAndReopen(options);
  1103. ASSERT_OK(Put("foo", "v5"));
  1104. ASSERT_OK(Put("bar", "v6"));
  1105. ASSERT_OK(Flush());
  1106. ASSERT_OK(Put("foo", "v7"));
  1107. ASSERT_OK(Put("bar", "v8"));
  1108. ASSERT_EQ(1, TotalTableFiles());
  1109. // Reopen DB. WAL logs should not be flushed this time.
  1110. Reopen(options);
  1111. ASSERT_EQ("v7", Get("foo"));
  1112. ASSERT_EQ("v8", Get("bar"));
  1113. ASSERT_EQ(1, TotalTableFiles());
  1114. // Force flush with allow_2pc.
  1115. options.avoid_flush_during_recovery = true;
  1116. options.allow_2pc = true;
  1117. ASSERT_OK(Put("foo", "v9"));
  1118. ASSERT_OK(Put("bar", "v10"));
  1119. ASSERT_OK(Flush());
  1120. ASSERT_OK(Put("foo", "v11"));
  1121. ASSERT_OK(Put("bar", "v12"));
  1122. Reopen(options);
  1123. ASSERT_EQ("v11", Get("foo"));
  1124. ASSERT_EQ("v12", Get("bar"));
  1125. ASSERT_EQ(3, TotalTableFiles());
  1126. }
  1127. TEST_F(DBWALTest, WalCleanupAfterAvoidFlushDuringRecovery) {
  1128. // Verifies WAL files that were present during recovery, but not flushed due
  1129. // to avoid_flush_during_recovery, will be considered for deletion at a later
  1130. // stage. We check at least one such file is deleted during Flush().
  1131. Options options = CurrentOptions();
  1132. options.disable_auto_compactions = true;
  1133. options.avoid_flush_during_recovery = true;
  1134. Reopen(options);
  1135. ASSERT_OK(Put("foo", "v1"));
  1136. Reopen(options);
  1137. for (int i = 0; i < 2; ++i) {
  1138. if (i > 0) {
  1139. // Flush() triggers deletion of obsolete tracked files
  1140. Flush();
  1141. }
  1142. VectorLogPtr log_files;
  1143. ASSERT_OK(dbfull()->GetSortedWalFiles(log_files));
  1144. if (i == 0) {
  1145. ASSERT_GT(log_files.size(), 0);
  1146. } else {
  1147. ASSERT_EQ(0, log_files.size());
  1148. }
  1149. }
  1150. }
  1151. TEST_F(DBWALTest, RecoverWithoutFlush) {
  1152. Options options = CurrentOptions();
  1153. options.avoid_flush_during_recovery = true;
  1154. options.create_if_missing = false;
  1155. options.disable_auto_compactions = true;
  1156. options.write_buffer_size = 64 * 1024 * 1024;
  1157. size_t count = RecoveryTestHelper::FillData(this, &options);
  1158. auto validateData = [this, count]() {
  1159. for (size_t i = 0; i < count; i++) {
  1160. ASSERT_NE(Get("key" + ToString(i)), "NOT_FOUND");
  1161. }
  1162. };
  1163. Reopen(options);
  1164. validateData();
  1165. // Insert some data without flush
  1166. ASSERT_OK(Put("foo", "foo_v1"));
  1167. ASSERT_OK(Put("bar", "bar_v1"));
  1168. Reopen(options);
  1169. validateData();
  1170. ASSERT_EQ(Get("foo"), "foo_v1");
  1171. ASSERT_EQ(Get("bar"), "bar_v1");
  1172. // Insert again and reopen
  1173. ASSERT_OK(Put("foo", "foo_v2"));
  1174. ASSERT_OK(Put("bar", "bar_v2"));
  1175. Reopen(options);
  1176. validateData();
  1177. ASSERT_EQ(Get("foo"), "foo_v2");
  1178. ASSERT_EQ(Get("bar"), "bar_v2");
  1179. // manual flush and insert again
  1180. Flush();
  1181. ASSERT_EQ(Get("foo"), "foo_v2");
  1182. ASSERT_EQ(Get("bar"), "bar_v2");
  1183. ASSERT_OK(Put("foo", "foo_v3"));
  1184. ASSERT_OK(Put("bar", "bar_v3"));
  1185. Reopen(options);
  1186. validateData();
  1187. ASSERT_EQ(Get("foo"), "foo_v3");
  1188. ASSERT_EQ(Get("bar"), "bar_v3");
  1189. }
  1190. TEST_F(DBWALTest, RecoverWithoutFlushMultipleCF) {
  1191. const std::string kSmallValue = "v";
  1192. const std::string kLargeValue = DummyString(1024);
  1193. Options options = CurrentOptions();
  1194. options.avoid_flush_during_recovery = true;
  1195. options.create_if_missing = false;
  1196. options.disable_auto_compactions = true;
  1197. auto countWalFiles = [this]() {
  1198. VectorLogPtr log_files;
  1199. dbfull()->GetSortedWalFiles(log_files);
  1200. return log_files.size();
  1201. };
  1202. // Create DB with multiple column families and multiple log files.
  1203. CreateAndReopenWithCF({"one", "two"}, options);
  1204. ASSERT_OK(Put(0, "key1", kSmallValue));
  1205. ASSERT_OK(Put(1, "key2", kLargeValue));
  1206. Flush(1);
  1207. ASSERT_EQ(1, countWalFiles());
  1208. ASSERT_OK(Put(0, "key3", kSmallValue));
  1209. ASSERT_OK(Put(2, "key4", kLargeValue));
  1210. Flush(2);
  1211. ASSERT_EQ(2, countWalFiles());
  1212. // Reopen, insert and flush.
  1213. options.db_write_buffer_size = 64 * 1024 * 1024;
  1214. ReopenWithColumnFamilies({"default", "one", "two"}, options);
  1215. ASSERT_EQ(Get(0, "key1"), kSmallValue);
  1216. ASSERT_EQ(Get(1, "key2"), kLargeValue);
  1217. ASSERT_EQ(Get(0, "key3"), kSmallValue);
  1218. ASSERT_EQ(Get(2, "key4"), kLargeValue);
  1219. // Insert more data.
  1220. ASSERT_OK(Put(0, "key5", kLargeValue));
  1221. ASSERT_OK(Put(1, "key6", kLargeValue));
  1222. ASSERT_EQ(3, countWalFiles());
  1223. Flush(1);
  1224. ASSERT_OK(Put(2, "key7", kLargeValue));
  1225. dbfull()->FlushWAL(false);
  1226. ASSERT_EQ(4, countWalFiles());
  1227. // Reopen twice and validate.
  1228. for (int i = 0; i < 2; i++) {
  1229. ReopenWithColumnFamilies({"default", "one", "two"}, options);
  1230. ASSERT_EQ(Get(0, "key1"), kSmallValue);
  1231. ASSERT_EQ(Get(1, "key2"), kLargeValue);
  1232. ASSERT_EQ(Get(0, "key3"), kSmallValue);
  1233. ASSERT_EQ(Get(2, "key4"), kLargeValue);
  1234. ASSERT_EQ(Get(0, "key5"), kLargeValue);
  1235. ASSERT_EQ(Get(1, "key6"), kLargeValue);
  1236. ASSERT_EQ(Get(2, "key7"), kLargeValue);
  1237. ASSERT_EQ(4, countWalFiles());
  1238. }
  1239. }
  1240. // In this test we are trying to do the following:
  1241. // 1. Create a DB with corrupted WAL log;
  1242. // 2. Open with avoid_flush_during_recovery = true;
  1243. // 3. Append more data without flushing, which creates new WAL log.
  1244. // 4. Open again. See if it can correctly handle previous corruption.
  1245. TEST_F(DBWALTest, RecoverFromCorruptedWALWithoutFlush) {
  1246. const int jstart = RecoveryTestHelper::kWALFileOffset;
  1247. const int jend = jstart + RecoveryTestHelper::kWALFilesCount;
  1248. const int kAppendKeys = 100;
  1249. Options options = CurrentOptions();
  1250. options.avoid_flush_during_recovery = true;
  1251. options.create_if_missing = false;
  1252. options.disable_auto_compactions = true;
  1253. options.write_buffer_size = 64 * 1024 * 1024;
  1254. auto getAll = [this]() {
  1255. std::vector<std::pair<std::string, std::string>> data;
  1256. ReadOptions ropt;
  1257. Iterator* iter = dbfull()->NewIterator(ropt);
  1258. for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
  1259. data.push_back(
  1260. std::make_pair(iter->key().ToString(), iter->value().ToString()));
  1261. }
  1262. delete iter;
  1263. return data;
  1264. };
  1265. for (auto& mode : wal_recovery_mode_string_map) {
  1266. options.wal_recovery_mode = mode.second;
  1267. for (auto trunc : {true, false}) {
  1268. for (int i = 0; i < 4; i++) {
  1269. for (int j = jstart; j < jend; j++) {
  1270. // Create corrupted WAL
  1271. RecoveryTestHelper::FillData(this, &options);
  1272. RecoveryTestHelper::CorruptWAL(this, options, /*off=*/i * .3,
  1273. /*len%=*/.1, /*wal=*/j, trunc);
  1274. // Skip the test if DB won't open.
  1275. if (!TryReopen(options).ok()) {
  1276. ASSERT_TRUE(options.wal_recovery_mode ==
  1277. WALRecoveryMode::kAbsoluteConsistency ||
  1278. (!trunc &&
  1279. options.wal_recovery_mode ==
  1280. WALRecoveryMode::kTolerateCorruptedTailRecords));
  1281. continue;
  1282. }
  1283. ASSERT_OK(TryReopen(options));
  1284. // Append some more data.
  1285. for (int k = 0; k < kAppendKeys; k++) {
  1286. std::string key = "extra_key" + ToString(k);
  1287. std::string value = DummyString(RecoveryTestHelper::kValueSize);
  1288. ASSERT_OK(Put(key, value));
  1289. }
  1290. // Save data for comparison.
  1291. auto data = getAll();
  1292. // Reopen. Verify data.
  1293. ASSERT_OK(TryReopen(options));
  1294. auto actual_data = getAll();
  1295. ASSERT_EQ(data, actual_data);
  1296. }
  1297. }
  1298. }
  1299. }
  1300. }
  1301. // Tests that total log size is recovered if we set
  1302. // avoid_flush_during_recovery=true.
  1303. // Flush should trigger if max_total_wal_size is reached.
  1304. TEST_F(DBWALTest, RestoreTotalLogSizeAfterRecoverWithoutFlush) {
  1305. class TestFlushListener : public EventListener {
  1306. public:
  1307. std::atomic<int> count{0};
  1308. TestFlushListener() = default;
  1309. void OnFlushBegin(DB* /*db*/, const FlushJobInfo& flush_job_info) override {
  1310. count++;
  1311. assert(FlushReason::kWriteBufferManager == flush_job_info.flush_reason);
  1312. }
  1313. };
  1314. std::shared_ptr<TestFlushListener> test_listener =
  1315. std::make_shared<TestFlushListener>();
  1316. constexpr size_t kKB = 1024;
  1317. constexpr size_t kMB = 1024 * 1024;
  1318. Options options = CurrentOptions();
  1319. options.avoid_flush_during_recovery = true;
  1320. options.max_total_wal_size = 1 * kMB;
  1321. options.listeners.push_back(test_listener);
  1322. // Have to open DB in multi-CF mode to trigger flush when
  1323. // max_total_wal_size is reached.
  1324. CreateAndReopenWithCF({"one"}, options);
  1325. // Write some keys and we will end up with one log file which is slightly
  1326. // smaller than 1MB.
  1327. std::string value_100k(100 * kKB, 'v');
  1328. std::string value_300k(300 * kKB, 'v');
  1329. ASSERT_OK(Put(0, "foo", "v1"));
  1330. for (int i = 0; i < 9; i++) {
  1331. ASSERT_OK(Put(1, "key" + ToString(i), value_100k));
  1332. }
  1333. // Get log files before reopen.
  1334. VectorLogPtr log_files_before;
  1335. ASSERT_OK(dbfull()->GetSortedWalFiles(log_files_before));
  1336. ASSERT_EQ(1, log_files_before.size());
  1337. uint64_t log_size_before = log_files_before[0]->SizeFileBytes();
  1338. ASSERT_GT(log_size_before, 900 * kKB);
  1339. ASSERT_LT(log_size_before, 1 * kMB);
  1340. ReopenWithColumnFamilies({"default", "one"}, options);
  1341. // Write one more value to make log larger than 1MB.
  1342. ASSERT_OK(Put(1, "bar", value_300k));
  1343. // Get log files again. A new log file will be opened.
  1344. VectorLogPtr log_files_after_reopen;
  1345. ASSERT_OK(dbfull()->GetSortedWalFiles(log_files_after_reopen));
  1346. ASSERT_EQ(2, log_files_after_reopen.size());
  1347. ASSERT_EQ(log_files_before[0]->LogNumber(),
  1348. log_files_after_reopen[0]->LogNumber());
  1349. ASSERT_GT(log_files_after_reopen[0]->SizeFileBytes() +
  1350. log_files_after_reopen[1]->SizeFileBytes(),
  1351. 1 * kMB);
  1352. // Write one more key to trigger flush.
  1353. ASSERT_OK(Put(0, "foo", "v2"));
  1354. dbfull()->TEST_WaitForFlushMemTable();
  1355. // Flushed two column families.
  1356. ASSERT_EQ(2, test_listener->count.load());
  1357. }
  1358. #if defined(ROCKSDB_PLATFORM_POSIX)
  1359. #if defined(ROCKSDB_FALLOCATE_PRESENT)
  1360. // Tests that we will truncate the preallocated space of the last log from
  1361. // previous.
  1362. TEST_F(DBWALTest, TruncateLastLogAfterRecoverWithoutFlush) {
  1363. constexpr size_t kKB = 1024;
  1364. Options options = CurrentOptions();
  1365. options.avoid_flush_during_recovery = true;
  1366. DestroyAndReopen(options);
  1367. size_t preallocated_size =
  1368. dbfull()->TEST_GetWalPreallocateBlockSize(options.write_buffer_size);
  1369. ASSERT_OK(Put("foo", "v1"));
  1370. VectorLogPtr log_files_before;
  1371. ASSERT_OK(dbfull()->GetSortedWalFiles(log_files_before));
  1372. ASSERT_EQ(1, log_files_before.size());
  1373. auto& file_before = log_files_before[0];
  1374. ASSERT_LT(file_before->SizeFileBytes(), 1 * kKB);
  1375. // The log file has preallocated space.
  1376. ASSERT_GE(GetAllocatedFileSize(dbname_ + file_before->PathName()),
  1377. preallocated_size);
  1378. Reopen(options);
  1379. VectorLogPtr log_files_after;
  1380. ASSERT_OK(dbfull()->GetSortedWalFiles(log_files_after));
  1381. ASSERT_EQ(1, log_files_after.size());
  1382. ASSERT_LT(log_files_after[0]->SizeFileBytes(), 1 * kKB);
  1383. // The preallocated space should be truncated.
  1384. ASSERT_LT(GetAllocatedFileSize(dbname_ + file_before->PathName()),
  1385. preallocated_size);
  1386. }
  1387. #endif // ROCKSDB_FALLOCATE_PRESENT
  1388. #endif // ROCKSDB_PLATFORM_POSIX
  1389. #endif // ROCKSDB_LITE
  1390. TEST_F(DBWALTest, WalTermTest) {
  1391. Options options = CurrentOptions();
  1392. options.env = env_;
  1393. CreateAndReopenWithCF({"pikachu"}, options);
  1394. ASSERT_OK(Put(1, "foo", "bar"));
  1395. WriteOptions wo;
  1396. wo.sync = true;
  1397. wo.disableWAL = false;
  1398. WriteBatch batch;
  1399. batch.Put("foo", "bar");
  1400. batch.MarkWalTerminationPoint();
  1401. batch.Put("foo2", "bar2");
  1402. ASSERT_OK(dbfull()->Write(wo, &batch));
  1403. // make sure we can re-open it.
  1404. ASSERT_OK(TryReopenWithColumnFamilies({"default", "pikachu"}, options));
  1405. ASSERT_EQ("bar", Get(1, "foo"));
  1406. ASSERT_EQ("NOT_FOUND", Get(1, "foo2"));
  1407. }
  1408. } // namespace ROCKSDB_NAMESPACE
  1409. int main(int argc, char** argv) {
  1410. ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
  1411. ::testing::InitGoogleTest(&argc, argv);
  1412. return RUN_ALL_TESTS();
  1413. }