log_test.cc 42 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #include "db/log_reader.h"
  10. #include "db/log_writer.h"
  11. #include "file/sequence_file_reader.h"
  12. #include "file/writable_file_writer.h"
  13. #include "rocksdb/env.h"
  14. #include "test_util/testharness.h"
  15. #include "test_util/testutil.h"
  16. #include "util/coding.h"
  17. #include "util/crc32c.h"
  18. #include "util/random.h"
  19. #include "utilities/memory_allocators.h"
  20. namespace ROCKSDB_NAMESPACE::log {
  21. // Construct a string of the specified length made out of the supplied
  22. // partial string.
  23. static std::string BigString(const std::string& partial_string, size_t n) {
  24. std::string result;
  25. while (result.size() < n) {
  26. result.append(partial_string);
  27. }
  28. result.resize(n);
  29. return result;
  30. }
  31. // Construct a string from a number
  32. static std::string NumberString(int n) {
  33. char buf[50];
  34. snprintf(buf, sizeof(buf), "%d.", n);
  35. return std::string(buf);
  36. }
  37. // Return a skewed potentially long string
  38. static std::string RandomSkewedString(int i, Random* rnd) {
  39. return BigString(NumberString(i), rnd->Skewed(17));
  40. }
  41. // Param type is tuple<int, bool, CompressionType>
  42. // get<0>(tuple): non-zero if recycling log, zero if regular log
  43. // get<1>(tuple): true if allow retry after read EOF, false otherwise
  44. // get<2>(tuple): type of compression used
  45. class LogTest
  46. : public ::testing::TestWithParam<std::tuple<int, bool, CompressionType>> {
  47. private:
  48. class StringSource : public FSSequentialFile {
  49. public:
  50. Slice& contents_;
  51. bool force_error_;
  52. size_t force_error_position_;
  53. bool force_eof_;
  54. size_t force_eof_position_;
  55. bool returned_partial_;
  56. bool fail_after_read_partial_;
  57. explicit StringSource(Slice& contents, bool fail_after_read_partial)
  58. : contents_(contents),
  59. force_error_(false),
  60. force_error_position_(0),
  61. force_eof_(false),
  62. force_eof_position_(0),
  63. returned_partial_(false),
  64. fail_after_read_partial_(fail_after_read_partial) {}
  65. IOStatus Read(size_t n, const IOOptions& /*opts*/, Slice* result,
  66. char* scratch, IODebugContext* /*dbg*/) override {
  67. if (fail_after_read_partial_) {
  68. EXPECT_TRUE(!returned_partial_) << "must not Read() after eof/error";
  69. }
  70. if (force_error_) {
  71. if (force_error_position_ >= n) {
  72. force_error_position_ -= n;
  73. } else {
  74. *result = Slice(contents_.data(), force_error_position_);
  75. contents_.remove_prefix(force_error_position_);
  76. force_error_ = false;
  77. returned_partial_ = true;
  78. return IOStatus::Corruption("read error");
  79. }
  80. }
  81. if (contents_.size() < n) {
  82. n = contents_.size();
  83. returned_partial_ = true;
  84. }
  85. if (force_eof_) {
  86. if (force_eof_position_ >= n) {
  87. force_eof_position_ -= n;
  88. } else {
  89. force_eof_ = false;
  90. n = force_eof_position_;
  91. returned_partial_ = true;
  92. }
  93. }
  94. // By using scratch we ensure that caller has control over the
  95. // lifetime of result.data()
  96. memcpy(scratch, contents_.data(), n);
  97. *result = Slice(scratch, n);
  98. contents_.remove_prefix(n);
  99. return IOStatus::OK();
  100. }
  101. IOStatus Skip(uint64_t n) override {
  102. if (n > contents_.size()) {
  103. contents_.clear();
  104. return IOStatus::NotFound("in-memory file skipepd past end");
  105. }
  106. contents_.remove_prefix(n);
  107. return IOStatus::OK();
  108. }
  109. };
  110. class ReportCollector : public Reader::Reporter {
  111. public:
  112. size_t dropped_bytes_;
  113. std::string message_;
  114. ReportCollector() : dropped_bytes_(0) {}
  115. void Corruption(size_t bytes, const Status& status,
  116. uint64_t /*log_number*/ = kMaxSequenceNumber) override {
  117. dropped_bytes_ += bytes;
  118. message_.append(status.ToString());
  119. }
  120. };
  121. std::string& dest_contents() { return sink_->contents_; }
  122. const std::string& dest_contents() const { return sink_->contents_; }
  123. void reset_source_contents() { source_->contents_ = dest_contents(); }
  124. Slice reader_contents_;
  125. test::StringSink* sink_;
  126. StringSource* source_;
  127. ReportCollector report_;
  128. protected:
  129. std::unique_ptr<Writer> writer_;
  130. std::unique_ptr<Reader> reader_;
  131. bool allow_retry_read_;
  132. CompressionType compression_type_;
  133. public:
  134. LogTest()
  135. : reader_contents_(),
  136. sink_(new test::StringSink(&reader_contents_)),
  137. source_(new StringSource(reader_contents_, !std::get<1>(GetParam()))),
  138. allow_retry_read_(std::get<1>(GetParam())),
  139. compression_type_(std::get<2>(GetParam())) {
  140. std::unique_ptr<FSWritableFile> sink_holder(sink_);
  141. std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
  142. std::move(sink_holder), "" /* don't care */, FileOptions()));
  143. Writer* writer =
  144. new Writer(std::move(file_writer), 123, std::get<0>(GetParam()), false,
  145. compression_type_);
  146. writer_.reset(writer);
  147. std::unique_ptr<FSSequentialFile> source_holder(source_);
  148. std::unique_ptr<SequentialFileReader> file_reader(
  149. new SequentialFileReader(std::move(source_holder), "" /* file name */));
  150. if (allow_retry_read_) {
  151. reader_.reset(new FragmentBufferedReader(nullptr, std::move(file_reader),
  152. &report_, true /* checksum */,
  153. 123 /* log_number */));
  154. } else {
  155. reader_.reset(new Reader(nullptr, std::move(file_reader), &report_,
  156. true /* checksum */, 123 /* log_number */));
  157. }
  158. }
  159. Slice* get_reader_contents() { return &reader_contents_; }
  160. void Write(const std::string& msg,
  161. const UnorderedMap<uint32_t, size_t>* cf_to_ts_sz = nullptr) {
  162. if (cf_to_ts_sz != nullptr && !cf_to_ts_sz->empty()) {
  163. ASSERT_OK(writer_->MaybeAddUserDefinedTimestampSizeRecord(WriteOptions(),
  164. *cf_to_ts_sz));
  165. }
  166. ASSERT_OK(writer_->AddRecord(WriteOptions(), Slice(msg)));
  167. }
  168. size_t WrittenBytes() const { return dest_contents().size(); }
  169. std::string Read(const WALRecoveryMode wal_recovery_mode =
  170. WALRecoveryMode::kTolerateCorruptedTailRecords,
  171. UnorderedMap<uint32_t, size_t>* cf_to_ts_sz = nullptr) {
  172. std::string scratch;
  173. Slice record;
  174. bool ret = false;
  175. uint64_t record_checksum;
  176. ret = reader_->ReadRecord(&record, &scratch, wal_recovery_mode,
  177. &record_checksum);
  178. if (cf_to_ts_sz != nullptr) {
  179. *cf_to_ts_sz = reader_->GetRecordedTimestampSize();
  180. }
  181. if (ret) {
  182. if (!allow_retry_read_) {
  183. // allow_retry_read_ means using FragmentBufferedReader which does not
  184. // support record checksum yet.
  185. uint64_t actual_record_checksum =
  186. XXH3_64bits(record.data(), record.size());
  187. assert(actual_record_checksum == record_checksum);
  188. }
  189. return record.ToString();
  190. } else {
  191. return "EOF";
  192. }
  193. }
  194. void IncrementByte(int offset, char delta) {
  195. dest_contents()[offset] += delta;
  196. }
  197. void SetByte(int offset, char new_byte) {
  198. dest_contents()[offset] = new_byte;
  199. }
  200. void ShrinkSize(int bytes) { sink_->Drop(bytes); }
  201. void FixChecksum(int header_offset, int len, bool recyclable) {
  202. // Compute crc of type/len/data
  203. int header_size = recyclable ? kRecyclableHeaderSize : kHeaderSize;
  204. uint32_t crc = crc32c::Value(&dest_contents()[header_offset + 6],
  205. header_size - 6 + len);
  206. crc = crc32c::Mask(crc);
  207. EncodeFixed32(&dest_contents()[header_offset], crc);
  208. }
  209. void ForceError(size_t position = 0) {
  210. source_->force_error_ = true;
  211. source_->force_error_position_ = position;
  212. }
  213. size_t DroppedBytes() const { return report_.dropped_bytes_; }
  214. std::string ReportMessage() const { return report_.message_; }
  215. void ForceEOF(size_t position = 0) {
  216. source_->force_eof_ = true;
  217. source_->force_eof_position_ = position;
  218. }
  219. void UnmarkEOF() {
  220. source_->returned_partial_ = false;
  221. reader_->UnmarkEOF();
  222. }
  223. bool IsEOF() { return reader_->IsEOF(); }
  224. // Returns OK iff recorded error message contains "msg"
  225. std::string MatchError(const std::string& msg) const {
  226. if (report_.message_.find(msg) == std::string::npos) {
  227. return report_.message_;
  228. } else {
  229. return "OK";
  230. }
  231. }
  232. void CheckRecordAndTimestampSize(
  233. std::string record, UnorderedMap<uint32_t, size_t>& expected_ts_sz) {
  234. UnorderedMap<uint32_t, size_t> recorded_ts_sz;
  235. ASSERT_EQ(record,
  236. Read(WALRecoveryMode::
  237. kTolerateCorruptedTailRecords /* wal_recovery_mode */,
  238. &recorded_ts_sz));
  239. EXPECT_EQ(expected_ts_sz, recorded_ts_sz);
  240. }
  241. };
  242. TEST_P(LogTest, Empty) { ASSERT_EQ("EOF", Read()); }
  243. TEST_P(LogTest, ReadWrite) {
  244. Write("foo");
  245. Write("bar");
  246. Write("");
  247. Write("xxxx");
  248. ASSERT_EQ("foo", Read());
  249. ASSERT_EQ("bar", Read());
  250. ASSERT_EQ("", Read());
  251. ASSERT_EQ("xxxx", Read());
  252. ASSERT_EQ("EOF", Read());
  253. ASSERT_EQ("EOF", Read()); // Make sure reads at eof work
  254. }
  255. TEST_P(LogTest, ReadWriteWithTimestampSize) {
  256. UnorderedMap<uint32_t, size_t> ts_sz_one = {
  257. {1, sizeof(uint64_t)},
  258. };
  259. Write("foo", &ts_sz_one);
  260. Write("bar");
  261. UnorderedMap<uint32_t, size_t> ts_sz_two = {{2, sizeof(char)}};
  262. Write("", &ts_sz_two);
  263. Write("xxxx");
  264. CheckRecordAndTimestampSize("foo", ts_sz_one);
  265. CheckRecordAndTimestampSize("bar", ts_sz_one);
  266. UnorderedMap<uint32_t, size_t> expected_ts_sz_two;
  267. // User-defined timestamp size records are accumulated and applied to
  268. // subsequent records.
  269. expected_ts_sz_two.insert(ts_sz_one.begin(), ts_sz_one.end());
  270. expected_ts_sz_two.insert(ts_sz_two.begin(), ts_sz_two.end());
  271. CheckRecordAndTimestampSize("", expected_ts_sz_two);
  272. CheckRecordAndTimestampSize("xxxx", expected_ts_sz_two);
  273. ASSERT_EQ("EOF", Read());
  274. ASSERT_EQ("EOF", Read()); // Make sure reads at eof work
  275. }
  276. TEST_P(LogTest, ReadWriteWithTimestampSizeZeroTimestampIgnored) {
  277. UnorderedMap<uint32_t, size_t> ts_sz_one = {{1, sizeof(uint64_t)}};
  278. Write("foo", &ts_sz_one);
  279. UnorderedMap<uint32_t, size_t> ts_sz_two(ts_sz_one.begin(), ts_sz_one.end());
  280. ts_sz_two.insert(std::make_pair(2, 0));
  281. Write("bar", &ts_sz_two);
  282. CheckRecordAndTimestampSize("foo", ts_sz_one);
  283. CheckRecordAndTimestampSize("bar", ts_sz_one);
  284. ASSERT_EQ("EOF", Read());
  285. ASSERT_EQ("EOF", Read()); // Make sure reads at eof work
  286. }
  287. TEST_P(LogTest, ManyBlocks) {
  288. for (int i = 0; i < 100000; i++) {
  289. Write(NumberString(i));
  290. }
  291. for (int i = 0; i < 100000; i++) {
  292. ASSERT_EQ(NumberString(i), Read());
  293. }
  294. ASSERT_EQ("EOF", Read());
  295. }
  296. TEST_P(LogTest, Fragmentation) {
  297. Write("small");
  298. Write(BigString("medium", 50000));
  299. Write(BigString("large", 100000));
  300. ASSERT_EQ("small", Read());
  301. ASSERT_EQ(BigString("medium", 50000), Read());
  302. ASSERT_EQ(BigString("large", 100000), Read());
  303. ASSERT_EQ("EOF", Read());
  304. }
  305. TEST_P(LogTest, MarginalTrailer) {
  306. // Make a trailer that is exactly the same length as an empty record.
  307. int header_size =
  308. std::get<0>(GetParam()) ? kRecyclableHeaderSize : kHeaderSize;
  309. const int n = kBlockSize - 2 * header_size;
  310. Write(BigString("foo", n));
  311. ASSERT_EQ((unsigned int)(kBlockSize - header_size), WrittenBytes());
  312. Write("");
  313. Write("bar");
  314. ASSERT_EQ(BigString("foo", n), Read());
  315. ASSERT_EQ("", Read());
  316. ASSERT_EQ("bar", Read());
  317. ASSERT_EQ("EOF", Read());
  318. }
  319. TEST_P(LogTest, MarginalTrailer2) {
  320. // Make a trailer that is exactly the same length as an empty record.
  321. int header_size =
  322. std::get<0>(GetParam()) ? kRecyclableHeaderSize : kHeaderSize;
  323. const int n = kBlockSize - 2 * header_size;
  324. Write(BigString("foo", n));
  325. ASSERT_EQ((unsigned int)(kBlockSize - header_size), WrittenBytes());
  326. Write("bar");
  327. ASSERT_EQ(BigString("foo", n), Read());
  328. ASSERT_EQ("bar", Read());
  329. ASSERT_EQ("EOF", Read());
  330. ASSERT_EQ(0U, DroppedBytes());
  331. ASSERT_EQ("", ReportMessage());
  332. }
  333. TEST_P(LogTest, ShortTrailer) {
  334. int header_size =
  335. std::get<0>(GetParam()) ? kRecyclableHeaderSize : kHeaderSize;
  336. const int n = kBlockSize - 2 * header_size + 4;
  337. Write(BigString("foo", n));
  338. ASSERT_EQ((unsigned int)(kBlockSize - header_size + 4), WrittenBytes());
  339. Write("");
  340. Write("bar");
  341. ASSERT_EQ(BigString("foo", n), Read());
  342. ASSERT_EQ("", Read());
  343. ASSERT_EQ("bar", Read());
  344. ASSERT_EQ("EOF", Read());
  345. }
  346. TEST_P(LogTest, AlignedEof) {
  347. int header_size =
  348. std::get<0>(GetParam()) ? kRecyclableHeaderSize : kHeaderSize;
  349. const int n = kBlockSize - 2 * header_size + 4;
  350. Write(BigString("foo", n));
  351. ASSERT_EQ((unsigned int)(kBlockSize - header_size + 4), WrittenBytes());
  352. ASSERT_EQ(BigString("foo", n), Read());
  353. ASSERT_EQ("EOF", Read());
  354. }
  355. TEST_P(LogTest, RandomRead) {
  356. const int N = 500;
  357. Random write_rnd(301);
  358. for (int i = 0; i < N; i++) {
  359. Write(RandomSkewedString(i, &write_rnd));
  360. }
  361. Random read_rnd(301);
  362. for (int i = 0; i < N; i++) {
  363. ASSERT_EQ(RandomSkewedString(i, &read_rnd), Read());
  364. }
  365. ASSERT_EQ("EOF", Read());
  366. }
  367. // Tests of all the error paths in log_reader.cc follow:
  368. TEST_P(LogTest, ReadError) {
  369. Write("foo");
  370. ForceError();
  371. ASSERT_EQ("EOF", Read());
  372. ASSERT_EQ((unsigned int)kBlockSize, DroppedBytes());
  373. ASSERT_EQ("OK", MatchError("read error"));
  374. }
  375. TEST_P(LogTest, BadRecordType) {
  376. Write("foo");
  377. // Type is stored in header[6]
  378. IncrementByte(6, 100);
  379. FixChecksum(0, 3, false);
  380. ASSERT_EQ("EOF", Read());
  381. ASSERT_EQ(3U, DroppedBytes());
  382. ASSERT_EQ("OK", MatchError("unknown record type"));
  383. }
  384. TEST_P(LogTest, IgnorableRecordType) {
  385. Write("foo");
  386. // Type is stored in header[6]
  387. SetByte(6, static_cast<char>(kRecordTypeSafeIgnoreMask + 100));
  388. FixChecksum(0, 3, false);
  389. ASSERT_EQ("EOF", Read());
  390. // The new type has value 129 and masked to be ignorable if unknown
  391. ASSERT_EQ(0U, DroppedBytes());
  392. ASSERT_EQ("", ReportMessage());
  393. }
  394. TEST_P(LogTest, TruncatedTrailingRecordIsIgnored) {
  395. Write("foo");
  396. ShrinkSize(4); // Drop all payload as well as a header byte
  397. ASSERT_EQ("EOF", Read());
  398. // Truncated last record is ignored, not treated as an error
  399. ASSERT_EQ(0U, DroppedBytes());
  400. ASSERT_EQ("", ReportMessage());
  401. }
  402. TEST_P(LogTest, TruncatedTrailingRecordIsNotIgnored) {
  403. if (allow_retry_read_) {
  404. // If read retry is allowed, then truncated trailing record should not
  405. // raise an error.
  406. return;
  407. }
  408. Write("foo");
  409. ShrinkSize(4); // Drop all payload as well as a header byte
  410. ASSERT_EQ("EOF", Read(WALRecoveryMode::kAbsoluteConsistency));
  411. // Truncated last record is not ignored, treated as an error
  412. ASSERT_GT(DroppedBytes(), 0U);
  413. ASSERT_EQ("OK", MatchError("Corruption: truncated header"));
  414. }
  415. TEST_P(LogTest, BadLength) {
  416. if (allow_retry_read_) {
  417. // If read retry is allowed, then we should not raise an error when the
  418. // record length specified in header is longer than data currently
  419. // available. It's possible that the body of the record is not written yet.
  420. return;
  421. }
  422. bool recyclable_log = (std::get<0>(GetParam()) != 0);
  423. int header_size = recyclable_log ? kRecyclableHeaderSize : kHeaderSize;
  424. const int kPayloadSize = kBlockSize - header_size;
  425. Write(BigString("bar", kPayloadSize));
  426. Write("foo");
  427. // Least significant size byte is stored in header[4].
  428. IncrementByte(4, 1);
  429. if (!recyclable_log) {
  430. ASSERT_EQ("foo", Read());
  431. ASSERT_EQ(kBlockSize, DroppedBytes());
  432. ASSERT_EQ("OK", MatchError("bad record length"));
  433. } else {
  434. ASSERT_EQ("EOF", Read());
  435. }
  436. }
  437. TEST_P(LogTest, BadLengthAtEndIsIgnored) {
  438. if (allow_retry_read_) {
  439. // If read retry is allowed, then we should not raise an error when the
  440. // record length specified in header is longer than data currently
  441. // available. It's possible that the body of the record is not written yet.
  442. return;
  443. }
  444. Write("foo");
  445. ShrinkSize(1);
  446. ASSERT_EQ("EOF", Read());
  447. ASSERT_EQ(0U, DroppedBytes());
  448. ASSERT_EQ("", ReportMessage());
  449. }
  450. TEST_P(LogTest, BadLengthAtEndIsNotIgnored) {
  451. if (allow_retry_read_) {
  452. // If read retry is allowed, then we should not raise an error when the
  453. // record length specified in header is longer than data currently
  454. // available. It's possible that the body of the record is not written yet.
  455. return;
  456. }
  457. Write("foo");
  458. ShrinkSize(1);
  459. ASSERT_EQ("EOF", Read(WALRecoveryMode::kAbsoluteConsistency));
  460. ASSERT_GT(DroppedBytes(), 0U);
  461. ASSERT_EQ("OK", MatchError("Corruption: truncated record body"));
  462. }
  463. TEST_P(LogTest, ChecksumMismatch) {
  464. Write("foooooo");
  465. IncrementByte(0, 14);
  466. ASSERT_EQ("EOF", Read());
  467. bool recyclable_log = (std::get<0>(GetParam()) != 0);
  468. if (!recyclable_log) {
  469. ASSERT_EQ(14U, DroppedBytes());
  470. ASSERT_EQ("OK", MatchError("checksum mismatch"));
  471. } else {
  472. ASSERT_EQ(0U, DroppedBytes());
  473. ASSERT_EQ("", ReportMessage());
  474. }
  475. }
  476. TEST_P(LogTest, UnexpectedMiddleType) {
  477. Write("foo");
  478. bool recyclable_log = (std::get<0>(GetParam()) != 0);
  479. SetByte(6, static_cast<char>(recyclable_log ? kRecyclableMiddleType
  480. : kMiddleType));
  481. FixChecksum(0, 3, !!recyclable_log);
  482. ASSERT_EQ("EOF", Read());
  483. ASSERT_EQ(3U, DroppedBytes());
  484. ASSERT_EQ("OK", MatchError("missing start"));
  485. }
  486. TEST_P(LogTest, UnexpectedLastType) {
  487. Write("foo");
  488. bool recyclable_log = (std::get<0>(GetParam()) != 0);
  489. SetByte(6,
  490. static_cast<char>(recyclable_log ? kRecyclableLastType : kLastType));
  491. FixChecksum(0, 3, !!recyclable_log);
  492. ASSERT_EQ("EOF", Read());
  493. ASSERT_EQ(3U, DroppedBytes());
  494. ASSERT_EQ("OK", MatchError("missing start"));
  495. }
  496. TEST_P(LogTest, UnexpectedFullType) {
  497. Write("foo");
  498. Write("bar");
  499. bool recyclable_log = (std::get<0>(GetParam()) != 0);
  500. SetByte(
  501. 6, static_cast<char>(recyclable_log ? kRecyclableFirstType : kFirstType));
  502. FixChecksum(0, 3, !!recyclable_log);
  503. ASSERT_EQ("bar", Read());
  504. ASSERT_EQ("EOF", Read());
  505. ASSERT_EQ(3U, DroppedBytes());
  506. ASSERT_EQ("OK", MatchError("partial record without end"));
  507. }
  508. TEST_P(LogTest, UnexpectedFirstType) {
  509. Write("foo");
  510. Write(BigString("bar", 100000));
  511. bool recyclable_log = (std::get<0>(GetParam()) != 0);
  512. SetByte(
  513. 6, static_cast<char>(recyclable_log ? kRecyclableFirstType : kFirstType));
  514. FixChecksum(0, 3, !!recyclable_log);
  515. ASSERT_EQ(BigString("bar", 100000), Read());
  516. ASSERT_EQ("EOF", Read());
  517. ASSERT_EQ(3U, DroppedBytes());
  518. ASSERT_EQ("OK", MatchError("partial record without end"));
  519. }
  520. TEST_P(LogTest, MissingLastIsIgnored) {
  521. Write(BigString("bar", kBlockSize));
  522. // Remove the LAST block, including header.
  523. ShrinkSize(14);
  524. ASSERT_EQ("EOF", Read());
  525. ASSERT_EQ("", ReportMessage());
  526. ASSERT_EQ(0U, DroppedBytes());
  527. }
  528. TEST_P(LogTest, MissingLastIsNotIgnored) {
  529. if (allow_retry_read_) {
  530. // If read retry is allowed, then truncated trailing record should not
  531. // raise an error.
  532. return;
  533. }
  534. Write(BigString("bar", kBlockSize));
  535. // Remove the LAST block, including header.
  536. ShrinkSize(14);
  537. ASSERT_EQ("EOF", Read(WALRecoveryMode::kAbsoluteConsistency));
  538. ASSERT_GT(DroppedBytes(), 0U);
  539. ASSERT_EQ("OK", MatchError("Corruption: error reading trailing data"));
  540. }
  541. TEST_P(LogTest, PartialLastIsIgnored) {
  542. Write(BigString("bar", kBlockSize));
  543. // Cause a bad record length in the LAST block.
  544. ShrinkSize(1);
  545. ASSERT_EQ("EOF", Read());
  546. ASSERT_EQ("", ReportMessage());
  547. ASSERT_EQ(0U, DroppedBytes());
  548. }
  549. TEST_P(LogTest, PartialLastIsNotIgnored) {
  550. if (allow_retry_read_) {
  551. // If read retry is allowed, then truncated trailing record should not
  552. // raise an error.
  553. return;
  554. }
  555. Write(BigString("bar", kBlockSize));
  556. // Cause a bad record length in the LAST block.
  557. ShrinkSize(1);
  558. ASSERT_EQ("EOF", Read(WALRecoveryMode::kAbsoluteConsistency));
  559. ASSERT_GT(DroppedBytes(), 0U);
  560. ASSERT_EQ("OK", MatchError("Corruption: truncated record body"));
  561. }
  562. TEST_P(LogTest, ErrorJoinsRecords) {
  563. // Consider two fragmented records:
  564. // first(R1) last(R1) first(R2) last(R2)
  565. // where the middle two fragments disappear. We do not want
  566. // first(R1),last(R2) to get joined and returned as a valid record.
  567. // Write records that span two blocks
  568. Write(BigString("foo", kBlockSize));
  569. Write(BigString("bar", kBlockSize));
  570. Write("correct");
  571. // Wipe the middle block
  572. for (unsigned int offset = kBlockSize; offset < 2 * kBlockSize; offset++) {
  573. SetByte(offset, 'x');
  574. }
  575. bool recyclable_log = (std::get<0>(GetParam()) != 0);
  576. if (!recyclable_log) {
  577. ASSERT_EQ("correct", Read());
  578. ASSERT_EQ("EOF", Read());
  579. size_t dropped = DroppedBytes();
  580. ASSERT_LE(dropped, 2 * kBlockSize + 100);
  581. ASSERT_GE(dropped, 2 * kBlockSize);
  582. } else {
  583. ASSERT_EQ("EOF", Read());
  584. }
  585. }
  586. TEST_P(LogTest, ClearEofSingleBlock) {
  587. Write("foo");
  588. Write("bar");
  589. bool recyclable_log = (std::get<0>(GetParam()) != 0);
  590. int header_size = recyclable_log ? kRecyclableHeaderSize : kHeaderSize;
  591. ForceEOF(3 + header_size + 2);
  592. ASSERT_EQ("foo", Read());
  593. UnmarkEOF();
  594. ASSERT_EQ("bar", Read());
  595. ASSERT_TRUE(IsEOF());
  596. ASSERT_EQ("EOF", Read());
  597. Write("xxx");
  598. UnmarkEOF();
  599. ASSERT_EQ("xxx", Read());
  600. ASSERT_TRUE(IsEOF());
  601. }
  602. TEST_P(LogTest, ClearEofMultiBlock) {
  603. size_t num_full_blocks = 5;
  604. bool recyclable_log = (std::get<0>(GetParam()) != 0);
  605. int header_size = recyclable_log ? kRecyclableHeaderSize : kHeaderSize;
  606. size_t n = (kBlockSize - header_size) * num_full_blocks + 25;
  607. Write(BigString("foo", n));
  608. Write(BigString("bar", n));
  609. ForceEOF(n + num_full_blocks * header_size + header_size + 3);
  610. ASSERT_EQ(BigString("foo", n), Read());
  611. ASSERT_TRUE(IsEOF());
  612. UnmarkEOF();
  613. ASSERT_EQ(BigString("bar", n), Read());
  614. ASSERT_TRUE(IsEOF());
  615. Write(BigString("xxx", n));
  616. UnmarkEOF();
  617. ASSERT_EQ(BigString("xxx", n), Read());
  618. ASSERT_TRUE(IsEOF());
  619. }
  620. TEST_P(LogTest, ClearEofError) {
  621. // If an error occurs during Read() in UnmarkEOF(), the records contained
  622. // in the buffer should be returned on subsequent calls of ReadRecord()
  623. // until no more full records are left, whereafter ReadRecord() should return
  624. // false to indicate that it cannot read any further.
  625. Write("foo");
  626. Write("bar");
  627. UnmarkEOF();
  628. ASSERT_EQ("foo", Read());
  629. ASSERT_TRUE(IsEOF());
  630. Write("xxx");
  631. ForceError(0);
  632. UnmarkEOF();
  633. ASSERT_EQ("bar", Read());
  634. ASSERT_EQ("EOF", Read());
  635. }
  636. TEST_P(LogTest, ClearEofError2) {
  637. Write("foo");
  638. Write("bar");
  639. UnmarkEOF();
  640. ASSERT_EQ("foo", Read());
  641. Write("xxx");
  642. ForceError(3);
  643. UnmarkEOF();
  644. ASSERT_EQ("bar", Read());
  645. ASSERT_EQ("EOF", Read());
  646. ASSERT_EQ(3U, DroppedBytes());
  647. ASSERT_EQ("OK", MatchError("read error"));
  648. }
  649. TEST_P(LogTest, Recycle) {
  650. bool recyclable_log = (std::get<0>(GetParam()) != 0);
  651. if (!recyclable_log) {
  652. return; // test is only valid for recycled logs
  653. }
  654. Write("foo");
  655. Write("bar");
  656. Write("baz");
  657. Write("bif");
  658. Write("blitz");
  659. while (get_reader_contents()->size() < log::kBlockSize * 2) {
  660. Write("xxxxxxxxxxxxxxxx");
  661. }
  662. std::unique_ptr<FSWritableFile> sink(
  663. new test::OverwritingStringSink(get_reader_contents()));
  664. std::unique_ptr<WritableFileWriter> dest_holder(new WritableFileWriter(
  665. std::move(sink), "" /* don't care */, FileOptions()));
  666. Writer recycle_writer(std::move(dest_holder), 123, true);
  667. ASSERT_OK(recycle_writer.AddRecord(WriteOptions(), Slice("foooo")));
  668. ASSERT_OK(recycle_writer.AddRecord(WriteOptions(), Slice("bar")));
  669. ASSERT_GE(get_reader_contents()->size(), log::kBlockSize * 2);
  670. ASSERT_EQ("foooo", Read());
  671. ASSERT_EQ("bar", Read());
  672. ASSERT_EQ("EOF", Read());
  673. }
  674. TEST_P(LogTest, RecycleWithTimestampSize) {
  675. bool recyclable_log = (std::get<0>(GetParam()) != 0);
  676. if (!recyclable_log) {
  677. return; // test is only valid for recycled logs
  678. }
  679. UnorderedMap<uint32_t, size_t> ts_sz_one = {
  680. {1, sizeof(uint32_t)},
  681. };
  682. Write("foo", &ts_sz_one);
  683. Write("bar");
  684. Write("baz");
  685. Write("bif");
  686. Write("blitz");
  687. while (get_reader_contents()->size() < log::kBlockSize * 2) {
  688. Write("xxxxxxxxxxxxxxxx");
  689. }
  690. std::unique_ptr<FSWritableFile> sink(
  691. new test::OverwritingStringSink(get_reader_contents()));
  692. std::unique_ptr<WritableFileWriter> dest_holder(new WritableFileWriter(
  693. std::move(sink), "" /* don't care */, FileOptions()));
  694. Writer recycle_writer(std::move(dest_holder), 123, true);
  695. UnorderedMap<uint32_t, size_t> ts_sz_two = {
  696. {2, sizeof(uint64_t)},
  697. };
  698. ASSERT_OK(recycle_writer.MaybeAddUserDefinedTimestampSizeRecord(
  699. WriteOptions(), ts_sz_two));
  700. ASSERT_OK(recycle_writer.AddRecord(WriteOptions(), Slice("foooo")));
  701. ASSERT_OK(recycle_writer.AddRecord(WriteOptions(), Slice("bar")));
  702. ASSERT_GE(get_reader_contents()->size(), log::kBlockSize * 2);
  703. CheckRecordAndTimestampSize("foooo", ts_sz_two);
  704. CheckRecordAndTimestampSize("bar", ts_sz_two);
  705. ASSERT_EQ("EOF", Read());
  706. }
  707. // Validates that `MaybeAddUserDefinedTimestampSizeRecord`` adds padding to the
  708. // tail of a block and switches to a new block, if there's not enough space for
  709. // the record.
  710. TEST_P(LogTest, TimestampSizeRecordPadding) {
  711. bool recyclable_log = (std::get<0>(GetParam()) != 0);
  712. const size_t header_size =
  713. recyclable_log ? kRecyclableHeaderSize : kHeaderSize;
  714. const size_t data_len = kBlockSize - 2 * header_size;
  715. const auto first_str = BigString("foo", data_len);
  716. Write(first_str);
  717. UnorderedMap<uint32_t, size_t> ts_sz = {
  718. {2, sizeof(uint64_t)},
  719. };
  720. ASSERT_OK(
  721. writer_->MaybeAddUserDefinedTimestampSizeRecord(WriteOptions(), ts_sz));
  722. ASSERT_LT(writer_->TEST_block_offset(), kBlockSize);
  723. const auto second_str = BigString("bar", 1000);
  724. Write(second_str);
  725. ASSERT_EQ(first_str, Read());
  726. CheckRecordAndTimestampSize(second_str, ts_sz);
  727. }
  728. // Do NOT enable compression for this instantiation.
  729. INSTANTIATE_TEST_CASE_P(
  730. Log, LogTest,
  731. ::testing::Combine(::testing::Values(0, 1), ::testing::Bool(),
  732. ::testing::Values(CompressionType::kNoCompression)));
  733. class RetriableLogTest : public ::testing::TestWithParam<int> {
  734. private:
  735. class ReportCollector : public Reader::Reporter {
  736. public:
  737. size_t dropped_bytes_;
  738. std::string message_;
  739. ReportCollector() : dropped_bytes_(0) {}
  740. void Corruption(size_t bytes, const Status& status,
  741. uint64_t /*log_number*/ = kMaxSequenceNumber) override {
  742. dropped_bytes_ += bytes;
  743. message_.append(status.ToString());
  744. }
  745. };
  746. Slice contents_;
  747. test::StringSink* sink_;
  748. std::unique_ptr<Writer> log_writer_;
  749. Env* env_;
  750. const std::string test_dir_;
  751. const std::string log_file_;
  752. std::unique_ptr<WritableFileWriter> writer_;
  753. std::unique_ptr<SequentialFileReader> reader_;
  754. ReportCollector report_;
  755. std::unique_ptr<FragmentBufferedReader> log_reader_;
  756. public:
  757. RetriableLogTest()
  758. : contents_(),
  759. sink_(new test::StringSink(&contents_)),
  760. log_writer_(nullptr),
  761. env_(Env::Default()),
  762. test_dir_(test::PerThreadDBPath("retriable_log_test")),
  763. log_file_(test_dir_ + "/log"),
  764. writer_(nullptr),
  765. reader_(nullptr),
  766. log_reader_(nullptr) {
  767. std::unique_ptr<FSWritableFile> sink_holder(sink_);
  768. std::unique_ptr<WritableFileWriter> wfw(new WritableFileWriter(
  769. std::move(sink_holder), "" /* file name */, FileOptions()));
  770. log_writer_.reset(new Writer(std::move(wfw), 123, GetParam()));
  771. }
  772. Status SetupTestEnv() {
  773. Status s;
  774. FileOptions fopts;
  775. auto fs = env_->GetFileSystem();
  776. s = fs->CreateDirIfMissing(test_dir_, IOOptions(), nullptr);
  777. std::unique_ptr<FSWritableFile> writable_file;
  778. if (s.ok()) {
  779. s = fs->NewWritableFile(log_file_, fopts, &writable_file, nullptr);
  780. }
  781. if (s.ok()) {
  782. writer_.reset(
  783. new WritableFileWriter(std::move(writable_file), log_file_, fopts));
  784. EXPECT_NE(writer_, nullptr);
  785. }
  786. std::unique_ptr<FSSequentialFile> seq_file;
  787. if (s.ok()) {
  788. s = fs->NewSequentialFile(log_file_, fopts, &seq_file, nullptr);
  789. }
  790. if (s.ok()) {
  791. reader_.reset(new SequentialFileReader(std::move(seq_file), log_file_));
  792. EXPECT_NE(reader_, nullptr);
  793. log_reader_.reset(new FragmentBufferedReader(
  794. nullptr, std::move(reader_), &report_, true /* checksum */,
  795. 123 /* log_number */));
  796. EXPECT_NE(log_reader_, nullptr);
  797. }
  798. return s;
  799. }
  800. std::string contents() { return sink_->contents_; }
  801. void Encode(const std::string& msg) {
  802. ASSERT_OK(log_writer_->AddRecord(WriteOptions(), Slice(msg)));
  803. }
  804. void Write(const Slice& data) {
  805. ASSERT_OK(writer_->Append(IOOptions(), data));
  806. ASSERT_OK(writer_->Sync(IOOptions(), true));
  807. }
  808. bool TryRead(std::string* result) {
  809. assert(result != nullptr);
  810. result->clear();
  811. std::string scratch;
  812. Slice record;
  813. bool r = log_reader_->ReadRecord(&record, &scratch);
  814. if (r) {
  815. result->assign(record.data(), record.size());
  816. return true;
  817. } else {
  818. return false;
  819. }
  820. }
  821. };
  822. TEST_P(RetriableLogTest, TailLog_PartialHeader) {
  823. ASSERT_OK(SetupTestEnv());
  824. std::vector<int> remaining_bytes_in_last_record;
  825. size_t header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
  826. bool eof = false;
  827. SyncPoint::GetInstance()->DisableProcessing();
  828. SyncPoint::GetInstance()->LoadDependency(
  829. {{"RetriableLogTest::TailLog:AfterPart1",
  830. "RetriableLogTest::TailLog:BeforeReadRecord"},
  831. {"FragmentBufferedLogReader::TryReadMore:FirstEOF",
  832. "RetriableLogTest::TailLog:BeforePart2"}});
  833. SyncPoint::GetInstance()->ClearAllCallBacks();
  834. SyncPoint::GetInstance()->SetCallBack(
  835. "FragmentBufferedLogReader::TryReadMore:FirstEOF",
  836. [&](void* /*arg*/) { eof = true; });
  837. SyncPoint::GetInstance()->EnableProcessing();
  838. size_t delta = header_size - 1;
  839. port::Thread log_writer_thread([&]() {
  840. size_t old_sz = contents().size();
  841. Encode("foo");
  842. size_t new_sz = contents().size();
  843. std::string part1 = contents().substr(old_sz, delta);
  844. std::string part2 =
  845. contents().substr(old_sz + delta, new_sz - old_sz - delta);
  846. Write(Slice(part1));
  847. TEST_SYNC_POINT("RetriableLogTest::TailLog:AfterPart1");
  848. TEST_SYNC_POINT("RetriableLogTest::TailLog:BeforePart2");
  849. Write(Slice(part2));
  850. });
  851. std::string record;
  852. port::Thread log_reader_thread([&]() {
  853. TEST_SYNC_POINT("RetriableLogTest::TailLog:BeforeReadRecord");
  854. while (!TryRead(&record)) {
  855. }
  856. });
  857. log_reader_thread.join();
  858. log_writer_thread.join();
  859. ASSERT_EQ("foo", record);
  860. ASSERT_TRUE(eof);
  861. }
  862. TEST_P(RetriableLogTest, TailLog_FullHeader) {
  863. ASSERT_OK(SetupTestEnv());
  864. std::vector<int> remaining_bytes_in_last_record;
  865. size_t header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
  866. bool eof = false;
  867. SyncPoint::GetInstance()->DisableProcessing();
  868. SyncPoint::GetInstance()->LoadDependency(
  869. {{"RetriableLogTest::TailLog:AfterPart1",
  870. "RetriableLogTest::TailLog:BeforeReadRecord"},
  871. {"FragmentBufferedLogReader::TryReadMore:FirstEOF",
  872. "RetriableLogTest::TailLog:BeforePart2"}});
  873. SyncPoint::GetInstance()->ClearAllCallBacks();
  874. SyncPoint::GetInstance()->SetCallBack(
  875. "FragmentBufferedLogReader::TryReadMore:FirstEOF",
  876. [&](void* /*arg*/) { eof = true; });
  877. SyncPoint::GetInstance()->EnableProcessing();
  878. size_t delta = header_size + 1;
  879. port::Thread log_writer_thread([&]() {
  880. size_t old_sz = contents().size();
  881. Encode("foo");
  882. size_t new_sz = contents().size();
  883. std::string part1 = contents().substr(old_sz, delta);
  884. std::string part2 =
  885. contents().substr(old_sz + delta, new_sz - old_sz - delta);
  886. Write(Slice(part1));
  887. TEST_SYNC_POINT("RetriableLogTest::TailLog:AfterPart1");
  888. TEST_SYNC_POINT("RetriableLogTest::TailLog:BeforePart2");
  889. Write(Slice(part2));
  890. ASSERT_TRUE(eof);
  891. });
  892. std::string record;
  893. port::Thread log_reader_thread([&]() {
  894. TEST_SYNC_POINT("RetriableLogTest::TailLog:BeforeReadRecord");
  895. while (!TryRead(&record)) {
  896. }
  897. });
  898. log_reader_thread.join();
  899. log_writer_thread.join();
  900. ASSERT_EQ("foo", record);
  901. }
  902. TEST_P(RetriableLogTest, NonBlockingReadFullRecord) {
  903. // Clear all sync point callbacks even if this test does not use sync point.
  904. // It is necessary, otherwise the execute of this test may hit a sync point
  905. // with which a callback is registered. The registered callback may access
  906. // some dead variable, causing segfault.
  907. SyncPoint::GetInstance()->DisableProcessing();
  908. SyncPoint::GetInstance()->ClearAllCallBacks();
  909. ASSERT_OK(SetupTestEnv());
  910. size_t header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
  911. size_t delta = header_size - 1;
  912. size_t old_sz = contents().size();
  913. Encode("foo-bar");
  914. size_t new_sz = contents().size();
  915. std::string part1 = contents().substr(old_sz, delta);
  916. std::string part2 =
  917. contents().substr(old_sz + delta, new_sz - old_sz - delta);
  918. Write(Slice(part1));
  919. std::string record;
  920. ASSERT_FALSE(TryRead(&record));
  921. ASSERT_TRUE(record.empty());
  922. Write(Slice(part2));
  923. ASSERT_TRUE(TryRead(&record));
  924. ASSERT_EQ("foo-bar", record);
  925. }
  926. INSTANTIATE_TEST_CASE_P(bool, RetriableLogTest, ::testing::Values(0, 2));
  927. class CompressionLogTest : public LogTest {
  928. public:
  929. Status SetupTestEnv() {
  930. return writer_->AddCompressionTypeRecord(WriteOptions());
  931. }
  932. };
  933. TEST_P(CompressionLogTest, Empty) {
  934. CompressionType compression_type = std::get<2>(GetParam());
  935. if (!StreamingCompressionTypeSupported(compression_type)) {
  936. ROCKSDB_GTEST_SKIP("Test requires support for compression type");
  937. return;
  938. }
  939. ASSERT_OK(SetupTestEnv());
  940. const bool compression_enabled =
  941. std::get<2>(GetParam()) == kNoCompression ? false : true;
  942. // If WAL compression is enabled, a record is added for the compression type
  943. const int compression_record_size = compression_enabled ? kHeaderSize + 4 : 0;
  944. ASSERT_EQ(compression_record_size, WrittenBytes());
  945. ASSERT_EQ("EOF", Read());
  946. }
  947. TEST_P(CompressionLogTest, ReadWrite) {
  948. CompressionType compression_type = std::get<2>(GetParam());
  949. if (!StreamingCompressionTypeSupported(compression_type)) {
  950. ROCKSDB_GTEST_SKIP("Test requires support for compression type");
  951. return;
  952. }
  953. ASSERT_OK(SetupTestEnv());
  954. Write("foo");
  955. Write("bar");
  956. Write("");
  957. Write("xxxx");
  958. ASSERT_EQ("foo", Read());
  959. ASSERT_EQ("bar", Read());
  960. ASSERT_EQ("", Read());
  961. ASSERT_EQ("xxxx", Read());
  962. ASSERT_EQ("EOF", Read());
  963. ASSERT_EQ("EOF", Read()); // Make sure reads at eof work
  964. }
  965. TEST_P(CompressionLogTest, ReadWriteWithTimestampSize) {
  966. CompressionType compression_type = std::get<2>(GetParam());
  967. if (!StreamingCompressionTypeSupported(compression_type)) {
  968. ROCKSDB_GTEST_SKIP("Test requires support for compression type");
  969. return;
  970. }
  971. ASSERT_OK(SetupTestEnv());
  972. UnorderedMap<uint32_t, size_t> ts_sz_one = {
  973. {1, sizeof(uint64_t)},
  974. };
  975. Write("foo", &ts_sz_one);
  976. Write("bar");
  977. UnorderedMap<uint32_t, size_t> ts_sz_two = {{2, sizeof(char)}};
  978. Write("", &ts_sz_two);
  979. Write("xxxx");
  980. CheckRecordAndTimestampSize("foo", ts_sz_one);
  981. CheckRecordAndTimestampSize("bar", ts_sz_one);
  982. UnorderedMap<uint32_t, size_t> expected_ts_sz_two;
  983. // User-defined timestamp size records are accumulated and applied to
  984. // subsequent records.
  985. expected_ts_sz_two.insert(ts_sz_one.begin(), ts_sz_one.end());
  986. expected_ts_sz_two.insert(ts_sz_two.begin(), ts_sz_two.end());
  987. CheckRecordAndTimestampSize("", expected_ts_sz_two);
  988. CheckRecordAndTimestampSize("xxxx", expected_ts_sz_two);
  989. ASSERT_EQ("EOF", Read());
  990. ASSERT_EQ("EOF", Read()); // Make sure reads at eof work
  991. }
  992. TEST_P(CompressionLogTest, ManyBlocks) {
  993. CompressionType compression_type = std::get<2>(GetParam());
  994. if (!StreamingCompressionTypeSupported(compression_type)) {
  995. ROCKSDB_GTEST_SKIP("Test requires support for compression type");
  996. return;
  997. }
  998. ASSERT_OK(SetupTestEnv());
  999. for (int i = 0; i < 100000; i++) {
  1000. Write(NumberString(i));
  1001. }
  1002. for (int i = 0; i < 100000; i++) {
  1003. ASSERT_EQ(NumberString(i), Read());
  1004. }
  1005. ASSERT_EQ("EOF", Read());
  1006. }
  1007. TEST_P(CompressionLogTest, Fragmentation) {
  1008. CompressionType compression_type = std::get<2>(GetParam());
  1009. if (!StreamingCompressionTypeSupported(compression_type)) {
  1010. ROCKSDB_GTEST_SKIP("Test requires support for compression type");
  1011. return;
  1012. }
  1013. ASSERT_OK(SetupTestEnv());
  1014. Random rnd(301);
  1015. const std::vector<std::string> wal_entries = {
  1016. "small",
  1017. rnd.RandomBinaryString(3 * kBlockSize / 2), // Spans into block 2
  1018. rnd.RandomBinaryString(3 * kBlockSize), // Spans into block 5
  1019. };
  1020. for (const std::string& wal_entry : wal_entries) {
  1021. Write(wal_entry);
  1022. }
  1023. for (const std::string& wal_entry : wal_entries) {
  1024. ASSERT_EQ(wal_entry, Read());
  1025. }
  1026. ASSERT_EQ("EOF", Read());
  1027. }
  1028. TEST_P(CompressionLogTest, AlignedFragmentation) {
  1029. CompressionType compression_type = std::get<2>(GetParam());
  1030. if (!StreamingCompressionTypeSupported(compression_type)) {
  1031. ROCKSDB_GTEST_SKIP("Test requires support for compression type");
  1032. return;
  1033. }
  1034. ASSERT_OK(SetupTestEnv());
  1035. Random rnd(301);
  1036. int num_filler_records = 0;
  1037. // Keep writing small records until the next record will be aligned at the
  1038. // beginning of the block.
  1039. while ((WrittenBytes() & (kBlockSize - 1)) >= kHeaderSize) {
  1040. char entry = 'a';
  1041. ASSERT_OK(writer_->AddRecord(WriteOptions(), Slice(&entry, 1)));
  1042. num_filler_records++;
  1043. }
  1044. const std::vector<std::string> wal_entries = {
  1045. rnd.RandomBinaryString(3 * kBlockSize),
  1046. };
  1047. for (const std::string& wal_entry : wal_entries) {
  1048. Write(wal_entry);
  1049. }
  1050. for (int i = 0; i < num_filler_records; ++i) {
  1051. ASSERT_EQ("a", Read());
  1052. }
  1053. for (const std::string& wal_entry : wal_entries) {
  1054. ASSERT_EQ(wal_entry, Read());
  1055. }
  1056. ASSERT_EQ("EOF", Read());
  1057. }
  1058. TEST_P(CompressionLogTest, ChecksumMismatch) {
  1059. const CompressionType kCompressionType = std::get<2>(GetParam());
  1060. const bool kCompressionEnabled = kCompressionType != kNoCompression;
  1061. const bool kRecyclableLog = (std::get<0>(GetParam()) != 0);
  1062. if (!StreamingCompressionTypeSupported(kCompressionType)) {
  1063. ROCKSDB_GTEST_SKIP("Test requires support for compression type");
  1064. return;
  1065. }
  1066. ASSERT_OK(SetupTestEnv());
  1067. Write("foooooo");
  1068. int header_len;
  1069. if (kRecyclableLog) {
  1070. header_len = kRecyclableHeaderSize;
  1071. } else {
  1072. header_len = kHeaderSize;
  1073. }
  1074. int compression_record_len;
  1075. if (kCompressionEnabled) {
  1076. compression_record_len = header_len + 4;
  1077. } else {
  1078. compression_record_len = 0;
  1079. }
  1080. IncrementByte(compression_record_len + header_len /* offset */,
  1081. 14 /* delta */);
  1082. ASSERT_EQ("EOF", Read());
  1083. if (!kRecyclableLog) {
  1084. ASSERT_GT(DroppedBytes(), 0U);
  1085. ASSERT_EQ("OK", MatchError("checksum mismatch"));
  1086. } else {
  1087. ASSERT_EQ(0U, DroppedBytes());
  1088. ASSERT_EQ("", ReportMessage());
  1089. }
  1090. }
  1091. INSTANTIATE_TEST_CASE_P(
  1092. Compression, CompressionLogTest,
  1093. ::testing::Combine(::testing::Values(0, 1), ::testing::Bool(),
  1094. ::testing::Values(CompressionType::kNoCompression,
  1095. CompressionType::kZSTD)));
  1096. class StreamingCompressionTest
  1097. : public ::testing::TestWithParam<std::tuple<int, CompressionType>> {};
  1098. TEST_P(StreamingCompressionTest, Basic) {
  1099. size_t input_size = std::get<0>(GetParam());
  1100. CompressionType compression_type = std::get<1>(GetParam());
  1101. if (!StreamingCompressionTypeSupported(compression_type)) {
  1102. ROCKSDB_GTEST_SKIP("Test requires support for compression type");
  1103. return;
  1104. }
  1105. CompressionOptions opts;
  1106. constexpr uint32_t compression_format_version = 2;
  1107. StreamingCompress* compress = StreamingCompress::Create(
  1108. compression_type, opts, compression_format_version, kBlockSize);
  1109. StreamingUncompress* uncompress = StreamingUncompress::Create(
  1110. compression_type, compression_format_version, kBlockSize);
  1111. MemoryAllocator* allocator = new DefaultMemoryAllocator();
  1112. std::string input_buffer = BigString("abc", input_size);
  1113. std::vector<std::string> compressed_buffers;
  1114. size_t remaining;
  1115. // Call compress till the entire input is consumed
  1116. do {
  1117. char* output_buffer = (char*)allocator->Allocate(kBlockSize);
  1118. size_t output_pos;
  1119. remaining = compress->Compress(input_buffer.c_str(), input_size,
  1120. output_buffer, &output_pos);
  1121. if (output_pos > 0) {
  1122. std::string compressed_buffer;
  1123. compressed_buffer.assign(output_buffer, output_pos);
  1124. compressed_buffers.emplace_back(std::move(compressed_buffer));
  1125. }
  1126. allocator->Deallocate((void*)output_buffer);
  1127. } while (remaining > 0);
  1128. std::string uncompressed_buffer;
  1129. int ret_val = 0;
  1130. size_t output_pos;
  1131. char* uncompressed_output_buffer = (char*)allocator->Allocate(kBlockSize);
  1132. // Uncompress the fragments and concatenate them.
  1133. for (int i = 0; i < (int)compressed_buffers.size(); i++) {
  1134. // Call uncompress till either the entire input is consumed or the output
  1135. // buffer size is equal to the allocated output buffer size.
  1136. const char* input = compressed_buffers[i].c_str();
  1137. do {
  1138. ret_val = uncompress->Uncompress(input, compressed_buffers[i].size(),
  1139. uncompressed_output_buffer, &output_pos);
  1140. input = nullptr;
  1141. if (output_pos > 0) {
  1142. std::string uncompressed_fragment;
  1143. uncompressed_fragment.assign(uncompressed_output_buffer, output_pos);
  1144. uncompressed_buffer += uncompressed_fragment;
  1145. }
  1146. } while (ret_val > 0 || output_pos == kBlockSize);
  1147. }
  1148. allocator->Deallocate((void*)uncompressed_output_buffer);
  1149. delete allocator;
  1150. delete compress;
  1151. delete uncompress;
  1152. // The final return value from uncompress() should be 0.
  1153. ASSERT_EQ(ret_val, 0);
  1154. ASSERT_EQ(input_buffer, uncompressed_buffer);
  1155. }
  1156. INSTANTIATE_TEST_CASE_P(
  1157. StreamingCompression, StreamingCompressionTest,
  1158. ::testing::Combine(::testing::Values(10, 100, 1000, kBlockSize,
  1159. kBlockSize * 2),
  1160. ::testing::Values(CompressionType::kZSTD)));
  1161. } // namespace ROCKSDB_NAMESPACE::log
  1162. int main(int argc, char** argv) {
  1163. ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
  1164. ::testing::InitGoogleTest(&argc, argv);
  1165. return RUN_ALL_TESTS();
  1166. }