db_impl_open.cc 62 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #include "db/db_impl/db_impl.h"
  10. #include <cinttypes>
  11. #include "db/builder.h"
  12. #include "db/error_handler.h"
  13. #include "env/composite_env_wrapper.h"
  14. #include "file/read_write_util.h"
  15. #include "file/sst_file_manager_impl.h"
  16. #include "file/writable_file_writer.h"
  17. #include "monitoring/persistent_stats_history.h"
  18. #include "options/options_helper.h"
  19. #include "rocksdb/wal_filter.h"
  20. #include "table/block_based/block_based_table_factory.h"
  21. #include "test_util/sync_point.h"
  22. #include "util/rate_limiter.h"
  23. namespace ROCKSDB_NAMESPACE {
  24. Options SanitizeOptions(const std::string& dbname, const Options& src) {
  25. auto db_options = SanitizeOptions(dbname, DBOptions(src));
  26. ImmutableDBOptions immutable_db_options(db_options);
  27. auto cf_options =
  28. SanitizeOptions(immutable_db_options, ColumnFamilyOptions(src));
  29. return Options(db_options, cf_options);
  30. }
  31. DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src) {
  32. DBOptions result(src);
  33. if (result.file_system == nullptr) {
  34. if (result.env == Env::Default()) {
  35. result.file_system = FileSystem::Default();
  36. } else {
  37. result.file_system.reset(new LegacyFileSystemWrapper(result.env));
  38. }
  39. } else {
  40. if (result.env == nullptr) {
  41. result.env = Env::Default();
  42. }
  43. }
  44. // result.max_open_files means an "infinite" open files.
  45. if (result.max_open_files != -1) {
  46. int max_max_open_files = port::GetMaxOpenFiles();
  47. if (max_max_open_files == -1) {
  48. max_max_open_files = 0x400000;
  49. }
  50. ClipToRange(&result.max_open_files, 20, max_max_open_files);
  51. TEST_SYNC_POINT_CALLBACK("SanitizeOptions::AfterChangeMaxOpenFiles",
  52. &result.max_open_files);
  53. }
  54. if (result.info_log == nullptr) {
  55. Status s = CreateLoggerFromOptions(dbname, result, &result.info_log);
  56. if (!s.ok()) {
  57. // No place suitable for logging
  58. result.info_log = nullptr;
  59. }
  60. }
  61. if (!result.write_buffer_manager) {
  62. result.write_buffer_manager.reset(
  63. new WriteBufferManager(result.db_write_buffer_size));
  64. }
  65. auto bg_job_limits = DBImpl::GetBGJobLimits(
  66. result.max_background_flushes, result.max_background_compactions,
  67. result.max_background_jobs, true /* parallelize_compactions */);
  68. result.env->IncBackgroundThreadsIfNeeded(bg_job_limits.max_compactions,
  69. Env::Priority::LOW);
  70. result.env->IncBackgroundThreadsIfNeeded(bg_job_limits.max_flushes,
  71. Env::Priority::HIGH);
  72. if (result.rate_limiter.get() != nullptr) {
  73. if (result.bytes_per_sync == 0) {
  74. result.bytes_per_sync = 1024 * 1024;
  75. }
  76. }
  77. if (result.delayed_write_rate == 0) {
  78. if (result.rate_limiter.get() != nullptr) {
  79. result.delayed_write_rate = result.rate_limiter->GetBytesPerSecond();
  80. }
  81. if (result.delayed_write_rate == 0) {
  82. result.delayed_write_rate = 16 * 1024 * 1024;
  83. }
  84. }
  85. if (result.WAL_ttl_seconds > 0 || result.WAL_size_limit_MB > 0) {
  86. result.recycle_log_file_num = false;
  87. }
  88. if (result.recycle_log_file_num &&
  89. (result.wal_recovery_mode == WALRecoveryMode::kPointInTimeRecovery ||
  90. result.wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency)) {
  91. // kPointInTimeRecovery is inconsistent with recycle log file feature since
  92. // we define the "end" of the log as the first corrupt record we encounter.
  93. // kAbsoluteConsistency doesn't make sense because even a clean
  94. // shutdown leaves old junk at the end of the log file.
  95. result.recycle_log_file_num = 0;
  96. }
  97. if (result.wal_dir.empty()) {
  98. // Use dbname as default
  99. result.wal_dir = dbname;
  100. }
  101. if (result.wal_dir.back() == '/') {
  102. result.wal_dir = result.wal_dir.substr(0, result.wal_dir.size() - 1);
  103. }
  104. if (result.db_paths.size() == 0) {
  105. result.db_paths.emplace_back(dbname, std::numeric_limits<uint64_t>::max());
  106. }
  107. if (result.use_direct_reads && result.compaction_readahead_size == 0) {
  108. TEST_SYNC_POINT_CALLBACK("SanitizeOptions:direct_io", nullptr);
  109. result.compaction_readahead_size = 1024 * 1024 * 2;
  110. }
  111. if (result.compaction_readahead_size > 0 || result.use_direct_reads) {
  112. result.new_table_reader_for_compaction_inputs = true;
  113. }
  114. // Force flush on DB open if 2PC is enabled, since with 2PC we have no
  115. // guarantee that consecutive log files have consecutive sequence id, which
  116. // make recovery complicated.
  117. if (result.allow_2pc) {
  118. result.avoid_flush_during_recovery = false;
  119. }
  120. #ifndef ROCKSDB_LITE
  121. ImmutableDBOptions immutable_db_options(result);
  122. if (!IsWalDirSameAsDBPath(&immutable_db_options)) {
  123. // Either the WAL dir and db_paths[0]/db_name are not the same, or we
  124. // cannot tell for sure. In either case, assume they're different and
  125. // explicitly cleanup the trash log files (bypass DeleteScheduler)
  126. // Do this first so even if we end up calling
  127. // DeleteScheduler::CleanupDirectory on the same dir later, it will be
  128. // safe
  129. std::vector<std::string> filenames;
  130. result.env->GetChildren(result.wal_dir, &filenames);
  131. for (std::string& filename : filenames) {
  132. if (filename.find(".log.trash", filename.length() -
  133. std::string(".log.trash").length()) !=
  134. std::string::npos) {
  135. std::string trash_file = result.wal_dir + "/" + filename;
  136. result.env->DeleteFile(trash_file);
  137. }
  138. }
  139. }
  140. // When the DB is stopped, it's possible that there are some .trash files that
  141. // were not deleted yet, when we open the DB we will find these .trash files
  142. // and schedule them to be deleted (or delete immediately if SstFileManager
  143. // was not used)
  144. auto sfm = static_cast<SstFileManagerImpl*>(result.sst_file_manager.get());
  145. for (size_t i = 0; i < result.db_paths.size(); i++) {
  146. DeleteScheduler::CleanupDirectory(result.env, sfm, result.db_paths[i].path);
  147. }
  148. // Create a default SstFileManager for purposes of tracking compaction size
  149. // and facilitating recovery from out of space errors.
  150. if (result.sst_file_manager.get() == nullptr) {
  151. std::shared_ptr<SstFileManager> sst_file_manager(
  152. NewSstFileManager(result.env, result.info_log));
  153. result.sst_file_manager = sst_file_manager;
  154. }
  155. #endif
  156. if (!result.paranoid_checks) {
  157. result.skip_checking_sst_file_sizes_on_db_open = true;
  158. ROCKS_LOG_INFO(result.info_log,
  159. "file size check will be skipped during open.");
  160. }
  161. return result;
  162. }
  163. namespace {
  164. Status SanitizeOptionsByTable(
  165. const DBOptions& db_opts,
  166. const std::vector<ColumnFamilyDescriptor>& column_families) {
  167. Status s;
  168. for (auto cf : column_families) {
  169. s = cf.options.table_factory->SanitizeOptions(db_opts, cf.options);
  170. if (!s.ok()) {
  171. return s;
  172. }
  173. }
  174. return Status::OK();
  175. }
  176. } // namespace
  177. Status DBImpl::ValidateOptions(
  178. const DBOptions& db_options,
  179. const std::vector<ColumnFamilyDescriptor>& column_families) {
  180. Status s;
  181. for (auto& cfd : column_families) {
  182. s = ColumnFamilyData::ValidateOptions(db_options, cfd.options);
  183. if (!s.ok()) {
  184. return s;
  185. }
  186. }
  187. s = ValidateOptions(db_options);
  188. return s;
  189. }
  190. Status DBImpl::ValidateOptions(const DBOptions& db_options) {
  191. if (db_options.db_paths.size() > 4) {
  192. return Status::NotSupported(
  193. "More than four DB paths are not supported yet. ");
  194. }
  195. if (db_options.allow_mmap_reads && db_options.use_direct_reads) {
  196. // Protect against assert in PosixMMapReadableFile constructor
  197. return Status::NotSupported(
  198. "If memory mapped reads (allow_mmap_reads) are enabled "
  199. "then direct I/O reads (use_direct_reads) must be disabled. ");
  200. }
  201. if (db_options.allow_mmap_writes &&
  202. db_options.use_direct_io_for_flush_and_compaction) {
  203. return Status::NotSupported(
  204. "If memory mapped writes (allow_mmap_writes) are enabled "
  205. "then direct I/O writes (use_direct_io_for_flush_and_compaction) must "
  206. "be disabled. ");
  207. }
  208. if (db_options.keep_log_file_num == 0) {
  209. return Status::InvalidArgument("keep_log_file_num must be greater than 0");
  210. }
  211. if (db_options.unordered_write &&
  212. !db_options.allow_concurrent_memtable_write) {
  213. return Status::InvalidArgument(
  214. "unordered_write is incompatible with !allow_concurrent_memtable_write");
  215. }
  216. if (db_options.unordered_write && db_options.enable_pipelined_write) {
  217. return Status::InvalidArgument(
  218. "unordered_write is incompatible with enable_pipelined_write");
  219. }
  220. if (db_options.atomic_flush && db_options.enable_pipelined_write) {
  221. return Status::InvalidArgument(
  222. "atomic_flush is incompatible with enable_pipelined_write");
  223. }
  224. return Status::OK();
  225. }
  226. Status DBImpl::NewDB() {
  227. VersionEdit new_db;
  228. Status s = SetIdentityFile(env_, dbname_);
  229. if (!s.ok()) {
  230. return s;
  231. }
  232. if (immutable_db_options_.write_dbid_to_manifest) {
  233. std::string temp_db_id;
  234. GetDbIdentityFromIdentityFile(&temp_db_id);
  235. new_db.SetDBId(temp_db_id);
  236. }
  237. new_db.SetLogNumber(0);
  238. new_db.SetNextFile(2);
  239. new_db.SetLastSequence(0);
  240. ROCKS_LOG_INFO(immutable_db_options_.info_log, "Creating manifest 1 \n");
  241. const std::string manifest = DescriptorFileName(dbname_, 1);
  242. {
  243. std::unique_ptr<FSWritableFile> file;
  244. FileOptions file_options = fs_->OptimizeForManifestWrite(file_options_);
  245. s = NewWritableFile(fs_.get(), manifest, &file, file_options);
  246. if (!s.ok()) {
  247. return s;
  248. }
  249. file->SetPreallocationBlockSize(
  250. immutable_db_options_.manifest_preallocation_size);
  251. std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
  252. std::move(file), manifest, file_options, env_, nullptr /* stats */,
  253. immutable_db_options_.listeners));
  254. log::Writer log(std::move(file_writer), 0, false);
  255. std::string record;
  256. new_db.EncodeTo(&record);
  257. s = log.AddRecord(record);
  258. if (s.ok()) {
  259. s = SyncManifest(env_, &immutable_db_options_, log.file());
  260. }
  261. }
  262. if (s.ok()) {
  263. // Make "CURRENT" file that points to the new manifest file.
  264. s = SetCurrentFile(env_, dbname_, 1, directories_.GetDbDir());
  265. } else {
  266. fs_->DeleteFile(manifest, IOOptions(), nullptr);
  267. }
  268. return s;
  269. }
  270. Status DBImpl::CreateAndNewDirectory(Env* env, const std::string& dirname,
  271. std::unique_ptr<Directory>* directory) {
  272. // We call CreateDirIfMissing() as the directory may already exist (if we
  273. // are reopening a DB), when this happens we don't want creating the
  274. // directory to cause an error. However, we need to check if creating the
  275. // directory fails or else we may get an obscure message about the lock
  276. // file not existing. One real-world example of this occurring is if
  277. // env->CreateDirIfMissing() doesn't create intermediate directories, e.g.
  278. // when dbname_ is "dir/db" but when "dir" doesn't exist.
  279. Status s = env->CreateDirIfMissing(dirname);
  280. if (!s.ok()) {
  281. return s;
  282. }
  283. return env->NewDirectory(dirname, directory);
  284. }
  285. Status Directories::SetDirectories(Env* env, const std::string& dbname,
  286. const std::string& wal_dir,
  287. const std::vector<DbPath>& data_paths) {
  288. Status s = DBImpl::CreateAndNewDirectory(env, dbname, &db_dir_);
  289. if (!s.ok()) {
  290. return s;
  291. }
  292. if (!wal_dir.empty() && dbname != wal_dir) {
  293. s = DBImpl::CreateAndNewDirectory(env, wal_dir, &wal_dir_);
  294. if (!s.ok()) {
  295. return s;
  296. }
  297. }
  298. data_dirs_.clear();
  299. for (auto& p : data_paths) {
  300. const std::string db_path = p.path;
  301. if (db_path == dbname) {
  302. data_dirs_.emplace_back(nullptr);
  303. } else {
  304. std::unique_ptr<Directory> path_directory;
  305. s = DBImpl::CreateAndNewDirectory(env, db_path, &path_directory);
  306. if (!s.ok()) {
  307. return s;
  308. }
  309. data_dirs_.emplace_back(path_directory.release());
  310. }
  311. }
  312. assert(data_dirs_.size() == data_paths.size());
  313. return Status::OK();
  314. }
  315. Status DBImpl::Recover(
  316. const std::vector<ColumnFamilyDescriptor>& column_families, bool read_only,
  317. bool error_if_log_file_exist, bool error_if_data_exists_in_logs,
  318. uint64_t* recovered_seq) {
  319. mutex_.AssertHeld();
  320. bool is_new_db = false;
  321. assert(db_lock_ == nullptr);
  322. if (!read_only) {
  323. Status s = directories_.SetDirectories(env_, dbname_,
  324. immutable_db_options_.wal_dir,
  325. immutable_db_options_.db_paths);
  326. if (!s.ok()) {
  327. return s;
  328. }
  329. s = env_->LockFile(LockFileName(dbname_), &db_lock_);
  330. if (!s.ok()) {
  331. return s;
  332. }
  333. std::string current_fname = CurrentFileName(dbname_);
  334. s = env_->FileExists(current_fname);
  335. if (s.IsNotFound()) {
  336. if (immutable_db_options_.create_if_missing) {
  337. s = NewDB();
  338. is_new_db = true;
  339. if (!s.ok()) {
  340. return s;
  341. }
  342. } else {
  343. return Status::InvalidArgument(
  344. current_fname, "does not exist (create_if_missing is false)");
  345. }
  346. } else if (s.ok()) {
  347. if (immutable_db_options_.error_if_exists) {
  348. return Status::InvalidArgument(dbname_,
  349. "exists (error_if_exists is true)");
  350. }
  351. } else {
  352. // Unexpected error reading file
  353. assert(s.IsIOError());
  354. return s;
  355. }
  356. // Verify compatibility of file_options_ and filesystem
  357. {
  358. std::unique_ptr<FSRandomAccessFile> idfile;
  359. FileOptions customized_fs(file_options_);
  360. customized_fs.use_direct_reads |=
  361. immutable_db_options_.use_direct_io_for_flush_and_compaction;
  362. s = fs_->NewRandomAccessFile(current_fname, customized_fs, &idfile,
  363. nullptr);
  364. if (!s.ok()) {
  365. std::string error_str = s.ToString();
  366. // Check if unsupported Direct I/O is the root cause
  367. customized_fs.use_direct_reads = false;
  368. s = fs_->NewRandomAccessFile(current_fname, customized_fs, &idfile,
  369. nullptr);
  370. if (s.ok()) {
  371. return Status::InvalidArgument(
  372. "Direct I/O is not supported by the specified DB.");
  373. } else {
  374. return Status::InvalidArgument(
  375. "Found options incompatible with filesystem", error_str.c_str());
  376. }
  377. }
  378. }
  379. }
  380. assert(db_id_.empty());
  381. Status s = versions_->Recover(column_families, read_only, &db_id_);
  382. if (!s.ok()) {
  383. return s;
  384. }
  385. // Happens when immutable_db_options_.write_dbid_to_manifest is set to true
  386. // the very first time.
  387. if (db_id_.empty()) {
  388. // Check for the IDENTITY file and create it if not there.
  389. s = fs_->FileExists(IdentityFileName(dbname_), IOOptions(), nullptr);
  390. // Typically Identity file is created in NewDB() and for some reason if
  391. // it is no longer available then at this point DB ID is not in Identity
  392. // file or Manifest.
  393. if (s.IsNotFound()) {
  394. s = SetIdentityFile(env_, dbname_);
  395. if (!s.ok()) {
  396. return s;
  397. }
  398. } else if (!s.ok()) {
  399. assert(s.IsIOError());
  400. return s;
  401. }
  402. s = GetDbIdentityFromIdentityFile(&db_id_);
  403. if (immutable_db_options_.write_dbid_to_manifest && s.ok()) {
  404. VersionEdit edit;
  405. edit.SetDBId(db_id_);
  406. Options options;
  407. MutableCFOptions mutable_cf_options(options);
  408. versions_->db_id_ = db_id_;
  409. s = versions_->LogAndApply(versions_->GetColumnFamilySet()->GetDefault(),
  410. mutable_cf_options, &edit, &mutex_, nullptr,
  411. false);
  412. }
  413. } else {
  414. s = SetIdentityFile(env_, dbname_, db_id_);
  415. }
  416. if (immutable_db_options_.paranoid_checks && s.ok()) {
  417. s = CheckConsistency();
  418. }
  419. if (s.ok() && !read_only) {
  420. std::map<std::string, std::shared_ptr<Directory>> created_dirs;
  421. for (auto cfd : *versions_->GetColumnFamilySet()) {
  422. s = cfd->AddDirectories(&created_dirs);
  423. if (!s.ok()) {
  424. return s;
  425. }
  426. }
  427. }
  428. // DB mutex is already held
  429. if (s.ok() && immutable_db_options_.persist_stats_to_disk) {
  430. s = InitPersistStatsColumnFamily();
  431. }
  432. if (s.ok()) {
  433. // Initial max_total_in_memory_state_ before recovery logs. Log recovery
  434. // may check this value to decide whether to flush.
  435. max_total_in_memory_state_ = 0;
  436. for (auto cfd : *versions_->GetColumnFamilySet()) {
  437. auto* mutable_cf_options = cfd->GetLatestMutableCFOptions();
  438. max_total_in_memory_state_ += mutable_cf_options->write_buffer_size *
  439. mutable_cf_options->max_write_buffer_number;
  440. }
  441. SequenceNumber next_sequence(kMaxSequenceNumber);
  442. default_cf_handle_ = new ColumnFamilyHandleImpl(
  443. versions_->GetColumnFamilySet()->GetDefault(), this, &mutex_);
  444. default_cf_internal_stats_ = default_cf_handle_->cfd()->internal_stats();
  445. // TODO(Zhongyi): handle single_column_family_mode_ when
  446. // persistent_stats is enabled
  447. single_column_family_mode_ =
  448. versions_->GetColumnFamilySet()->NumberOfColumnFamilies() == 1;
  449. // Recover from all newer log files than the ones named in the
  450. // descriptor (new log files may have been added by the previous
  451. // incarnation without registering them in the descriptor).
  452. //
  453. // Note that prev_log_number() is no longer used, but we pay
  454. // attention to it in case we are recovering a database
  455. // produced by an older version of rocksdb.
  456. std::vector<std::string> filenames;
  457. s = env_->GetChildren(immutable_db_options_.wal_dir, &filenames);
  458. if (s.IsNotFound()) {
  459. return Status::InvalidArgument("wal_dir not found",
  460. immutable_db_options_.wal_dir);
  461. } else if (!s.ok()) {
  462. return s;
  463. }
  464. std::vector<uint64_t> logs;
  465. for (size_t i = 0; i < filenames.size(); i++) {
  466. uint64_t number;
  467. FileType type;
  468. if (ParseFileName(filenames[i], &number, &type) && type == kLogFile) {
  469. if (is_new_db) {
  470. return Status::Corruption(
  471. "While creating a new Db, wal_dir contains "
  472. "existing log file: ",
  473. filenames[i]);
  474. } else {
  475. logs.push_back(number);
  476. }
  477. }
  478. }
  479. if (logs.size() > 0) {
  480. if (error_if_log_file_exist) {
  481. return Status::Corruption(
  482. "The db was opened in readonly mode with error_if_log_file_exist"
  483. "flag but a log file already exists");
  484. } else if (error_if_data_exists_in_logs) {
  485. for (auto& log : logs) {
  486. std::string fname = LogFileName(immutable_db_options_.wal_dir, log);
  487. uint64_t bytes;
  488. s = env_->GetFileSize(fname, &bytes);
  489. if (s.ok()) {
  490. if (bytes > 0) {
  491. return Status::Corruption(
  492. "error_if_data_exists_in_logs is set but there are data "
  493. " in log files.");
  494. }
  495. }
  496. }
  497. }
  498. }
  499. if (!logs.empty()) {
  500. // Recover in the order in which the logs were generated
  501. std::sort(logs.begin(), logs.end());
  502. bool corrupted_log_found = false;
  503. s = RecoverLogFiles(logs, &next_sequence, read_only,
  504. &corrupted_log_found);
  505. if (corrupted_log_found && recovered_seq != nullptr) {
  506. *recovered_seq = next_sequence;
  507. }
  508. if (!s.ok()) {
  509. // Clear memtables if recovery failed
  510. for (auto cfd : *versions_->GetColumnFamilySet()) {
  511. cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions(),
  512. kMaxSequenceNumber);
  513. }
  514. }
  515. }
  516. }
  517. if (read_only) {
  518. // If we are opening as read-only, we need to update options_file_number_
  519. // to reflect the most recent OPTIONS file. It does not matter for regular
  520. // read-write db instance because options_file_number_ will later be
  521. // updated to versions_->NewFileNumber() in RenameTempFileToOptionsFile.
  522. std::vector<std::string> file_names;
  523. if (s.ok()) {
  524. s = env_->GetChildren(GetName(), &file_names);
  525. }
  526. if (s.ok()) {
  527. uint64_t number = 0;
  528. uint64_t options_file_number = 0;
  529. FileType type;
  530. for (const auto& fname : file_names) {
  531. if (ParseFileName(fname, &number, &type) && type == kOptionsFile) {
  532. options_file_number = std::max(number, options_file_number);
  533. }
  534. }
  535. versions_->options_file_number_ = options_file_number;
  536. }
  537. }
  538. return s;
  539. }
  540. Status DBImpl::PersistentStatsProcessFormatVersion() {
  541. mutex_.AssertHeld();
  542. Status s;
  543. // persist version when stats CF doesn't exist
  544. bool should_persist_format_version = !persistent_stats_cfd_exists_;
  545. mutex_.Unlock();
  546. if (persistent_stats_cfd_exists_) {
  547. // Check persistent stats format version compatibility. Drop and recreate
  548. // persistent stats CF if format version is incompatible
  549. uint64_t format_version_recovered = 0;
  550. Status s_format = DecodePersistentStatsVersionNumber(
  551. this, StatsVersionKeyType::kFormatVersion, &format_version_recovered);
  552. uint64_t compatible_version_recovered = 0;
  553. Status s_compatible = DecodePersistentStatsVersionNumber(
  554. this, StatsVersionKeyType::kCompatibleVersion,
  555. &compatible_version_recovered);
  556. // abort reading from existing stats CF if any of following is true:
  557. // 1. failed to read format version or compatible version from disk
  558. // 2. sst's format version is greater than current format version, meaning
  559. // this sst is encoded with a newer RocksDB release, and current compatible
  560. // version is below the sst's compatible version
  561. if (!s_format.ok() || !s_compatible.ok() ||
  562. (kStatsCFCurrentFormatVersion < format_version_recovered &&
  563. kStatsCFCompatibleFormatVersion < compatible_version_recovered)) {
  564. if (!s_format.ok() || !s_compatible.ok()) {
  565. ROCKS_LOG_INFO(
  566. immutable_db_options_.info_log,
  567. "Reading persistent stats version key failed. Format key: %s, "
  568. "compatible key: %s",
  569. s_format.ToString().c_str(), s_compatible.ToString().c_str());
  570. } else {
  571. ROCKS_LOG_INFO(
  572. immutable_db_options_.info_log,
  573. "Disable persistent stats due to corrupted or incompatible format "
  574. "version\n");
  575. }
  576. DropColumnFamily(persist_stats_cf_handle_);
  577. DestroyColumnFamilyHandle(persist_stats_cf_handle_);
  578. ColumnFamilyHandle* handle = nullptr;
  579. ColumnFamilyOptions cfo;
  580. OptimizeForPersistentStats(&cfo);
  581. s = CreateColumnFamily(cfo, kPersistentStatsColumnFamilyName, &handle);
  582. persist_stats_cf_handle_ = static_cast<ColumnFamilyHandleImpl*>(handle);
  583. // should also persist version here because old stats CF is discarded
  584. should_persist_format_version = true;
  585. }
  586. }
  587. if (s.ok() && should_persist_format_version) {
  588. // Persistent stats CF being created for the first time, need to write
  589. // format version key
  590. WriteBatch batch;
  591. batch.Put(persist_stats_cf_handle_, kFormatVersionKeyString,
  592. ToString(kStatsCFCurrentFormatVersion));
  593. batch.Put(persist_stats_cf_handle_, kCompatibleVersionKeyString,
  594. ToString(kStatsCFCompatibleFormatVersion));
  595. WriteOptions wo;
  596. wo.low_pri = true;
  597. wo.no_slowdown = true;
  598. wo.sync = false;
  599. s = Write(wo, &batch);
  600. }
  601. mutex_.Lock();
  602. return s;
  603. }
  604. Status DBImpl::InitPersistStatsColumnFamily() {
  605. mutex_.AssertHeld();
  606. assert(!persist_stats_cf_handle_);
  607. ColumnFamilyData* persistent_stats_cfd =
  608. versions_->GetColumnFamilySet()->GetColumnFamily(
  609. kPersistentStatsColumnFamilyName);
  610. persistent_stats_cfd_exists_ = persistent_stats_cfd != nullptr;
  611. Status s;
  612. if (persistent_stats_cfd != nullptr) {
  613. // We are recovering from a DB which already contains persistent stats CF,
  614. // the CF is already created in VersionSet::ApplyOneVersionEdit, but
  615. // column family handle was not. Need to explicitly create handle here.
  616. persist_stats_cf_handle_ =
  617. new ColumnFamilyHandleImpl(persistent_stats_cfd, this, &mutex_);
  618. } else {
  619. mutex_.Unlock();
  620. ColumnFamilyHandle* handle = nullptr;
  621. ColumnFamilyOptions cfo;
  622. OptimizeForPersistentStats(&cfo);
  623. s = CreateColumnFamily(cfo, kPersistentStatsColumnFamilyName, &handle);
  624. persist_stats_cf_handle_ = static_cast<ColumnFamilyHandleImpl*>(handle);
  625. mutex_.Lock();
  626. }
  627. return s;
  628. }
  629. // REQUIRES: log_numbers are sorted in ascending order
  630. Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
  631. SequenceNumber* next_sequence, bool read_only,
  632. bool* corrupted_log_found) {
  633. struct LogReporter : public log::Reader::Reporter {
  634. Env* env;
  635. Logger* info_log;
  636. const char* fname;
  637. Status* status; // nullptr if immutable_db_options_.paranoid_checks==false
  638. void Corruption(size_t bytes, const Status& s) override {
  639. ROCKS_LOG_WARN(info_log, "%s%s: dropping %d bytes; %s",
  640. (this->status == nullptr ? "(ignoring error) " : ""),
  641. fname, static_cast<int>(bytes), s.ToString().c_str());
  642. if (this->status != nullptr && this->status->ok()) {
  643. *this->status = s;
  644. }
  645. }
  646. };
  647. mutex_.AssertHeld();
  648. Status status;
  649. std::unordered_map<int, VersionEdit> version_edits;
  650. // no need to refcount because iteration is under mutex
  651. for (auto cfd : *versions_->GetColumnFamilySet()) {
  652. VersionEdit edit;
  653. edit.SetColumnFamily(cfd->GetID());
  654. version_edits.insert({cfd->GetID(), edit});
  655. }
  656. int job_id = next_job_id_.fetch_add(1);
  657. {
  658. auto stream = event_logger_.Log();
  659. stream << "job" << job_id << "event"
  660. << "recovery_started";
  661. stream << "log_files";
  662. stream.StartArray();
  663. for (auto log_number : log_numbers) {
  664. stream << log_number;
  665. }
  666. stream.EndArray();
  667. }
  668. #ifndef ROCKSDB_LITE
  669. if (immutable_db_options_.wal_filter != nullptr) {
  670. std::map<std::string, uint32_t> cf_name_id_map;
  671. std::map<uint32_t, uint64_t> cf_lognumber_map;
  672. for (auto cfd : *versions_->GetColumnFamilySet()) {
  673. cf_name_id_map.insert(std::make_pair(cfd->GetName(), cfd->GetID()));
  674. cf_lognumber_map.insert(
  675. std::make_pair(cfd->GetID(), cfd->GetLogNumber()));
  676. }
  677. immutable_db_options_.wal_filter->ColumnFamilyLogNumberMap(cf_lognumber_map,
  678. cf_name_id_map);
  679. }
  680. #endif
  681. bool stop_replay_by_wal_filter = false;
  682. bool stop_replay_for_corruption = false;
  683. bool flushed = false;
  684. uint64_t corrupted_log_number = kMaxSequenceNumber;
  685. uint64_t min_log_number = MinLogNumberToKeep();
  686. for (auto log_number : log_numbers) {
  687. if (log_number < min_log_number) {
  688. ROCKS_LOG_INFO(immutable_db_options_.info_log,
  689. "Skipping log #%" PRIu64
  690. " since it is older than min log to keep #%" PRIu64,
  691. log_number, min_log_number);
  692. continue;
  693. }
  694. // The previous incarnation may not have written any MANIFEST
  695. // records after allocating this log number. So we manually
  696. // update the file number allocation counter in VersionSet.
  697. versions_->MarkFileNumberUsed(log_number);
  698. // Open the log file
  699. std::string fname = LogFileName(immutable_db_options_.wal_dir, log_number);
  700. ROCKS_LOG_INFO(immutable_db_options_.info_log,
  701. "Recovering log #%" PRIu64 " mode %d", log_number,
  702. static_cast<int>(immutable_db_options_.wal_recovery_mode));
  703. auto logFileDropped = [this, &fname]() {
  704. uint64_t bytes;
  705. if (env_->GetFileSize(fname, &bytes).ok()) {
  706. auto info_log = immutable_db_options_.info_log.get();
  707. ROCKS_LOG_WARN(info_log, "%s: dropping %d bytes", fname.c_str(),
  708. static_cast<int>(bytes));
  709. }
  710. };
  711. if (stop_replay_by_wal_filter) {
  712. logFileDropped();
  713. continue;
  714. }
  715. std::unique_ptr<SequentialFileReader> file_reader;
  716. {
  717. std::unique_ptr<FSSequentialFile> file;
  718. status = fs_->NewSequentialFile(fname,
  719. fs_->OptimizeForLogRead(file_options_),
  720. &file, nullptr);
  721. if (!status.ok()) {
  722. MaybeIgnoreError(&status);
  723. if (!status.ok()) {
  724. return status;
  725. } else {
  726. // Fail with one log file, but that's ok.
  727. // Try next one.
  728. continue;
  729. }
  730. }
  731. file_reader.reset(new SequentialFileReader(
  732. std::move(file), fname, immutable_db_options_.log_readahead_size));
  733. }
  734. // Create the log reader.
  735. LogReporter reporter;
  736. reporter.env = env_;
  737. reporter.info_log = immutable_db_options_.info_log.get();
  738. reporter.fname = fname.c_str();
  739. if (!immutable_db_options_.paranoid_checks ||
  740. immutable_db_options_.wal_recovery_mode ==
  741. WALRecoveryMode::kSkipAnyCorruptedRecords) {
  742. reporter.status = nullptr;
  743. } else {
  744. reporter.status = &status;
  745. }
  746. // We intentially make log::Reader do checksumming even if
  747. // paranoid_checks==false so that corruptions cause entire commits
  748. // to be skipped instead of propagating bad information (like overly
  749. // large sequence numbers).
  750. log::Reader reader(immutable_db_options_.info_log, std::move(file_reader),
  751. &reporter, true /*checksum*/, log_number);
  752. // Determine if we should tolerate incomplete records at the tail end of the
  753. // Read all the records and add to a memtable
  754. std::string scratch;
  755. Slice record;
  756. WriteBatch batch;
  757. while (!stop_replay_by_wal_filter &&
  758. reader.ReadRecord(&record, &scratch,
  759. immutable_db_options_.wal_recovery_mode) &&
  760. status.ok()) {
  761. if (record.size() < WriteBatchInternal::kHeader) {
  762. reporter.Corruption(record.size(),
  763. Status::Corruption("log record too small"));
  764. continue;
  765. }
  766. WriteBatchInternal::SetContents(&batch, record);
  767. SequenceNumber sequence = WriteBatchInternal::Sequence(&batch);
  768. if (immutable_db_options_.wal_recovery_mode ==
  769. WALRecoveryMode::kPointInTimeRecovery) {
  770. // In point-in-time recovery mode, if sequence id of log files are
  771. // consecutive, we continue recovery despite corruption. This could
  772. // happen when we open and write to a corrupted DB, where sequence id
  773. // will start from the last sequence id we recovered.
  774. if (sequence == *next_sequence) {
  775. stop_replay_for_corruption = false;
  776. }
  777. if (stop_replay_for_corruption) {
  778. logFileDropped();
  779. break;
  780. }
  781. }
  782. #ifndef ROCKSDB_LITE
  783. if (immutable_db_options_.wal_filter != nullptr) {
  784. WriteBatch new_batch;
  785. bool batch_changed = false;
  786. WalFilter::WalProcessingOption wal_processing_option =
  787. immutable_db_options_.wal_filter->LogRecordFound(
  788. log_number, fname, batch, &new_batch, &batch_changed);
  789. switch (wal_processing_option) {
  790. case WalFilter::WalProcessingOption::kContinueProcessing:
  791. // do nothing, proceeed normally
  792. break;
  793. case WalFilter::WalProcessingOption::kIgnoreCurrentRecord:
  794. // skip current record
  795. continue;
  796. case WalFilter::WalProcessingOption::kStopReplay:
  797. // skip current record and stop replay
  798. stop_replay_by_wal_filter = true;
  799. continue;
  800. case WalFilter::WalProcessingOption::kCorruptedRecord: {
  801. status =
  802. Status::Corruption("Corruption reported by Wal Filter ",
  803. immutable_db_options_.wal_filter->Name());
  804. MaybeIgnoreError(&status);
  805. if (!status.ok()) {
  806. reporter.Corruption(record.size(), status);
  807. continue;
  808. }
  809. break;
  810. }
  811. default: {
  812. assert(false); // unhandled case
  813. status = Status::NotSupported(
  814. "Unknown WalProcessingOption returned"
  815. " by Wal Filter ",
  816. immutable_db_options_.wal_filter->Name());
  817. MaybeIgnoreError(&status);
  818. if (!status.ok()) {
  819. return status;
  820. } else {
  821. // Ignore the error with current record processing.
  822. continue;
  823. }
  824. }
  825. }
  826. if (batch_changed) {
  827. // Make sure that the count in the new batch is
  828. // within the orignal count.
  829. int new_count = WriteBatchInternal::Count(&new_batch);
  830. int original_count = WriteBatchInternal::Count(&batch);
  831. if (new_count > original_count) {
  832. ROCKS_LOG_FATAL(
  833. immutable_db_options_.info_log,
  834. "Recovering log #%" PRIu64
  835. " mode %d log filter %s returned "
  836. "more records (%d) than original (%d) which is not allowed. "
  837. "Aborting recovery.",
  838. log_number,
  839. static_cast<int>(immutable_db_options_.wal_recovery_mode),
  840. immutable_db_options_.wal_filter->Name(), new_count,
  841. original_count);
  842. status = Status::NotSupported(
  843. "More than original # of records "
  844. "returned by Wal Filter ",
  845. immutable_db_options_.wal_filter->Name());
  846. return status;
  847. }
  848. // Set the same sequence number in the new_batch
  849. // as the original batch.
  850. WriteBatchInternal::SetSequence(&new_batch,
  851. WriteBatchInternal::Sequence(&batch));
  852. batch = new_batch;
  853. }
  854. }
  855. #endif // ROCKSDB_LITE
  856. // If column family was not found, it might mean that the WAL write
  857. // batch references to the column family that was dropped after the
  858. // insert. We don't want to fail the whole write batch in that case --
  859. // we just ignore the update.
  860. // That's why we set ignore missing column families to true
  861. bool has_valid_writes = false;
  862. status = WriteBatchInternal::InsertInto(
  863. &batch, column_family_memtables_.get(), &flush_scheduler_,
  864. &trim_history_scheduler_, true, log_number, this,
  865. false /* concurrent_memtable_writes */, next_sequence,
  866. &has_valid_writes, seq_per_batch_, batch_per_txn_);
  867. MaybeIgnoreError(&status);
  868. if (!status.ok()) {
  869. // We are treating this as a failure while reading since we read valid
  870. // blocks that do not form coherent data
  871. reporter.Corruption(record.size(), status);
  872. continue;
  873. }
  874. if (has_valid_writes && !read_only) {
  875. // we can do this because this is called before client has access to the
  876. // DB and there is only a single thread operating on DB
  877. ColumnFamilyData* cfd;
  878. while ((cfd = flush_scheduler_.TakeNextColumnFamily()) != nullptr) {
  879. cfd->UnrefAndTryDelete();
  880. // If this asserts, it means that InsertInto failed in
  881. // filtering updates to already-flushed column families
  882. assert(cfd->GetLogNumber() <= log_number);
  883. auto iter = version_edits.find(cfd->GetID());
  884. assert(iter != version_edits.end());
  885. VersionEdit* edit = &iter->second;
  886. status = WriteLevel0TableForRecovery(job_id, cfd, cfd->mem(), edit);
  887. if (!status.ok()) {
  888. // Reflect errors immediately so that conditions like full
  889. // file-systems cause the DB::Open() to fail.
  890. return status;
  891. }
  892. flushed = true;
  893. cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions(),
  894. *next_sequence);
  895. }
  896. }
  897. }
  898. if (!status.ok()) {
  899. if (status.IsNotSupported()) {
  900. // We should not treat NotSupported as corruption. It is rather a clear
  901. // sign that we are processing a WAL that is produced by an incompatible
  902. // version of the code.
  903. return status;
  904. }
  905. if (immutable_db_options_.wal_recovery_mode ==
  906. WALRecoveryMode::kSkipAnyCorruptedRecords) {
  907. // We should ignore all errors unconditionally
  908. status = Status::OK();
  909. } else if (immutable_db_options_.wal_recovery_mode ==
  910. WALRecoveryMode::kPointInTimeRecovery) {
  911. // We should ignore the error but not continue replaying
  912. status = Status::OK();
  913. stop_replay_for_corruption = true;
  914. corrupted_log_number = log_number;
  915. if (corrupted_log_found != nullptr) {
  916. *corrupted_log_found = true;
  917. }
  918. ROCKS_LOG_INFO(immutable_db_options_.info_log,
  919. "Point in time recovered to log #%" PRIu64
  920. " seq #%" PRIu64,
  921. log_number, *next_sequence);
  922. } else {
  923. assert(immutable_db_options_.wal_recovery_mode ==
  924. WALRecoveryMode::kTolerateCorruptedTailRecords ||
  925. immutable_db_options_.wal_recovery_mode ==
  926. WALRecoveryMode::kAbsoluteConsistency);
  927. return status;
  928. }
  929. }
  930. flush_scheduler_.Clear();
  931. trim_history_scheduler_.Clear();
  932. auto last_sequence = *next_sequence - 1;
  933. if ((*next_sequence != kMaxSequenceNumber) &&
  934. (versions_->LastSequence() <= last_sequence)) {
  935. versions_->SetLastAllocatedSequence(last_sequence);
  936. versions_->SetLastPublishedSequence(last_sequence);
  937. versions_->SetLastSequence(last_sequence);
  938. }
  939. }
  940. // Compare the corrupted log number to all columnfamily's current log number.
  941. // Abort Open() if any column family's log number is greater than
  942. // the corrupted log number, which means CF contains data beyond the point of
  943. // corruption. This could during PIT recovery when the WAL is corrupted and
  944. // some (but not all) CFs are flushed
  945. // Exclude the PIT case where no log is dropped after the corruption point.
  946. // This is to cover the case for empty logs after corrupted log, in which we
  947. // don't reset stop_replay_for_corruption.
  948. if (stop_replay_for_corruption == true &&
  949. (immutable_db_options_.wal_recovery_mode ==
  950. WALRecoveryMode::kPointInTimeRecovery ||
  951. immutable_db_options_.wal_recovery_mode ==
  952. WALRecoveryMode::kTolerateCorruptedTailRecords)) {
  953. for (auto cfd : *versions_->GetColumnFamilySet()) {
  954. if (cfd->GetLogNumber() > corrupted_log_number) {
  955. ROCKS_LOG_ERROR(immutable_db_options_.info_log,
  956. "Column family inconsistency: SST file contains data"
  957. " beyond the point of corruption.");
  958. return Status::Corruption("SST file is ahead of WALs");
  959. }
  960. }
  961. }
  962. // True if there's any data in the WALs; if not, we can skip re-processing
  963. // them later
  964. bool data_seen = false;
  965. if (!read_only) {
  966. // no need to refcount since client still doesn't have access
  967. // to the DB and can not drop column families while we iterate
  968. auto max_log_number = log_numbers.back();
  969. for (auto cfd : *versions_->GetColumnFamilySet()) {
  970. auto iter = version_edits.find(cfd->GetID());
  971. assert(iter != version_edits.end());
  972. VersionEdit* edit = &iter->second;
  973. if (cfd->GetLogNumber() > max_log_number) {
  974. // Column family cfd has already flushed the data
  975. // from all logs. Memtable has to be empty because
  976. // we filter the updates based on log_number
  977. // (in WriteBatch::InsertInto)
  978. assert(cfd->mem()->GetFirstSequenceNumber() == 0);
  979. assert(edit->NumEntries() == 0);
  980. continue;
  981. }
  982. TEST_SYNC_POINT_CALLBACK(
  983. "DBImpl::RecoverLogFiles:BeforeFlushFinalMemtable", /*arg=*/nullptr);
  984. // flush the final memtable (if non-empty)
  985. if (cfd->mem()->GetFirstSequenceNumber() != 0) {
  986. // If flush happened in the middle of recovery (e.g. due to memtable
  987. // being full), we flush at the end. Otherwise we'll need to record
  988. // where we were on last flush, which make the logic complicated.
  989. if (flushed || !immutable_db_options_.avoid_flush_during_recovery) {
  990. status = WriteLevel0TableForRecovery(job_id, cfd, cfd->mem(), edit);
  991. if (!status.ok()) {
  992. // Recovery failed
  993. break;
  994. }
  995. flushed = true;
  996. cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions(),
  997. versions_->LastSequence());
  998. }
  999. data_seen = true;
  1000. }
  1001. // Update the log number info in the version edit corresponding to this
  1002. // column family. Note that the version edits will be written to MANIFEST
  1003. // together later.
  1004. // writing log_number in the manifest means that any log file
  1005. // with number strongly less than (log_number + 1) is already
  1006. // recovered and should be ignored on next reincarnation.
  1007. // Since we already recovered max_log_number, we want all logs
  1008. // with numbers `<= max_log_number` (includes this one) to be ignored
  1009. if (flushed || cfd->mem()->GetFirstSequenceNumber() == 0) {
  1010. edit->SetLogNumber(max_log_number + 1);
  1011. }
  1012. }
  1013. if (status.ok()) {
  1014. // we must mark the next log number as used, even though it's
  1015. // not actually used. that is because VersionSet assumes
  1016. // VersionSet::next_file_number_ always to be strictly greater than any
  1017. // log number
  1018. versions_->MarkFileNumberUsed(max_log_number + 1);
  1019. autovector<ColumnFamilyData*> cfds;
  1020. autovector<const MutableCFOptions*> cf_opts;
  1021. autovector<autovector<VersionEdit*>> edit_lists;
  1022. for (auto* cfd : *versions_->GetColumnFamilySet()) {
  1023. cfds.push_back(cfd);
  1024. cf_opts.push_back(cfd->GetLatestMutableCFOptions());
  1025. auto iter = version_edits.find(cfd->GetID());
  1026. assert(iter != version_edits.end());
  1027. edit_lists.push_back({&iter->second});
  1028. }
  1029. // write MANIFEST with update
  1030. status = versions_->LogAndApply(cfds, cf_opts, edit_lists, &mutex_,
  1031. directories_.GetDbDir(),
  1032. /*new_descriptor_log=*/true);
  1033. }
  1034. }
  1035. if (status.ok() && data_seen && !flushed) {
  1036. status = RestoreAliveLogFiles(log_numbers);
  1037. }
  1038. event_logger_.Log() << "job" << job_id << "event"
  1039. << "recovery_finished";
  1040. return status;
  1041. }
  1042. Status DBImpl::RestoreAliveLogFiles(const std::vector<uint64_t>& log_numbers) {
  1043. if (log_numbers.empty()) {
  1044. return Status::OK();
  1045. }
  1046. Status s;
  1047. mutex_.AssertHeld();
  1048. assert(immutable_db_options_.avoid_flush_during_recovery);
  1049. if (two_write_queues_) {
  1050. log_write_mutex_.Lock();
  1051. }
  1052. // Mark these as alive so they'll be considered for deletion later by
  1053. // FindObsoleteFiles()
  1054. total_log_size_ = 0;
  1055. log_empty_ = false;
  1056. for (auto log_number : log_numbers) {
  1057. LogFileNumberSize log(log_number);
  1058. std::string fname = LogFileName(immutable_db_options_.wal_dir, log_number);
  1059. // This gets the appear size of the logs, not including preallocated space.
  1060. s = env_->GetFileSize(fname, &log.size);
  1061. if (!s.ok()) {
  1062. break;
  1063. }
  1064. total_log_size_ += log.size;
  1065. alive_log_files_.push_back(log);
  1066. // We preallocate space for logs, but then after a crash and restart, those
  1067. // preallocated space are not needed anymore. It is likely only the last
  1068. // log has such preallocated space, so we only truncate for the last log.
  1069. if (log_number == log_numbers.back()) {
  1070. std::unique_ptr<FSWritableFile> last_log;
  1071. Status truncate_status = fs_->ReopenWritableFile(
  1072. fname,
  1073. fs_->OptimizeForLogWrite(
  1074. file_options_,
  1075. BuildDBOptions(immutable_db_options_, mutable_db_options_)),
  1076. &last_log, nullptr);
  1077. if (truncate_status.ok()) {
  1078. truncate_status = last_log->Truncate(log.size, IOOptions(), nullptr);
  1079. }
  1080. if (truncate_status.ok()) {
  1081. truncate_status = last_log->Close(IOOptions(), nullptr);
  1082. }
  1083. // Not a critical error if fail to truncate.
  1084. if (!truncate_status.ok()) {
  1085. ROCKS_LOG_WARN(immutable_db_options_.info_log,
  1086. "Failed to truncate log #%" PRIu64 ": %s", log_number,
  1087. truncate_status.ToString().c_str());
  1088. }
  1089. }
  1090. }
  1091. if (two_write_queues_) {
  1092. log_write_mutex_.Unlock();
  1093. }
  1094. return s;
  1095. }
  1096. Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
  1097. MemTable* mem, VersionEdit* edit) {
  1098. mutex_.AssertHeld();
  1099. const uint64_t start_micros = env_->NowMicros();
  1100. FileMetaData meta;
  1101. std::unique_ptr<std::list<uint64_t>::iterator> pending_outputs_inserted_elem(
  1102. new std::list<uint64_t>::iterator(
  1103. CaptureCurrentFileNumberInPendingOutputs()));
  1104. meta.fd = FileDescriptor(versions_->NewFileNumber(), 0, 0);
  1105. ReadOptions ro;
  1106. ro.total_order_seek = true;
  1107. Arena arena;
  1108. Status s;
  1109. TableProperties table_properties;
  1110. {
  1111. ScopedArenaIterator iter(mem->NewIterator(ro, &arena));
  1112. ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
  1113. "[%s] [WriteLevel0TableForRecovery]"
  1114. " Level-0 table #%" PRIu64 ": started",
  1115. cfd->GetName().c_str(), meta.fd.GetNumber());
  1116. // Get the latest mutable cf options while the mutex is still locked
  1117. const MutableCFOptions mutable_cf_options =
  1118. *cfd->GetLatestMutableCFOptions();
  1119. bool paranoid_file_checks =
  1120. cfd->GetLatestMutableCFOptions()->paranoid_file_checks;
  1121. int64_t _current_time = 0;
  1122. env_->GetCurrentTime(&_current_time); // ignore error
  1123. const uint64_t current_time = static_cast<uint64_t>(_current_time);
  1124. meta.oldest_ancester_time = current_time;
  1125. {
  1126. auto write_hint = cfd->CalculateSSTWriteHint(0);
  1127. mutex_.Unlock();
  1128. SequenceNumber earliest_write_conflict_snapshot;
  1129. std::vector<SequenceNumber> snapshot_seqs =
  1130. snapshots_.GetAll(&earliest_write_conflict_snapshot);
  1131. auto snapshot_checker = snapshot_checker_.get();
  1132. if (use_custom_gc_ && snapshot_checker == nullptr) {
  1133. snapshot_checker = DisableGCSnapshotChecker::Instance();
  1134. }
  1135. std::vector<std::unique_ptr<FragmentedRangeTombstoneIterator>>
  1136. range_del_iters;
  1137. auto range_del_iter =
  1138. mem->NewRangeTombstoneIterator(ro, kMaxSequenceNumber);
  1139. if (range_del_iter != nullptr) {
  1140. range_del_iters.emplace_back(range_del_iter);
  1141. }
  1142. s = BuildTable(
  1143. dbname_, env_, fs_.get(), *cfd->ioptions(), mutable_cf_options,
  1144. file_options_for_compaction_, cfd->table_cache(), iter.get(),
  1145. std::move(range_del_iters), &meta, cfd->internal_comparator(),
  1146. cfd->int_tbl_prop_collector_factories(), cfd->GetID(), cfd->GetName(),
  1147. snapshot_seqs, earliest_write_conflict_snapshot, snapshot_checker,
  1148. GetCompressionFlush(*cfd->ioptions(), mutable_cf_options),
  1149. mutable_cf_options.sample_for_compression,
  1150. cfd->ioptions()->compression_opts, paranoid_file_checks,
  1151. cfd->internal_stats(), TableFileCreationReason::kRecovery,
  1152. &event_logger_, job_id, Env::IO_HIGH, nullptr /* table_properties */,
  1153. -1 /* level */, current_time, write_hint);
  1154. LogFlush(immutable_db_options_.info_log);
  1155. ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
  1156. "[%s] [WriteLevel0TableForRecovery]"
  1157. " Level-0 table #%" PRIu64 ": %" PRIu64 " bytes %s",
  1158. cfd->GetName().c_str(), meta.fd.GetNumber(),
  1159. meta.fd.GetFileSize(), s.ToString().c_str());
  1160. mutex_.Lock();
  1161. }
  1162. }
  1163. ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem);
  1164. // Note that if file_size is zero, the file has been deleted and
  1165. // should not be added to the manifest.
  1166. int level = 0;
  1167. if (s.ok() && meta.fd.GetFileSize() > 0) {
  1168. edit->AddFile(level, meta.fd.GetNumber(), meta.fd.GetPathId(),
  1169. meta.fd.GetFileSize(), meta.smallest, meta.largest,
  1170. meta.fd.smallest_seqno, meta.fd.largest_seqno,
  1171. meta.marked_for_compaction, meta.oldest_blob_file_number,
  1172. meta.oldest_ancester_time, meta.file_creation_time,
  1173. meta.file_checksum, meta.file_checksum_func_name);
  1174. }
  1175. InternalStats::CompactionStats stats(CompactionReason::kFlush, 1);
  1176. stats.micros = env_->NowMicros() - start_micros;
  1177. stats.bytes_written = meta.fd.GetFileSize();
  1178. stats.num_output_files = 1;
  1179. cfd->internal_stats()->AddCompactionStats(level, Env::Priority::USER, stats);
  1180. cfd->internal_stats()->AddCFStats(InternalStats::BYTES_FLUSHED,
  1181. meta.fd.GetFileSize());
  1182. RecordTick(stats_, COMPACT_WRITE_BYTES, meta.fd.GetFileSize());
  1183. return s;
  1184. }
  1185. Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
  1186. DBOptions db_options(options);
  1187. ColumnFamilyOptions cf_options(options);
  1188. std::vector<ColumnFamilyDescriptor> column_families;
  1189. column_families.push_back(
  1190. ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options));
  1191. if (db_options.persist_stats_to_disk) {
  1192. column_families.push_back(
  1193. ColumnFamilyDescriptor(kPersistentStatsColumnFamilyName, cf_options));
  1194. }
  1195. std::vector<ColumnFamilyHandle*> handles;
  1196. Status s = DB::Open(db_options, dbname, column_families, &handles, dbptr);
  1197. if (s.ok()) {
  1198. if (db_options.persist_stats_to_disk) {
  1199. assert(handles.size() == 2);
  1200. } else {
  1201. assert(handles.size() == 1);
  1202. }
  1203. // i can delete the handle since DBImpl is always holding a reference to
  1204. // default column family
  1205. if (db_options.persist_stats_to_disk && handles[1] != nullptr) {
  1206. delete handles[1];
  1207. }
  1208. delete handles[0];
  1209. }
  1210. return s;
  1211. }
  1212. Status DB::Open(const DBOptions& db_options, const std::string& dbname,
  1213. const std::vector<ColumnFamilyDescriptor>& column_families,
  1214. std::vector<ColumnFamilyHandle*>* handles, DB** dbptr) {
  1215. const bool kSeqPerBatch = true;
  1216. const bool kBatchPerTxn = true;
  1217. return DBImpl::Open(db_options, dbname, column_families, handles, dbptr,
  1218. !kSeqPerBatch, kBatchPerTxn);
  1219. }
  1220. Status DBImpl::CreateWAL(uint64_t log_file_num, uint64_t recycle_log_number,
  1221. size_t preallocate_block_size, log::Writer** new_log) {
  1222. Status s;
  1223. std::unique_ptr<FSWritableFile> lfile;
  1224. DBOptions db_options =
  1225. BuildDBOptions(immutable_db_options_, mutable_db_options_);
  1226. FileOptions opt_file_options =
  1227. fs_->OptimizeForLogWrite(file_options_, db_options);
  1228. std::string log_fname =
  1229. LogFileName(immutable_db_options_.wal_dir, log_file_num);
  1230. if (recycle_log_number) {
  1231. ROCKS_LOG_INFO(immutable_db_options_.info_log,
  1232. "reusing log %" PRIu64 " from recycle list\n",
  1233. recycle_log_number);
  1234. std::string old_log_fname =
  1235. LogFileName(immutable_db_options_.wal_dir, recycle_log_number);
  1236. TEST_SYNC_POINT("DBImpl::CreateWAL:BeforeReuseWritableFile1");
  1237. TEST_SYNC_POINT("DBImpl::CreateWAL:BeforeReuseWritableFile2");
  1238. s = fs_->ReuseWritableFile(log_fname, old_log_fname, opt_file_options,
  1239. &lfile, /*dbg=*/nullptr);
  1240. } else {
  1241. s = NewWritableFile(fs_.get(), log_fname, &lfile, opt_file_options);
  1242. }
  1243. if (s.ok()) {
  1244. lfile->SetWriteLifeTimeHint(CalculateWALWriteHint());
  1245. lfile->SetPreallocationBlockSize(preallocate_block_size);
  1246. const auto& listeners = immutable_db_options_.listeners;
  1247. std::unique_ptr<WritableFileWriter> file_writer(
  1248. new WritableFileWriter(std::move(lfile), log_fname, opt_file_options,
  1249. env_, nullptr /* stats */, listeners));
  1250. *new_log = new log::Writer(std::move(file_writer), log_file_num,
  1251. immutable_db_options_.recycle_log_file_num > 0,
  1252. immutable_db_options_.manual_wal_flush);
  1253. }
  1254. return s;
  1255. }
  1256. Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
  1257. const std::vector<ColumnFamilyDescriptor>& column_families,
  1258. std::vector<ColumnFamilyHandle*>* handles, DB** dbptr,
  1259. const bool seq_per_batch, const bool batch_per_txn) {
  1260. Status s = SanitizeOptionsByTable(db_options, column_families);
  1261. if (!s.ok()) {
  1262. return s;
  1263. }
  1264. s = ValidateOptions(db_options, column_families);
  1265. if (!s.ok()) {
  1266. return s;
  1267. }
  1268. *dbptr = nullptr;
  1269. handles->clear();
  1270. size_t max_write_buffer_size = 0;
  1271. for (auto cf : column_families) {
  1272. max_write_buffer_size =
  1273. std::max(max_write_buffer_size, cf.options.write_buffer_size);
  1274. }
  1275. DBImpl* impl = new DBImpl(db_options, dbname, seq_per_batch, batch_per_txn);
  1276. s = impl->env_->CreateDirIfMissing(impl->immutable_db_options_.wal_dir);
  1277. if (s.ok()) {
  1278. std::vector<std::string> paths;
  1279. for (auto& db_path : impl->immutable_db_options_.db_paths) {
  1280. paths.emplace_back(db_path.path);
  1281. }
  1282. for (auto& cf : column_families) {
  1283. for (auto& cf_path : cf.options.cf_paths) {
  1284. paths.emplace_back(cf_path.path);
  1285. }
  1286. }
  1287. for (auto& path : paths) {
  1288. s = impl->env_->CreateDirIfMissing(path);
  1289. if (!s.ok()) {
  1290. break;
  1291. }
  1292. }
  1293. // For recovery from NoSpace() error, we can only handle
  1294. // the case where the database is stored in a single path
  1295. if (paths.size() <= 1) {
  1296. impl->error_handler_.EnableAutoRecovery();
  1297. }
  1298. }
  1299. if (!s.ok()) {
  1300. delete impl;
  1301. return s;
  1302. }
  1303. s = impl->CreateArchivalDirectory();
  1304. if (!s.ok()) {
  1305. delete impl;
  1306. return s;
  1307. }
  1308. impl->wal_in_db_path_ = IsWalDirSameAsDBPath(&impl->immutable_db_options_);
  1309. impl->mutex_.Lock();
  1310. // Handles create_if_missing, error_if_exists
  1311. uint64_t recovered_seq(kMaxSequenceNumber);
  1312. s = impl->Recover(column_families, false, false, false, &recovered_seq);
  1313. if (s.ok()) {
  1314. uint64_t new_log_number = impl->versions_->NewFileNumber();
  1315. log::Writer* new_log = nullptr;
  1316. const size_t preallocate_block_size =
  1317. impl->GetWalPreallocateBlockSize(max_write_buffer_size);
  1318. s = impl->CreateWAL(new_log_number, 0 /*recycle_log_number*/,
  1319. preallocate_block_size, &new_log);
  1320. if (s.ok()) {
  1321. InstrumentedMutexLock wl(&impl->log_write_mutex_);
  1322. impl->logfile_number_ = new_log_number;
  1323. assert(new_log != nullptr);
  1324. impl->logs_.emplace_back(new_log_number, new_log);
  1325. }
  1326. if (s.ok()) {
  1327. // set column family handles
  1328. for (auto cf : column_families) {
  1329. auto cfd =
  1330. impl->versions_->GetColumnFamilySet()->GetColumnFamily(cf.name);
  1331. if (cfd != nullptr) {
  1332. handles->push_back(
  1333. new ColumnFamilyHandleImpl(cfd, impl, &impl->mutex_));
  1334. impl->NewThreadStatusCfInfo(cfd);
  1335. } else {
  1336. if (db_options.create_missing_column_families) {
  1337. // missing column family, create it
  1338. ColumnFamilyHandle* handle;
  1339. impl->mutex_.Unlock();
  1340. s = impl->CreateColumnFamily(cf.options, cf.name, &handle);
  1341. impl->mutex_.Lock();
  1342. if (s.ok()) {
  1343. handles->push_back(handle);
  1344. } else {
  1345. break;
  1346. }
  1347. } else {
  1348. s = Status::InvalidArgument("Column family not found: ", cf.name);
  1349. break;
  1350. }
  1351. }
  1352. }
  1353. }
  1354. if (s.ok()) {
  1355. SuperVersionContext sv_context(/* create_superversion */ true);
  1356. for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
  1357. impl->InstallSuperVersionAndScheduleWork(
  1358. cfd, &sv_context, *cfd->GetLatestMutableCFOptions());
  1359. }
  1360. sv_context.Clean();
  1361. if (impl->two_write_queues_) {
  1362. impl->log_write_mutex_.Lock();
  1363. }
  1364. impl->alive_log_files_.push_back(
  1365. DBImpl::LogFileNumberSize(impl->logfile_number_));
  1366. if (impl->two_write_queues_) {
  1367. impl->log_write_mutex_.Unlock();
  1368. }
  1369. impl->DeleteObsoleteFiles();
  1370. s = impl->directories_.GetDbDir()->Fsync();
  1371. }
  1372. if (s.ok()) {
  1373. // In WritePrepared there could be gap in sequence numbers. This breaks
  1374. // the trick we use in kPointInTimeRecovery which assumes the first seq in
  1375. // the log right after the corrupted log is one larger than the last seq
  1376. // we read from the logs. To let this trick keep working, we add a dummy
  1377. // entry with the expected sequence to the first log right after recovery.
  1378. // In non-WritePrepared case also the new log after recovery could be
  1379. // empty, and thus missing the consecutive seq hint to distinguish
  1380. // middle-log corruption to corrupted-log-remained-after-recovery. This
  1381. // case also will be addressed by a dummy write.
  1382. if (recovered_seq != kMaxSequenceNumber) {
  1383. WriteBatch empty_batch;
  1384. WriteBatchInternal::SetSequence(&empty_batch, recovered_seq);
  1385. WriteOptions write_options;
  1386. uint64_t log_used, log_size;
  1387. log::Writer* log_writer = impl->logs_.back().writer;
  1388. s = impl->WriteToWAL(empty_batch, log_writer, &log_used, &log_size);
  1389. if (s.ok()) {
  1390. // Need to fsync, otherwise it might get lost after a power reset.
  1391. s = impl->FlushWAL(false);
  1392. if (s.ok()) {
  1393. s = log_writer->file()->Sync(impl->immutable_db_options_.use_fsync);
  1394. }
  1395. }
  1396. }
  1397. }
  1398. }
  1399. if (s.ok() && impl->immutable_db_options_.persist_stats_to_disk) {
  1400. // try to read format version but no need to fail Open() even if it fails
  1401. s = impl->PersistentStatsProcessFormatVersion();
  1402. }
  1403. if (s.ok()) {
  1404. for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
  1405. if (cfd->ioptions()->compaction_style == kCompactionStyleFIFO) {
  1406. auto* vstorage = cfd->current()->storage_info();
  1407. for (int i = 1; i < vstorage->num_levels(); ++i) {
  1408. int num_files = vstorage->NumLevelFiles(i);
  1409. if (num_files > 0) {
  1410. s = Status::InvalidArgument(
  1411. "Not all files are at level 0. Cannot "
  1412. "open with FIFO compaction style.");
  1413. break;
  1414. }
  1415. }
  1416. }
  1417. if (!cfd->mem()->IsSnapshotSupported()) {
  1418. impl->is_snapshot_supported_ = false;
  1419. }
  1420. if (cfd->ioptions()->merge_operator != nullptr &&
  1421. !cfd->mem()->IsMergeOperatorSupported()) {
  1422. s = Status::InvalidArgument(
  1423. "The memtable of column family %s does not support merge operator "
  1424. "its options.merge_operator is non-null",
  1425. cfd->GetName().c_str());
  1426. }
  1427. if (!s.ok()) {
  1428. break;
  1429. }
  1430. }
  1431. }
  1432. TEST_SYNC_POINT("DBImpl::Open:Opened");
  1433. Status persist_options_status;
  1434. if (s.ok()) {
  1435. // Persist RocksDB Options before scheduling the compaction.
  1436. // The WriteOptionsFile() will release and lock the mutex internally.
  1437. persist_options_status = impl->WriteOptionsFile(
  1438. false /*need_mutex_lock*/, false /*need_enter_write_thread*/);
  1439. *dbptr = impl;
  1440. impl->opened_successfully_ = true;
  1441. impl->MaybeScheduleFlushOrCompaction();
  1442. }
  1443. impl->mutex_.Unlock();
  1444. #ifndef ROCKSDB_LITE
  1445. auto sfm = static_cast<SstFileManagerImpl*>(
  1446. impl->immutable_db_options_.sst_file_manager.get());
  1447. if (s.ok() && sfm) {
  1448. // Notify SstFileManager about all sst files that already exist in
  1449. // db_paths[0] and cf_paths[0] when the DB is opened.
  1450. // SstFileManagerImpl needs to know sizes of the files. For files whose size
  1451. // we already know (sst files that appear in manifest - typically that's the
  1452. // vast majority of all files), we'll pass the size to SstFileManager.
  1453. // For all other files SstFileManager will query the size from filesystem.
  1454. std::vector<LiveFileMetaData> metadata;
  1455. impl->mutex_.Lock();
  1456. impl->versions_->GetLiveFilesMetaData(&metadata);
  1457. impl->mutex_.Unlock();
  1458. std::unordered_map<std::string, uint64_t> known_file_sizes;
  1459. for (const auto& md : metadata) {
  1460. std::string name = md.name;
  1461. if (!name.empty() && name[0] == '/') {
  1462. name = name.substr(1);
  1463. }
  1464. known_file_sizes[name] = md.size;
  1465. }
  1466. std::vector<std::string> paths;
  1467. paths.emplace_back(impl->immutable_db_options_.db_paths[0].path);
  1468. for (auto& cf : column_families) {
  1469. if (!cf.options.cf_paths.empty()) {
  1470. paths.emplace_back(cf.options.cf_paths[0].path);
  1471. }
  1472. }
  1473. // Remove duplicate paths.
  1474. std::sort(paths.begin(), paths.end());
  1475. paths.erase(std::unique(paths.begin(), paths.end()), paths.end());
  1476. for (auto& path : paths) {
  1477. std::vector<std::string> existing_files;
  1478. impl->immutable_db_options_.env->GetChildren(path, &existing_files);
  1479. for (auto& file_name : existing_files) {
  1480. uint64_t file_number;
  1481. FileType file_type;
  1482. std::string file_path = path + "/" + file_name;
  1483. if (ParseFileName(file_name, &file_number, &file_type) &&
  1484. file_type == kTableFile) {
  1485. if (known_file_sizes.count(file_name)) {
  1486. // We're assuming that each sst file name exists in at most one of
  1487. // the paths.
  1488. sfm->OnAddFile(file_path, known_file_sizes.at(file_name),
  1489. /* compaction */ false);
  1490. } else {
  1491. sfm->OnAddFile(file_path);
  1492. }
  1493. }
  1494. }
  1495. }
  1496. // Reserve some disk buffer space. This is a heuristic - when we run out
  1497. // of disk space, this ensures that there is atleast write_buffer_size
  1498. // amount of free space before we resume DB writes. In low disk space
  1499. // conditions, we want to avoid a lot of small L0 files due to frequent
  1500. // WAL write failures and resultant forced flushes
  1501. sfm->ReserveDiskBuffer(max_write_buffer_size,
  1502. impl->immutable_db_options_.db_paths[0].path);
  1503. }
  1504. #endif // !ROCKSDB_LITE
  1505. if (s.ok()) {
  1506. ROCKS_LOG_HEADER(impl->immutable_db_options_.info_log, "DB pointer %p",
  1507. impl);
  1508. LogFlush(impl->immutable_db_options_.info_log);
  1509. assert(impl->TEST_WALBufferIsEmpty());
  1510. // If the assert above fails then we need to FlushWAL before returning
  1511. // control back to the user.
  1512. if (!persist_options_status.ok()) {
  1513. s = Status::IOError(
  1514. "DB::Open() failed --- Unable to persist Options file",
  1515. persist_options_status.ToString());
  1516. }
  1517. }
  1518. if (s.ok()) {
  1519. impl->StartTimedTasks();
  1520. }
  1521. if (!s.ok()) {
  1522. for (auto* h : *handles) {
  1523. delete h;
  1524. }
  1525. handles->clear();
  1526. delete impl;
  1527. *dbptr = nullptr;
  1528. }
  1529. return s;
  1530. }
  1531. } // namespace ROCKSDB_NAMESPACE