log_test.cc 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #include "db/log_reader.h"
  10. #include "db/log_writer.h"
  11. #include "env/composite_env_wrapper.h"
  12. #include "file/sequence_file_reader.h"
  13. #include "file/writable_file_writer.h"
  14. #include "rocksdb/env.h"
  15. #include "test_util/testharness.h"
  16. #include "test_util/testutil.h"
  17. #include "util/coding.h"
  18. #include "util/crc32c.h"
  19. #include "util/random.h"
  20. namespace ROCKSDB_NAMESPACE {
  21. namespace log {
  22. // Construct a string of the specified length made out of the supplied
  23. // partial string.
  24. static std::string BigString(const std::string& partial_string, size_t n) {
  25. std::string result;
  26. while (result.size() < n) {
  27. result.append(partial_string);
  28. }
  29. result.resize(n);
  30. return result;
  31. }
  32. // Construct a string from a number
  33. static std::string NumberString(int n) {
  34. char buf[50];
  35. snprintf(buf, sizeof(buf), "%d.", n);
  36. return std::string(buf);
  37. }
  38. // Return a skewed potentially long string
  39. static std::string RandomSkewedString(int i, Random* rnd) {
  40. return BigString(NumberString(i), rnd->Skewed(17));
  41. }
  42. // Param type is tuple<int, bool>
  43. // get<0>(tuple): non-zero if recycling log, zero if regular log
  44. // get<1>(tuple): true if allow retry after read EOF, false otherwise
  45. class LogTest : public ::testing::TestWithParam<std::tuple<int, bool>> {
  46. private:
  47. class StringSource : public SequentialFile {
  48. public:
  49. Slice& contents_;
  50. bool force_error_;
  51. size_t force_error_position_;
  52. bool force_eof_;
  53. size_t force_eof_position_;
  54. bool returned_partial_;
  55. bool fail_after_read_partial_;
  56. explicit StringSource(Slice& contents, bool fail_after_read_partial)
  57. : contents_(contents),
  58. force_error_(false),
  59. force_error_position_(0),
  60. force_eof_(false),
  61. force_eof_position_(0),
  62. returned_partial_(false),
  63. fail_after_read_partial_(fail_after_read_partial) {}
  64. Status Read(size_t n, Slice* result, char* scratch) override {
  65. if (fail_after_read_partial_) {
  66. EXPECT_TRUE(!returned_partial_) << "must not Read() after eof/error";
  67. }
  68. if (force_error_) {
  69. if (force_error_position_ >= n) {
  70. force_error_position_ -= n;
  71. } else {
  72. *result = Slice(contents_.data(), force_error_position_);
  73. contents_.remove_prefix(force_error_position_);
  74. force_error_ = false;
  75. returned_partial_ = true;
  76. return Status::Corruption("read error");
  77. }
  78. }
  79. if (contents_.size() < n) {
  80. n = contents_.size();
  81. returned_partial_ = true;
  82. }
  83. if (force_eof_) {
  84. if (force_eof_position_ >= n) {
  85. force_eof_position_ -= n;
  86. } else {
  87. force_eof_ = false;
  88. n = force_eof_position_;
  89. returned_partial_ = true;
  90. }
  91. }
  92. // By using scratch we ensure that caller has control over the
  93. // lifetime of result.data()
  94. memcpy(scratch, contents_.data(), n);
  95. *result = Slice(scratch, n);
  96. contents_.remove_prefix(n);
  97. return Status::OK();
  98. }
  99. Status Skip(uint64_t n) override {
  100. if (n > contents_.size()) {
  101. contents_.clear();
  102. return Status::NotFound("in-memory file skipepd past end");
  103. }
  104. contents_.remove_prefix(n);
  105. return Status::OK();
  106. }
  107. };
  108. inline StringSource* GetStringSourceFromLegacyReader(
  109. SequentialFileReader* reader) {
  110. LegacySequentialFileWrapper* file =
  111. static_cast<LegacySequentialFileWrapper*>(reader->file());
  112. return static_cast<StringSource*>(file->target());
  113. }
  114. class ReportCollector : public Reader::Reporter {
  115. public:
  116. size_t dropped_bytes_;
  117. std::string message_;
  118. ReportCollector() : dropped_bytes_(0) { }
  119. void Corruption(size_t bytes, const Status& status) override {
  120. dropped_bytes_ += bytes;
  121. message_.append(status.ToString());
  122. }
  123. };
  124. std::string& dest_contents() {
  125. auto dest = test::GetStringSinkFromLegacyWriter(writer_.file());
  126. assert(dest);
  127. return dest->contents_;
  128. }
  129. const std::string& dest_contents() const {
  130. auto dest = test::GetStringSinkFromLegacyWriter(writer_.file());
  131. assert(dest);
  132. return dest->contents_;
  133. }
  134. void reset_source_contents() {
  135. auto src = GetStringSourceFromLegacyReader(reader_->file());
  136. assert(src);
  137. src->contents_ = dest_contents();
  138. }
  139. Slice reader_contents_;
  140. std::unique_ptr<WritableFileWriter> dest_holder_;
  141. std::unique_ptr<SequentialFileReader> source_holder_;
  142. ReportCollector report_;
  143. Writer writer_;
  144. std::unique_ptr<Reader> reader_;
  145. protected:
  146. bool allow_retry_read_;
  147. public:
  148. LogTest()
  149. : reader_contents_(),
  150. dest_holder_(test::GetWritableFileWriter(
  151. new test::StringSink(&reader_contents_), "" /* don't care */)),
  152. source_holder_(test::GetSequentialFileReader(
  153. new StringSource(reader_contents_, !std::get<1>(GetParam())),
  154. "" /* file name */)),
  155. writer_(std::move(dest_holder_), 123, std::get<0>(GetParam())),
  156. allow_retry_read_(std::get<1>(GetParam())) {
  157. if (allow_retry_read_) {
  158. reader_.reset(new FragmentBufferedReader(
  159. nullptr, std::move(source_holder_), &report_, true /* checksum */,
  160. 123 /* log_number */));
  161. } else {
  162. reader_.reset(new Reader(nullptr, std::move(source_holder_), &report_,
  163. true /* checksum */, 123 /* log_number */));
  164. }
  165. }
  166. Slice* get_reader_contents() { return &reader_contents_; }
  167. void Write(const std::string& msg) {
  168. writer_.AddRecord(Slice(msg));
  169. }
  170. size_t WrittenBytes() const {
  171. return dest_contents().size();
  172. }
  173. std::string Read(const WALRecoveryMode wal_recovery_mode =
  174. WALRecoveryMode::kTolerateCorruptedTailRecords) {
  175. std::string scratch;
  176. Slice record;
  177. bool ret = false;
  178. ret = reader_->ReadRecord(&record, &scratch, wal_recovery_mode);
  179. if (ret) {
  180. return record.ToString();
  181. } else {
  182. return "EOF";
  183. }
  184. }
  185. void IncrementByte(int offset, char delta) {
  186. dest_contents()[offset] += delta;
  187. }
  188. void SetByte(int offset, char new_byte) {
  189. dest_contents()[offset] = new_byte;
  190. }
  191. void ShrinkSize(int bytes) {
  192. auto dest = test::GetStringSinkFromLegacyWriter(writer_.file());
  193. assert(dest);
  194. dest->Drop(bytes);
  195. }
  196. void FixChecksum(int header_offset, int len, bool recyclable) {
  197. // Compute crc of type/len/data
  198. int header_size = recyclable ? kRecyclableHeaderSize : kHeaderSize;
  199. uint32_t crc = crc32c::Value(&dest_contents()[header_offset + 6],
  200. header_size - 6 + len);
  201. crc = crc32c::Mask(crc);
  202. EncodeFixed32(&dest_contents()[header_offset], crc);
  203. }
  204. void ForceError(size_t position = 0) {
  205. auto src = GetStringSourceFromLegacyReader(reader_->file());
  206. src->force_error_ = true;
  207. src->force_error_position_ = position;
  208. }
  209. size_t DroppedBytes() const {
  210. return report_.dropped_bytes_;
  211. }
  212. std::string ReportMessage() const {
  213. return report_.message_;
  214. }
  215. void ForceEOF(size_t position = 0) {
  216. auto src = GetStringSourceFromLegacyReader(reader_->file());
  217. src->force_eof_ = true;
  218. src->force_eof_position_ = position;
  219. }
  220. void UnmarkEOF() {
  221. auto src = GetStringSourceFromLegacyReader(reader_->file());
  222. src->returned_partial_ = false;
  223. reader_->UnmarkEOF();
  224. }
  225. bool IsEOF() { return reader_->IsEOF(); }
  226. // Returns OK iff recorded error message contains "msg"
  227. std::string MatchError(const std::string& msg) const {
  228. if (report_.message_.find(msg) == std::string::npos) {
  229. return report_.message_;
  230. } else {
  231. return "OK";
  232. }
  233. }
  234. };
  235. TEST_P(LogTest, Empty) { ASSERT_EQ("EOF", Read()); }
  236. TEST_P(LogTest, ReadWrite) {
  237. Write("foo");
  238. Write("bar");
  239. Write("");
  240. Write("xxxx");
  241. ASSERT_EQ("foo", Read());
  242. ASSERT_EQ("bar", Read());
  243. ASSERT_EQ("", Read());
  244. ASSERT_EQ("xxxx", Read());
  245. ASSERT_EQ("EOF", Read());
  246. ASSERT_EQ("EOF", Read()); // Make sure reads at eof work
  247. }
  248. TEST_P(LogTest, ManyBlocks) {
  249. for (int i = 0; i < 100000; i++) {
  250. Write(NumberString(i));
  251. }
  252. for (int i = 0; i < 100000; i++) {
  253. ASSERT_EQ(NumberString(i), Read());
  254. }
  255. ASSERT_EQ("EOF", Read());
  256. }
  257. TEST_P(LogTest, Fragmentation) {
  258. Write("small");
  259. Write(BigString("medium", 50000));
  260. Write(BigString("large", 100000));
  261. ASSERT_EQ("small", Read());
  262. ASSERT_EQ(BigString("medium", 50000), Read());
  263. ASSERT_EQ(BigString("large", 100000), Read());
  264. ASSERT_EQ("EOF", Read());
  265. }
  266. TEST_P(LogTest, MarginalTrailer) {
  267. // Make a trailer that is exactly the same length as an empty record.
  268. int header_size =
  269. std::get<0>(GetParam()) ? kRecyclableHeaderSize : kHeaderSize;
  270. const int n = kBlockSize - 2 * header_size;
  271. Write(BigString("foo", n));
  272. ASSERT_EQ((unsigned int)(kBlockSize - header_size), WrittenBytes());
  273. Write("");
  274. Write("bar");
  275. ASSERT_EQ(BigString("foo", n), Read());
  276. ASSERT_EQ("", Read());
  277. ASSERT_EQ("bar", Read());
  278. ASSERT_EQ("EOF", Read());
  279. }
  280. TEST_P(LogTest, MarginalTrailer2) {
  281. // Make a trailer that is exactly the same length as an empty record.
  282. int header_size =
  283. std::get<0>(GetParam()) ? kRecyclableHeaderSize : kHeaderSize;
  284. const int n = kBlockSize - 2 * header_size;
  285. Write(BigString("foo", n));
  286. ASSERT_EQ((unsigned int)(kBlockSize - header_size), WrittenBytes());
  287. Write("bar");
  288. ASSERT_EQ(BigString("foo", n), Read());
  289. ASSERT_EQ("bar", Read());
  290. ASSERT_EQ("EOF", Read());
  291. ASSERT_EQ(0U, DroppedBytes());
  292. ASSERT_EQ("", ReportMessage());
  293. }
  294. TEST_P(LogTest, ShortTrailer) {
  295. int header_size =
  296. std::get<0>(GetParam()) ? kRecyclableHeaderSize : kHeaderSize;
  297. const int n = kBlockSize - 2 * header_size + 4;
  298. Write(BigString("foo", n));
  299. ASSERT_EQ((unsigned int)(kBlockSize - header_size + 4), WrittenBytes());
  300. Write("");
  301. Write("bar");
  302. ASSERT_EQ(BigString("foo", n), Read());
  303. ASSERT_EQ("", Read());
  304. ASSERT_EQ("bar", Read());
  305. ASSERT_EQ("EOF", Read());
  306. }
  307. TEST_P(LogTest, AlignedEof) {
  308. int header_size =
  309. std::get<0>(GetParam()) ? kRecyclableHeaderSize : kHeaderSize;
  310. const int n = kBlockSize - 2 * header_size + 4;
  311. Write(BigString("foo", n));
  312. ASSERT_EQ((unsigned int)(kBlockSize - header_size + 4), WrittenBytes());
  313. ASSERT_EQ(BigString("foo", n), Read());
  314. ASSERT_EQ("EOF", Read());
  315. }
  316. TEST_P(LogTest, RandomRead) {
  317. const int N = 500;
  318. Random write_rnd(301);
  319. for (int i = 0; i < N; i++) {
  320. Write(RandomSkewedString(i, &write_rnd));
  321. }
  322. Random read_rnd(301);
  323. for (int i = 0; i < N; i++) {
  324. ASSERT_EQ(RandomSkewedString(i, &read_rnd), Read());
  325. }
  326. ASSERT_EQ("EOF", Read());
  327. }
  328. // Tests of all the error paths in log_reader.cc follow:
  329. TEST_P(LogTest, ReadError) {
  330. Write("foo");
  331. ForceError();
  332. ASSERT_EQ("EOF", Read());
  333. ASSERT_EQ((unsigned int)kBlockSize, DroppedBytes());
  334. ASSERT_EQ("OK", MatchError("read error"));
  335. }
  336. TEST_P(LogTest, BadRecordType) {
  337. Write("foo");
  338. // Type is stored in header[6]
  339. IncrementByte(6, 100);
  340. FixChecksum(0, 3, false);
  341. ASSERT_EQ("EOF", Read());
  342. ASSERT_EQ(3U, DroppedBytes());
  343. ASSERT_EQ("OK", MatchError("unknown record type"));
  344. }
  345. TEST_P(LogTest, TruncatedTrailingRecordIsIgnored) {
  346. Write("foo");
  347. ShrinkSize(4); // Drop all payload as well as a header byte
  348. ASSERT_EQ("EOF", Read());
  349. // Truncated last record is ignored, not treated as an error
  350. ASSERT_EQ(0U, DroppedBytes());
  351. ASSERT_EQ("", ReportMessage());
  352. }
  353. TEST_P(LogTest, TruncatedTrailingRecordIsNotIgnored) {
  354. if (allow_retry_read_) {
  355. // If read retry is allowed, then truncated trailing record should not
  356. // raise an error.
  357. return;
  358. }
  359. Write("foo");
  360. ShrinkSize(4); // Drop all payload as well as a header byte
  361. ASSERT_EQ("EOF", Read(WALRecoveryMode::kAbsoluteConsistency));
  362. // Truncated last record is ignored, not treated as an error
  363. ASSERT_GT(DroppedBytes(), 0U);
  364. ASSERT_EQ("OK", MatchError("Corruption: truncated header"));
  365. }
  366. TEST_P(LogTest, BadLength) {
  367. if (allow_retry_read_) {
  368. // If read retry is allowed, then we should not raise an error when the
  369. // record length specified in header is longer than data currently
  370. // available. It's possible that the body of the record is not written yet.
  371. return;
  372. }
  373. bool recyclable_log = (std::get<0>(GetParam()) != 0);
  374. int header_size = recyclable_log ? kRecyclableHeaderSize : kHeaderSize;
  375. const int kPayloadSize = kBlockSize - header_size;
  376. Write(BigString("bar", kPayloadSize));
  377. Write("foo");
  378. // Least significant size byte is stored in header[4].
  379. IncrementByte(4, 1);
  380. if (!recyclable_log) {
  381. ASSERT_EQ("foo", Read());
  382. ASSERT_EQ(kBlockSize, DroppedBytes());
  383. ASSERT_EQ("OK", MatchError("bad record length"));
  384. } else {
  385. ASSERT_EQ("EOF", Read());
  386. }
  387. }
  388. TEST_P(LogTest, BadLengthAtEndIsIgnored) {
  389. if (allow_retry_read_) {
  390. // If read retry is allowed, then we should not raise an error when the
  391. // record length specified in header is longer than data currently
  392. // available. It's possible that the body of the record is not written yet.
  393. return;
  394. }
  395. Write("foo");
  396. ShrinkSize(1);
  397. ASSERT_EQ("EOF", Read());
  398. ASSERT_EQ(0U, DroppedBytes());
  399. ASSERT_EQ("", ReportMessage());
  400. }
  401. TEST_P(LogTest, BadLengthAtEndIsNotIgnored) {
  402. if (allow_retry_read_) {
  403. // If read retry is allowed, then we should not raise an error when the
  404. // record length specified in header is longer than data currently
  405. // available. It's possible that the body of the record is not written yet.
  406. return;
  407. }
  408. Write("foo");
  409. ShrinkSize(1);
  410. ASSERT_EQ("EOF", Read(WALRecoveryMode::kAbsoluteConsistency));
  411. ASSERT_GT(DroppedBytes(), 0U);
  412. ASSERT_EQ("OK", MatchError("Corruption: truncated header"));
  413. }
  414. TEST_P(LogTest, ChecksumMismatch) {
  415. Write("foooooo");
  416. IncrementByte(0, 14);
  417. ASSERT_EQ("EOF", Read());
  418. bool recyclable_log = (std::get<0>(GetParam()) != 0);
  419. if (!recyclable_log) {
  420. ASSERT_EQ(14U, DroppedBytes());
  421. ASSERT_EQ("OK", MatchError("checksum mismatch"));
  422. } else {
  423. ASSERT_EQ(0U, DroppedBytes());
  424. ASSERT_EQ("", ReportMessage());
  425. }
  426. }
  427. TEST_P(LogTest, UnexpectedMiddleType) {
  428. Write("foo");
  429. bool recyclable_log = (std::get<0>(GetParam()) != 0);
  430. SetByte(6, static_cast<char>(recyclable_log ? kRecyclableMiddleType
  431. : kMiddleType));
  432. FixChecksum(0, 3, !!recyclable_log);
  433. ASSERT_EQ("EOF", Read());
  434. ASSERT_EQ(3U, DroppedBytes());
  435. ASSERT_EQ("OK", MatchError("missing start"));
  436. }
  437. TEST_P(LogTest, UnexpectedLastType) {
  438. Write("foo");
  439. bool recyclable_log = (std::get<0>(GetParam()) != 0);
  440. SetByte(6,
  441. static_cast<char>(recyclable_log ? kRecyclableLastType : kLastType));
  442. FixChecksum(0, 3, !!recyclable_log);
  443. ASSERT_EQ("EOF", Read());
  444. ASSERT_EQ(3U, DroppedBytes());
  445. ASSERT_EQ("OK", MatchError("missing start"));
  446. }
  447. TEST_P(LogTest, UnexpectedFullType) {
  448. Write("foo");
  449. Write("bar");
  450. bool recyclable_log = (std::get<0>(GetParam()) != 0);
  451. SetByte(
  452. 6, static_cast<char>(recyclable_log ? kRecyclableFirstType : kFirstType));
  453. FixChecksum(0, 3, !!recyclable_log);
  454. ASSERT_EQ("bar", Read());
  455. ASSERT_EQ("EOF", Read());
  456. ASSERT_EQ(3U, DroppedBytes());
  457. ASSERT_EQ("OK", MatchError("partial record without end"));
  458. }
  459. TEST_P(LogTest, UnexpectedFirstType) {
  460. Write("foo");
  461. Write(BigString("bar", 100000));
  462. bool recyclable_log = (std::get<0>(GetParam()) != 0);
  463. SetByte(
  464. 6, static_cast<char>(recyclable_log ? kRecyclableFirstType : kFirstType));
  465. FixChecksum(0, 3, !!recyclable_log);
  466. ASSERT_EQ(BigString("bar", 100000), Read());
  467. ASSERT_EQ("EOF", Read());
  468. ASSERT_EQ(3U, DroppedBytes());
  469. ASSERT_EQ("OK", MatchError("partial record without end"));
  470. }
  471. TEST_P(LogTest, MissingLastIsIgnored) {
  472. Write(BigString("bar", kBlockSize));
  473. // Remove the LAST block, including header.
  474. ShrinkSize(14);
  475. ASSERT_EQ("EOF", Read());
  476. ASSERT_EQ("", ReportMessage());
  477. ASSERT_EQ(0U, DroppedBytes());
  478. }
  479. TEST_P(LogTest, MissingLastIsNotIgnored) {
  480. if (allow_retry_read_) {
  481. // If read retry is allowed, then truncated trailing record should not
  482. // raise an error.
  483. return;
  484. }
  485. Write(BigString("bar", kBlockSize));
  486. // Remove the LAST block, including header.
  487. ShrinkSize(14);
  488. ASSERT_EQ("EOF", Read(WALRecoveryMode::kAbsoluteConsistency));
  489. ASSERT_GT(DroppedBytes(), 0U);
  490. ASSERT_EQ("OK", MatchError("Corruption: error reading trailing data"));
  491. }
  492. TEST_P(LogTest, PartialLastIsIgnored) {
  493. Write(BigString("bar", kBlockSize));
  494. // Cause a bad record length in the LAST block.
  495. ShrinkSize(1);
  496. ASSERT_EQ("EOF", Read());
  497. ASSERT_EQ("", ReportMessage());
  498. ASSERT_EQ(0U, DroppedBytes());
  499. }
  500. TEST_P(LogTest, PartialLastIsNotIgnored) {
  501. if (allow_retry_read_) {
  502. // If read retry is allowed, then truncated trailing record should not
  503. // raise an error.
  504. return;
  505. }
  506. Write(BigString("bar", kBlockSize));
  507. // Cause a bad record length in the LAST block.
  508. ShrinkSize(1);
  509. ASSERT_EQ("EOF", Read(WALRecoveryMode::kAbsoluteConsistency));
  510. ASSERT_GT(DroppedBytes(), 0U);
  511. ASSERT_EQ("OK", MatchError(
  512. "Corruption: truncated headerCorruption: "
  513. "error reading trailing data"));
  514. }
  515. TEST_P(LogTest, ErrorJoinsRecords) {
  516. // Consider two fragmented records:
  517. // first(R1) last(R1) first(R2) last(R2)
  518. // where the middle two fragments disappear. We do not want
  519. // first(R1),last(R2) to get joined and returned as a valid record.
  520. // Write records that span two blocks
  521. Write(BigString("foo", kBlockSize));
  522. Write(BigString("bar", kBlockSize));
  523. Write("correct");
  524. // Wipe the middle block
  525. for (unsigned int offset = kBlockSize; offset < 2*kBlockSize; offset++) {
  526. SetByte(offset, 'x');
  527. }
  528. bool recyclable_log = (std::get<0>(GetParam()) != 0);
  529. if (!recyclable_log) {
  530. ASSERT_EQ("correct", Read());
  531. ASSERT_EQ("EOF", Read());
  532. size_t dropped = DroppedBytes();
  533. ASSERT_LE(dropped, 2 * kBlockSize + 100);
  534. ASSERT_GE(dropped, 2 * kBlockSize);
  535. } else {
  536. ASSERT_EQ("EOF", Read());
  537. }
  538. }
  539. TEST_P(LogTest, ClearEofSingleBlock) {
  540. Write("foo");
  541. Write("bar");
  542. bool recyclable_log = (std::get<0>(GetParam()) != 0);
  543. int header_size = recyclable_log ? kRecyclableHeaderSize : kHeaderSize;
  544. ForceEOF(3 + header_size + 2);
  545. ASSERT_EQ("foo", Read());
  546. UnmarkEOF();
  547. ASSERT_EQ("bar", Read());
  548. ASSERT_TRUE(IsEOF());
  549. ASSERT_EQ("EOF", Read());
  550. Write("xxx");
  551. UnmarkEOF();
  552. ASSERT_EQ("xxx", Read());
  553. ASSERT_TRUE(IsEOF());
  554. }
  555. TEST_P(LogTest, ClearEofMultiBlock) {
  556. size_t num_full_blocks = 5;
  557. bool recyclable_log = (std::get<0>(GetParam()) != 0);
  558. int header_size = recyclable_log ? kRecyclableHeaderSize : kHeaderSize;
  559. size_t n = (kBlockSize - header_size) * num_full_blocks + 25;
  560. Write(BigString("foo", n));
  561. Write(BigString("bar", n));
  562. ForceEOF(n + num_full_blocks * header_size + header_size + 3);
  563. ASSERT_EQ(BigString("foo", n), Read());
  564. ASSERT_TRUE(IsEOF());
  565. UnmarkEOF();
  566. ASSERT_EQ(BigString("bar", n), Read());
  567. ASSERT_TRUE(IsEOF());
  568. Write(BigString("xxx", n));
  569. UnmarkEOF();
  570. ASSERT_EQ(BigString("xxx", n), Read());
  571. ASSERT_TRUE(IsEOF());
  572. }
  573. TEST_P(LogTest, ClearEofError) {
  574. // If an error occurs during Read() in UnmarkEOF(), the records contained
  575. // in the buffer should be returned on subsequent calls of ReadRecord()
  576. // until no more full records are left, whereafter ReadRecord() should return
  577. // false to indicate that it cannot read any further.
  578. Write("foo");
  579. Write("bar");
  580. UnmarkEOF();
  581. ASSERT_EQ("foo", Read());
  582. ASSERT_TRUE(IsEOF());
  583. Write("xxx");
  584. ForceError(0);
  585. UnmarkEOF();
  586. ASSERT_EQ("bar", Read());
  587. ASSERT_EQ("EOF", Read());
  588. }
  589. TEST_P(LogTest, ClearEofError2) {
  590. Write("foo");
  591. Write("bar");
  592. UnmarkEOF();
  593. ASSERT_EQ("foo", Read());
  594. Write("xxx");
  595. ForceError(3);
  596. UnmarkEOF();
  597. ASSERT_EQ("bar", Read());
  598. ASSERT_EQ("EOF", Read());
  599. ASSERT_EQ(3U, DroppedBytes());
  600. ASSERT_EQ("OK", MatchError("read error"));
  601. }
  602. TEST_P(LogTest, Recycle) {
  603. bool recyclable_log = (std::get<0>(GetParam()) != 0);
  604. if (!recyclable_log) {
  605. return; // test is only valid for recycled logs
  606. }
  607. Write("foo");
  608. Write("bar");
  609. Write("baz");
  610. Write("bif");
  611. Write("blitz");
  612. while (get_reader_contents()->size() < log::kBlockSize * 2) {
  613. Write("xxxxxxxxxxxxxxxx");
  614. }
  615. std::unique_ptr<WritableFileWriter> dest_holder(test::GetWritableFileWriter(
  616. new test::OverwritingStringSink(get_reader_contents()),
  617. "" /* don't care */));
  618. Writer recycle_writer(std::move(dest_holder), 123, true);
  619. recycle_writer.AddRecord(Slice("foooo"));
  620. recycle_writer.AddRecord(Slice("bar"));
  621. ASSERT_GE(get_reader_contents()->size(), log::kBlockSize * 2);
  622. ASSERT_EQ("foooo", Read());
  623. ASSERT_EQ("bar", Read());
  624. ASSERT_EQ("EOF", Read());
  625. }
  626. INSTANTIATE_TEST_CASE_P(bool, LogTest,
  627. ::testing::Values(std::make_tuple(0, false),
  628. std::make_tuple(0, true),
  629. std::make_tuple(1, false),
  630. std::make_tuple(1, true)));
  631. class RetriableLogTest : public ::testing::TestWithParam<int> {
  632. private:
  633. class ReportCollector : public Reader::Reporter {
  634. public:
  635. size_t dropped_bytes_;
  636. std::string message_;
  637. ReportCollector() : dropped_bytes_(0) {}
  638. void Corruption(size_t bytes, const Status& status) override {
  639. dropped_bytes_ += bytes;
  640. message_.append(status.ToString());
  641. }
  642. };
  643. Slice contents_;
  644. std::unique_ptr<WritableFileWriter> dest_holder_;
  645. std::unique_ptr<Writer> log_writer_;
  646. Env* env_;
  647. EnvOptions env_options_;
  648. const std::string test_dir_;
  649. const std::string log_file_;
  650. std::unique_ptr<WritableFileWriter> writer_;
  651. std::unique_ptr<SequentialFileReader> reader_;
  652. ReportCollector report_;
  653. std::unique_ptr<FragmentBufferedReader> log_reader_;
  654. public:
  655. RetriableLogTest()
  656. : contents_(),
  657. dest_holder_(nullptr),
  658. log_writer_(nullptr),
  659. env_(Env::Default()),
  660. test_dir_(test::PerThreadDBPath("retriable_log_test")),
  661. log_file_(test_dir_ + "/log"),
  662. writer_(nullptr),
  663. reader_(nullptr),
  664. log_reader_(nullptr) {}
  665. Status SetupTestEnv() {
  666. dest_holder_.reset(test::GetWritableFileWriter(
  667. new test::StringSink(&contents_), "" /* file name */));
  668. assert(dest_holder_ != nullptr);
  669. log_writer_.reset(new Writer(std::move(dest_holder_), 123, GetParam()));
  670. assert(log_writer_ != nullptr);
  671. Status s;
  672. s = env_->CreateDirIfMissing(test_dir_);
  673. std::unique_ptr<WritableFile> writable_file;
  674. if (s.ok()) {
  675. s = env_->NewWritableFile(log_file_, &writable_file, env_options_);
  676. }
  677. if (s.ok()) {
  678. writer_.reset(new WritableFileWriter(
  679. NewLegacyWritableFileWrapper(std::move(writable_file)), log_file_,
  680. env_options_));
  681. assert(writer_ != nullptr);
  682. }
  683. std::unique_ptr<SequentialFile> seq_file;
  684. if (s.ok()) {
  685. s = env_->NewSequentialFile(log_file_, &seq_file, env_options_);
  686. }
  687. if (s.ok()) {
  688. reader_.reset(new SequentialFileReader(
  689. NewLegacySequentialFileWrapper(seq_file), log_file_));
  690. assert(reader_ != nullptr);
  691. log_reader_.reset(new FragmentBufferedReader(
  692. nullptr, std::move(reader_), &report_, true /* checksum */,
  693. 123 /* log_number */));
  694. assert(log_reader_ != nullptr);
  695. }
  696. return s;
  697. }
  698. std::string contents() {
  699. auto file = test::GetStringSinkFromLegacyWriter(log_writer_->file());
  700. assert(file != nullptr);
  701. return file->contents_;
  702. }
  703. void Encode(const std::string& msg) { log_writer_->AddRecord(Slice(msg)); }
  704. void Write(const Slice& data) {
  705. writer_->Append(data);
  706. writer_->Sync(true);
  707. }
  708. bool TryRead(std::string* result) {
  709. assert(result != nullptr);
  710. result->clear();
  711. std::string scratch;
  712. Slice record;
  713. bool r = log_reader_->ReadRecord(&record, &scratch);
  714. if (r) {
  715. result->assign(record.data(), record.size());
  716. return true;
  717. } else {
  718. return false;
  719. }
  720. }
  721. };
  722. TEST_P(RetriableLogTest, TailLog_PartialHeader) {
  723. ASSERT_OK(SetupTestEnv());
  724. std::vector<int> remaining_bytes_in_last_record;
  725. size_t header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
  726. bool eof = false;
  727. SyncPoint::GetInstance()->DisableProcessing();
  728. SyncPoint::GetInstance()->LoadDependency(
  729. {{"RetriableLogTest::TailLog:AfterPart1",
  730. "RetriableLogTest::TailLog:BeforeReadRecord"},
  731. {"FragmentBufferedLogReader::TryReadMore:FirstEOF",
  732. "RetriableLogTest::TailLog:BeforePart2"}});
  733. SyncPoint::GetInstance()->ClearAllCallBacks();
  734. SyncPoint::GetInstance()->SetCallBack(
  735. "FragmentBufferedLogReader::TryReadMore:FirstEOF",
  736. [&](void* /*arg*/) { eof = true; });
  737. SyncPoint::GetInstance()->EnableProcessing();
  738. size_t delta = header_size - 1;
  739. port::Thread log_writer_thread([&]() {
  740. size_t old_sz = contents().size();
  741. Encode("foo");
  742. size_t new_sz = contents().size();
  743. std::string part1 = contents().substr(old_sz, delta);
  744. std::string part2 =
  745. contents().substr(old_sz + delta, new_sz - old_sz - delta);
  746. Write(Slice(part1));
  747. TEST_SYNC_POINT("RetriableLogTest::TailLog:AfterPart1");
  748. TEST_SYNC_POINT("RetriableLogTest::TailLog:BeforePart2");
  749. Write(Slice(part2));
  750. });
  751. std::string record;
  752. port::Thread log_reader_thread([&]() {
  753. TEST_SYNC_POINT("RetriableLogTest::TailLog:BeforeReadRecord");
  754. while (!TryRead(&record)) {
  755. }
  756. });
  757. log_reader_thread.join();
  758. log_writer_thread.join();
  759. ASSERT_EQ("foo", record);
  760. ASSERT_TRUE(eof);
  761. }
  762. TEST_P(RetriableLogTest, TailLog_FullHeader) {
  763. ASSERT_OK(SetupTestEnv());
  764. std::vector<int> remaining_bytes_in_last_record;
  765. size_t header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
  766. bool eof = false;
  767. SyncPoint::GetInstance()->DisableProcessing();
  768. SyncPoint::GetInstance()->LoadDependency(
  769. {{"RetriableLogTest::TailLog:AfterPart1",
  770. "RetriableLogTest::TailLog:BeforeReadRecord"},
  771. {"FragmentBufferedLogReader::TryReadMore:FirstEOF",
  772. "RetriableLogTest::TailLog:BeforePart2"}});
  773. SyncPoint::GetInstance()->ClearAllCallBacks();
  774. SyncPoint::GetInstance()->SetCallBack(
  775. "FragmentBufferedLogReader::TryReadMore:FirstEOF",
  776. [&](void* /*arg*/) { eof = true; });
  777. SyncPoint::GetInstance()->EnableProcessing();
  778. size_t delta = header_size + 1;
  779. port::Thread log_writer_thread([&]() {
  780. size_t old_sz = contents().size();
  781. Encode("foo");
  782. size_t new_sz = contents().size();
  783. std::string part1 = contents().substr(old_sz, delta);
  784. std::string part2 =
  785. contents().substr(old_sz + delta, new_sz - old_sz - delta);
  786. Write(Slice(part1));
  787. TEST_SYNC_POINT("RetriableLogTest::TailLog:AfterPart1");
  788. TEST_SYNC_POINT("RetriableLogTest::TailLog:BeforePart2");
  789. Write(Slice(part2));
  790. ASSERT_TRUE(eof);
  791. });
  792. std::string record;
  793. port::Thread log_reader_thread([&]() {
  794. TEST_SYNC_POINT("RetriableLogTest::TailLog:BeforeReadRecord");
  795. while (!TryRead(&record)) {
  796. }
  797. });
  798. log_reader_thread.join();
  799. log_writer_thread.join();
  800. ASSERT_EQ("foo", record);
  801. }
  802. TEST_P(RetriableLogTest, NonBlockingReadFullRecord) {
  803. // Clear all sync point callbacks even if this test does not use sync point.
  804. // It is necessary, otherwise the execute of this test may hit a sync point
  805. // with which a callback is registered. The registered callback may access
  806. // some dead variable, causing segfault.
  807. SyncPoint::GetInstance()->DisableProcessing();
  808. SyncPoint::GetInstance()->ClearAllCallBacks();
  809. ASSERT_OK(SetupTestEnv());
  810. size_t header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
  811. size_t delta = header_size - 1;
  812. size_t old_sz = contents().size();
  813. Encode("foo-bar");
  814. size_t new_sz = contents().size();
  815. std::string part1 = contents().substr(old_sz, delta);
  816. std::string part2 =
  817. contents().substr(old_sz + delta, new_sz - old_sz - delta);
  818. Write(Slice(part1));
  819. std::string record;
  820. ASSERT_FALSE(TryRead(&record));
  821. ASSERT_TRUE(record.empty());
  822. Write(Slice(part2));
  823. ASSERT_TRUE(TryRead(&record));
  824. ASSERT_EQ("foo-bar", record);
  825. }
  826. INSTANTIATE_TEST_CASE_P(bool, RetriableLogTest, ::testing::Values(0, 2));
  827. } // namespace log
  828. } // namespace ROCKSDB_NAMESPACE
  829. int main(int argc, char** argv) {
  830. ::testing::InitGoogleTest(&argc, argv);
  831. return RUN_ALL_TESTS();
  832. }