expected_state.cc 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881
  1. // Copyright (c) 2021-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #include <atomic>
  6. #ifdef GFLAGS
  7. #include "db/wide/wide_column_serialization.h"
  8. #include "db/wide/wide_columns_helper.h"
  9. #include "db_stress_tool/db_stress_common.h"
  10. #include "db_stress_tool/db_stress_shared_state.h"
  11. #include "db_stress_tool/expected_state.h"
  12. #include "rocksdb/trace_reader_writer.h"
  13. #include "rocksdb/trace_record_result.h"
  14. namespace ROCKSDB_NAMESPACE {
  15. ExpectedState::ExpectedState(size_t max_key, size_t num_column_families)
  16. : max_key_(max_key),
  17. num_column_families_(num_column_families),
  18. values_(nullptr) {}
  19. void ExpectedState::ClearColumnFamily(int cf) {
  20. const uint32_t del_mask = ExpectedValue::GetDelMask();
  21. std::fill(&Value(cf, 0 /* key */), &Value(cf + 1, 0 /* key */), del_mask);
  22. }
  23. void ExpectedState::Precommit(int cf, int64_t key, const ExpectedValue& value) {
  24. Value(cf, key).store(value.Read());
  25. // To prevent low-level instruction reordering that results
  26. // in db write happens before setting pending state in expected value
  27. std::atomic_thread_fence(std::memory_order_release);
  28. }
  29. PendingExpectedValue ExpectedState::PreparePut(int cf, int64_t key) {
  30. ExpectedValue expected_value = Load(cf, key);
  31. // Calculate the original expected value
  32. const ExpectedValue orig_expected_value = expected_value;
  33. // Calculate the pending expected value
  34. expected_value.Put(true /* pending */);
  35. const ExpectedValue pending_expected_value = expected_value;
  36. // Calculate the final expected value
  37. expected_value.Put(false /* pending */);
  38. const ExpectedValue final_expected_value = expected_value;
  39. // Precommit
  40. Precommit(cf, key, pending_expected_value);
  41. return PendingExpectedValue(&Value(cf, key), orig_expected_value,
  42. final_expected_value);
  43. }
  44. ExpectedValue ExpectedState::Get(int cf, int64_t key) { return Load(cf, key); }
  45. PendingExpectedValue ExpectedState::PrepareDelete(int cf, int64_t key) {
  46. ExpectedValue expected_value = Load(cf, key);
  47. // Calculate the original expected value
  48. const ExpectedValue orig_expected_value = expected_value;
  49. // Calculate the pending expected value
  50. bool res = expected_value.Delete(true /* pending */);
  51. if (!res) {
  52. PendingExpectedValue ret = PendingExpectedValue(
  53. &Value(cf, key), orig_expected_value, orig_expected_value);
  54. return ret;
  55. }
  56. const ExpectedValue pending_expected_value = expected_value;
  57. // Calculate the final expected value
  58. expected_value.Delete(false /* pending */);
  59. const ExpectedValue final_expected_value = expected_value;
  60. // Precommit
  61. Precommit(cf, key, pending_expected_value);
  62. return PendingExpectedValue(&Value(cf, key), orig_expected_value,
  63. final_expected_value);
  64. }
  65. PendingExpectedValue ExpectedState::PrepareSingleDelete(int cf, int64_t key) {
  66. return PrepareDelete(cf, key);
  67. }
  68. std::vector<PendingExpectedValue> ExpectedState::PrepareDeleteRange(
  69. int cf, int64_t begin_key, int64_t end_key) {
  70. std::vector<PendingExpectedValue> pending_expected_values;
  71. for (int64_t key = begin_key; key < end_key; ++key) {
  72. pending_expected_values.push_back(PrepareDelete(cf, key));
  73. }
  74. return pending_expected_values;
  75. }
  76. bool ExpectedState::Exists(int cf, int64_t key) {
  77. return Load(cf, key).Exists();
  78. }
  79. void ExpectedState::Reset() {
  80. const uint32_t del_mask = ExpectedValue::GetDelMask();
  81. for (size_t i = 0; i < num_column_families_; ++i) {
  82. for (size_t j = 0; j < max_key_; ++j) {
  83. Value(static_cast<int>(i), j).store(del_mask, std::memory_order_relaxed);
  84. }
  85. }
  86. }
  87. void ExpectedState::SyncPut(int cf, int64_t key, uint32_t value_base) {
  88. ExpectedValue expected_value = Load(cf, key);
  89. expected_value.SyncPut(value_base);
  90. Value(cf, key).store(expected_value.Read());
  91. }
  92. void ExpectedState::SyncPendingPut(int cf, int64_t key) {
  93. ExpectedValue expected_value = Load(cf, key);
  94. expected_value.SyncPendingPut();
  95. Value(cf, key).store(expected_value.Read());
  96. }
  97. void ExpectedState::SyncDelete(int cf, int64_t key) {
  98. ExpectedValue expected_value = Load(cf, key);
  99. expected_value.SyncDelete();
  100. Value(cf, key).store(expected_value.Read());
  101. }
  102. void ExpectedState::SyncDeleteRange(int cf, int64_t begin_key,
  103. int64_t end_key) {
  104. for (int64_t key = begin_key; key < end_key; ++key) {
  105. SyncDelete(cf, key);
  106. }
  107. }
  108. FileExpectedState::FileExpectedState(
  109. const std::string& expected_state_file_path,
  110. const std::string& expected_persisted_seqno_file_path, size_t max_key,
  111. size_t num_column_families)
  112. : ExpectedState(max_key, num_column_families),
  113. expected_state_file_path_(expected_state_file_path),
  114. expected_persisted_seqno_file_path_(expected_persisted_seqno_file_path) {}
  115. Status FileExpectedState::Open(bool create) {
  116. size_t expected_values_size = GetValuesLen();
  117. Env* default_env = Env::Default();
  118. Status status;
  119. if (create) {
  120. status = CreateFile(default_env, EnvOptions(), expected_state_file_path_,
  121. std::string(expected_values_size, '\0'));
  122. if (!status.ok()) {
  123. return status;
  124. }
  125. status = CreateFile(default_env, EnvOptions(),
  126. expected_persisted_seqno_file_path_,
  127. std::string(sizeof(std::atomic<SequenceNumber>), '\0'));
  128. if (!status.ok()) {
  129. return status;
  130. }
  131. }
  132. status = MemoryMappedFile(default_env, expected_state_file_path_,
  133. expected_state_mmap_buffer_, expected_values_size);
  134. if (!status.ok()) {
  135. assert(values_ == nullptr);
  136. return status;
  137. }
  138. values_ = static_cast<std::atomic<uint32_t>*>(
  139. expected_state_mmap_buffer_->GetBase());
  140. assert(values_ != nullptr);
  141. if (create) {
  142. Reset();
  143. }
  144. // TODO(hx235): Find a way to mmap persisted seqno and expected state into the
  145. // same LATEST file so we can obselete the logic to handle this extra file for
  146. // persisted seqno
  147. status = MemoryMappedFile(default_env, expected_persisted_seqno_file_path_,
  148. expected_persisted_seqno_mmap_buffer_,
  149. sizeof(std::atomic<SequenceNumber>));
  150. if (!status.ok()) {
  151. assert(persisted_seqno_ == nullptr);
  152. return status;
  153. }
  154. persisted_seqno_ = static_cast<std::atomic<SequenceNumber>*>(
  155. expected_persisted_seqno_mmap_buffer_->GetBase());
  156. assert(persisted_seqno_ != nullptr);
  157. if (create) {
  158. persisted_seqno_->store(0, std::memory_order_relaxed);
  159. }
  160. return status;
  161. }
  162. AnonExpectedState::AnonExpectedState(size_t max_key, size_t num_column_families)
  163. : ExpectedState(max_key, num_column_families) {}
  164. #ifndef NDEBUG
  165. Status AnonExpectedState::Open(bool create) {
  166. #else
  167. Status AnonExpectedState::Open(bool /* create */) {
  168. #endif
  169. // AnonExpectedState only supports being freshly created.
  170. assert(create);
  171. values_allocation_.reset(
  172. new std::atomic<uint32_t>[GetValuesLen() /
  173. sizeof(std::atomic<uint32_t>)]);
  174. values_ = &values_allocation_[0];
  175. Reset();
  176. return Status::OK();
  177. }
  178. ExpectedStateManager::ExpectedStateManager(size_t max_key,
  179. size_t num_column_families)
  180. : max_key_(max_key),
  181. num_column_families_(num_column_families),
  182. latest_(nullptr) {}
  183. ExpectedStateManager::~ExpectedStateManager() = default;
  184. const std::string FileExpectedStateManager::kLatestBasename = "LATEST";
  185. const std::string FileExpectedStateManager::kStateFilenameSuffix = ".state";
  186. const std::string FileExpectedStateManager::kTraceFilenameSuffix = ".trace";
  187. const std::string FileExpectedStateManager::kPersistedSeqnoBasename = "PERSIST";
  188. const std::string FileExpectedStateManager::kPersistedSeqnoFilenameSuffix =
  189. ".seqno";
  190. const std::string FileExpectedStateManager::kTempFilenamePrefix = ".";
  191. const std::string FileExpectedStateManager::kTempFilenameSuffix = ".tmp";
  192. FileExpectedStateManager::FileExpectedStateManager(
  193. size_t max_key, size_t num_column_families,
  194. std::string expected_state_dir_path)
  195. : ExpectedStateManager(max_key, num_column_families),
  196. expected_state_dir_path_(std::move(expected_state_dir_path)) {
  197. assert(!expected_state_dir_path_.empty());
  198. }
  199. Status FileExpectedStateManager::Open() {
  200. // Before doing anything, sync directory state with ours. That is, determine
  201. // `saved_seqno_`, and create any necessary missing files.
  202. std::vector<std::string> expected_state_dir_children;
  203. Status s = Env::Default()->GetChildren(expected_state_dir_path_,
  204. &expected_state_dir_children);
  205. bool found_trace = false;
  206. if (s.ok()) {
  207. for (size_t i = 0; i < expected_state_dir_children.size(); ++i) {
  208. const auto& filename = expected_state_dir_children[i];
  209. if (filename.size() >= kStateFilenameSuffix.size() &&
  210. filename.rfind(kStateFilenameSuffix) ==
  211. filename.size() - kStateFilenameSuffix.size() &&
  212. filename.rfind(kLatestBasename, 0) == std::string::npos) {
  213. SequenceNumber found_seqno = ParseUint64(
  214. filename.substr(0, filename.size() - kStateFilenameSuffix.size()));
  215. if (saved_seqno_ == kMaxSequenceNumber || found_seqno > saved_seqno_) {
  216. saved_seqno_ = found_seqno;
  217. }
  218. }
  219. }
  220. // Check if crash happened after creating state file but before creating
  221. // trace file.
  222. if (saved_seqno_ != kMaxSequenceNumber) {
  223. std::string saved_seqno_trace_path = GetPathForFilename(
  224. std::to_string(saved_seqno_) + kTraceFilenameSuffix);
  225. Status exists_status = Env::Default()->FileExists(saved_seqno_trace_path);
  226. if (exists_status.ok()) {
  227. found_trace = true;
  228. } else if (exists_status.IsNotFound()) {
  229. found_trace = false;
  230. } else {
  231. s = exists_status;
  232. }
  233. }
  234. }
  235. if (s.ok() && saved_seqno_ != kMaxSequenceNumber && !found_trace) {
  236. // Create an empty trace file so later logic does not need to distinguish
  237. // missing vs. empty trace file.
  238. std::unique_ptr<WritableFile> wfile;
  239. const EnvOptions soptions;
  240. std::string saved_seqno_trace_path =
  241. GetPathForFilename(std::to_string(saved_seqno_) + kTraceFilenameSuffix);
  242. s = Env::Default()->NewWritableFile(saved_seqno_trace_path, &wfile,
  243. soptions);
  244. }
  245. if (s.ok()) {
  246. s = Clean();
  247. }
  248. std::string expected_state_file_path =
  249. GetPathForFilename(kLatestBasename + kStateFilenameSuffix);
  250. std::string expected_persisted_seqno_file_path = GetPathForFilename(
  251. kPersistedSeqnoBasename + kPersistedSeqnoFilenameSuffix);
  252. bool found = false;
  253. if (s.ok()) {
  254. Status exists_status = Env::Default()->FileExists(expected_state_file_path);
  255. if (exists_status.ok()) {
  256. found = true;
  257. } else if (exists_status.IsNotFound()) {
  258. assert(Env::Default()
  259. ->FileExists(expected_persisted_seqno_file_path)
  260. .IsNotFound());
  261. } else {
  262. s = exists_status;
  263. }
  264. }
  265. if (!found) {
  266. // Initialize the file in a temp path and then rename it. That way, in case
  267. // this process is killed during setup, `Clean()` will take care of removing
  268. // the incomplete expected values file.
  269. std::string temp_expected_state_file_path =
  270. GetTempPathForFilename(kLatestBasename + kStateFilenameSuffix);
  271. std::string temp_expected_persisted_seqno_file_path =
  272. GetTempPathForFilename(kPersistedSeqnoBasename +
  273. kPersistedSeqnoFilenameSuffix);
  274. FileExpectedState temp_expected_state(
  275. temp_expected_state_file_path, temp_expected_persisted_seqno_file_path,
  276. max_key_, num_column_families_);
  277. if (s.ok()) {
  278. s = temp_expected_state.Open(true /* create */);
  279. }
  280. if (s.ok()) {
  281. s = Env::Default()->RenameFile(temp_expected_state_file_path,
  282. expected_state_file_path);
  283. }
  284. if (s.ok()) {
  285. s = Env::Default()->RenameFile(temp_expected_persisted_seqno_file_path,
  286. expected_persisted_seqno_file_path);
  287. }
  288. }
  289. if (s.ok()) {
  290. latest_.reset(
  291. new FileExpectedState(std::move(expected_state_file_path),
  292. std::move(expected_persisted_seqno_file_path),
  293. max_key_, num_column_families_));
  294. s = latest_->Open(false /* create */);
  295. }
  296. return s;
  297. }
  298. Status FileExpectedStateManager::SaveAtAndAfter(DB* db) {
  299. SequenceNumber seqno = db->GetLatestSequenceNumber();
  300. std::string state_filename = std::to_string(seqno) + kStateFilenameSuffix;
  301. std::string state_file_temp_path = GetTempPathForFilename(state_filename);
  302. std::string state_file_path = GetPathForFilename(state_filename);
  303. std::string latest_file_path =
  304. GetPathForFilename(kLatestBasename + kStateFilenameSuffix);
  305. std::string trace_filename = std::to_string(seqno) + kTraceFilenameSuffix;
  306. std::string trace_file_path = GetPathForFilename(trace_filename);
  307. // Populate a tempfile and then rename it to atomically create "<seqno>.state"
  308. // with contents from "LATEST.state"
  309. Status s =
  310. CopyFile(FileSystem::Default(), latest_file_path, Temperature::kUnknown,
  311. state_file_temp_path, Temperature::kUnknown, 0 /* size */,
  312. false /* use_fsync */, nullptr /* io_tracer */);
  313. if (s.ok()) {
  314. s = FileSystem::Default()->RenameFile(state_file_temp_path, state_file_path,
  315. IOOptions(), nullptr /* dbg */);
  316. }
  317. SequenceNumber old_saved_seqno = 0;
  318. if (s.ok()) {
  319. old_saved_seqno = saved_seqno_;
  320. saved_seqno_ = seqno;
  321. }
  322. // If there is a crash now, i.e., after "<seqno>.state" was created but before
  323. // "<seqno>.trace" is created, it will be treated as if "<seqno>.trace" were
  324. // present but empty.
  325. // Create "<seqno>.trace" directly. It is initially empty so no need for
  326. // tempfile.
  327. std::unique_ptr<TraceWriter> trace_writer;
  328. if (s.ok()) {
  329. EnvOptions soptions;
  330. // Disable buffering so traces will not get stuck in application buffer.
  331. soptions.writable_file_max_buffer_size = 0;
  332. s = NewFileTraceWriter(Env::Default(), soptions, trace_file_path,
  333. &trace_writer);
  334. }
  335. if (s.ok()) {
  336. TraceOptions trace_opts;
  337. trace_opts.filter |= kTraceFilterGet;
  338. trace_opts.filter |= kTraceFilterMultiGet;
  339. trace_opts.filter |= kTraceFilterIteratorSeek;
  340. trace_opts.filter |= kTraceFilterIteratorSeekForPrev;
  341. trace_opts.preserve_write_order = true;
  342. s = db->StartTrace(trace_opts, std::move(trace_writer));
  343. }
  344. // Delete old state/trace files. Deletion order does not matter since we only
  345. // delete after successfully saving new files, so old files will never be used
  346. // again, even if we crash.
  347. if (s.ok() && old_saved_seqno != kMaxSequenceNumber &&
  348. old_saved_seqno != saved_seqno_) {
  349. s = Env::Default()->DeleteFile(GetPathForFilename(
  350. std::to_string(old_saved_seqno) + kStateFilenameSuffix));
  351. }
  352. if (s.ok() && old_saved_seqno != kMaxSequenceNumber &&
  353. old_saved_seqno != saved_seqno_) {
  354. s = Env::Default()->DeleteFile(GetPathForFilename(
  355. std::to_string(old_saved_seqno) + kTraceFilenameSuffix));
  356. }
  357. return s;
  358. }
  359. bool FileExpectedStateManager::HasHistory() {
  360. return saved_seqno_ != kMaxSequenceNumber;
  361. }
  362. namespace {
  363. // An `ExpectedStateTraceRecordHandler` applies a configurable number of
  364. // write operation trace records to the configured expected state. It is used in
  365. // `FileExpectedStateManager::Restore()` to sync the expected state with the
  366. // DB's post-recovery state.
  367. class ExpectedStateTraceRecordHandler : public TraceRecord::Handler,
  368. public WriteBatch::Handler {
  369. public:
  370. ExpectedStateTraceRecordHandler(uint64_t max_write_ops, ExpectedState* state)
  371. : max_write_ops_(max_write_ops),
  372. state_(state),
  373. buffered_writes_(nullptr) {}
  374. ~ExpectedStateTraceRecordHandler() { assert(IsDone()); }
  375. // True if we have already reached the limit on write operations to apply.
  376. bool IsDone() { return num_write_ops_ == max_write_ops_; }
  377. Status Handle(const WriteQueryTraceRecord& record,
  378. std::unique_ptr<TraceRecordResult>* /* result */) override {
  379. if (IsDone()) {
  380. return Status::OK();
  381. }
  382. WriteBatch batch(record.GetWriteBatchRep().ToString());
  383. return batch.Iterate(this);
  384. }
  385. // Ignore reads.
  386. Status Handle(const GetQueryTraceRecord& /* record */,
  387. std::unique_ptr<TraceRecordResult>* /* result */) override {
  388. return Status::OK();
  389. }
  390. // Ignore reads.
  391. Status Handle(const IteratorSeekQueryTraceRecord& /* record */,
  392. std::unique_ptr<TraceRecordResult>* /* result */) override {
  393. return Status::OK();
  394. }
  395. // Ignore reads.
  396. Status Handle(const MultiGetQueryTraceRecord& /* record */,
  397. std::unique_ptr<TraceRecordResult>* /* result */) override {
  398. return Status::OK();
  399. }
  400. // Below are the WriteBatch::Handler overrides. We could use a separate
  401. // object, but it's convenient and works to share state with the
  402. // `TraceRecord::Handler`.
  403. Status PutCF(uint32_t column_family_id, const Slice& key_with_ts,
  404. const Slice& value) override {
  405. Slice key =
  406. StripTimestampFromUserKey(key_with_ts, FLAGS_user_timestamp_size);
  407. uint64_t key_id;
  408. if (!GetIntVal(key.ToString(), &key_id)) {
  409. return Status::Corruption("unable to parse key", key.ToString());
  410. }
  411. uint32_t value_base = GetValueBase(value);
  412. bool should_buffer_write = !(buffered_writes_ == nullptr);
  413. if (should_buffer_write) {
  414. return WriteBatchInternal::Put(buffered_writes_.get(), column_family_id,
  415. key, value);
  416. }
  417. state_->SyncPut(column_family_id, static_cast<int64_t>(key_id), value_base);
  418. ++num_write_ops_;
  419. return Status::OK();
  420. }
  421. Status TimedPutCF(uint32_t column_family_id, const Slice& key_with_ts,
  422. const Slice& value, uint64_t write_unix_time) override {
  423. Slice key =
  424. StripTimestampFromUserKey(key_with_ts, FLAGS_user_timestamp_size);
  425. uint64_t key_id;
  426. if (!GetIntVal(key.ToString(), &key_id)) {
  427. return Status::Corruption("unable to parse key", key.ToString());
  428. }
  429. uint32_t value_base = GetValueBase(value);
  430. bool should_buffer_write = !(buffered_writes_ == nullptr);
  431. if (should_buffer_write) {
  432. return WriteBatchInternal::TimedPut(buffered_writes_.get(),
  433. column_family_id, key, value,
  434. write_unix_time);
  435. }
  436. state_->SyncPut(column_family_id, static_cast<int64_t>(key_id), value_base);
  437. ++num_write_ops_;
  438. return Status::OK();
  439. }
  440. Status PutEntityCF(uint32_t column_family_id, const Slice& key_with_ts,
  441. const Slice& entity) override {
  442. Slice key =
  443. StripTimestampFromUserKey(key_with_ts, FLAGS_user_timestamp_size);
  444. uint64_t key_id = 0;
  445. if (!GetIntVal(key.ToString(), &key_id)) {
  446. return Status::Corruption("Unable to parse key", key.ToString());
  447. }
  448. Slice entity_copy = entity;
  449. WideColumns columns;
  450. if (!WideColumnSerialization::Deserialize(entity_copy, columns).ok()) {
  451. return Status::Corruption("Unable to deserialize entity",
  452. entity.ToString(/* hex */ true));
  453. }
  454. if (!VerifyWideColumns(columns)) {
  455. return Status::Corruption("Wide columns in entity inconsistent",
  456. entity.ToString(/* hex */ true));
  457. }
  458. if (buffered_writes_) {
  459. return WriteBatchInternal::PutEntity(buffered_writes_.get(),
  460. column_family_id, key, columns);
  461. }
  462. const uint32_t value_base =
  463. GetValueBase(WideColumnsHelper::GetDefaultColumn(columns));
  464. state_->SyncPut(column_family_id, static_cast<int64_t>(key_id), value_base);
  465. ++num_write_ops_;
  466. return Status::OK();
  467. }
  468. Status DeleteCF(uint32_t column_family_id,
  469. const Slice& key_with_ts) override {
  470. Slice key =
  471. StripTimestampFromUserKey(key_with_ts, FLAGS_user_timestamp_size);
  472. uint64_t key_id;
  473. if (!GetIntVal(key.ToString(), &key_id)) {
  474. return Status::Corruption("unable to parse key", key.ToString());
  475. }
  476. bool should_buffer_write = !(buffered_writes_ == nullptr);
  477. if (should_buffer_write) {
  478. return WriteBatchInternal::Delete(buffered_writes_.get(),
  479. column_family_id, key);
  480. }
  481. state_->SyncDelete(column_family_id, static_cast<int64_t>(key_id));
  482. ++num_write_ops_;
  483. return Status::OK();
  484. }
  485. Status SingleDeleteCF(uint32_t column_family_id,
  486. const Slice& key_with_ts) override {
  487. bool should_buffer_write = !(buffered_writes_ == nullptr);
  488. if (should_buffer_write) {
  489. Slice key =
  490. StripTimestampFromUserKey(key_with_ts, FLAGS_user_timestamp_size);
  491. Slice ts =
  492. ExtractTimestampFromUserKey(key_with_ts, FLAGS_user_timestamp_size);
  493. std::array<Slice, 2> key_with_ts_arr{{key, ts}};
  494. return WriteBatchInternal::SingleDelete(
  495. buffered_writes_.get(), column_family_id,
  496. SliceParts(key_with_ts_arr.data(), 2));
  497. }
  498. return DeleteCF(column_family_id, key_with_ts);
  499. }
  500. Status DeleteRangeCF(uint32_t column_family_id,
  501. const Slice& begin_key_with_ts,
  502. const Slice& end_key_with_ts) override {
  503. Slice begin_key =
  504. StripTimestampFromUserKey(begin_key_with_ts, FLAGS_user_timestamp_size);
  505. Slice end_key =
  506. StripTimestampFromUserKey(end_key_with_ts, FLAGS_user_timestamp_size);
  507. uint64_t begin_key_id, end_key_id;
  508. if (!GetIntVal(begin_key.ToString(), &begin_key_id)) {
  509. return Status::Corruption("unable to parse begin key",
  510. begin_key.ToString());
  511. }
  512. if (!GetIntVal(end_key.ToString(), &end_key_id)) {
  513. return Status::Corruption("unable to parse end key", end_key.ToString());
  514. }
  515. bool should_buffer_write = !(buffered_writes_ == nullptr);
  516. if (should_buffer_write) {
  517. return WriteBatchInternal::DeleteRange(
  518. buffered_writes_.get(), column_family_id, begin_key, end_key);
  519. }
  520. state_->SyncDeleteRange(column_family_id,
  521. static_cast<int64_t>(begin_key_id),
  522. static_cast<int64_t>(end_key_id));
  523. ++num_write_ops_;
  524. return Status::OK();
  525. }
  526. Status MergeCF(uint32_t column_family_id, const Slice& key_with_ts,
  527. const Slice& value) override {
  528. Slice key =
  529. StripTimestampFromUserKey(key_with_ts, FLAGS_user_timestamp_size);
  530. bool should_buffer_write = !(buffered_writes_ == nullptr);
  531. if (should_buffer_write) {
  532. return WriteBatchInternal::Merge(buffered_writes_.get(), column_family_id,
  533. key, value);
  534. }
  535. return PutCF(column_family_id, key, value);
  536. }
  537. Status MarkBeginPrepare(bool = false) override {
  538. assert(!buffered_writes_);
  539. buffered_writes_.reset(new WriteBatch());
  540. return Status::OK();
  541. }
  542. Status MarkEndPrepare(const Slice& xid) override {
  543. assert(buffered_writes_);
  544. std::string xid_str = xid.ToString();
  545. assert(xid_to_buffered_writes_.find(xid_str) ==
  546. xid_to_buffered_writes_.end());
  547. xid_to_buffered_writes_[xid_str].swap(buffered_writes_);
  548. buffered_writes_.reset();
  549. return Status::OK();
  550. }
  551. Status MarkCommit(const Slice& xid) override {
  552. std::string xid_str = xid.ToString();
  553. assert(xid_to_buffered_writes_.find(xid_str) !=
  554. xid_to_buffered_writes_.end());
  555. assert(xid_to_buffered_writes_.at(xid_str));
  556. Status s = xid_to_buffered_writes_.at(xid_str)->Iterate(this);
  557. xid_to_buffered_writes_.erase(xid_str);
  558. return s;
  559. }
  560. Status MarkRollback(const Slice& xid) override {
  561. std::string xid_str = xid.ToString();
  562. assert(xid_to_buffered_writes_.find(xid_str) !=
  563. xid_to_buffered_writes_.end());
  564. assert(xid_to_buffered_writes_.at(xid_str));
  565. xid_to_buffered_writes_.erase(xid_str);
  566. return Status::OK();
  567. }
  568. private:
  569. uint64_t num_write_ops_ = 0;
  570. uint64_t max_write_ops_;
  571. ExpectedState* state_;
  572. std::unordered_map<std::string, std::unique_ptr<WriteBatch>>
  573. xid_to_buffered_writes_;
  574. std::unique_ptr<WriteBatch> buffered_writes_;
  575. };
  576. } // anonymous namespace
  577. Status FileExpectedStateManager::Restore(DB* db) {
  578. assert(HasHistory());
  579. SequenceNumber seqno = db->GetLatestSequenceNumber();
  580. if (seqno < saved_seqno_) {
  581. return Status::Corruption("DB is older than any restorable expected state");
  582. }
  583. std::string state_filename =
  584. std::to_string(saved_seqno_) + kStateFilenameSuffix;
  585. std::string state_file_path = GetPathForFilename(state_filename);
  586. std::string latest_file_temp_path =
  587. GetTempPathForFilename(kLatestBasename + kStateFilenameSuffix);
  588. std::string latest_file_path =
  589. GetPathForFilename(kLatestBasename + kStateFilenameSuffix);
  590. std::string trace_filename =
  591. std::to_string(saved_seqno_) + kTraceFilenameSuffix;
  592. std::string trace_file_path = GetPathForFilename(trace_filename);
  593. std::unique_ptr<TraceReader> trace_reader;
  594. Status s = NewFileTraceReader(Env::Default(), EnvOptions(), trace_file_path,
  595. &trace_reader);
  596. std::string persisted_seqno_file_path = GetPathForFilename(
  597. kPersistedSeqnoBasename + kPersistedSeqnoFilenameSuffix);
  598. if (s.ok()) {
  599. // We are going to replay on top of "`seqno`.state" to create a new
  600. // "LATEST.state". Start off by creating a tempfile so we can later make the
  601. // new "LATEST.state" appear atomically using `RenameFile()`.
  602. s = CopyFile(FileSystem::Default(), state_file_path, Temperature::kUnknown,
  603. latest_file_temp_path, Temperature::kUnknown, 0 /* size */,
  604. false /* use_fsync */, nullptr /* io_tracer */);
  605. }
  606. {
  607. std::unique_ptr<Replayer> replayer;
  608. std::unique_ptr<ExpectedState> state;
  609. std::unique_ptr<ExpectedStateTraceRecordHandler> handler;
  610. if (s.ok()) {
  611. state.reset(new FileExpectedState(latest_file_temp_path,
  612. persisted_seqno_file_path, max_key_,
  613. num_column_families_));
  614. s = state->Open(false /* create */);
  615. }
  616. if (s.ok()) {
  617. handler.reset(new ExpectedStateTraceRecordHandler(seqno - saved_seqno_,
  618. state.get()));
  619. // TODO(ajkr): An API limitation requires we provide `handles` although
  620. // they will be unused since we only use the replayer for reading records.
  621. // Just give a default CFH for now to satisfy the requirement.
  622. s = db->NewDefaultReplayer({db->DefaultColumnFamily()} /* handles */,
  623. std::move(trace_reader), &replayer);
  624. }
  625. if (s.ok()) {
  626. s = replayer->Prepare();
  627. }
  628. for (; s.ok();) {
  629. std::unique_ptr<TraceRecord> record;
  630. s = replayer->Next(&record);
  631. if (!s.ok()) {
  632. if (s.IsCorruption() && handler->IsDone()) {
  633. // There could be a corruption reading the tail record of the trace
  634. // due to `db_stress` crashing while writing it. It shouldn't matter
  635. // as long as we already found all the write ops we need to catch up
  636. // the expected state.
  637. s = Status::OK();
  638. }
  639. if (s.IsIncomplete()) {
  640. // OK because `Status::Incomplete` is expected upon finishing all the
  641. // trace records.
  642. s = Status::OK();
  643. }
  644. break;
  645. }
  646. std::unique_ptr<TraceRecordResult> res;
  647. s = record->Accept(handler.get(), &res);
  648. }
  649. }
  650. if (s.ok()) {
  651. s = FileSystem::Default()->RenameFile(latest_file_temp_path,
  652. latest_file_path, IOOptions(),
  653. nullptr /* dbg */);
  654. }
  655. if (s.ok()) {
  656. latest_.reset(new FileExpectedState(latest_file_path,
  657. persisted_seqno_file_path, max_key_,
  658. num_column_families_));
  659. s = latest_->Open(false /* create */);
  660. }
  661. // Delete old state/trace files. We must delete the state file first.
  662. // Otherwise, a crash-recovery immediately after deleting the trace file could
  663. // lead to `Restore()` unable to replay to `seqno`.
  664. if (s.ok()) {
  665. s = Env::Default()->DeleteFile(state_file_path);
  666. }
  667. if (s.ok()) {
  668. std::vector<std::string> expected_state_dir_children;
  669. s = Env::Default()->GetChildren(expected_state_dir_path_,
  670. &expected_state_dir_children);
  671. if (s.ok()) {
  672. for (size_t i = 0; i < expected_state_dir_children.size(); ++i) {
  673. const auto& filename = expected_state_dir_children[i];
  674. if (filename.size() >= kTraceFilenameSuffix.size() &&
  675. filename.rfind(kTraceFilenameSuffix) ==
  676. filename.size() - kTraceFilenameSuffix.size()) {
  677. SequenceNumber found_seqno = ParseUint64(filename.substr(
  678. 0, filename.size() - kTraceFilenameSuffix.size()));
  679. // Delete older trace files, but keep the one we just replayed for
  680. // debugging purposes
  681. if (found_seqno < saved_seqno_) {
  682. s = Env::Default()->DeleteFile(GetPathForFilename(filename));
  683. }
  684. }
  685. if (!s.ok()) {
  686. break;
  687. }
  688. }
  689. }
  690. if (s.ok()) {
  691. saved_seqno_ = kMaxSequenceNumber;
  692. }
  693. }
  694. return s;
  695. }
  696. Status FileExpectedStateManager::Clean() {
  697. std::vector<std::string> expected_state_dir_children;
  698. Status s = Env::Default()->GetChildren(expected_state_dir_path_,
  699. &expected_state_dir_children);
  700. // An incomplete `Open()` or incomplete `SaveAtAndAfter()` could have left
  701. // behind invalid temporary files. An incomplete `SaveAtAndAfter()` could have
  702. // also left behind stale state/trace files. An incomplete `Restore()` could
  703. // have left behind stale trace files.
  704. for (size_t i = 0; s.ok() && i < expected_state_dir_children.size(); ++i) {
  705. const auto& filename = expected_state_dir_children[i];
  706. if (filename.rfind(kTempFilenamePrefix, 0 /* pos */) == 0 &&
  707. filename.size() >= kTempFilenameSuffix.size() &&
  708. filename.rfind(kTempFilenameSuffix) ==
  709. filename.size() - kTempFilenameSuffix.size()) {
  710. // Delete all temp files.
  711. s = Env::Default()->DeleteFile(GetPathForFilename(filename));
  712. } else if (filename.size() >= kStateFilenameSuffix.size() &&
  713. filename.rfind(kStateFilenameSuffix) ==
  714. filename.size() - kStateFilenameSuffix.size() &&
  715. filename.rfind(kLatestBasename, 0) == std::string::npos &&
  716. ParseUint64(filename.substr(
  717. 0, filename.size() - kStateFilenameSuffix.size())) <
  718. saved_seqno_) {
  719. assert(saved_seqno_ != kMaxSequenceNumber);
  720. // Delete stale state files.
  721. s = Env::Default()->DeleteFile(GetPathForFilename(filename));
  722. } else if (filename.size() >= kTraceFilenameSuffix.size() &&
  723. filename.rfind(kTraceFilenameSuffix) ==
  724. filename.size() - kTraceFilenameSuffix.size() &&
  725. ParseUint64(filename.substr(
  726. 0, filename.size() - kTraceFilenameSuffix.size())) <
  727. saved_seqno_) {
  728. // Delete stale trace files.
  729. s = Env::Default()->DeleteFile(GetPathForFilename(filename));
  730. }
  731. }
  732. return s;
  733. }
  734. std::string FileExpectedStateManager::GetTempPathForFilename(
  735. const std::string& filename) {
  736. assert(!expected_state_dir_path_.empty());
  737. std::string expected_state_dir_path_slash =
  738. expected_state_dir_path_.back() == '/' ? expected_state_dir_path_
  739. : expected_state_dir_path_ + "/";
  740. return expected_state_dir_path_slash + kTempFilenamePrefix + filename +
  741. kTempFilenameSuffix;
  742. }
  743. std::string FileExpectedStateManager::GetPathForFilename(
  744. const std::string& filename) {
  745. assert(!expected_state_dir_path_.empty());
  746. std::string expected_state_dir_path_slash =
  747. expected_state_dir_path_.back() == '/' ? expected_state_dir_path_
  748. : expected_state_dir_path_ + "/";
  749. return expected_state_dir_path_slash + filename;
  750. }
  751. AnonExpectedStateManager::AnonExpectedStateManager(size_t max_key,
  752. size_t num_column_families)
  753. : ExpectedStateManager(max_key, num_column_families) {}
  754. Status AnonExpectedStateManager::Open() {
  755. latest_.reset(new AnonExpectedState(max_key_, num_column_families_));
  756. return latest_->Open(true /* create */);
  757. }
  758. } // namespace ROCKSDB_NAMESPACE
  759. #endif // GFLAGS