write_batch_test.cc 41 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #include <memory>
  10. #include "db/column_family.h"
  11. #include "db/db_test_util.h"
  12. #include "db/memtable.h"
  13. #include "db/wide/wide_columns_helper.h"
  14. #include "db/write_batch_internal.h"
  15. #include "dbformat.h"
  16. #include "rocksdb/comparator.h"
  17. #include "rocksdb/db.h"
  18. #include "rocksdb/env.h"
  19. #include "rocksdb/memtablerep.h"
  20. #include "rocksdb/utilities/write_batch_with_index.h"
  21. #include "rocksdb/write_buffer_manager.h"
  22. #include "test_util/testharness.h"
  23. #include "test_util/testutil.h"
  24. #include "util/string_util.h"
  25. namespace ROCKSDB_NAMESPACE {
  26. static std::string PrintContents(WriteBatch* b,
  27. bool merge_operator_supported = true) {
  28. InternalKeyComparator cmp(BytewiseComparator());
  29. auto factory = std::make_shared<SkipListFactory>();
  30. Options options;
  31. options.memtable_factory = factory;
  32. if (merge_operator_supported) {
  33. options.merge_operator.reset(new TestPutOperator());
  34. }
  35. ImmutableOptions ioptions(options);
  36. WriteBufferManager wb(options.db_write_buffer_size);
  37. MemTable* mem = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb,
  38. kMaxSequenceNumber, 0 /* column_family_id */);
  39. mem->Ref();
  40. std::string state;
  41. ColumnFamilyMemTablesDefault cf_mems_default(mem);
  42. Status s =
  43. WriteBatchInternal::InsertInto(b, &cf_mems_default, nullptr, nullptr);
  44. uint32_t count = 0;
  45. int put_count = 0;
  46. int timed_put_count = 0;
  47. int delete_count = 0;
  48. int single_delete_count = 0;
  49. int delete_range_count = 0;
  50. int merge_count = 0;
  51. for (int i = 0; i < 2; ++i) {
  52. Arena arena;
  53. ScopedArenaPtr<InternalIterator> arena_iter_guard;
  54. std::unique_ptr<InternalIterator> iter_guard;
  55. InternalIterator* iter;
  56. if (i == 0) {
  57. iter = mem->NewIterator(ReadOptions(), /*seqno_to_time_mapping=*/nullptr,
  58. &arena, /*prefix_extractor=*/nullptr,
  59. /*for_flush=*/false);
  60. arena_iter_guard.reset(iter);
  61. } else {
  62. iter = mem->NewRangeTombstoneIterator(ReadOptions(),
  63. kMaxSequenceNumber /* read_seq */,
  64. false /* immutable_memtable */);
  65. iter_guard.reset(iter);
  66. }
  67. if (iter == nullptr) {
  68. continue;
  69. }
  70. EXPECT_OK(iter->status());
  71. for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
  72. ParsedInternalKey ikey;
  73. ikey.clear();
  74. EXPECT_OK(ParseInternalKey(iter->key(), &ikey, true /* log_err_key */));
  75. switch (ikey.type) {
  76. case kTypeValue:
  77. state.append("Put(");
  78. state.append(ikey.user_key.ToString());
  79. state.append(", ");
  80. state.append(iter->value().ToString());
  81. state.append(")");
  82. count++;
  83. put_count++;
  84. break;
  85. case kTypeDeletion:
  86. state.append("Delete(");
  87. state.append(ikey.user_key.ToString());
  88. state.append(")");
  89. count++;
  90. delete_count++;
  91. break;
  92. case kTypeSingleDeletion:
  93. state.append("SingleDelete(");
  94. state.append(ikey.user_key.ToString());
  95. state.append(")");
  96. count++;
  97. single_delete_count++;
  98. break;
  99. case kTypeRangeDeletion:
  100. state.append("DeleteRange(");
  101. state.append(ikey.user_key.ToString());
  102. state.append(", ");
  103. state.append(iter->value().ToString());
  104. state.append(")");
  105. count++;
  106. delete_range_count++;
  107. break;
  108. case kTypeMerge:
  109. state.append("Merge(");
  110. state.append(ikey.user_key.ToString());
  111. state.append(", ");
  112. state.append(iter->value().ToString());
  113. state.append(")");
  114. count++;
  115. merge_count++;
  116. break;
  117. case kTypeValuePreferredSeqno: {
  118. state.append("TimedPut(");
  119. state.append(ikey.user_key.ToString());
  120. state.append(", ");
  121. auto [unpacked_value, unix_write_time] =
  122. ParsePackedValueWithWriteTime(iter->value());
  123. state.append(unpacked_value.ToString());
  124. state.append(", ");
  125. state.append(std::to_string(unix_write_time));
  126. state.append(")");
  127. count++;
  128. timed_put_count++;
  129. break;
  130. }
  131. default:
  132. assert(false);
  133. break;
  134. }
  135. state.append("@");
  136. state.append(std::to_string(ikey.sequence));
  137. }
  138. EXPECT_OK(iter->status());
  139. }
  140. if (s.ok()) {
  141. EXPECT_EQ(b->HasPut(), put_count > 0);
  142. EXPECT_EQ(b->HasTimedPut(), timed_put_count > 0);
  143. EXPECT_EQ(b->HasDelete(), delete_count > 0);
  144. EXPECT_EQ(b->HasSingleDelete(), single_delete_count > 0);
  145. EXPECT_EQ(b->HasDeleteRange(), delete_range_count > 0);
  146. EXPECT_EQ(b->HasMerge(), merge_count > 0);
  147. if (count != WriteBatchInternal::Count(b)) {
  148. state.append("CountMismatch()");
  149. }
  150. } else {
  151. state.append(s.ToString());
  152. }
  153. delete mem->Unref();
  154. return state;
  155. }
  156. class WriteBatchTest : public testing::Test {};
  157. TEST_F(WriteBatchTest, Empty) {
  158. WriteBatch batch;
  159. ASSERT_EQ("", PrintContents(&batch));
  160. ASSERT_EQ(0u, WriteBatchInternal::Count(&batch));
  161. ASSERT_EQ(0u, batch.Count());
  162. }
  163. TEST_F(WriteBatchTest, Multiple) {
  164. WriteBatch batch;
  165. ASSERT_OK(batch.Put(Slice("foo"), Slice("bar")));
  166. ASSERT_OK(batch.Delete(Slice("box")));
  167. ASSERT_OK(batch.DeleteRange(Slice("bar"), Slice("foo")));
  168. ASSERT_OK(batch.Put(Slice("baz"), Slice("boo")));
  169. WriteBatchInternal::SetSequence(&batch, 100);
  170. ASSERT_EQ(100U, WriteBatchInternal::Sequence(&batch));
  171. ASSERT_EQ(4u, WriteBatchInternal::Count(&batch));
  172. ASSERT_EQ(
  173. "Put(baz, boo)@103"
  174. "Delete(box)@101"
  175. "Put(foo, bar)@100"
  176. "DeleteRange(bar, foo)@102",
  177. PrintContents(&batch));
  178. ASSERT_EQ(4u, batch.Count());
  179. }
  180. TEST_F(WriteBatchTest, Corruption) {
  181. WriteBatch batch;
  182. ASSERT_OK(batch.Put(Slice("foo"), Slice("bar")));
  183. ASSERT_OK(batch.Delete(Slice("box")));
  184. WriteBatchInternal::SetSequence(&batch, 200);
  185. Slice contents = WriteBatchInternal::Contents(&batch);
  186. ASSERT_OK(WriteBatchInternal::SetContents(
  187. &batch, Slice(contents.data(), contents.size() - 1)));
  188. ASSERT_EQ(
  189. "Put(foo, bar)@200"
  190. "Corruption: bad WriteBatch Delete",
  191. PrintContents(&batch));
  192. }
  193. TEST_F(WriteBatchTest, Append) {
  194. WriteBatch b1, b2;
  195. WriteBatchInternal::SetSequence(&b1, 200);
  196. WriteBatchInternal::SetSequence(&b2, 300);
  197. ASSERT_OK(WriteBatchInternal::Append(&b1, &b2));
  198. ASSERT_EQ("", PrintContents(&b1));
  199. ASSERT_EQ(0u, b1.Count());
  200. ASSERT_OK(b2.Put("a", "va"));
  201. ASSERT_OK(WriteBatchInternal::Append(&b1, &b2));
  202. ASSERT_EQ("Put(a, va)@200", PrintContents(&b1));
  203. ASSERT_EQ(1u, b1.Count());
  204. b2.Clear();
  205. ASSERT_OK(b2.Put("b", "vb"));
  206. ASSERT_OK(WriteBatchInternal::Append(&b1, &b2));
  207. ASSERT_EQ(
  208. "Put(a, va)@200"
  209. "Put(b, vb)@201",
  210. PrintContents(&b1));
  211. ASSERT_EQ(2u, b1.Count());
  212. ASSERT_OK(b2.Delete("foo"));
  213. ASSERT_OK(WriteBatchInternal::Append(&b1, &b2));
  214. ASSERT_EQ(
  215. "Put(a, va)@200"
  216. "Put(b, vb)@202"
  217. "Put(b, vb)@201"
  218. "Delete(foo)@203",
  219. PrintContents(&b1));
  220. ASSERT_EQ(4u, b1.Count());
  221. b2.Clear();
  222. ASSERT_OK(b2.Put("c", "cc"));
  223. ASSERT_OK(b2.Put("d", "dd"));
  224. b2.MarkWalTerminationPoint();
  225. ASSERT_OK(b2.Put("e", "ee"));
  226. ASSERT_OK(WriteBatchInternal::Append(&b1, &b2, /*wal only*/ true));
  227. ASSERT_EQ(
  228. "Put(a, va)@200"
  229. "Put(b, vb)@202"
  230. "Put(b, vb)@201"
  231. "Put(c, cc)@204"
  232. "Put(d, dd)@205"
  233. "Delete(foo)@203",
  234. PrintContents(&b1));
  235. ASSERT_EQ(6u, b1.Count());
  236. ASSERT_EQ(
  237. "Put(c, cc)@0"
  238. "Put(d, dd)@1"
  239. "Put(e, ee)@2",
  240. PrintContents(&b2));
  241. ASSERT_EQ(3u, b2.Count());
  242. }
  243. TEST_F(WriteBatchTest, SingleDeletion) {
  244. WriteBatch batch;
  245. WriteBatchInternal::SetSequence(&batch, 100);
  246. ASSERT_EQ("", PrintContents(&batch));
  247. ASSERT_EQ(0u, batch.Count());
  248. ASSERT_OK(batch.Put("a", "va"));
  249. ASSERT_EQ("Put(a, va)@100", PrintContents(&batch));
  250. ASSERT_EQ(1u, batch.Count());
  251. ASSERT_OK(batch.SingleDelete("a"));
  252. ASSERT_EQ(
  253. "SingleDelete(a)@101"
  254. "Put(a, va)@100",
  255. PrintContents(&batch));
  256. ASSERT_EQ(2u, batch.Count());
  257. }
  258. TEST_F(WriteBatchTest, OwnershipTransfer) {
  259. Random rnd(301);
  260. WriteBatch put_batch;
  261. ASSERT_OK(put_batch.Put(rnd.RandomString(16) /* key */,
  262. rnd.RandomString(1024) /* value */));
  263. // (1) Verify `Release()` transfers string data ownership
  264. const char* expected_data = put_batch.Data().data();
  265. std::string batch_str = put_batch.Release();
  266. ASSERT_EQ(expected_data, batch_str.data());
  267. // (2) Verify constructor transfers string data ownership
  268. WriteBatch move_batch(std::move(batch_str));
  269. ASSERT_EQ(expected_data, move_batch.Data().data());
  270. }
  271. namespace {
  272. struct TestHandler : public WriteBatch::Handler {
  273. std::string seen;
  274. Status PutCF(uint32_t column_family_id, const Slice& key,
  275. const Slice& value) override {
  276. if (column_family_id == 0) {
  277. seen += "Put(" + key.ToString() + ", " + value.ToString() + ")";
  278. } else {
  279. seen += "PutCF(" + std::to_string(column_family_id) + ", " +
  280. key.ToString() + ", " + value.ToString() + ")";
  281. }
  282. return Status::OK();
  283. }
  284. Status TimedPutCF(uint32_t column_family_id, const Slice& key,
  285. const Slice& value, uint64_t unix_write_time) override {
  286. if (column_family_id == 0) {
  287. seen += "TimedPut(" + key.ToString() + ", " + value.ToString() + ", " +
  288. std::to_string(unix_write_time) + ")";
  289. } else {
  290. seen += "TimedPutCF(" + std::to_string(column_family_id) + ", " +
  291. key.ToString() + ", " + value.ToString() + ", " +
  292. std::to_string(unix_write_time) + ")";
  293. }
  294. return Status::OK();
  295. }
  296. Status PutEntityCF(uint32_t column_family_id, const Slice& key,
  297. const Slice& entity) override {
  298. std::ostringstream oss;
  299. Status s = WideColumnsHelper::DumpSliceAsWideColumns(entity, oss, false);
  300. if (!s.ok()) {
  301. return s;
  302. }
  303. if (column_family_id == 0) {
  304. seen += "PutEntity(" + key.ToString() + ", " + oss.str() + ")";
  305. } else {
  306. seen += "PutEntityCF(" + std::to_string(column_family_id) + ", " +
  307. key.ToString() + ", " + oss.str() + ")";
  308. }
  309. return Status::OK();
  310. }
  311. Status DeleteCF(uint32_t column_family_id, const Slice& key) override {
  312. if (column_family_id == 0) {
  313. seen += "Delete(" + key.ToString() + ")";
  314. } else {
  315. seen += "DeleteCF(" + std::to_string(column_family_id) + ", " +
  316. key.ToString() + ")";
  317. }
  318. return Status::OK();
  319. }
  320. Status SingleDeleteCF(uint32_t column_family_id, const Slice& key) override {
  321. if (column_family_id == 0) {
  322. seen += "SingleDelete(" + key.ToString() + ")";
  323. } else {
  324. seen += "SingleDeleteCF(" + std::to_string(column_family_id) + ", " +
  325. key.ToString() + ")";
  326. }
  327. return Status::OK();
  328. }
  329. Status DeleteRangeCF(uint32_t column_family_id, const Slice& begin_key,
  330. const Slice& end_key) override {
  331. if (column_family_id == 0) {
  332. seen += "DeleteRange(" + begin_key.ToString() + ", " +
  333. end_key.ToString() + ")";
  334. } else {
  335. seen += "DeleteRangeCF(" + std::to_string(column_family_id) + ", " +
  336. begin_key.ToString() + ", " + end_key.ToString() + ")";
  337. }
  338. return Status::OK();
  339. }
  340. Status MergeCF(uint32_t column_family_id, const Slice& key,
  341. const Slice& value) override {
  342. if (column_family_id == 0) {
  343. seen += "Merge(" + key.ToString() + ", " + value.ToString() + ")";
  344. } else {
  345. seen += "MergeCF(" + std::to_string(column_family_id) + ", " +
  346. key.ToString() + ", " + value.ToString() + ")";
  347. }
  348. return Status::OK();
  349. }
  350. void LogData(const Slice& blob) override {
  351. seen += "LogData(" + blob.ToString() + ")";
  352. }
  353. Status MarkBeginPrepare(bool unprepare) override {
  354. seen +=
  355. "MarkBeginPrepare(" + std::string(unprepare ? "true" : "false") + ")";
  356. return Status::OK();
  357. }
  358. Status MarkEndPrepare(const Slice& xid) override {
  359. seen += "MarkEndPrepare(" + xid.ToString() + ")";
  360. return Status::OK();
  361. }
  362. Status MarkNoop(bool empty_batch) override {
  363. seen += "MarkNoop(" + std::string(empty_batch ? "true" : "false") + ")";
  364. return Status::OK();
  365. }
  366. Status MarkCommit(const Slice& xid) override {
  367. seen += "MarkCommit(" + xid.ToString() + ")";
  368. return Status::OK();
  369. }
  370. Status MarkCommitWithTimestamp(const Slice& xid, const Slice& ts) override {
  371. seen += "MarkCommitWithTimestamp(" + xid.ToString() + ", " +
  372. ts.ToString(true) + ")";
  373. return Status::OK();
  374. }
  375. Status MarkRollback(const Slice& xid) override {
  376. seen += "MarkRollback(" + xid.ToString() + ")";
  377. return Status::OK();
  378. }
  379. };
  380. } // anonymous namespace
  381. TEST_F(WriteBatchTest, PutNotImplemented) {
  382. WriteBatch batch;
  383. ASSERT_OK(batch.Put(Slice("k1"), Slice("v1")));
  384. ASSERT_EQ(1u, batch.Count());
  385. ASSERT_EQ("Put(k1, v1)@0", PrintContents(&batch));
  386. WriteBatch::Handler handler;
  387. ASSERT_OK(batch.Iterate(&handler));
  388. }
  389. TEST_F(WriteBatchTest, TimedPutNotImplemented) {
  390. WriteBatch batch;
  391. ASSERT_OK(
  392. batch.TimedPut(0, Slice("k1"), Slice("v1"), /*write_unix_time=*/30));
  393. ASSERT_EQ(1u, batch.Count());
  394. ASSERT_EQ("TimedPut(k1, v1, 30)@0", PrintContents(&batch));
  395. WriteBatch::Handler handler;
  396. ASSERT_TRUE(batch.Iterate(&handler).IsInvalidArgument());
  397. batch.Clear();
  398. ASSERT_OK(
  399. batch.TimedPut(0, Slice("k1"), Slice("v1"),
  400. /*write_unix_time=*/std::numeric_limits<uint64_t>::max()));
  401. ASSERT_EQ(1u, batch.Count());
  402. ASSERT_EQ("Put(k1, v1)@0", PrintContents(&batch));
  403. }
  404. TEST_F(WriteBatchTest, DeleteNotImplemented) {
  405. WriteBatch batch;
  406. ASSERT_OK(batch.Delete(Slice("k2")));
  407. ASSERT_EQ(1u, batch.Count());
  408. ASSERT_EQ("Delete(k2)@0", PrintContents(&batch));
  409. WriteBatch::Handler handler;
  410. ASSERT_OK(batch.Iterate(&handler));
  411. }
  412. TEST_F(WriteBatchTest, SingleDeleteNotImplemented) {
  413. WriteBatch batch;
  414. ASSERT_OK(batch.SingleDelete(Slice("k2")));
  415. ASSERT_EQ(1u, batch.Count());
  416. ASSERT_EQ("SingleDelete(k2)@0", PrintContents(&batch));
  417. WriteBatch::Handler handler;
  418. ASSERT_OK(batch.Iterate(&handler));
  419. }
  420. TEST_F(WriteBatchTest, MergeNotImplemented) {
  421. WriteBatch batch;
  422. ASSERT_OK(batch.Merge(Slice("foo"), Slice("bar")));
  423. ASSERT_EQ(1u, batch.Count());
  424. ASSERT_EQ("Merge(foo, bar)@0", PrintContents(&batch));
  425. WriteBatch::Handler handler;
  426. ASSERT_OK(batch.Iterate(&handler));
  427. }
  428. TEST_F(WriteBatchTest, MergeWithoutOperatorInsertionFailure) {
  429. WriteBatch batch;
  430. ASSERT_OK(batch.Merge(Slice("foo"), Slice("bar")));
  431. ASSERT_EQ(1u, batch.Count());
  432. ASSERT_EQ(
  433. "Invalid argument: Merge requires `ColumnFamilyOptions::merge_operator "
  434. "!= nullptr`",
  435. PrintContents(&batch, false /* merge_operator_supported */));
  436. }
  437. TEST_F(WriteBatchTest, Blob) {
  438. WriteBatch batch;
  439. ASSERT_OK(batch.Put(Slice("k1"), Slice("v1")));
  440. ASSERT_OK(batch.Put(Slice("k2"), Slice("v2")));
  441. ASSERT_OK(batch.Put(Slice("k3"), Slice("v3")));
  442. ASSERT_OK(batch.PutLogData(Slice("blob1")));
  443. ASSERT_OK(batch.Delete(Slice("k2")));
  444. ASSERT_OK(batch.SingleDelete(Slice("k3")));
  445. ASSERT_OK(batch.PutLogData(Slice("blob2")));
  446. ASSERT_OK(batch.Merge(Slice("foo"), Slice("bar")));
  447. ASSERT_EQ(6u, batch.Count());
  448. ASSERT_EQ(
  449. "Merge(foo, bar)@5"
  450. "Put(k1, v1)@0"
  451. "Delete(k2)@3"
  452. "Put(k2, v2)@1"
  453. "SingleDelete(k3)@4"
  454. "Put(k3, v3)@2",
  455. PrintContents(&batch));
  456. TestHandler handler;
  457. ASSERT_OK(batch.Iterate(&handler));
  458. ASSERT_EQ(
  459. "Put(k1, v1)"
  460. "Put(k2, v2)"
  461. "Put(k3, v3)"
  462. "LogData(blob1)"
  463. "Delete(k2)"
  464. "SingleDelete(k3)"
  465. "LogData(blob2)"
  466. "Merge(foo, bar)",
  467. handler.seen);
  468. }
  469. TEST_F(WriteBatchTest, PrepareCommit) {
  470. WriteBatch batch;
  471. ASSERT_OK(WriteBatchInternal::InsertNoop(&batch));
  472. ASSERT_OK(batch.Put(Slice("k1"), Slice("v1")));
  473. ASSERT_OK(batch.Put(Slice("k2"), Slice("v2")));
  474. batch.SetSavePoint();
  475. ASSERT_OK(WriteBatchInternal::MarkEndPrepare(&batch, Slice("xid1")));
  476. Status s = batch.RollbackToSavePoint();
  477. ASSERT_EQ(s, Status::NotFound());
  478. ASSERT_OK(WriteBatchInternal::MarkCommit(&batch, Slice("xid1")));
  479. ASSERT_OK(WriteBatchInternal::MarkRollback(&batch, Slice("xid1")));
  480. ASSERT_EQ(2u, batch.Count());
  481. TestHandler handler;
  482. ASSERT_OK(batch.Iterate(&handler));
  483. ASSERT_EQ(
  484. "MarkBeginPrepare(false)"
  485. "Put(k1, v1)"
  486. "Put(k2, v2)"
  487. "MarkEndPrepare(xid1)"
  488. "MarkCommit(xid1)"
  489. "MarkRollback(xid1)",
  490. handler.seen);
  491. }
  492. // It requires more than 30GB of memory to run the test. With single memory
  493. // allocation of more than 30GB.
  494. // Not all platform can run it. Also it runs a long time. So disable it.
  495. TEST_F(WriteBatchTest, DISABLED_ManyUpdates) {
  496. // Insert key and value of 3GB and push total batch size to 12GB.
  497. static const size_t kKeyValueSize = 4u;
  498. static const uint32_t kNumUpdates = uint32_t{3} << 30;
  499. std::string raw(kKeyValueSize, 'A');
  500. WriteBatch batch(kNumUpdates * (4 + kKeyValueSize * 2) + 1024u);
  501. char c = 'A';
  502. for (uint32_t i = 0; i < kNumUpdates; i++) {
  503. if (c > 'Z') {
  504. c = 'A';
  505. }
  506. raw[0] = c;
  507. raw[raw.length() - 1] = c;
  508. c++;
  509. ASSERT_OK(batch.Put(raw, raw));
  510. }
  511. ASSERT_EQ(kNumUpdates, batch.Count());
  512. struct NoopHandler : public WriteBatch::Handler {
  513. uint32_t num_seen = 0;
  514. char expected_char = 'A';
  515. Status PutCF(uint32_t /*column_family_id*/, const Slice& key,
  516. const Slice& value) override {
  517. EXPECT_EQ(kKeyValueSize, key.size());
  518. EXPECT_EQ(kKeyValueSize, value.size());
  519. EXPECT_EQ(expected_char, key[0]);
  520. EXPECT_EQ(expected_char, value[0]);
  521. EXPECT_EQ(expected_char, key[kKeyValueSize - 1]);
  522. EXPECT_EQ(expected_char, value[kKeyValueSize - 1]);
  523. expected_char++;
  524. if (expected_char > 'Z') {
  525. expected_char = 'A';
  526. }
  527. ++num_seen;
  528. return Status::OK();
  529. }
  530. Status DeleteCF(uint32_t /*column_family_id*/,
  531. const Slice& /*key*/) override {
  532. ADD_FAILURE();
  533. return Status::OK();
  534. }
  535. Status SingleDeleteCF(uint32_t /*column_family_id*/,
  536. const Slice& /*key*/) override {
  537. ADD_FAILURE();
  538. return Status::OK();
  539. }
  540. Status MergeCF(uint32_t /*column_family_id*/, const Slice& /*key*/,
  541. const Slice& /*value*/) override {
  542. ADD_FAILURE();
  543. return Status::OK();
  544. }
  545. void LogData(const Slice& /*blob*/) override { ADD_FAILURE(); }
  546. bool Continue() override { return num_seen < kNumUpdates; }
  547. } handler;
  548. ASSERT_OK(batch.Iterate(&handler));
  549. ASSERT_EQ(kNumUpdates, handler.num_seen);
  550. }
  551. // The test requires more than 18GB memory to run it, with single memory
  552. // allocation of more than 12GB. Not all the platform can run it. So disable it.
  553. TEST_F(WriteBatchTest, DISABLED_LargeKeyValue) {
  554. // Insert key and value of 3GB and push total batch size to 12GB.
  555. static const size_t kKeyValueSize = 3221225472u;
  556. std::string raw(kKeyValueSize, 'A');
  557. WriteBatch batch(size_t(12884901888ull + 1024u));
  558. for (char i = 0; i < 2; i++) {
  559. raw[0] = 'A' + i;
  560. raw[raw.length() - 1] = 'A' - i;
  561. ASSERT_OK(batch.Put(raw, raw));
  562. }
  563. ASSERT_EQ(2u, batch.Count());
  564. struct NoopHandler : public WriteBatch::Handler {
  565. int num_seen = 0;
  566. Status PutCF(uint32_t /*column_family_id*/, const Slice& key,
  567. const Slice& value) override {
  568. EXPECT_EQ(kKeyValueSize, key.size());
  569. EXPECT_EQ(kKeyValueSize, value.size());
  570. EXPECT_EQ('A' + num_seen, key[0]);
  571. EXPECT_EQ('A' + num_seen, value[0]);
  572. EXPECT_EQ('A' - num_seen, key[kKeyValueSize - 1]);
  573. EXPECT_EQ('A' - num_seen, value[kKeyValueSize - 1]);
  574. ++num_seen;
  575. return Status::OK();
  576. }
  577. Status DeleteCF(uint32_t /*column_family_id*/,
  578. const Slice& /*key*/) override {
  579. ADD_FAILURE();
  580. return Status::OK();
  581. }
  582. Status SingleDeleteCF(uint32_t /*column_family_id*/,
  583. const Slice& /*key*/) override {
  584. ADD_FAILURE();
  585. return Status::OK();
  586. }
  587. Status MergeCF(uint32_t /*column_family_id*/, const Slice& /*key*/,
  588. const Slice& /*value*/) override {
  589. ADD_FAILURE();
  590. return Status::OK();
  591. }
  592. void LogData(const Slice& /*blob*/) override { ADD_FAILURE(); }
  593. bool Continue() override { return num_seen < 2; }
  594. } handler;
  595. ASSERT_OK(batch.Iterate(&handler));
  596. ASSERT_EQ(2, handler.num_seen);
  597. }
  598. TEST_F(WriteBatchTest, Continue) {
  599. WriteBatch batch;
  600. struct Handler : public TestHandler {
  601. int num_seen = 0;
  602. Status PutCF(uint32_t column_family_id, const Slice& key,
  603. const Slice& value) override {
  604. ++num_seen;
  605. return TestHandler::PutCF(column_family_id, key, value);
  606. }
  607. Status DeleteCF(uint32_t column_family_id, const Slice& key) override {
  608. ++num_seen;
  609. return TestHandler::DeleteCF(column_family_id, key);
  610. }
  611. Status SingleDeleteCF(uint32_t column_family_id,
  612. const Slice& key) override {
  613. ++num_seen;
  614. return TestHandler::SingleDeleteCF(column_family_id, key);
  615. }
  616. Status MergeCF(uint32_t column_family_id, const Slice& key,
  617. const Slice& value) override {
  618. ++num_seen;
  619. return TestHandler::MergeCF(column_family_id, key, value);
  620. }
  621. void LogData(const Slice& blob) override {
  622. ++num_seen;
  623. TestHandler::LogData(blob);
  624. }
  625. bool Continue() override { return num_seen < 5; }
  626. } handler;
  627. ASSERT_OK(batch.Put(Slice("k1"), Slice("v1")));
  628. ASSERT_OK(batch.Put(Slice("k2"), Slice("v2")));
  629. ASSERT_OK(batch.PutLogData(Slice("blob1")));
  630. ASSERT_OK(batch.Delete(Slice("k1")));
  631. ASSERT_OK(batch.SingleDelete(Slice("k2")));
  632. ASSERT_OK(batch.PutLogData(Slice("blob2")));
  633. ASSERT_OK(batch.Merge(Slice("foo"), Slice("bar")));
  634. ASSERT_OK(batch.Iterate(&handler));
  635. ASSERT_EQ(
  636. "Put(k1, v1)"
  637. "Put(k2, v2)"
  638. "LogData(blob1)"
  639. "Delete(k1)"
  640. "SingleDelete(k2)",
  641. handler.seen);
  642. }
  643. TEST_F(WriteBatchTest, PutGatherSlices) {
  644. WriteBatch batch;
  645. ASSERT_OK(batch.Put(Slice("foo"), Slice("bar")));
  646. {
  647. // Try a write where the key is one slice but the value is two
  648. Slice key_slice("baz");
  649. Slice value_slices[2] = {Slice("header"), Slice("payload")};
  650. ASSERT_OK(
  651. batch.Put(SliceParts(&key_slice, 1), SliceParts(value_slices, 2)));
  652. }
  653. {
  654. // One where the key is composite but the value is a single slice
  655. Slice key_slices[3] = {Slice("key"), Slice("part2"), Slice("part3")};
  656. Slice value_slice("value");
  657. ASSERT_OK(
  658. batch.Put(SliceParts(key_slices, 3), SliceParts(&value_slice, 1)));
  659. }
  660. WriteBatchInternal::SetSequence(&batch, 100);
  661. ASSERT_EQ(
  662. "Put(baz, headerpayload)@101"
  663. "Put(foo, bar)@100"
  664. "Put(keypart2part3, value)@102",
  665. PrintContents(&batch));
  666. ASSERT_EQ(3u, batch.Count());
  667. }
  668. namespace {
  669. class ColumnFamilyHandleImplDummy : public ColumnFamilyHandleImpl {
  670. public:
  671. explicit ColumnFamilyHandleImplDummy(int id)
  672. : ColumnFamilyHandleImpl(nullptr, nullptr, nullptr), id_(id) {}
  673. explicit ColumnFamilyHandleImplDummy(int id, const Comparator* ucmp)
  674. : ColumnFamilyHandleImpl(nullptr, nullptr, nullptr),
  675. id_(id),
  676. ucmp_(ucmp) {}
  677. uint32_t GetID() const override { return id_; }
  678. const Comparator* GetComparator() const override { return ucmp_; }
  679. private:
  680. uint32_t id_;
  681. const Comparator* const ucmp_ = BytewiseComparator();
  682. };
  683. } // anonymous namespace
  684. TEST_F(WriteBatchTest, AttributeGroupTest) {
  685. WriteBatch batch;
  686. ColumnFamilyHandleImplDummy zero(0), two(2);
  687. AttributeGroups foo_ags;
  688. WideColumn zero_col_1{"0_c_1_n", "0_c_1_v"};
  689. WideColumn zero_col_2{"0_c_2_n", "0_c_2_v"};
  690. WideColumns zero_col_1_col_2{zero_col_1, zero_col_2};
  691. WideColumn two_col_1{"2_c_1_n", "2_c_1_v"};
  692. WideColumn two_col_2{"2_c_2_n", "2_c_2_v"};
  693. WideColumns two_col_1_col_2{two_col_1, two_col_2};
  694. foo_ags.emplace_back(&zero, zero_col_1_col_2);
  695. foo_ags.emplace_back(&two, two_col_1_col_2);
  696. ASSERT_OK(batch.PutEntity("foo", foo_ags));
  697. TestHandler handler;
  698. ASSERT_OK(batch.Iterate(&handler));
  699. ASSERT_EQ(
  700. "PutEntity(foo, 0_c_1_n:0_c_1_v "
  701. "0_c_2_n:0_c_2_v)"
  702. "PutEntityCF(2, foo, 2_c_1_n:2_c_1_v "
  703. "2_c_2_n:2_c_2_v)",
  704. handler.seen);
  705. }
  706. TEST_F(WriteBatchTest, AttributeGroupSavePointTest) {
  707. WriteBatch batch;
  708. batch.SetSavePoint();
  709. ColumnFamilyHandleImplDummy zero(0), two(2), three(3);
  710. AttributeGroups foo_ags;
  711. WideColumn zero_col_1{"0_c_1_n", "0_c_1_v"};
  712. WideColumn zero_col_2{"0_c_2_n", "0_c_2_v"};
  713. WideColumns zero_col_1_col_2{zero_col_1, zero_col_2};
  714. WideColumn two_col_1{"2_c_1_n", "2_c_1_v"};
  715. WideColumn two_col_2{"2_c_2_n", "2_c_2_v"};
  716. WideColumns two_col_1_col_2{two_col_1, two_col_2};
  717. foo_ags.emplace_back(&zero, zero_col_1_col_2);
  718. foo_ags.emplace_back(&two, two_col_1_col_2);
  719. AttributeGroups bar_ags;
  720. WideColumn three_col_1{"3_c_1_n", "3_c_1_v"};
  721. WideColumn three_col_2{"3_c_2_n", "3_c_2_v"};
  722. WideColumns three_col_1_col_2{three_col_1, three_col_2};
  723. bar_ags.emplace_back(&zero, zero_col_1_col_2);
  724. bar_ags.emplace_back(&three, three_col_1_col_2);
  725. ASSERT_OK(batch.PutEntity("foo", foo_ags));
  726. batch.SetSavePoint();
  727. ASSERT_OK(batch.PutEntity("bar", bar_ags));
  728. TestHandler handler;
  729. ASSERT_OK(batch.Iterate(&handler));
  730. ASSERT_EQ(
  731. "PutEntity(foo, 0_c_1_n:0_c_1_v 0_c_2_n:0_c_2_v)"
  732. "PutEntityCF(2, foo, 2_c_1_n:2_c_1_v 2_c_2_n:2_c_2_v)"
  733. "PutEntity(bar, 0_c_1_n:0_c_1_v 0_c_2_n:0_c_2_v)"
  734. "PutEntityCF(3, bar, 3_c_1_n:3_c_1_v 3_c_2_n:3_c_2_v)",
  735. handler.seen);
  736. ASSERT_OK(batch.RollbackToSavePoint());
  737. handler.seen.clear();
  738. ASSERT_OK(batch.Iterate(&handler));
  739. ASSERT_EQ(
  740. "PutEntity(foo, 0_c_1_n:0_c_1_v 0_c_2_n:0_c_2_v)"
  741. "PutEntityCF(2, foo, 2_c_1_n:2_c_1_v 2_c_2_n:2_c_2_v)",
  742. handler.seen);
  743. }
  744. TEST_F(WriteBatchTest, ColumnFamiliesBatchTest) {
  745. WriteBatch batch;
  746. ColumnFamilyHandleImplDummy zero(0), two(2), three(3), eight(8);
  747. ASSERT_OK(batch.Put(&zero, Slice("foo"), Slice("bar")));
  748. ASSERT_OK(batch.Put(&two, Slice("twofoo"), Slice("bar2")));
  749. ASSERT_OK(batch.Put(&eight, Slice("eightfoo"), Slice("bar8")));
  750. ASSERT_OK(batch.Delete(&eight, Slice("eightfoo")));
  751. ASSERT_OK(batch.SingleDelete(&two, Slice("twofoo")));
  752. ASSERT_OK(batch.DeleteRange(&two, Slice("3foo"), Slice("4foo")));
  753. ASSERT_OK(batch.Merge(&three, Slice("threethree"), Slice("3three")));
  754. ASSERT_OK(batch.Put(&zero, Slice("foo"), Slice("bar")));
  755. ASSERT_OK(batch.Merge(Slice("omom"), Slice("nom")));
  756. ASSERT_OK(batch.TimedPut(&zero, Slice("foo"), Slice("bar"),
  757. /*write_unix_time*/ 0u));
  758. TestHandler handler;
  759. ASSERT_OK(batch.Iterate(&handler));
  760. ASSERT_EQ(
  761. "Put(foo, bar)"
  762. "PutCF(2, twofoo, bar2)"
  763. "PutCF(8, eightfoo, bar8)"
  764. "DeleteCF(8, eightfoo)"
  765. "SingleDeleteCF(2, twofoo)"
  766. "DeleteRangeCF(2, 3foo, 4foo)"
  767. "MergeCF(3, threethree, 3three)"
  768. "Put(foo, bar)"
  769. "Merge(omom, nom)"
  770. "TimedPut(foo, bar, 0)",
  771. handler.seen);
  772. }
  773. TEST_F(WriteBatchTest, ColumnFamiliesBatchWithIndexTest) {
  774. WriteBatchWithIndex batch;
  775. ColumnFamilyHandleImplDummy zero(0), two(2), three(3), eight(8);
  776. ASSERT_OK(batch.Put(&zero, Slice("foo"), Slice("bar")));
  777. ASSERT_OK(batch.Put(&two, Slice("twofoo"), Slice("bar2")));
  778. ASSERT_OK(batch.Put(&eight, Slice("eightfoo"), Slice("bar8")));
  779. ASSERT_OK(batch.Delete(&eight, Slice("eightfoo")));
  780. ASSERT_OK(batch.SingleDelete(&two, Slice("twofoo")));
  781. ASSERT_OK(batch.Merge(&three, Slice("threethree"), Slice("3three")));
  782. ASSERT_OK(batch.Put(&zero, Slice("foo"), Slice("bar")));
  783. ASSERT_OK(batch.Merge(Slice("omom"), Slice("nom")));
  784. ASSERT_TRUE(
  785. batch.TimedPut(&zero, Slice("foo"), Slice("bar"), 0u).IsNotSupported());
  786. std::unique_ptr<WBWIIterator> iter;
  787. iter.reset(batch.NewIterator(&eight));
  788. iter->Seek("eightfoo");
  789. ASSERT_OK(iter->status());
  790. ASSERT_TRUE(iter->Valid());
  791. // For the same key, most recent update is ordered first.
  792. ASSERT_EQ(WriteType::kDeleteRecord, iter->Entry().type);
  793. ASSERT_EQ("eightfoo", iter->Entry().key.ToString());
  794. iter->Next();
  795. ASSERT_OK(iter->status());
  796. ASSERT_TRUE(iter->Valid());
  797. ASSERT_EQ(WriteType::kPutRecord, iter->Entry().type);
  798. ASSERT_EQ("eightfoo", iter->Entry().key.ToString());
  799. ASSERT_EQ("bar8", iter->Entry().value.ToString());
  800. iter->Next();
  801. ASSERT_OK(iter->status());
  802. ASSERT_TRUE(!iter->Valid());
  803. iter.reset(batch.NewIterator(&two));
  804. iter->Seek("twofoo");
  805. ASSERT_OK(iter->status());
  806. ASSERT_TRUE(iter->Valid());
  807. ASSERT_EQ(WriteType::kSingleDeleteRecord, iter->Entry().type);
  808. ASSERT_EQ("twofoo", iter->Entry().key.ToString());
  809. iter->Next();
  810. ASSERT_OK(iter->status());
  811. ASSERT_TRUE(iter->Valid());
  812. ASSERT_EQ(WriteType::kPutRecord, iter->Entry().type);
  813. ASSERT_EQ("twofoo", iter->Entry().key.ToString());
  814. ASSERT_EQ("bar2", iter->Entry().value.ToString());
  815. iter->Next();
  816. ASSERT_OK(iter->status());
  817. ASSERT_TRUE(!iter->Valid());
  818. iter.reset(batch.NewIterator());
  819. iter->Seek("gggg");
  820. ASSERT_OK(iter->status());
  821. ASSERT_TRUE(iter->Valid());
  822. ASSERT_EQ(WriteType::kMergeRecord, iter->Entry().type);
  823. ASSERT_EQ("omom", iter->Entry().key.ToString());
  824. ASSERT_EQ("nom", iter->Entry().value.ToString());
  825. iter->Next();
  826. ASSERT_OK(iter->status());
  827. ASSERT_TRUE(!iter->Valid());
  828. iter.reset(batch.NewIterator(&zero));
  829. iter->Seek("foo");
  830. ASSERT_OK(iter->status());
  831. ASSERT_TRUE(iter->Valid());
  832. ASSERT_EQ(WriteType::kPutRecord, iter->Entry().type);
  833. ASSERT_EQ("foo", iter->Entry().key.ToString());
  834. ASSERT_EQ("bar", iter->Entry().value.ToString());
  835. iter->Next();
  836. ASSERT_OK(iter->status());
  837. ASSERT_TRUE(iter->Valid());
  838. ASSERT_EQ(WriteType::kPutRecord, iter->Entry().type);
  839. ASSERT_EQ("foo", iter->Entry().key.ToString());
  840. ASSERT_EQ("bar", iter->Entry().value.ToString());
  841. iter->Next();
  842. ASSERT_OK(iter->status());
  843. ASSERT_TRUE(iter->Valid());
  844. ASSERT_EQ(WriteType::kMergeRecord, iter->Entry().type);
  845. ASSERT_EQ("omom", iter->Entry().key.ToString());
  846. ASSERT_EQ("nom", iter->Entry().value.ToString());
  847. iter->Next();
  848. ASSERT_OK(iter->status());
  849. ASSERT_TRUE(!iter->Valid());
  850. TestHandler handler;
  851. ASSERT_OK(batch.GetWriteBatch()->Iterate(&handler));
  852. ASSERT_EQ(
  853. "Put(foo, bar)"
  854. "PutCF(2, twofoo, bar2)"
  855. "PutCF(8, eightfoo, bar8)"
  856. "DeleteCF(8, eightfoo)"
  857. "SingleDeleteCF(2, twofoo)"
  858. "MergeCF(3, threethree, 3three)"
  859. "Put(foo, bar)"
  860. "Merge(omom, nom)",
  861. handler.seen);
  862. }
  863. TEST_F(WriteBatchTest, SavePointTest) {
  864. Status s;
  865. WriteBatch batch;
  866. batch.SetSavePoint();
  867. ASSERT_OK(batch.Put("A", "a"));
  868. ASSERT_OK(batch.Put("B", "b"));
  869. batch.SetSavePoint();
  870. ASSERT_OK(batch.Put("C", "c"));
  871. ASSERT_OK(batch.Delete("A"));
  872. batch.SetSavePoint();
  873. batch.SetSavePoint();
  874. ASSERT_OK(batch.RollbackToSavePoint());
  875. ASSERT_EQ(
  876. "Delete(A)@3"
  877. "Put(A, a)@0"
  878. "Put(B, b)@1"
  879. "Put(C, c)@2",
  880. PrintContents(&batch));
  881. ASSERT_OK(batch.RollbackToSavePoint());
  882. ASSERT_OK(batch.RollbackToSavePoint());
  883. ASSERT_EQ(
  884. "Put(A, a)@0"
  885. "Put(B, b)@1",
  886. PrintContents(&batch));
  887. ASSERT_OK(batch.Delete("A"));
  888. ASSERT_OK(batch.Put("B", "bb"));
  889. ASSERT_OK(batch.RollbackToSavePoint());
  890. ASSERT_EQ("", PrintContents(&batch));
  891. s = batch.RollbackToSavePoint();
  892. ASSERT_TRUE(s.IsNotFound());
  893. ASSERT_EQ("", PrintContents(&batch));
  894. ASSERT_OK(batch.Put("D", "d"));
  895. ASSERT_OK(batch.Delete("A"));
  896. batch.SetSavePoint();
  897. ASSERT_OK(batch.Put("A", "aaa"));
  898. ASSERT_OK(batch.RollbackToSavePoint());
  899. ASSERT_EQ(
  900. "Delete(A)@1"
  901. "Put(D, d)@0",
  902. PrintContents(&batch));
  903. batch.SetSavePoint();
  904. ASSERT_OK(batch.Put("D", "d"));
  905. ASSERT_OK(batch.Delete("A"));
  906. ASSERT_OK(batch.RollbackToSavePoint());
  907. ASSERT_EQ(
  908. "Delete(A)@1"
  909. "Put(D, d)@0",
  910. PrintContents(&batch));
  911. s = batch.RollbackToSavePoint();
  912. ASSERT_TRUE(s.IsNotFound());
  913. ASSERT_EQ(
  914. "Delete(A)@1"
  915. "Put(D, d)@0",
  916. PrintContents(&batch));
  917. WriteBatch batch2;
  918. s = batch2.RollbackToSavePoint();
  919. ASSERT_TRUE(s.IsNotFound());
  920. ASSERT_EQ("", PrintContents(&batch2));
  921. ASSERT_OK(batch2.Delete("A"));
  922. batch2.SetSavePoint();
  923. s = batch2.RollbackToSavePoint();
  924. ASSERT_OK(s);
  925. ASSERT_EQ("Delete(A)@0", PrintContents(&batch2));
  926. batch2.Clear();
  927. ASSERT_EQ("", PrintContents(&batch2));
  928. batch2.SetSavePoint();
  929. ASSERT_OK(batch2.Delete("B"));
  930. ASSERT_EQ("Delete(B)@0", PrintContents(&batch2));
  931. batch2.SetSavePoint();
  932. s = batch2.RollbackToSavePoint();
  933. ASSERT_OK(s);
  934. ASSERT_EQ("Delete(B)@0", PrintContents(&batch2));
  935. s = batch2.RollbackToSavePoint();
  936. ASSERT_OK(s);
  937. ASSERT_EQ("", PrintContents(&batch2));
  938. s = batch2.RollbackToSavePoint();
  939. ASSERT_TRUE(s.IsNotFound());
  940. ASSERT_EQ("", PrintContents(&batch2));
  941. WriteBatch batch3;
  942. s = batch3.PopSavePoint();
  943. ASSERT_TRUE(s.IsNotFound());
  944. ASSERT_EQ("", PrintContents(&batch3));
  945. batch3.SetSavePoint();
  946. ASSERT_OK(batch3.Delete("A"));
  947. s = batch3.PopSavePoint();
  948. ASSERT_OK(s);
  949. ASSERT_EQ("Delete(A)@0", PrintContents(&batch3));
  950. }
  951. TEST_F(WriteBatchTest, MemoryLimitTest) {
  952. Status s;
  953. // The header size is 12 bytes. The two Puts take 8 bytes which gives total
  954. // of 12 + 8 * 2 = 28 bytes.
  955. WriteBatch batch(0, 28);
  956. ASSERT_OK(batch.Put("a", "...."));
  957. ASSERT_OK(batch.Put("b", "...."));
  958. s = batch.Put("c", "....");
  959. ASSERT_TRUE(s.IsMemoryLimit());
  960. }
  961. namespace {
  962. class TimestampChecker : public WriteBatch::Handler {
  963. public:
  964. explicit TimestampChecker(
  965. std::unordered_map<uint32_t, const Comparator*> cf_to_ucmps, Slice ts)
  966. : cf_to_ucmps_(std::move(cf_to_ucmps)), timestamp_(std::move(ts)) {}
  967. Status PutCF(uint32_t cf, const Slice& key, const Slice& /*value*/) override {
  968. auto cf_iter = cf_to_ucmps_.find(cf);
  969. if (cf_iter == cf_to_ucmps_.end()) {
  970. return Status::Corruption();
  971. }
  972. const Comparator* const ucmp = cf_iter->second;
  973. assert(ucmp);
  974. size_t ts_sz = ucmp->timestamp_size();
  975. if (ts_sz == 0) {
  976. return Status::OK();
  977. }
  978. if (key.size() < ts_sz) {
  979. return Status::Corruption();
  980. }
  981. Slice ts = ExtractTimestampFromUserKey(key, ts_sz);
  982. if (ts.compare(timestamp_) != 0) {
  983. return Status::Corruption();
  984. }
  985. return Status::OK();
  986. }
  987. private:
  988. std::unordered_map<uint32_t, const Comparator*> cf_to_ucmps_;
  989. Slice timestamp_;
  990. };
  991. Status CheckTimestampsInWriteBatch(
  992. WriteBatch& wb, Slice timestamp,
  993. std::unordered_map<uint32_t, const Comparator*> cf_to_ucmps) {
  994. TimestampChecker ts_checker(cf_to_ucmps, timestamp);
  995. return wb.Iterate(&ts_checker);
  996. }
  997. } // anonymous namespace
  998. TEST_F(WriteBatchTest, SanityChecks) {
  999. ColumnFamilyHandleImplDummy cf0(0,
  1000. test::BytewiseComparatorWithU64TsWrapper());
  1001. ColumnFamilyHandleImplDummy cf4(4);
  1002. WriteBatch wb(0, 0, 0, /*default_cf_ts_sz=*/sizeof(uint64_t));
  1003. // Sanity checks for the new WriteBatch APIs with extra 'ts' arg.
  1004. ASSERT_TRUE(wb.Put(nullptr, "key", "ts", "value").IsInvalidArgument());
  1005. ASSERT_TRUE(wb.Delete(nullptr, "key", "ts").IsInvalidArgument());
  1006. ASSERT_TRUE(wb.SingleDelete(nullptr, "key", "ts").IsInvalidArgument());
  1007. ASSERT_TRUE(wb.Merge(nullptr, "key", "ts", "value").IsInvalidArgument());
  1008. ASSERT_TRUE(wb.DeleteRange(nullptr, "begin_key", "end_key", "ts")
  1009. .IsInvalidArgument());
  1010. ASSERT_TRUE(wb.Put(&cf4, "key", "ts", "value").IsInvalidArgument());
  1011. ASSERT_TRUE(wb.Delete(&cf4, "key", "ts").IsInvalidArgument());
  1012. ASSERT_TRUE(wb.SingleDelete(&cf4, "key", "ts").IsInvalidArgument());
  1013. ASSERT_TRUE(wb.Merge(&cf4, "key", "ts", "value").IsInvalidArgument());
  1014. ASSERT_TRUE(
  1015. wb.DeleteRange(&cf4, "begin_key", "end_key", "ts").IsInvalidArgument());
  1016. constexpr size_t wrong_ts_sz = 1 + sizeof(uint64_t);
  1017. std::string ts(wrong_ts_sz, '\0');
  1018. ASSERT_TRUE(wb.Put(&cf0, "key", ts, "value").IsInvalidArgument());
  1019. ASSERT_TRUE(wb.Delete(&cf0, "key", ts).IsInvalidArgument());
  1020. ASSERT_TRUE(wb.SingleDelete(&cf0, "key", ts).IsInvalidArgument());
  1021. ASSERT_TRUE(wb.Merge(&cf0, "key", ts, "value").IsInvalidArgument());
  1022. ASSERT_TRUE(
  1023. wb.DeleteRange(&cf0, "begin_key", "end_key", ts).IsInvalidArgument());
  1024. // Sanity checks for the new WriteBatch APIs without extra 'ts' arg.
  1025. WriteBatch wb1(0, 0, 0, wrong_ts_sz);
  1026. ASSERT_TRUE(wb1.Put(&cf0, "key", "value").IsInvalidArgument());
  1027. ASSERT_TRUE(wb1.Delete(&cf0, "key").IsInvalidArgument());
  1028. ASSERT_TRUE(wb1.SingleDelete(&cf0, "key").IsInvalidArgument());
  1029. ASSERT_TRUE(wb1.Merge(&cf0, "key", "value").IsInvalidArgument());
  1030. ASSERT_TRUE(
  1031. wb1.DeleteRange(&cf0, "begin_key", "end_key").IsInvalidArgument());
  1032. }
  1033. TEST_F(WriteBatchTest, UpdateTimestamps) {
  1034. // We assume the last eight bytes of each key is reserved for timestamps.
  1035. // Therefore, we must make sure each key is longer than eight bytes.
  1036. constexpr size_t key_size = 16;
  1037. constexpr size_t num_of_keys = 10;
  1038. std::vector<std::string> key_strs(num_of_keys, std::string(key_size, '\0'));
  1039. ColumnFamilyHandleImplDummy cf0(0);
  1040. ColumnFamilyHandleImplDummy cf4(4,
  1041. test::BytewiseComparatorWithU64TsWrapper());
  1042. ColumnFamilyHandleImplDummy cf5(5,
  1043. test::BytewiseComparatorWithU64TsWrapper());
  1044. const std::unordered_map<uint32_t, const Comparator*> cf_to_ucmps = {
  1045. {0, cf0.GetComparator()},
  1046. {4, cf4.GetComparator()},
  1047. {5, cf5.GetComparator()}};
  1048. static constexpr size_t timestamp_size = sizeof(uint64_t);
  1049. {
  1050. WriteBatch wb1, wb2, wb3, wb4, wb5, wb6, wb7;
  1051. ASSERT_OK(wb1.Put(&cf0, "key", "value"));
  1052. ASSERT_FALSE(WriteBatchInternal::HasKeyWithTimestamp(wb1));
  1053. ASSERT_OK(wb2.Put(&cf4, "key", "value"));
  1054. ASSERT_TRUE(WriteBatchInternal::HasKeyWithTimestamp(wb2));
  1055. ASSERT_OK(wb3.Put(&cf4, "key", /*ts=*/std::string(timestamp_size, '\xfe'),
  1056. "value"));
  1057. ASSERT_TRUE(WriteBatchInternal::HasKeyWithTimestamp(wb3));
  1058. ASSERT_OK(wb4.Delete(&cf4, "key",
  1059. /*ts=*/std::string(timestamp_size, '\xfe')));
  1060. ASSERT_TRUE(WriteBatchInternal::HasKeyWithTimestamp(wb4));
  1061. ASSERT_OK(wb5.Delete(&cf4, "key"));
  1062. ASSERT_TRUE(WriteBatchInternal::HasKeyWithTimestamp(wb5));
  1063. ASSERT_OK(wb6.SingleDelete(&cf4, "key"));
  1064. ASSERT_TRUE(WriteBatchInternal::HasKeyWithTimestamp(wb6));
  1065. ASSERT_OK(wb7.SingleDelete(&cf4, "key",
  1066. /*ts=*/std::string(timestamp_size, '\xfe')));
  1067. ASSERT_TRUE(WriteBatchInternal::HasKeyWithTimestamp(wb7));
  1068. }
  1069. WriteBatch batch;
  1070. // Write to the batch. We will assign timestamps later.
  1071. for (const auto& key_str : key_strs) {
  1072. ASSERT_OK(batch.Put(&cf0, key_str, "value"));
  1073. ASSERT_OK(batch.Put(&cf4, key_str, "value"));
  1074. ASSERT_OK(batch.Put(&cf5, key_str, "value"));
  1075. }
  1076. const auto checker1 = [](uint32_t cf) {
  1077. if (cf == 4 || cf == 5) {
  1078. return timestamp_size;
  1079. } else if (cf == 0) {
  1080. return static_cast<size_t>(0);
  1081. } else {
  1082. return std::numeric_limits<size_t>::max();
  1083. }
  1084. };
  1085. ASSERT_OK(
  1086. batch.UpdateTimestamps(std::string(timestamp_size, '\xfe'), checker1));
  1087. ASSERT_OK(CheckTimestampsInWriteBatch(
  1088. batch, std::string(timestamp_size, '\xfe'), cf_to_ucmps));
  1089. // We use indexed_cf_to_ucmps, non_indexed_cfs_with_ts and timestamp_size to
  1090. // simulate the case in which a transaction enables indexing for some writes
  1091. // while disables indexing for other writes. A transaction uses a
  1092. // WriteBatchWithIndex object to buffer writes (we consider Write-committed
  1093. // policy only). If indexing is enabled, then writes go through
  1094. // WriteBatchWithIndex API populating a WBWI internal data structure, i.e. a
  1095. // mapping from cf to user comparators. If indexing is disabled, a transaction
  1096. // writes directly to the underlying raw WriteBatch. We will need to track the
  1097. // comparator information for the column families to which un-indexed writes
  1098. // are performed. When calling UpdateTimestamp API of WriteBatch, we need
  1099. // indexed_cf_to_ucmps, non_indexed_cfs_with_ts, and timestamp_size to perform
  1100. // checking.
  1101. std::unordered_map<uint32_t, const Comparator*> indexed_cf_to_ucmps = {
  1102. {0, cf0.GetComparator()}, {4, cf4.GetComparator()}};
  1103. std::unordered_set<uint32_t> non_indexed_cfs_with_ts = {cf5.GetID()};
  1104. const auto checker2 = [&indexed_cf_to_ucmps,
  1105. &non_indexed_cfs_with_ts](uint32_t cf) {
  1106. if (non_indexed_cfs_with_ts.count(cf) > 0) {
  1107. return timestamp_size;
  1108. }
  1109. auto cf_iter = indexed_cf_to_ucmps.find(cf);
  1110. if (cf_iter == indexed_cf_to_ucmps.end()) {
  1111. assert(false);
  1112. return std::numeric_limits<size_t>::max();
  1113. }
  1114. const Comparator* const ucmp = cf_iter->second;
  1115. assert(ucmp);
  1116. return ucmp->timestamp_size();
  1117. };
  1118. ASSERT_OK(
  1119. batch.UpdateTimestamps(std::string(timestamp_size, '\xef'), checker2));
  1120. ASSERT_OK(CheckTimestampsInWriteBatch(
  1121. batch, std::string(timestamp_size, '\xef'), cf_to_ucmps));
  1122. }
  1123. TEST_F(WriteBatchTest, CommitWithTimestamp) {
  1124. WriteBatch wb;
  1125. const std::string txn_name = "xid1";
  1126. std::string ts;
  1127. constexpr uint64_t commit_ts = 23;
  1128. PutFixed64(&ts, commit_ts);
  1129. ASSERT_OK(WriteBatchInternal::MarkCommitWithTimestamp(&wb, txn_name, ts));
  1130. TestHandler handler;
  1131. ASSERT_OK(wb.Iterate(&handler));
  1132. ASSERT_EQ("MarkCommitWithTimestamp(" + txn_name + ", " +
  1133. Slice(ts).ToString(true) + ")",
  1134. handler.seen);
  1135. }
  1136. } // namespace ROCKSDB_NAMESPACE
  1137. int main(int argc, char** argv) {
  1138. ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
  1139. ::testing::InitGoogleTest(&argc, argv);
  1140. return RUN_ALL_TESTS();
  1141. }