write_batch.cc 124 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645264626472648264926502651265226532654265526562657265826592660266126622663266426652666266726682669267026712672267326742675267626772678267926802681268226832684268526862687268826892690269126922693269426952696269726982699270027012702270327042705270627072708270927102711271227132714271527162717271827192720272127222723272427252726272727282729273027312732273327342735273627372738273927402741274227432744274527462747274827492750275127522753275427552756275727582759276027612762276327642765276627672768276927702771277227732774277527762777277827792780278127822783278427852786278727882789279027912792279327942795279627972798279928002801280228032804280528062807280828092810281128122813281428152816281728182819282028212822282328242825282628272828282928302831283228332834283528362837283828392840284128422843284428452846284728482849285028512852285328542855285628572858285928602861286228632864286528662867286828692870287128722873287428752876287728782879288028812882288328842885288628872888288928902891289228932894289528962897289828992900290129022903290429052906290729082909291029112912291329142915291629172918291929202921292229232924292529262927292829292930293129322933293429352936293729382939294029412942294329442945294629472948294929502951295229532954295529562957295829592960296129622963296429652966296729682969297029712972297329742975297629772978297929802981298229832984298529862987298829892990299129922993299429952996299729982999300030013002300330043005300630073008300930103011301230133014301530163017301830193020302130223023302430253026302730283029303030313032303330343035303630373038303930403041304230433044304530463047304830493050305130523053305430553056305730583059306030613062306330643065306630673068306930703071307230733074307530763077307830793080308130823083308430853086308730883089309030913092309330943095309630973098309931003101310231033104310531063107310831093110311131123113311431153116311731183119312031213122312331243125312631273128312931303131313231333134313531363137313831393140314131423143314431453146314731483149315031513152315331543155315631573158315931603161316231633164316531663167316831693170317131723173317431753176317731783179318031813182318331843185318631873188318931903191319231933194319531963197319831993200320132023203320432053206320732083209321032113212321332143215321632173218321932203221322232233224322532263227322832293230323132323233323432353236323732383239324032413242324332443245324632473248324932503251325232533254325532563257325832593260326132623263326432653266326732683269327032713272327332743275327632773278327932803281328232833284328532863287328832893290329132923293329432953296329732983299330033013302330333043305330633073308330933103311331233133314331533163317331833193320332133223323332433253326332733283329333033313332333333343335333633373338333933403341334233433344334533463347334833493350335133523353335433553356335733583359336033613362336333643365336633673368336933703371337233733374337533763377337833793380338133823383338433853386338733883389339033913392339333943395339633973398339934003401340234033404340534063407340834093410341134123413341434153416341734183419342034213422342334243425342634273428342934303431343234333434343534363437343834393440344134423443344434453446344734483449345034513452345334543455345634573458345934603461346234633464346534663467346834693470347134723473347434753476347734783479348034813482348334843485348634873488348934903491349234933494
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. //
  10. // WriteBatch::rep_ :=
  11. // sequence: fixed64
  12. // count: fixed32
  13. // data: record[count]
  14. // record :=
  15. // kTypeValue varstring varstring
  16. // kTypeDeletion varstring
  17. // kTypeSingleDeletion varstring
  18. // kTypeRangeDeletion varstring varstring
  19. // kTypeMerge varstring varstring
  20. // kTypeColumnFamilyValue varint32 varstring varstring
  21. // kTypeColumnFamilyDeletion varint32 varstring
  22. // kTypeColumnFamilySingleDeletion varint32 varstring
  23. // kTypeColumnFamilyRangeDeletion varint32 varstring varstring
  24. // kTypeColumnFamilyMerge varint32 varstring varstring
  25. // kTypeBeginPrepareXID
  26. // kTypeEndPrepareXID varstring
  27. // kTypeCommitXID varstring
  28. // kTypeCommitXIDAndTimestamp varstring varstring
  29. // kTypeRollbackXID varstring
  30. // kTypeBeginPersistedPrepareXID
  31. // kTypeBeginUnprepareXID
  32. // kTypeWideColumnEntity varstring varstring
  33. // kTypeColumnFamilyWideColumnEntity varint32 varstring varstring
  34. // kTypeNoop
  35. // varstring :=
  36. // len: varint32
  37. // data: uint8[len]
  38. #include "rocksdb/write_batch.h"
  39. #include <algorithm>
  40. #include <cstdint>
  41. #include <limits>
  42. #include <map>
  43. #include <stack>
  44. #include <stdexcept>
  45. #include <type_traits>
  46. #include <unordered_map>
  47. #include <vector>
  48. #include "db/column_family.h"
  49. #include "db/db_impl/db_impl.h"
  50. #include "db/dbformat.h"
  51. #include "db/flush_scheduler.h"
  52. #include "db/kv_checksum.h"
  53. #include "db/memtable.h"
  54. #include "db/merge_context.h"
  55. #include "db/snapshot_impl.h"
  56. #include "db/trim_history_scheduler.h"
  57. #include "db/wide/wide_column_serialization.h"
  58. #include "db/wide/wide_columns_helper.h"
  59. #include "db/write_batch_internal.h"
  60. #include "monitoring/perf_context_imp.h"
  61. #include "monitoring/statistics_impl.h"
  62. #include "port/lang.h"
  63. #include "rocksdb/merge_operator.h"
  64. #include "rocksdb/system_clock.h"
  65. #include "util/aligned_storage.h"
  66. #include "util/autovector.h"
  67. #include "util/cast_util.h"
  68. #include "util/coding.h"
  69. #include "util/duplicate_detector.h"
  70. #include "util/string_util.h"
  71. namespace ROCKSDB_NAMESPACE {
  72. // anon namespace for file-local types
  73. namespace {
  74. enum ContentFlags : uint32_t {
  75. DEFERRED = 1 << 0,
  76. HAS_PUT = 1 << 1,
  77. HAS_DELETE = 1 << 2,
  78. HAS_SINGLE_DELETE = 1 << 3,
  79. HAS_MERGE = 1 << 4,
  80. HAS_BEGIN_PREPARE = 1 << 5,
  81. HAS_END_PREPARE = 1 << 6,
  82. HAS_COMMIT = 1 << 7,
  83. HAS_ROLLBACK = 1 << 8,
  84. HAS_DELETE_RANGE = 1 << 9,
  85. HAS_BLOB_INDEX = 1 << 10,
  86. HAS_BEGIN_UNPREPARE = 1 << 11,
  87. HAS_PUT_ENTITY = 1 << 12,
  88. HAS_TIMED_PUT = 1 << 13,
  89. };
  90. struct BatchContentClassifier : public WriteBatch::Handler {
  91. uint32_t content_flags = 0;
  92. Status PutCF(uint32_t, const Slice&, const Slice&) override {
  93. content_flags |= ContentFlags::HAS_PUT;
  94. return Status::OK();
  95. }
  96. Status TimedPutCF(uint32_t, const Slice&, const Slice&, uint64_t) override {
  97. content_flags |= ContentFlags::HAS_TIMED_PUT;
  98. return Status::OK();
  99. }
  100. Status PutEntityCF(uint32_t /* column_family_id */, const Slice& /* key */,
  101. const Slice& /* entity */) override {
  102. content_flags |= ContentFlags::HAS_PUT_ENTITY;
  103. return Status::OK();
  104. }
  105. Status DeleteCF(uint32_t, const Slice&) override {
  106. content_flags |= ContentFlags::HAS_DELETE;
  107. return Status::OK();
  108. }
  109. Status SingleDeleteCF(uint32_t, const Slice&) override {
  110. content_flags |= ContentFlags::HAS_SINGLE_DELETE;
  111. return Status::OK();
  112. }
  113. Status DeleteRangeCF(uint32_t, const Slice&, const Slice&) override {
  114. content_flags |= ContentFlags::HAS_DELETE_RANGE;
  115. return Status::OK();
  116. }
  117. Status MergeCF(uint32_t, const Slice&, const Slice&) override {
  118. content_flags |= ContentFlags::HAS_MERGE;
  119. return Status::OK();
  120. }
  121. Status PutBlobIndexCF(uint32_t, const Slice&, const Slice&) override {
  122. content_flags |= ContentFlags::HAS_BLOB_INDEX;
  123. return Status::OK();
  124. }
  125. Status MarkBeginPrepare(bool unprepare) override {
  126. content_flags |= ContentFlags::HAS_BEGIN_PREPARE;
  127. if (unprepare) {
  128. content_flags |= ContentFlags::HAS_BEGIN_UNPREPARE;
  129. }
  130. return Status::OK();
  131. }
  132. Status MarkEndPrepare(const Slice&) override {
  133. content_flags |= ContentFlags::HAS_END_PREPARE;
  134. return Status::OK();
  135. }
  136. Status MarkCommit(const Slice&) override {
  137. content_flags |= ContentFlags::HAS_COMMIT;
  138. return Status::OK();
  139. }
  140. Status MarkCommitWithTimestamp(const Slice&, const Slice&) override {
  141. content_flags |= ContentFlags::HAS_COMMIT;
  142. return Status::OK();
  143. }
  144. Status MarkRollback(const Slice&) override {
  145. content_flags |= ContentFlags::HAS_ROLLBACK;
  146. return Status::OK();
  147. }
  148. };
  149. } // anonymous namespace
  150. struct SavePoints {
  151. std::stack<SavePoint, autovector<SavePoint>> stack;
  152. };
  153. WriteBatch::WriteBatch(size_t reserved_bytes, size_t max_bytes,
  154. size_t protection_bytes_per_key, size_t default_cf_ts_sz)
  155. : content_flags_(0),
  156. max_bytes_(max_bytes),
  157. default_cf_ts_sz_(default_cf_ts_sz),
  158. rep_() {
  159. // Currently `protection_bytes_per_key` can only be enabled at 8 bytes per
  160. // entry.
  161. assert(protection_bytes_per_key == 0 || protection_bytes_per_key == 8);
  162. if (protection_bytes_per_key != 0) {
  163. prot_info_.reset(new WriteBatch::ProtectionInfo());
  164. }
  165. rep_.reserve((reserved_bytes > WriteBatchInternal::kHeader)
  166. ? reserved_bytes
  167. : WriteBatchInternal::kHeader);
  168. rep_.resize(WriteBatchInternal::kHeader);
  169. }
  170. WriteBatch::WriteBatch(const std::string& rep)
  171. : content_flags_(ContentFlags::DEFERRED), max_bytes_(0), rep_(rep) {}
  172. WriteBatch::WriteBatch(std::string&& rep)
  173. : content_flags_(ContentFlags::DEFERRED),
  174. max_bytes_(0),
  175. rep_(std::move(rep)) {}
  176. WriteBatch::WriteBatch(const WriteBatch& src)
  177. : wal_term_point_(src.wal_term_point_),
  178. content_flags_(src.content_flags_.load(std::memory_order_relaxed)),
  179. max_bytes_(src.max_bytes_),
  180. default_cf_ts_sz_(src.default_cf_ts_sz_),
  181. rep_(src.rep_) {
  182. if (src.save_points_ != nullptr) {
  183. save_points_.reset(new SavePoints());
  184. save_points_->stack = src.save_points_->stack;
  185. }
  186. if (src.prot_info_ != nullptr) {
  187. prot_info_.reset(new WriteBatch::ProtectionInfo());
  188. prot_info_->entries_ = src.prot_info_->entries_;
  189. }
  190. }
  191. WriteBatch::WriteBatch(WriteBatch&& src) noexcept
  192. : save_points_(std::move(src.save_points_)),
  193. wal_term_point_(std::move(src.wal_term_point_)),
  194. content_flags_(src.content_flags_.load(std::memory_order_relaxed)),
  195. max_bytes_(src.max_bytes_),
  196. prot_info_(std::move(src.prot_info_)),
  197. default_cf_ts_sz_(src.default_cf_ts_sz_),
  198. rep_(std::move(src.rep_)) {}
  199. WriteBatch& WriteBatch::operator=(const WriteBatch& src) {
  200. if (&src != this) {
  201. this->~WriteBatch();
  202. new (this) WriteBatch(src);
  203. }
  204. return *this;
  205. }
  206. WriteBatch& WriteBatch::operator=(WriteBatch&& src) {
  207. if (&src != this) {
  208. this->~WriteBatch();
  209. new (this) WriteBatch(std::move(src));
  210. }
  211. return *this;
  212. }
  213. WriteBatch::~WriteBatch() = default;
  214. WriteBatch::Handler::~Handler() = default;
  215. void WriteBatch::Handler::LogData(const Slice& /*blob*/) {
  216. // If the user has not specified something to do with blobs, then we ignore
  217. // them.
  218. }
  219. bool WriteBatch::Handler::Continue() { return true; }
  220. void WriteBatch::Clear() {
  221. rep_.clear();
  222. rep_.resize(WriteBatchInternal::kHeader);
  223. content_flags_.store(0, std::memory_order_relaxed);
  224. if (save_points_ != nullptr) {
  225. while (!save_points_->stack.empty()) {
  226. save_points_->stack.pop();
  227. }
  228. }
  229. if (prot_info_ != nullptr) {
  230. prot_info_->entries_.clear();
  231. }
  232. wal_term_point_.clear();
  233. default_cf_ts_sz_ = 0;
  234. }
  235. uint32_t WriteBatch::Count() const { return WriteBatchInternal::Count(this); }
  236. uint32_t WriteBatch::ComputeContentFlags() const {
  237. auto rv = content_flags_.load(std::memory_order_relaxed);
  238. if ((rv & ContentFlags::DEFERRED) != 0) {
  239. BatchContentClassifier classifier;
  240. // Should we handle status here?
  241. Iterate(&classifier).PermitUncheckedError();
  242. rv = classifier.content_flags;
  243. // this method is conceptually const, because it is performing a lazy
  244. // computation that doesn't affect the abstract state of the batch.
  245. // content_flags_ is marked mutable so that we can perform the
  246. // following assignment
  247. content_flags_.store(rv, std::memory_order_relaxed);
  248. }
  249. return rv;
  250. }
  251. void WriteBatch::MarkWalTerminationPoint() {
  252. wal_term_point_.size = GetDataSize();
  253. wal_term_point_.count = Count();
  254. wal_term_point_.content_flags = content_flags_;
  255. }
  256. size_t WriteBatch::GetProtectionBytesPerKey() const {
  257. if (prot_info_ != nullptr) {
  258. return prot_info_->GetBytesPerKey();
  259. }
  260. return 0;
  261. }
  262. std::string WriteBatch::Release() {
  263. std::string ret = std::move(rep_);
  264. Clear();
  265. return ret;
  266. }
  267. bool WriteBatch::HasPut() const {
  268. return (ComputeContentFlags() & ContentFlags::HAS_PUT) != 0;
  269. }
  270. bool WriteBatch::HasTimedPut() const {
  271. return (ComputeContentFlags() & ContentFlags::HAS_TIMED_PUT) != 0;
  272. }
  273. bool WriteBatch::HasPutEntity() const {
  274. return (ComputeContentFlags() & ContentFlags::HAS_PUT_ENTITY) != 0;
  275. }
  276. bool WriteBatch::HasDelete() const {
  277. return (ComputeContentFlags() & ContentFlags::HAS_DELETE) != 0;
  278. }
  279. bool WriteBatch::HasSingleDelete() const {
  280. return (ComputeContentFlags() & ContentFlags::HAS_SINGLE_DELETE) != 0;
  281. }
  282. bool WriteBatch::HasDeleteRange() const {
  283. return (ComputeContentFlags() & ContentFlags::HAS_DELETE_RANGE) != 0;
  284. }
  285. bool WriteBatch::HasMerge() const {
  286. return (ComputeContentFlags() & ContentFlags::HAS_MERGE) != 0;
  287. }
  288. bool ReadKeyFromWriteBatchEntry(Slice* input, Slice* key, bool cf_record) {
  289. assert(input != nullptr && key != nullptr);
  290. // Skip tag byte
  291. input->remove_prefix(1);
  292. if (cf_record) {
  293. // Skip column_family bytes
  294. uint32_t cf;
  295. if (!GetVarint32(input, &cf)) {
  296. return false;
  297. }
  298. }
  299. // Extract key
  300. return GetLengthPrefixedSlice(input, key);
  301. }
  302. bool WriteBatch::HasBeginPrepare() const {
  303. return (ComputeContentFlags() & ContentFlags::HAS_BEGIN_PREPARE) != 0;
  304. }
  305. bool WriteBatch::HasEndPrepare() const {
  306. return (ComputeContentFlags() & ContentFlags::HAS_END_PREPARE) != 0;
  307. }
  308. bool WriteBatch::HasCommit() const {
  309. return (ComputeContentFlags() & ContentFlags::HAS_COMMIT) != 0;
  310. }
  311. bool WriteBatch::HasRollback() const {
  312. return (ComputeContentFlags() & ContentFlags::HAS_ROLLBACK) != 0;
  313. }
  314. Status ReadRecordFromWriteBatch(Slice* input, char* tag,
  315. uint32_t* column_family, Slice* key,
  316. Slice* value, Slice* blob, Slice* xid,
  317. uint64_t* write_unix_time) {
  318. assert(key != nullptr && value != nullptr);
  319. *tag = (*input)[0];
  320. input->remove_prefix(1);
  321. *column_family = 0; // default
  322. switch (*tag) {
  323. case kTypeColumnFamilyValue:
  324. if (!GetVarint32(input, column_family)) {
  325. return Status::Corruption("bad WriteBatch Put");
  326. }
  327. FALLTHROUGH_INTENDED;
  328. case kTypeValue:
  329. if (!GetLengthPrefixedSlice(input, key) ||
  330. !GetLengthPrefixedSlice(input, value)) {
  331. return Status::Corruption("bad WriteBatch Put");
  332. }
  333. break;
  334. case kTypeColumnFamilyDeletion:
  335. case kTypeColumnFamilySingleDeletion:
  336. if (!GetVarint32(input, column_family)) {
  337. return Status::Corruption("bad WriteBatch Delete");
  338. }
  339. FALLTHROUGH_INTENDED;
  340. case kTypeDeletion:
  341. case kTypeSingleDeletion:
  342. if (!GetLengthPrefixedSlice(input, key)) {
  343. return Status::Corruption("bad WriteBatch Delete");
  344. }
  345. break;
  346. case kTypeColumnFamilyRangeDeletion:
  347. if (!GetVarint32(input, column_family)) {
  348. return Status::Corruption("bad WriteBatch DeleteRange");
  349. }
  350. FALLTHROUGH_INTENDED;
  351. case kTypeRangeDeletion:
  352. // for range delete, "key" is begin_key, "value" is end_key
  353. if (!GetLengthPrefixedSlice(input, key) ||
  354. !GetLengthPrefixedSlice(input, value)) {
  355. return Status::Corruption("bad WriteBatch DeleteRange");
  356. }
  357. break;
  358. case kTypeColumnFamilyMerge:
  359. if (!GetVarint32(input, column_family)) {
  360. return Status::Corruption("bad WriteBatch Merge");
  361. }
  362. FALLTHROUGH_INTENDED;
  363. case kTypeMerge:
  364. if (!GetLengthPrefixedSlice(input, key) ||
  365. !GetLengthPrefixedSlice(input, value)) {
  366. return Status::Corruption("bad WriteBatch Merge");
  367. }
  368. break;
  369. case kTypeColumnFamilyBlobIndex:
  370. if (!GetVarint32(input, column_family)) {
  371. return Status::Corruption("bad WriteBatch BlobIndex");
  372. }
  373. FALLTHROUGH_INTENDED;
  374. case kTypeBlobIndex:
  375. if (!GetLengthPrefixedSlice(input, key) ||
  376. !GetLengthPrefixedSlice(input, value)) {
  377. return Status::Corruption("bad WriteBatch BlobIndex");
  378. }
  379. break;
  380. case kTypeLogData:
  381. assert(blob != nullptr);
  382. if (!GetLengthPrefixedSlice(input, blob)) {
  383. return Status::Corruption("bad WriteBatch Blob");
  384. }
  385. break;
  386. case kTypeNoop:
  387. case kTypeBeginPrepareXID:
  388. // This indicates that the prepared batch is also persisted in the db.
  389. // This is used in WritePreparedTxn
  390. case kTypeBeginPersistedPrepareXID:
  391. // This is used in WriteUnpreparedTxn
  392. case kTypeBeginUnprepareXID:
  393. break;
  394. case kTypeEndPrepareXID:
  395. if (!GetLengthPrefixedSlice(input, xid)) {
  396. return Status::Corruption("bad EndPrepare XID");
  397. }
  398. break;
  399. case kTypeCommitXIDAndTimestamp:
  400. if (!GetLengthPrefixedSlice(input, key)) {
  401. return Status::Corruption("bad commit timestamp");
  402. }
  403. FALLTHROUGH_INTENDED;
  404. case kTypeCommitXID:
  405. if (!GetLengthPrefixedSlice(input, xid)) {
  406. return Status::Corruption("bad Commit XID");
  407. }
  408. break;
  409. case kTypeRollbackXID:
  410. if (!GetLengthPrefixedSlice(input, xid)) {
  411. return Status::Corruption("bad Rollback XID");
  412. }
  413. break;
  414. case kTypeColumnFamilyWideColumnEntity:
  415. if (!GetVarint32(input, column_family)) {
  416. return Status::Corruption("bad WriteBatch PutEntity");
  417. }
  418. FALLTHROUGH_INTENDED;
  419. case kTypeWideColumnEntity:
  420. if (!GetLengthPrefixedSlice(input, key) ||
  421. !GetLengthPrefixedSlice(input, value)) {
  422. return Status::Corruption("bad WriteBatch PutEntity");
  423. }
  424. break;
  425. case kTypeColumnFamilyValuePreferredSeqno:
  426. if (!GetVarint32(input, column_family)) {
  427. return Status::Corruption("bad WriteBatch TimedPut");
  428. }
  429. FALLTHROUGH_INTENDED;
  430. case kTypeValuePreferredSeqno: {
  431. Slice packed_value;
  432. if (!GetLengthPrefixedSlice(input, key) ||
  433. !GetLengthPrefixedSlice(input, &packed_value)) {
  434. return Status::Corruption("bad WriteBatch TimedPut");
  435. }
  436. if (write_unix_time) {
  437. std::tie(*value, *write_unix_time) =
  438. ParsePackedValueWithWriteTime(packed_value);
  439. } else {
  440. // Caller doesn't want to unpack write_unix_time, so keep it packed in
  441. // the value.
  442. *value = packed_value;
  443. }
  444. break;
  445. }
  446. default:
  447. return Status::Corruption(
  448. "unknown WriteBatch tag",
  449. std::to_string(static_cast<unsigned int>(*tag)));
  450. }
  451. return Status::OK();
  452. }
  453. Status WriteBatch::Iterate(Handler* handler) const {
  454. if (rep_.size() < WriteBatchInternal::kHeader) {
  455. return Status::Corruption("malformed WriteBatch (too small)");
  456. }
  457. return WriteBatchInternal::Iterate(this, handler, WriteBatchInternal::kHeader,
  458. rep_.size());
  459. }
  460. Status WriteBatchInternal::Iterate(const WriteBatch* wb,
  461. WriteBatch::Handler* handler, size_t begin,
  462. size_t end) {
  463. if (begin > wb->rep_.size() || end > wb->rep_.size() || end < begin) {
  464. return Status::Corruption("Invalid start/end bounds for Iterate");
  465. }
  466. assert(begin <= end);
  467. Slice input(wb->rep_.data() + begin, static_cast<size_t>(end - begin));
  468. bool whole_batch =
  469. (begin == WriteBatchInternal::kHeader) && (end == wb->rep_.size());
  470. Slice key, value, blob, xid;
  471. uint64_t write_unix_time = 0;
  472. // Sometimes a sub-batch starts with a Noop. We want to exclude such Noops as
  473. // the batch boundary symbols otherwise we would mis-count the number of
  474. // batches. We do that by checking whether the accumulated batch is empty
  475. // before seeing the next Noop.
  476. bool empty_batch = true;
  477. uint32_t found = 0;
  478. Status s;
  479. char tag = 0;
  480. uint32_t column_family = 0; // default
  481. bool last_was_try_again = false;
  482. bool handler_continue = true;
  483. while (((s.ok() && !input.empty()) || UNLIKELY(s.IsTryAgain()))) {
  484. handler_continue = handler->Continue();
  485. if (!handler_continue) {
  486. break;
  487. }
  488. if (LIKELY(!s.IsTryAgain())) {
  489. last_was_try_again = false;
  490. s = ReadRecordFromWriteBatch(&input, &tag, &column_family, &key, &value,
  491. &blob, &xid, &write_unix_time);
  492. if (!s.ok()) {
  493. return s;
  494. }
  495. } else {
  496. assert(s.IsTryAgain());
  497. assert(!last_was_try_again); // to detect infinite loop bugs
  498. if (UNLIKELY(last_was_try_again)) {
  499. return Status::Corruption(
  500. "two consecutive TryAgain in WriteBatch handler; this is either a "
  501. "software bug or data corruption.");
  502. }
  503. last_was_try_again = true;
  504. s = Status::OK();
  505. }
  506. switch (tag) {
  507. case kTypeColumnFamilyValue:
  508. case kTypeValue:
  509. assert(wb->content_flags_.load(std::memory_order_relaxed) &
  510. (ContentFlags::DEFERRED | ContentFlags::HAS_PUT));
  511. s = handler->PutCF(column_family, key, value);
  512. if (LIKELY(s.ok())) {
  513. empty_batch = false;
  514. found++;
  515. }
  516. break;
  517. case kTypeColumnFamilyDeletion:
  518. case kTypeDeletion:
  519. assert(wb->content_flags_.load(std::memory_order_relaxed) &
  520. (ContentFlags::DEFERRED | ContentFlags::HAS_DELETE));
  521. s = handler->DeleteCF(column_family, key);
  522. if (LIKELY(s.ok())) {
  523. empty_batch = false;
  524. found++;
  525. }
  526. break;
  527. case kTypeColumnFamilySingleDeletion:
  528. case kTypeSingleDeletion:
  529. assert(wb->content_flags_.load(std::memory_order_relaxed) &
  530. (ContentFlags::DEFERRED | ContentFlags::HAS_SINGLE_DELETE));
  531. s = handler->SingleDeleteCF(column_family, key);
  532. if (LIKELY(s.ok())) {
  533. empty_batch = false;
  534. found++;
  535. }
  536. break;
  537. case kTypeColumnFamilyRangeDeletion:
  538. case kTypeRangeDeletion:
  539. assert(wb->content_flags_.load(std::memory_order_relaxed) &
  540. (ContentFlags::DEFERRED | ContentFlags::HAS_DELETE_RANGE));
  541. s = handler->DeleteRangeCF(column_family, key, value);
  542. if (LIKELY(s.ok())) {
  543. empty_batch = false;
  544. found++;
  545. }
  546. break;
  547. case kTypeColumnFamilyMerge:
  548. case kTypeMerge:
  549. assert(wb->content_flags_.load(std::memory_order_relaxed) &
  550. (ContentFlags::DEFERRED | ContentFlags::HAS_MERGE));
  551. s = handler->MergeCF(column_family, key, value);
  552. if (LIKELY(s.ok())) {
  553. empty_batch = false;
  554. found++;
  555. }
  556. break;
  557. case kTypeColumnFamilyBlobIndex:
  558. case kTypeBlobIndex:
  559. assert(wb->content_flags_.load(std::memory_order_relaxed) &
  560. (ContentFlags::DEFERRED | ContentFlags::HAS_BLOB_INDEX));
  561. s = handler->PutBlobIndexCF(column_family, key, value);
  562. if (LIKELY(s.ok())) {
  563. found++;
  564. }
  565. break;
  566. case kTypeLogData:
  567. handler->LogData(blob);
  568. // A batch might have nothing but LogData. It is still a batch.
  569. empty_batch = false;
  570. break;
  571. case kTypeBeginPrepareXID:
  572. assert(wb->content_flags_.load(std::memory_order_relaxed) &
  573. (ContentFlags::DEFERRED | ContentFlags::HAS_BEGIN_PREPARE));
  574. s = handler->MarkBeginPrepare();
  575. assert(s.ok());
  576. empty_batch = false;
  577. if (handler->WriteAfterCommit() ==
  578. WriteBatch::Handler::OptionState::kDisabled) {
  579. s = Status::NotSupported(
  580. "WriteCommitted txn tag when write_after_commit_ is disabled (in "
  581. "WritePrepared/WriteUnprepared mode). If it is not due to "
  582. "corruption, the WAL must be emptied before changing the "
  583. "WritePolicy.");
  584. }
  585. if (handler->WriteBeforePrepare() ==
  586. WriteBatch::Handler::OptionState::kEnabled) {
  587. s = Status::NotSupported(
  588. "WriteCommitted txn tag when write_before_prepare_ is enabled "
  589. "(in WriteUnprepared mode). If it is not due to corruption, the "
  590. "WAL must be emptied before changing the WritePolicy.");
  591. }
  592. break;
  593. case kTypeBeginPersistedPrepareXID:
  594. assert(wb->content_flags_.load(std::memory_order_relaxed) &
  595. (ContentFlags::DEFERRED | ContentFlags::HAS_BEGIN_PREPARE));
  596. s = handler->MarkBeginPrepare();
  597. assert(s.ok());
  598. empty_batch = false;
  599. if (handler->WriteAfterCommit() ==
  600. WriteBatch::Handler::OptionState::kEnabled) {
  601. s = Status::NotSupported(
  602. "WritePrepared/WriteUnprepared txn tag when write_after_commit_ "
  603. "is enabled (in default WriteCommitted mode). If it is not due "
  604. "to corruption, the WAL must be emptied before changing the "
  605. "WritePolicy.");
  606. }
  607. break;
  608. case kTypeBeginUnprepareXID:
  609. assert(wb->content_flags_.load(std::memory_order_relaxed) &
  610. (ContentFlags::DEFERRED | ContentFlags::HAS_BEGIN_UNPREPARE));
  611. s = handler->MarkBeginPrepare(true /* unprepared */);
  612. assert(s.ok());
  613. empty_batch = false;
  614. if (handler->WriteAfterCommit() ==
  615. WriteBatch::Handler::OptionState::kEnabled) {
  616. s = Status::NotSupported(
  617. "WriteUnprepared txn tag when write_after_commit_ is enabled (in "
  618. "default WriteCommitted mode). If it is not due to corruption, "
  619. "the WAL must be emptied before changing the WritePolicy.");
  620. }
  621. if (handler->WriteBeforePrepare() ==
  622. WriteBatch::Handler::OptionState::kDisabled) {
  623. s = Status::NotSupported(
  624. "WriteUnprepared txn tag when write_before_prepare_ is disabled "
  625. "(in WriteCommitted/WritePrepared mode). If it is not due to "
  626. "corruption, the WAL must be emptied before changing the "
  627. "WritePolicy.");
  628. }
  629. break;
  630. case kTypeEndPrepareXID:
  631. assert(wb->content_flags_.load(std::memory_order_relaxed) &
  632. (ContentFlags::DEFERRED | ContentFlags::HAS_END_PREPARE));
  633. s = handler->MarkEndPrepare(xid);
  634. assert(s.ok());
  635. empty_batch = true;
  636. break;
  637. case kTypeCommitXID:
  638. assert(wb->content_flags_.load(std::memory_order_relaxed) &
  639. (ContentFlags::DEFERRED | ContentFlags::HAS_COMMIT));
  640. s = handler->MarkCommit(xid);
  641. assert(s.ok());
  642. empty_batch = true;
  643. break;
  644. case kTypeCommitXIDAndTimestamp:
  645. assert(wb->content_flags_.load(std::memory_order_relaxed) &
  646. (ContentFlags::DEFERRED | ContentFlags::HAS_COMMIT));
  647. // key stores the commit timestamp.
  648. assert(!key.empty());
  649. s = handler->MarkCommitWithTimestamp(xid, key);
  650. if (LIKELY(s.ok())) {
  651. empty_batch = true;
  652. }
  653. break;
  654. case kTypeRollbackXID:
  655. assert(wb->content_flags_.load(std::memory_order_relaxed) &
  656. (ContentFlags::DEFERRED | ContentFlags::HAS_ROLLBACK));
  657. s = handler->MarkRollback(xid);
  658. assert(s.ok());
  659. empty_batch = true;
  660. break;
  661. case kTypeNoop:
  662. s = handler->MarkNoop(empty_batch);
  663. assert(s.ok());
  664. empty_batch = true;
  665. break;
  666. case kTypeWideColumnEntity:
  667. case kTypeColumnFamilyWideColumnEntity:
  668. assert(wb->content_flags_.load(std::memory_order_relaxed) &
  669. (ContentFlags::DEFERRED | ContentFlags::HAS_PUT_ENTITY));
  670. s = handler->PutEntityCF(column_family, key, value);
  671. if (LIKELY(s.ok())) {
  672. empty_batch = false;
  673. ++found;
  674. }
  675. break;
  676. case kTypeValuePreferredSeqno:
  677. case kTypeColumnFamilyValuePreferredSeqno:
  678. assert(wb->content_flags_.load(std::memory_order_relaxed) &
  679. (ContentFlags::DEFERRED | ContentFlags::HAS_TIMED_PUT));
  680. s = handler->TimedPutCF(column_family, key, value, write_unix_time);
  681. if (LIKELY(s.ok())) {
  682. empty_batch = false;
  683. ++found;
  684. }
  685. break;
  686. default:
  687. return Status::Corruption(
  688. "unknown WriteBatch tag",
  689. std::to_string(static_cast<unsigned int>(tag)));
  690. }
  691. }
  692. if (!s.ok()) {
  693. return s;
  694. }
  695. if (handler_continue && whole_batch &&
  696. found != WriteBatchInternal::Count(wb)) {
  697. return Status::Corruption("WriteBatch has wrong count");
  698. } else {
  699. return Status::OK();
  700. }
  701. }
  702. bool WriteBatchInternal::IsLatestPersistentState(const WriteBatch* b) {
  703. return b->is_latest_persistent_state_;
  704. }
  705. void WriteBatchInternal::SetAsLatestPersistentState(WriteBatch* b) {
  706. b->is_latest_persistent_state_ = true;
  707. }
  708. uint32_t WriteBatchInternal::Count(const WriteBatch* b) {
  709. return DecodeFixed32(b->rep_.data() + 8);
  710. }
  711. void WriteBatchInternal::SetCount(WriteBatch* b, uint32_t n) {
  712. EncodeFixed32(&b->rep_[8], n);
  713. }
  714. SequenceNumber WriteBatchInternal::Sequence(const WriteBatch* b) {
  715. return SequenceNumber(DecodeFixed64(b->rep_.data()));
  716. }
  717. void WriteBatchInternal::SetSequence(WriteBatch* b, SequenceNumber seq) {
  718. EncodeFixed64(b->rep_.data(), seq);
  719. }
  720. size_t WriteBatchInternal::GetFirstOffset(WriteBatch* /*b*/) {
  721. return WriteBatchInternal::kHeader;
  722. }
  723. void WriteBatchInternal::SetDefaultColumnFamilyTimestampSize(
  724. WriteBatch* wb, size_t default_cf_ts_sz) {
  725. wb->default_cf_ts_sz_ = default_cf_ts_sz;
  726. }
  727. std::tuple<Status, uint32_t, size_t>
  728. WriteBatchInternal::GetColumnFamilyIdAndTimestampSize(
  729. WriteBatch* b, ColumnFamilyHandle* column_family) {
  730. uint32_t cf_id = GetColumnFamilyID(column_family);
  731. size_t ts_sz = 0;
  732. Status s;
  733. if (column_family) {
  734. const Comparator* const ucmp = column_family->GetComparator();
  735. if (ucmp) {
  736. ts_sz = ucmp->timestamp_size();
  737. if (0 == cf_id && b->default_cf_ts_sz_ != ts_sz) {
  738. s = Status::InvalidArgument("Default cf timestamp size mismatch");
  739. }
  740. }
  741. auto* cfd =
  742. static_cast_with_check<ColumnFamilyHandleImpl>(column_family)->cfd();
  743. if (cfd && cfd->ioptions().disallow_memtable_writes) {
  744. s = Status::InvalidArgument(
  745. "This column family has disallow_memtable_writes=true");
  746. }
  747. } else if (b->default_cf_ts_sz_ > 0) {
  748. ts_sz = b->default_cf_ts_sz_;
  749. }
  750. return std::make_tuple(s, cf_id, ts_sz);
  751. }
  752. namespace {
  753. Status CheckColumnFamilyTimestampSize(ColumnFamilyHandle* column_family,
  754. const Slice& ts) {
  755. if (!column_family) {
  756. return Status::InvalidArgument("column family handle cannot be null");
  757. }
  758. const Comparator* const ucmp = column_family->GetComparator();
  759. assert(ucmp);
  760. size_t cf_ts_sz = ucmp->timestamp_size();
  761. if (0 == cf_ts_sz) {
  762. return Status::InvalidArgument("timestamp disabled");
  763. }
  764. if (cf_ts_sz != ts.size()) {
  765. return Status::InvalidArgument("timestamp size mismatch");
  766. }
  767. auto* cfd =
  768. static_cast_with_check<ColumnFamilyHandleImpl>(column_family)->cfd();
  769. if (cfd && cfd->ioptions().disallow_memtable_writes) {
  770. return Status::InvalidArgument(
  771. "This column family has disallow_memtable_writes=true");
  772. }
  773. return Status::OK();
  774. }
  775. } // anonymous namespace
  776. Status WriteBatchInternal::Put(WriteBatch* b, uint32_t column_family_id,
  777. const Slice& key, const Slice& value) {
  778. if (key.size() > size_t{std::numeric_limits<uint32_t>::max()}) {
  779. return Status::InvalidArgument("key is too large");
  780. }
  781. if (value.size() > size_t{std::numeric_limits<uint32_t>::max()}) {
  782. return Status::InvalidArgument("value is too large");
  783. }
  784. LocalSavePoint save(b);
  785. WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
  786. if (column_family_id == 0) {
  787. b->rep_.push_back(static_cast<char>(kTypeValue));
  788. } else {
  789. b->rep_.push_back(static_cast<char>(kTypeColumnFamilyValue));
  790. PutVarint32(&b->rep_, column_family_id);
  791. }
  792. PutLengthPrefixedSlice(&b->rep_, key);
  793. PutLengthPrefixedSlice(&b->rep_, value);
  794. b->content_flags_.store(
  795. b->content_flags_.load(std::memory_order_relaxed) | ContentFlags::HAS_PUT,
  796. std::memory_order_relaxed);
  797. if (b->prot_info_ != nullptr) {
  798. // Technically the optype could've been `kTypeColumnFamilyValue` with the
  799. // CF ID encoded in the `WriteBatch`. That distinction is unimportant
  800. // however since we verify CF ID is correct, as well as all other fields
  801. // (a missing/extra encoded CF ID would corrupt another field). It is
  802. // convenient to consolidate on `kTypeValue` here as that is what will be
  803. // inserted into memtable.
  804. b->prot_info_->entries_.emplace_back(ProtectionInfo64()
  805. .ProtectKVO(key, value, kTypeValue)
  806. .ProtectC(column_family_id));
  807. }
  808. return save.commit();
  809. }
  810. Status WriteBatchInternal::TimedPut(WriteBatch* b, uint32_t column_family_id,
  811. const Slice& key, const Slice& value,
  812. uint64_t write_unix_time) {
  813. if (key.size() > size_t{std::numeric_limits<uint32_t>::max()}) {
  814. return Status::InvalidArgument("key is too large");
  815. }
  816. if (value.size() > size_t{std::numeric_limits<uint32_t>::max()}) {
  817. return Status::InvalidArgument("value is too large");
  818. }
  819. if (std::numeric_limits<uint64_t>::max() == write_unix_time) {
  820. return WriteBatchInternal::Put(b, column_family_id, key, value);
  821. }
  822. LocalSavePoint save(b);
  823. WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
  824. if (column_family_id == 0) {
  825. b->rep_.push_back(static_cast<char>(kTypeValuePreferredSeqno));
  826. } else {
  827. b->rep_.push_back(static_cast<char>(kTypeColumnFamilyValuePreferredSeqno));
  828. PutVarint32(&b->rep_, column_family_id);
  829. }
  830. std::string value_buf;
  831. Slice packed_value =
  832. PackValueAndWriteTime(value, write_unix_time, &value_buf);
  833. PutLengthPrefixedSlice(&b->rep_, key);
  834. PutLengthPrefixedSlice(&b->rep_, packed_value);
  835. b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
  836. ContentFlags::HAS_TIMED_PUT,
  837. std::memory_order_relaxed);
  838. if (b->prot_info_ != nullptr) {
  839. // See comment in other internal functions for why we don't need to
  840. // differentiate between `kTypeValuePreferredSeqno` and
  841. // `kTypeColumnFamilyValuePreferredSeqno` here.
  842. b->prot_info_->entries_.emplace_back(
  843. ProtectionInfo64()
  844. .ProtectKVO(key, packed_value, kTypeValuePreferredSeqno)
  845. .ProtectC(column_family_id));
  846. }
  847. return save.commit();
  848. }
  849. Status WriteBatch::Put(ColumnFamilyHandle* column_family, const Slice& key,
  850. const Slice& value) {
  851. size_t ts_sz = 0;
  852. uint32_t cf_id = 0;
  853. Status s;
  854. std::tie(s, cf_id, ts_sz) =
  855. WriteBatchInternal::GetColumnFamilyIdAndTimestampSize(this,
  856. column_family);
  857. if (!s.ok()) {
  858. return s;
  859. }
  860. if (0 == ts_sz) {
  861. s = WriteBatchInternal::Put(this, cf_id, key, value);
  862. } else {
  863. needs_in_place_update_ts_ = true;
  864. has_key_with_ts_ = true;
  865. std::string dummy_ts(ts_sz, '\0');
  866. std::array<Slice, 2> key_with_ts{{key, dummy_ts}};
  867. s = WriteBatchInternal::Put(this, cf_id, SliceParts(key_with_ts.data(), 2),
  868. SliceParts(&value, 1));
  869. }
  870. if (s.ok()) {
  871. MaybeTrackTimestampSize(cf_id, ts_sz);
  872. }
  873. return s;
  874. }
  875. Status WriteBatch::TimedPut(ColumnFamilyHandle* column_family, const Slice& key,
  876. const Slice& value, uint64_t write_unix_time) {
  877. size_t ts_sz = 0;
  878. uint32_t cf_id = 0;
  879. Status s;
  880. std::tie(s, cf_id, ts_sz) =
  881. WriteBatchInternal::GetColumnFamilyIdAndTimestampSize(this,
  882. column_family);
  883. if (!s.ok()) {
  884. return s;
  885. } else if (ts_sz != 0) {
  886. return Status::NotSupported(
  887. "TimedPut is not supported in combination with user-defined "
  888. "timestamps.");
  889. }
  890. return WriteBatchInternal::TimedPut(this, cf_id, key, value, write_unix_time);
  891. }
  892. Status WriteBatch::Put(ColumnFamilyHandle* column_family, const Slice& key,
  893. const Slice& ts, const Slice& value) {
  894. Status s = CheckColumnFamilyTimestampSize(column_family, ts);
  895. if (!s.ok()) {
  896. return s;
  897. }
  898. has_key_with_ts_ = true;
  899. assert(column_family);
  900. uint32_t cf_id = column_family->GetID();
  901. std::array<Slice, 2> key_with_ts{{key, ts}};
  902. s = WriteBatchInternal::Put(this, cf_id, SliceParts(key_with_ts.data(), 2),
  903. SliceParts(&value, 1));
  904. if (s.ok()) {
  905. MaybeTrackTimestampSize(cf_id, ts.size());
  906. }
  907. return s;
  908. }
  909. Status WriteBatchInternal::CheckSlicePartsLength(const SliceParts& key,
  910. const SliceParts& value) {
  911. size_t total_key_bytes = 0;
  912. for (int i = 0; i < key.num_parts; ++i) {
  913. total_key_bytes += key.parts[i].size();
  914. }
  915. if (total_key_bytes >= size_t{std::numeric_limits<uint32_t>::max()}) {
  916. return Status::InvalidArgument("key is too large");
  917. }
  918. size_t total_value_bytes = 0;
  919. for (int i = 0; i < value.num_parts; ++i) {
  920. total_value_bytes += value.parts[i].size();
  921. }
  922. if (total_value_bytes >= size_t{std::numeric_limits<uint32_t>::max()}) {
  923. return Status::InvalidArgument("value is too large");
  924. }
  925. return Status::OK();
  926. }
  927. Status WriteBatchInternal::Put(WriteBatch* b, uint32_t column_family_id,
  928. const SliceParts& key, const SliceParts& value) {
  929. Status s = CheckSlicePartsLength(key, value);
  930. if (!s.ok()) {
  931. return s;
  932. }
  933. LocalSavePoint save(b);
  934. WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
  935. if (column_family_id == 0) {
  936. b->rep_.push_back(static_cast<char>(kTypeValue));
  937. } else {
  938. b->rep_.push_back(static_cast<char>(kTypeColumnFamilyValue));
  939. PutVarint32(&b->rep_, column_family_id);
  940. }
  941. PutLengthPrefixedSliceParts(&b->rep_, key);
  942. PutLengthPrefixedSliceParts(&b->rep_, value);
  943. b->content_flags_.store(
  944. b->content_flags_.load(std::memory_order_relaxed) | ContentFlags::HAS_PUT,
  945. std::memory_order_relaxed);
  946. if (b->prot_info_ != nullptr) {
  947. // See comment in first `WriteBatchInternal::Put()` overload concerning the
  948. // `ValueType` argument passed to `ProtectKVO()`.
  949. b->prot_info_->entries_.emplace_back(ProtectionInfo64()
  950. .ProtectKVO(key, value, kTypeValue)
  951. .ProtectC(column_family_id));
  952. }
  953. return save.commit();
  954. }
  955. Status WriteBatch::Put(ColumnFamilyHandle* column_family, const SliceParts& key,
  956. const SliceParts& value) {
  957. size_t ts_sz = 0;
  958. uint32_t cf_id = 0;
  959. Status s;
  960. std::tie(s, cf_id, ts_sz) =
  961. WriteBatchInternal::GetColumnFamilyIdAndTimestampSize(this,
  962. column_family);
  963. if (!s.ok()) {
  964. return s;
  965. }
  966. if (ts_sz == 0) {
  967. s = WriteBatchInternal::Put(this, cf_id, key, value);
  968. if (s.ok()) {
  969. MaybeTrackTimestampSize(cf_id, ts_sz);
  970. }
  971. return s;
  972. }
  973. return Status::InvalidArgument(
  974. "Cannot call this method on column family enabling timestamp");
  975. }
  976. Status WriteBatchInternal::PutEntity(WriteBatch* b, uint32_t column_family_id,
  977. const Slice& key,
  978. const WideColumns& columns) {
  979. assert(b);
  980. if (key.size() > size_t{std::numeric_limits<uint32_t>::max()}) {
  981. return Status::InvalidArgument("key is too large");
  982. }
  983. WideColumns sorted_columns(columns);
  984. WideColumnsHelper::SortColumns(sorted_columns);
  985. std::string entity;
  986. const Status s = WideColumnSerialization::Serialize(sorted_columns, entity);
  987. if (!s.ok()) {
  988. return s;
  989. }
  990. if (entity.size() > size_t{std::numeric_limits<uint32_t>::max()}) {
  991. return Status::InvalidArgument("wide column entity is too large");
  992. }
  993. LocalSavePoint save(b);
  994. WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
  995. if (column_family_id == 0) {
  996. b->rep_.push_back(static_cast<char>(kTypeWideColumnEntity));
  997. } else {
  998. b->rep_.push_back(static_cast<char>(kTypeColumnFamilyWideColumnEntity));
  999. PutVarint32(&b->rep_, column_family_id);
  1000. }
  1001. PutLengthPrefixedSlice(&b->rep_, key);
  1002. PutLengthPrefixedSlice(&b->rep_, entity);
  1003. b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
  1004. ContentFlags::HAS_PUT_ENTITY,
  1005. std::memory_order_relaxed);
  1006. if (b->prot_info_ != nullptr) {
  1007. b->prot_info_->entries_.emplace_back(
  1008. ProtectionInfo64()
  1009. .ProtectKVO(key, entity, kTypeWideColumnEntity)
  1010. .ProtectC(column_family_id));
  1011. }
  1012. return save.commit();
  1013. }
  1014. Status WriteBatch::PutEntity(ColumnFamilyHandle* column_family,
  1015. const Slice& key, const WideColumns& columns) {
  1016. if (!column_family) {
  1017. return Status::InvalidArgument(
  1018. "Cannot call this method without a column family handle");
  1019. }
  1020. Status s;
  1021. uint32_t cf_id = 0;
  1022. size_t ts_sz = 0;
  1023. std::tie(s, cf_id, ts_sz) =
  1024. WriteBatchInternal::GetColumnFamilyIdAndTimestampSize(this,
  1025. column_family);
  1026. if (!s.ok()) {
  1027. return s;
  1028. }
  1029. if (ts_sz) {
  1030. return Status::InvalidArgument(
  1031. "Cannot call this method on column family enabling timestamp");
  1032. }
  1033. return WriteBatchInternal::PutEntity(this, cf_id, key, columns);
  1034. }
  1035. Status WriteBatch::PutEntity(const Slice& key,
  1036. const AttributeGroups& attribute_groups) {
  1037. if (attribute_groups.empty()) {
  1038. return Status::InvalidArgument(
  1039. "Cannot call this method with empty attribute groups");
  1040. }
  1041. Status s;
  1042. for (const AttributeGroup& ag : attribute_groups) {
  1043. s = PutEntity(ag.column_family(), key, ag.columns());
  1044. if (!s.ok()) {
  1045. return s;
  1046. }
  1047. }
  1048. return s;
  1049. }
  1050. Status WriteBatchInternal::InsertNoop(WriteBatch* b) {
  1051. b->rep_.push_back(static_cast<char>(kTypeNoop));
  1052. return Status::OK();
  1053. }
  1054. ValueType WriteBatchInternal::GetBeginPrepareType(bool write_after_commit,
  1055. bool unprepared_batch) {
  1056. return write_after_commit
  1057. ? kTypeBeginPrepareXID
  1058. : (unprepared_batch ? kTypeBeginUnprepareXID
  1059. : kTypeBeginPersistedPrepareXID);
  1060. }
  1061. Status WriteBatchInternal::InsertBeginPrepare(WriteBatch* b,
  1062. bool write_after_commit,
  1063. bool unprepared_batch) {
  1064. b->rep_.push_back(static_cast<char>(
  1065. GetBeginPrepareType(write_after_commit, unprepared_batch)));
  1066. b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
  1067. ContentFlags::HAS_BEGIN_PREPARE,
  1068. std::memory_order_relaxed);
  1069. return Status::OK();
  1070. }
  1071. Status WriteBatchInternal::InsertEndPrepare(WriteBatch* b, const Slice& xid) {
  1072. b->rep_.push_back(static_cast<char>(kTypeEndPrepareXID));
  1073. PutLengthPrefixedSlice(&b->rep_, xid);
  1074. b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
  1075. ContentFlags::HAS_END_PREPARE,
  1076. std::memory_order_relaxed);
  1077. return Status::OK();
  1078. }
  1079. Status WriteBatchInternal::MarkEndPrepare(WriteBatch* b, const Slice& xid,
  1080. bool write_after_commit,
  1081. bool unprepared_batch) {
  1082. // a manually constructed batch can only contain one prepare section
  1083. assert(b->rep_[12] == static_cast<char>(kTypeNoop));
  1084. // all savepoints up to this point are cleared
  1085. if (b->save_points_ != nullptr) {
  1086. while (!b->save_points_->stack.empty()) {
  1087. b->save_points_->stack.pop();
  1088. }
  1089. }
  1090. // rewrite noop as begin marker
  1091. b->rep_[12] = static_cast<char>(
  1092. GetBeginPrepareType(write_after_commit, unprepared_batch));
  1093. b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
  1094. ContentFlags::HAS_BEGIN_PREPARE,
  1095. std::memory_order_relaxed);
  1096. if (unprepared_batch) {
  1097. b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
  1098. ContentFlags::HAS_BEGIN_UNPREPARE,
  1099. std::memory_order_relaxed);
  1100. }
  1101. return WriteBatchInternal::InsertEndPrepare(b, xid);
  1102. }
  1103. Status WriteBatchInternal::MarkCommit(WriteBatch* b, const Slice& xid) {
  1104. b->rep_.push_back(static_cast<char>(kTypeCommitXID));
  1105. PutLengthPrefixedSlice(&b->rep_, xid);
  1106. b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
  1107. ContentFlags::HAS_COMMIT,
  1108. std::memory_order_relaxed);
  1109. return Status::OK();
  1110. }
  1111. Status WriteBatchInternal::MarkCommitWithTimestamp(WriteBatch* b,
  1112. const Slice& xid,
  1113. const Slice& commit_ts) {
  1114. assert(!commit_ts.empty());
  1115. b->rep_.push_back(static_cast<char>(kTypeCommitXIDAndTimestamp));
  1116. PutLengthPrefixedSlice(&b->rep_, commit_ts);
  1117. PutLengthPrefixedSlice(&b->rep_, xid);
  1118. b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
  1119. ContentFlags::HAS_COMMIT,
  1120. std::memory_order_relaxed);
  1121. return Status::OK();
  1122. }
  1123. Status WriteBatchInternal::MarkRollback(WriteBatch* b, const Slice& xid) {
  1124. b->rep_.push_back(static_cast<char>(kTypeRollbackXID));
  1125. PutLengthPrefixedSlice(&b->rep_, xid);
  1126. b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
  1127. ContentFlags::HAS_ROLLBACK,
  1128. std::memory_order_relaxed);
  1129. return Status::OK();
  1130. }
  1131. Status WriteBatchInternal::Delete(WriteBatch* b, uint32_t column_family_id,
  1132. const Slice& key) {
  1133. LocalSavePoint save(b);
  1134. WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
  1135. if (column_family_id == 0) {
  1136. b->rep_.push_back(static_cast<char>(kTypeDeletion));
  1137. } else {
  1138. b->rep_.push_back(static_cast<char>(kTypeColumnFamilyDeletion));
  1139. PutVarint32(&b->rep_, column_family_id);
  1140. }
  1141. PutLengthPrefixedSlice(&b->rep_, key);
  1142. b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
  1143. ContentFlags::HAS_DELETE,
  1144. std::memory_order_relaxed);
  1145. if (b->prot_info_ != nullptr) {
  1146. // See comment in first `WriteBatchInternal::Put()` overload concerning the
  1147. // `ValueType` argument passed to `ProtectKVO()`.
  1148. b->prot_info_->entries_.emplace_back(
  1149. ProtectionInfo64()
  1150. .ProtectKVO(key, "" /* value */, kTypeDeletion)
  1151. .ProtectC(column_family_id));
  1152. }
  1153. return save.commit();
  1154. }
  1155. Status WriteBatch::Delete(ColumnFamilyHandle* column_family, const Slice& key) {
  1156. size_t ts_sz = 0;
  1157. uint32_t cf_id = 0;
  1158. Status s;
  1159. std::tie(s, cf_id, ts_sz) =
  1160. WriteBatchInternal::GetColumnFamilyIdAndTimestampSize(this,
  1161. column_family);
  1162. if (!s.ok()) {
  1163. return s;
  1164. }
  1165. if (0 == ts_sz) {
  1166. s = WriteBatchInternal::Delete(this, cf_id, key);
  1167. } else {
  1168. needs_in_place_update_ts_ = true;
  1169. has_key_with_ts_ = true;
  1170. std::string dummy_ts(ts_sz, '\0');
  1171. std::array<Slice, 2> key_with_ts{{key, dummy_ts}};
  1172. s = WriteBatchInternal::Delete(this, cf_id,
  1173. SliceParts(key_with_ts.data(), 2));
  1174. }
  1175. if (s.ok()) {
  1176. MaybeTrackTimestampSize(cf_id, ts_sz);
  1177. }
  1178. return s;
  1179. }
  1180. Status WriteBatch::Delete(ColumnFamilyHandle* column_family, const Slice& key,
  1181. const Slice& ts) {
  1182. Status s = CheckColumnFamilyTimestampSize(column_family, ts);
  1183. if (!s.ok()) {
  1184. return s;
  1185. }
  1186. assert(column_family);
  1187. has_key_with_ts_ = true;
  1188. uint32_t cf_id = column_family->GetID();
  1189. std::array<Slice, 2> key_with_ts{{key, ts}};
  1190. s = WriteBatchInternal::Delete(this, cf_id,
  1191. SliceParts(key_with_ts.data(), 2));
  1192. if (s.ok()) {
  1193. MaybeTrackTimestampSize(cf_id, ts.size());
  1194. }
  1195. return s;
  1196. }
  1197. Status WriteBatchInternal::Delete(WriteBatch* b, uint32_t column_family_id,
  1198. const SliceParts& key) {
  1199. LocalSavePoint save(b);
  1200. WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
  1201. if (column_family_id == 0) {
  1202. b->rep_.push_back(static_cast<char>(kTypeDeletion));
  1203. } else {
  1204. b->rep_.push_back(static_cast<char>(kTypeColumnFamilyDeletion));
  1205. PutVarint32(&b->rep_, column_family_id);
  1206. }
  1207. PutLengthPrefixedSliceParts(&b->rep_, key);
  1208. b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
  1209. ContentFlags::HAS_DELETE,
  1210. std::memory_order_relaxed);
  1211. if (b->prot_info_ != nullptr) {
  1212. // See comment in first `WriteBatchInternal::Put()` overload concerning the
  1213. // `ValueType` argument passed to `ProtectKVO()`.
  1214. b->prot_info_->entries_.emplace_back(
  1215. ProtectionInfo64()
  1216. .ProtectKVO(key,
  1217. SliceParts(nullptr /* _parts */, 0 /* _num_parts */),
  1218. kTypeDeletion)
  1219. .ProtectC(column_family_id));
  1220. }
  1221. return save.commit();
  1222. }
  1223. Status WriteBatch::Delete(ColumnFamilyHandle* column_family,
  1224. const SliceParts& key) {
  1225. size_t ts_sz = 0;
  1226. uint32_t cf_id = 0;
  1227. Status s;
  1228. std::tie(s, cf_id, ts_sz) =
  1229. WriteBatchInternal::GetColumnFamilyIdAndTimestampSize(this,
  1230. column_family);
  1231. if (!s.ok()) {
  1232. return s;
  1233. }
  1234. if (0 == ts_sz) {
  1235. s = WriteBatchInternal::Delete(this, cf_id, key);
  1236. if (s.ok()) {
  1237. MaybeTrackTimestampSize(cf_id, ts_sz);
  1238. }
  1239. return s;
  1240. }
  1241. return Status::InvalidArgument(
  1242. "Cannot call this method on column family enabling timestamp");
  1243. }
  1244. Status WriteBatchInternal::SingleDelete(WriteBatch* b,
  1245. uint32_t column_family_id,
  1246. const Slice& key) {
  1247. LocalSavePoint save(b);
  1248. WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
  1249. if (column_family_id == 0) {
  1250. b->rep_.push_back(static_cast<char>(kTypeSingleDeletion));
  1251. } else {
  1252. b->rep_.push_back(static_cast<char>(kTypeColumnFamilySingleDeletion));
  1253. PutVarint32(&b->rep_, column_family_id);
  1254. }
  1255. PutLengthPrefixedSlice(&b->rep_, key);
  1256. b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
  1257. ContentFlags::HAS_SINGLE_DELETE,
  1258. std::memory_order_relaxed);
  1259. if (b->prot_info_ != nullptr) {
  1260. // See comment in first `WriteBatchInternal::Put()` overload concerning the
  1261. // `ValueType` argument passed to `ProtectKVO()`.
  1262. b->prot_info_->entries_.emplace_back(
  1263. ProtectionInfo64()
  1264. .ProtectKVO(key, "" /* value */, kTypeSingleDeletion)
  1265. .ProtectC(column_family_id));
  1266. }
  1267. return save.commit();
  1268. }
  1269. Status WriteBatch::SingleDelete(ColumnFamilyHandle* column_family,
  1270. const Slice& key) {
  1271. size_t ts_sz = 0;
  1272. uint32_t cf_id = 0;
  1273. Status s;
  1274. std::tie(s, cf_id, ts_sz) =
  1275. WriteBatchInternal::GetColumnFamilyIdAndTimestampSize(this,
  1276. column_family);
  1277. if (!s.ok()) {
  1278. return s;
  1279. }
  1280. if (0 == ts_sz) {
  1281. s = WriteBatchInternal::SingleDelete(this, cf_id, key);
  1282. } else {
  1283. needs_in_place_update_ts_ = true;
  1284. has_key_with_ts_ = true;
  1285. std::string dummy_ts(ts_sz, '\0');
  1286. std::array<Slice, 2> key_with_ts{{key, dummy_ts}};
  1287. s = WriteBatchInternal::SingleDelete(this, cf_id,
  1288. SliceParts(key_with_ts.data(), 2));
  1289. }
  1290. if (s.ok()) {
  1291. MaybeTrackTimestampSize(cf_id, ts_sz);
  1292. }
  1293. return s;
  1294. }
  1295. Status WriteBatch::SingleDelete(ColumnFamilyHandle* column_family,
  1296. const Slice& key, const Slice& ts) {
  1297. Status s = CheckColumnFamilyTimestampSize(column_family, ts);
  1298. if (!s.ok()) {
  1299. return s;
  1300. }
  1301. has_key_with_ts_ = true;
  1302. assert(column_family);
  1303. uint32_t cf_id = column_family->GetID();
  1304. std::array<Slice, 2> key_with_ts{{key, ts}};
  1305. s = WriteBatchInternal::SingleDelete(this, cf_id,
  1306. SliceParts(key_with_ts.data(), 2));
  1307. if (s.ok()) {
  1308. MaybeTrackTimestampSize(cf_id, ts.size());
  1309. }
  1310. return s;
  1311. }
  1312. Status WriteBatchInternal::SingleDelete(WriteBatch* b,
  1313. uint32_t column_family_id,
  1314. const SliceParts& key) {
  1315. LocalSavePoint save(b);
  1316. WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
  1317. if (column_family_id == 0) {
  1318. b->rep_.push_back(static_cast<char>(kTypeSingleDeletion));
  1319. } else {
  1320. b->rep_.push_back(static_cast<char>(kTypeColumnFamilySingleDeletion));
  1321. PutVarint32(&b->rep_, column_family_id);
  1322. }
  1323. PutLengthPrefixedSliceParts(&b->rep_, key);
  1324. b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
  1325. ContentFlags::HAS_SINGLE_DELETE,
  1326. std::memory_order_relaxed);
  1327. if (b->prot_info_ != nullptr) {
  1328. // See comment in first `WriteBatchInternal::Put()` overload concerning the
  1329. // `ValueType` argument passed to `ProtectKVO()`.
  1330. b->prot_info_->entries_.emplace_back(
  1331. ProtectionInfo64()
  1332. .ProtectKVO(key,
  1333. SliceParts(nullptr /* _parts */,
  1334. 0 /* _num_parts */) /* value */,
  1335. kTypeSingleDeletion)
  1336. .ProtectC(column_family_id));
  1337. }
  1338. return save.commit();
  1339. }
  1340. Status WriteBatch::SingleDelete(ColumnFamilyHandle* column_family,
  1341. const SliceParts& key) {
  1342. size_t ts_sz = 0;
  1343. uint32_t cf_id = 0;
  1344. Status s;
  1345. std::tie(s, cf_id, ts_sz) =
  1346. WriteBatchInternal::GetColumnFamilyIdAndTimestampSize(this,
  1347. column_family);
  1348. if (!s.ok()) {
  1349. return s;
  1350. }
  1351. if (0 == ts_sz) {
  1352. s = WriteBatchInternal::SingleDelete(this, cf_id, key);
  1353. if (s.ok()) {
  1354. MaybeTrackTimestampSize(cf_id, ts_sz);
  1355. }
  1356. return s;
  1357. }
  1358. return Status::InvalidArgument(
  1359. "Cannot call this method on column family enabling timestamp");
  1360. }
  1361. Status WriteBatchInternal::DeleteRange(WriteBatch* b, uint32_t column_family_id,
  1362. const Slice& begin_key,
  1363. const Slice& end_key) {
  1364. LocalSavePoint save(b);
  1365. WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
  1366. if (column_family_id == 0) {
  1367. b->rep_.push_back(static_cast<char>(kTypeRangeDeletion));
  1368. } else {
  1369. b->rep_.push_back(static_cast<char>(kTypeColumnFamilyRangeDeletion));
  1370. PutVarint32(&b->rep_, column_family_id);
  1371. }
  1372. PutLengthPrefixedSlice(&b->rep_, begin_key);
  1373. PutLengthPrefixedSlice(&b->rep_, end_key);
  1374. b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
  1375. ContentFlags::HAS_DELETE_RANGE,
  1376. std::memory_order_relaxed);
  1377. if (b->prot_info_ != nullptr) {
  1378. // See comment in first `WriteBatchInternal::Put()` overload concerning the
  1379. // `ValueType` argument passed to `ProtectKVO()`.
  1380. // In `DeleteRange()`, the end key is treated as the value.
  1381. b->prot_info_->entries_.emplace_back(
  1382. ProtectionInfo64()
  1383. .ProtectKVO(begin_key, end_key, kTypeRangeDeletion)
  1384. .ProtectC(column_family_id));
  1385. }
  1386. return save.commit();
  1387. }
  1388. Status WriteBatch::DeleteRange(ColumnFamilyHandle* column_family,
  1389. const Slice& begin_key, const Slice& end_key) {
  1390. size_t ts_sz = 0;
  1391. uint32_t cf_id = 0;
  1392. Status s;
  1393. std::tie(s, cf_id, ts_sz) =
  1394. WriteBatchInternal::GetColumnFamilyIdAndTimestampSize(this,
  1395. column_family);
  1396. if (!s.ok()) {
  1397. return s;
  1398. }
  1399. if (0 == ts_sz) {
  1400. s = WriteBatchInternal::DeleteRange(this, cf_id, begin_key, end_key);
  1401. } else {
  1402. needs_in_place_update_ts_ = true;
  1403. has_key_with_ts_ = true;
  1404. std::string dummy_ts(ts_sz, '\0');
  1405. std::array<Slice, 2> begin_key_with_ts{{begin_key, dummy_ts}};
  1406. std::array<Slice, 2> end_key_with_ts{{end_key, dummy_ts}};
  1407. s = WriteBatchInternal::DeleteRange(this, cf_id,
  1408. SliceParts(begin_key_with_ts.data(), 2),
  1409. SliceParts(end_key_with_ts.data(), 2));
  1410. }
  1411. if (s.ok()) {
  1412. MaybeTrackTimestampSize(cf_id, ts_sz);
  1413. }
  1414. return s;
  1415. }
  1416. Status WriteBatch::DeleteRange(ColumnFamilyHandle* column_family,
  1417. const Slice& begin_key, const Slice& end_key,
  1418. const Slice& ts) {
  1419. Status s = CheckColumnFamilyTimestampSize(column_family, ts);
  1420. if (!s.ok()) {
  1421. return s;
  1422. }
  1423. assert(column_family);
  1424. has_key_with_ts_ = true;
  1425. uint32_t cf_id = column_family->GetID();
  1426. std::array<Slice, 2> key_with_ts{{begin_key, ts}};
  1427. std::array<Slice, 2> end_key_with_ts{{end_key, ts}};
  1428. s = WriteBatchInternal::DeleteRange(this, cf_id,
  1429. SliceParts(key_with_ts.data(), 2),
  1430. SliceParts(end_key_with_ts.data(), 2));
  1431. if (s.ok()) {
  1432. MaybeTrackTimestampSize(cf_id, ts.size());
  1433. }
  1434. return s;
  1435. }
  1436. Status WriteBatchInternal::DeleteRange(WriteBatch* b, uint32_t column_family_id,
  1437. const SliceParts& begin_key,
  1438. const SliceParts& end_key) {
  1439. LocalSavePoint save(b);
  1440. WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
  1441. if (column_family_id == 0) {
  1442. b->rep_.push_back(static_cast<char>(kTypeRangeDeletion));
  1443. } else {
  1444. b->rep_.push_back(static_cast<char>(kTypeColumnFamilyRangeDeletion));
  1445. PutVarint32(&b->rep_, column_family_id);
  1446. }
  1447. PutLengthPrefixedSliceParts(&b->rep_, begin_key);
  1448. PutLengthPrefixedSliceParts(&b->rep_, end_key);
  1449. b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
  1450. ContentFlags::HAS_DELETE_RANGE,
  1451. std::memory_order_relaxed);
  1452. if (b->prot_info_ != nullptr) {
  1453. // See comment in first `WriteBatchInternal::Put()` overload concerning the
  1454. // `ValueType` argument passed to `ProtectKVO()`.
  1455. // In `DeleteRange()`, the end key is treated as the value.
  1456. b->prot_info_->entries_.emplace_back(
  1457. ProtectionInfo64()
  1458. .ProtectKVO(begin_key, end_key, kTypeRangeDeletion)
  1459. .ProtectC(column_family_id));
  1460. }
  1461. return save.commit();
  1462. }
  1463. Status WriteBatch::DeleteRange(ColumnFamilyHandle* column_family,
  1464. const SliceParts& begin_key,
  1465. const SliceParts& end_key) {
  1466. size_t ts_sz = 0;
  1467. uint32_t cf_id = 0;
  1468. Status s;
  1469. std::tie(s, cf_id, ts_sz) =
  1470. WriteBatchInternal::GetColumnFamilyIdAndTimestampSize(this,
  1471. column_family);
  1472. if (!s.ok()) {
  1473. return s;
  1474. }
  1475. if (0 == ts_sz) {
  1476. s = WriteBatchInternal::DeleteRange(this, cf_id, begin_key, end_key);
  1477. if (s.ok()) {
  1478. MaybeTrackTimestampSize(cf_id, ts_sz);
  1479. }
  1480. return s;
  1481. }
  1482. return Status::InvalidArgument(
  1483. "Cannot call this method on column family enabling timestamp");
  1484. }
  1485. Status WriteBatchInternal::Merge(WriteBatch* b, uint32_t column_family_id,
  1486. const Slice& key, const Slice& value) {
  1487. if (key.size() > size_t{std::numeric_limits<uint32_t>::max()}) {
  1488. return Status::InvalidArgument("key is too large");
  1489. }
  1490. if (value.size() > size_t{std::numeric_limits<uint32_t>::max()}) {
  1491. return Status::InvalidArgument("value is too large");
  1492. }
  1493. LocalSavePoint save(b);
  1494. WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
  1495. if (column_family_id == 0) {
  1496. b->rep_.push_back(static_cast<char>(kTypeMerge));
  1497. } else {
  1498. b->rep_.push_back(static_cast<char>(kTypeColumnFamilyMerge));
  1499. PutVarint32(&b->rep_, column_family_id);
  1500. }
  1501. PutLengthPrefixedSlice(&b->rep_, key);
  1502. PutLengthPrefixedSlice(&b->rep_, value);
  1503. b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
  1504. ContentFlags::HAS_MERGE,
  1505. std::memory_order_relaxed);
  1506. if (b->prot_info_ != nullptr) {
  1507. // See comment in first `WriteBatchInternal::Put()` overload concerning the
  1508. // `ValueType` argument passed to `ProtectKVO()`.
  1509. b->prot_info_->entries_.emplace_back(ProtectionInfo64()
  1510. .ProtectKVO(key, value, kTypeMerge)
  1511. .ProtectC(column_family_id));
  1512. }
  1513. return save.commit();
  1514. }
  1515. Status WriteBatch::Merge(ColumnFamilyHandle* column_family, const Slice& key,
  1516. const Slice& value) {
  1517. size_t ts_sz = 0;
  1518. uint32_t cf_id = 0;
  1519. Status s;
  1520. std::tie(s, cf_id, ts_sz) =
  1521. WriteBatchInternal::GetColumnFamilyIdAndTimestampSize(this,
  1522. column_family);
  1523. if (!s.ok()) {
  1524. return s;
  1525. }
  1526. if (0 == ts_sz) {
  1527. s = WriteBatchInternal::Merge(this, cf_id, key, value);
  1528. } else {
  1529. needs_in_place_update_ts_ = true;
  1530. has_key_with_ts_ = true;
  1531. std::string dummy_ts(ts_sz, '\0');
  1532. std::array<Slice, 2> key_with_ts{{key, dummy_ts}};
  1533. s = WriteBatchInternal::Merge(
  1534. this, cf_id, SliceParts(key_with_ts.data(), 2), SliceParts(&value, 1));
  1535. }
  1536. if (s.ok()) {
  1537. MaybeTrackTimestampSize(cf_id, ts_sz);
  1538. }
  1539. return s;
  1540. }
  1541. Status WriteBatch::Merge(ColumnFamilyHandle* column_family, const Slice& key,
  1542. const Slice& ts, const Slice& value) {
  1543. Status s = CheckColumnFamilyTimestampSize(column_family, ts);
  1544. if (!s.ok()) {
  1545. return s;
  1546. }
  1547. has_key_with_ts_ = true;
  1548. assert(column_family);
  1549. uint32_t cf_id = column_family->GetID();
  1550. std::array<Slice, 2> key_with_ts{{key, ts}};
  1551. s = WriteBatchInternal::Merge(this, cf_id, SliceParts(key_with_ts.data(), 2),
  1552. SliceParts(&value, 1));
  1553. if (s.ok()) {
  1554. MaybeTrackTimestampSize(cf_id, ts.size());
  1555. }
  1556. return s;
  1557. }
  1558. Status WriteBatchInternal::Merge(WriteBatch* b, uint32_t column_family_id,
  1559. const SliceParts& key,
  1560. const SliceParts& value) {
  1561. Status s = CheckSlicePartsLength(key, value);
  1562. if (!s.ok()) {
  1563. return s;
  1564. }
  1565. LocalSavePoint save(b);
  1566. WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
  1567. if (column_family_id == 0) {
  1568. b->rep_.push_back(static_cast<char>(kTypeMerge));
  1569. } else {
  1570. b->rep_.push_back(static_cast<char>(kTypeColumnFamilyMerge));
  1571. PutVarint32(&b->rep_, column_family_id);
  1572. }
  1573. PutLengthPrefixedSliceParts(&b->rep_, key);
  1574. PutLengthPrefixedSliceParts(&b->rep_, value);
  1575. b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
  1576. ContentFlags::HAS_MERGE,
  1577. std::memory_order_relaxed);
  1578. if (b->prot_info_ != nullptr) {
  1579. // See comment in first `WriteBatchInternal::Put()` overload concerning the
  1580. // `ValueType` argument passed to `ProtectKVO()`.
  1581. b->prot_info_->entries_.emplace_back(ProtectionInfo64()
  1582. .ProtectKVO(key, value, kTypeMerge)
  1583. .ProtectC(column_family_id));
  1584. }
  1585. return save.commit();
  1586. }
  1587. Status WriteBatch::Merge(ColumnFamilyHandle* column_family,
  1588. const SliceParts& key, const SliceParts& value) {
  1589. size_t ts_sz = 0;
  1590. uint32_t cf_id = 0;
  1591. Status s;
  1592. std::tie(s, cf_id, ts_sz) =
  1593. WriteBatchInternal::GetColumnFamilyIdAndTimestampSize(this,
  1594. column_family);
  1595. if (!s.ok()) {
  1596. return s;
  1597. }
  1598. if (0 == ts_sz) {
  1599. s = WriteBatchInternal::Merge(this, cf_id, key, value);
  1600. if (s.ok()) {
  1601. MaybeTrackTimestampSize(cf_id, ts_sz);
  1602. }
  1603. return s;
  1604. }
  1605. return Status::InvalidArgument(
  1606. "Cannot call this method on column family enabling timestamp");
  1607. }
  1608. Status WriteBatchInternal::PutBlobIndex(WriteBatch* b,
  1609. uint32_t column_family_id,
  1610. const Slice& key, const Slice& value) {
  1611. LocalSavePoint save(b);
  1612. WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
  1613. if (column_family_id == 0) {
  1614. b->rep_.push_back(static_cast<char>(kTypeBlobIndex));
  1615. } else {
  1616. b->rep_.push_back(static_cast<char>(kTypeColumnFamilyBlobIndex));
  1617. PutVarint32(&b->rep_, column_family_id);
  1618. }
  1619. PutLengthPrefixedSlice(&b->rep_, key);
  1620. PutLengthPrefixedSlice(&b->rep_, value);
  1621. b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
  1622. ContentFlags::HAS_BLOB_INDEX,
  1623. std::memory_order_relaxed);
  1624. if (b->prot_info_ != nullptr) {
  1625. // See comment in first `WriteBatchInternal::Put()` overload concerning the
  1626. // `ValueType` argument passed to `ProtectKVO()`.
  1627. b->prot_info_->entries_.emplace_back(
  1628. ProtectionInfo64()
  1629. .ProtectKVO(key, value, kTypeBlobIndex)
  1630. .ProtectC(column_family_id));
  1631. }
  1632. return save.commit();
  1633. }
  1634. Status WriteBatch::PutLogData(const Slice& blob) {
  1635. LocalSavePoint save(this);
  1636. rep_.push_back(static_cast<char>(kTypeLogData));
  1637. PutLengthPrefixedSlice(&rep_, blob);
  1638. return save.commit();
  1639. }
  1640. void WriteBatch::SetSavePoint() {
  1641. if (save_points_ == nullptr) {
  1642. save_points_.reset(new SavePoints());
  1643. }
  1644. // Record length and count of current batch of writes.
  1645. save_points_->stack.push(SavePoint(
  1646. GetDataSize(), Count(), content_flags_.load(std::memory_order_relaxed)));
  1647. }
  1648. Status WriteBatch::RollbackToSavePoint() {
  1649. if (save_points_ == nullptr || save_points_->stack.size() == 0) {
  1650. return Status::NotFound();
  1651. }
  1652. // Pop the most recent savepoint off the stack
  1653. SavePoint savepoint = save_points_->stack.top();
  1654. save_points_->stack.pop();
  1655. assert(savepoint.size <= rep_.size());
  1656. assert(static_cast<uint32_t>(savepoint.count) <= Count());
  1657. if (savepoint.size == rep_.size()) {
  1658. // No changes to rollback
  1659. } else if (savepoint.size == 0) {
  1660. // Rollback everything
  1661. Clear();
  1662. } else {
  1663. rep_.resize(savepoint.size);
  1664. if (prot_info_ != nullptr) {
  1665. prot_info_->entries_.resize(savepoint.count);
  1666. }
  1667. WriteBatchInternal::SetCount(this, savepoint.count);
  1668. content_flags_.store(savepoint.content_flags, std::memory_order_relaxed);
  1669. }
  1670. return Status::OK();
  1671. }
  1672. Status WriteBatch::PopSavePoint() {
  1673. if (save_points_ == nullptr || save_points_->stack.size() == 0) {
  1674. return Status::NotFound();
  1675. }
  1676. // Pop the most recent savepoint off the stack
  1677. save_points_->stack.pop();
  1678. return Status::OK();
  1679. }
  1680. Status WriteBatch::UpdateTimestamps(
  1681. const Slice& ts, std::function<size_t(uint32_t)> ts_sz_func) {
  1682. TimestampUpdater<decltype(ts_sz_func)> ts_updater(prot_info_.get(),
  1683. std::move(ts_sz_func), ts);
  1684. const Status s = Iterate(&ts_updater);
  1685. if (s.ok()) {
  1686. needs_in_place_update_ts_ = false;
  1687. }
  1688. return s;
  1689. }
  1690. Status WriteBatch::VerifyChecksum() const {
  1691. if (prot_info_ == nullptr) {
  1692. return Status::OK();
  1693. }
  1694. Slice input(rep_.data() + WriteBatchInternal::kHeader,
  1695. rep_.size() - WriteBatchInternal::kHeader);
  1696. Slice key, value, blob, xid;
  1697. char tag = 0;
  1698. uint32_t column_family = 0; // default
  1699. Status s;
  1700. size_t prot_info_idx = 0;
  1701. bool checksum_protected = true;
  1702. while (!input.empty() && prot_info_idx < prot_info_->entries_.size()) {
  1703. // In case key/value/column_family are not updated by
  1704. // ReadRecordFromWriteBatch
  1705. key.clear();
  1706. value.clear();
  1707. s = ReadRecordFromWriteBatch(&input, &tag, &column_family, &key, &value,
  1708. &blob, &xid, /*write_unix_time=*/nullptr);
  1709. if (!s.ok()) {
  1710. return s;
  1711. }
  1712. checksum_protected = true;
  1713. // Write batch checksum uses op_type without ColumnFamily (e.g., if op_type
  1714. // in the write batch is kTypeColumnFamilyValue, kTypeValue is used to
  1715. // compute the checksum), and encodes column family id separately. See
  1716. // comment in first `WriteBatchInternal::Put()` for more detail.
  1717. switch (tag) {
  1718. case kTypeColumnFamilyValue:
  1719. case kTypeValue:
  1720. tag = kTypeValue;
  1721. break;
  1722. case kTypeColumnFamilyDeletion:
  1723. case kTypeDeletion:
  1724. tag = kTypeDeletion;
  1725. break;
  1726. case kTypeColumnFamilySingleDeletion:
  1727. case kTypeSingleDeletion:
  1728. tag = kTypeSingleDeletion;
  1729. break;
  1730. case kTypeColumnFamilyRangeDeletion:
  1731. case kTypeRangeDeletion:
  1732. tag = kTypeRangeDeletion;
  1733. break;
  1734. case kTypeColumnFamilyMerge:
  1735. case kTypeMerge:
  1736. tag = kTypeMerge;
  1737. break;
  1738. case kTypeColumnFamilyBlobIndex:
  1739. case kTypeBlobIndex:
  1740. tag = kTypeBlobIndex;
  1741. break;
  1742. case kTypeLogData:
  1743. case kTypeBeginPrepareXID:
  1744. case kTypeEndPrepareXID:
  1745. case kTypeCommitXID:
  1746. case kTypeRollbackXID:
  1747. case kTypeNoop:
  1748. case kTypeBeginPersistedPrepareXID:
  1749. case kTypeBeginUnprepareXID:
  1750. case kTypeDeletionWithTimestamp:
  1751. case kTypeCommitXIDAndTimestamp:
  1752. checksum_protected = false;
  1753. break;
  1754. case kTypeColumnFamilyWideColumnEntity:
  1755. case kTypeWideColumnEntity:
  1756. tag = kTypeWideColumnEntity;
  1757. break;
  1758. case kTypeColumnFamilyValuePreferredSeqno:
  1759. case kTypeValuePreferredSeqno:
  1760. tag = kTypeValuePreferredSeqno;
  1761. break;
  1762. default:
  1763. return Status::Corruption(
  1764. "unknown WriteBatch tag",
  1765. std::to_string(static_cast<unsigned int>(tag)));
  1766. }
  1767. if (checksum_protected) {
  1768. s = prot_info_->entries_[prot_info_idx++]
  1769. .StripC(column_family)
  1770. .StripKVO(key, value, static_cast<ValueType>(tag))
  1771. .GetStatus();
  1772. if (!s.ok()) {
  1773. return s;
  1774. }
  1775. }
  1776. }
  1777. if (prot_info_idx != WriteBatchInternal::Count(this)) {
  1778. return Status::Corruption("WriteBatch has wrong count");
  1779. }
  1780. assert(WriteBatchInternal::Count(this) == prot_info_->entries_.size());
  1781. return Status::OK();
  1782. }
  1783. namespace {
  1784. class MemTableInserter : public WriteBatch::Handler {
  1785. SequenceNumber sequence_;
  1786. ColumnFamilyMemTables* const cf_mems_;
  1787. FlushScheduler* const flush_scheduler_;
  1788. TrimHistoryScheduler* const trim_history_scheduler_;
  1789. const bool ignore_missing_column_families_;
  1790. const uint64_t recovering_log_number_;
  1791. // log number that all Memtables inserted into should reference
  1792. uint64_t log_number_ref_;
  1793. DBImpl* db_;
  1794. const bool concurrent_memtable_writes_;
  1795. bool post_info_created_;
  1796. const WriteBatch::ProtectionInfo* prot_info_;
  1797. size_t prot_info_idx_;
  1798. bool* has_valid_writes_;
  1799. // On some (!) platforms just default creating
  1800. // a map is too expensive in the Write() path as they
  1801. // cause memory allocations though unused.
  1802. // Make creation optional but do not incur
  1803. // std::unique_ptr additional allocation
  1804. using MemPostInfoMap = std::map<MemTable*, MemTablePostProcessInfo>;
  1805. using PostMapType = aligned_storage<MemPostInfoMap>::type;
  1806. PostMapType mem_post_info_map_;
  1807. // current recovered transaction we are rebuilding (recovery)
  1808. WriteBatch* rebuilding_trx_;
  1809. SequenceNumber rebuilding_trx_seq_;
  1810. // Increase seq number once per each write batch. Otherwise increase it once
  1811. // per key.
  1812. bool seq_per_batch_;
  1813. // Whether the memtable write will be done only after the commit
  1814. bool write_after_commit_;
  1815. // Whether memtable write can be done before prepare
  1816. bool write_before_prepare_;
  1817. // Whether this batch was unprepared or not
  1818. bool unprepared_batch_;
  1819. using DupDetector = aligned_storage<DuplicateDetector>::type;
  1820. DupDetector duplicate_detector_;
  1821. bool dup_dectector_on_;
  1822. bool hint_per_batch_;
  1823. bool hint_created_;
  1824. // Hints for this batch
  1825. using HintMap = std::unordered_map<MemTable*, void*>;
  1826. using HintMapType = aligned_storage<HintMap>::type;
  1827. HintMapType hint_;
  1828. HintMap& GetHintMap() {
  1829. assert(hint_per_batch_);
  1830. if (!hint_created_) {
  1831. new (&hint_) HintMap();
  1832. hint_created_ = true;
  1833. }
  1834. return *reinterpret_cast<HintMap*>(&hint_);
  1835. }
  1836. MemPostInfoMap& GetPostMap() {
  1837. assert(concurrent_memtable_writes_);
  1838. if (!post_info_created_) {
  1839. new (&mem_post_info_map_) MemPostInfoMap();
  1840. post_info_created_ = true;
  1841. }
  1842. return *reinterpret_cast<MemPostInfoMap*>(&mem_post_info_map_);
  1843. }
  1844. bool IsDuplicateKeySeq(uint32_t column_family_id, const Slice& key) {
  1845. assert(!write_after_commit_);
  1846. assert(rebuilding_trx_ != nullptr);
  1847. if (!dup_dectector_on_) {
  1848. new (&duplicate_detector_) DuplicateDetector(db_);
  1849. dup_dectector_on_ = true;
  1850. }
  1851. return reinterpret_cast<DuplicateDetector*>(&duplicate_detector_)
  1852. ->IsDuplicateKeySeq(column_family_id, key, sequence_);
  1853. }
  1854. const ProtectionInfoKVOC64* NextProtectionInfo() {
  1855. const ProtectionInfoKVOC64* res = nullptr;
  1856. if (prot_info_ != nullptr) {
  1857. assert(prot_info_idx_ < prot_info_->entries_.size());
  1858. res = &prot_info_->entries_[prot_info_idx_];
  1859. ++prot_info_idx_;
  1860. }
  1861. return res;
  1862. }
  1863. void DecrementProtectionInfoIdxForTryAgain() {
  1864. if (prot_info_ != nullptr) {
  1865. --prot_info_idx_;
  1866. }
  1867. }
  1868. void ResetProtectionInfo() {
  1869. prot_info_idx_ = 0;
  1870. prot_info_ = nullptr;
  1871. }
  1872. protected:
  1873. Handler::OptionState WriteBeforePrepare() const override {
  1874. return write_before_prepare_ ? Handler::OptionState::kEnabled
  1875. : Handler::OptionState::kDisabled;
  1876. }
  1877. Handler::OptionState WriteAfterCommit() const override {
  1878. return write_after_commit_ ? Handler::OptionState::kEnabled
  1879. : Handler::OptionState::kDisabled;
  1880. }
  1881. public:
  1882. // cf_mems should not be shared with concurrent inserters
  1883. MemTableInserter(SequenceNumber _sequence, ColumnFamilyMemTables* cf_mems,
  1884. FlushScheduler* flush_scheduler,
  1885. TrimHistoryScheduler* trim_history_scheduler,
  1886. bool ignore_missing_column_families,
  1887. uint64_t recovering_log_number, DB* db,
  1888. bool concurrent_memtable_writes,
  1889. const WriteBatch::ProtectionInfo* prot_info,
  1890. bool* has_valid_writes = nullptr, bool seq_per_batch = false,
  1891. bool batch_per_txn = true, bool hint_per_batch = false)
  1892. : sequence_(_sequence),
  1893. cf_mems_(cf_mems),
  1894. flush_scheduler_(flush_scheduler),
  1895. trim_history_scheduler_(trim_history_scheduler),
  1896. ignore_missing_column_families_(ignore_missing_column_families),
  1897. recovering_log_number_(recovering_log_number),
  1898. log_number_ref_(0),
  1899. db_(static_cast_with_check<DBImpl>(db)),
  1900. concurrent_memtable_writes_(concurrent_memtable_writes),
  1901. post_info_created_(false),
  1902. prot_info_(prot_info),
  1903. prot_info_idx_(0),
  1904. has_valid_writes_(has_valid_writes),
  1905. rebuilding_trx_(nullptr),
  1906. rebuilding_trx_seq_(0),
  1907. seq_per_batch_(seq_per_batch),
  1908. // Write after commit currently uses one seq per key (instead of per
  1909. // batch). So seq_per_batch being false indicates write_after_commit
  1910. // approach.
  1911. write_after_commit_(!seq_per_batch),
  1912. // WriteUnprepared can write WriteBatches per transaction, so
  1913. // batch_per_txn being false indicates write_before_prepare.
  1914. write_before_prepare_(!batch_per_txn),
  1915. unprepared_batch_(false),
  1916. duplicate_detector_(),
  1917. dup_dectector_on_(false),
  1918. hint_per_batch_(hint_per_batch),
  1919. hint_created_(false) {
  1920. assert(cf_mems_);
  1921. }
  1922. ~MemTableInserter() override {
  1923. if (dup_dectector_on_) {
  1924. reinterpret_cast<DuplicateDetector*>(&duplicate_detector_)
  1925. ->~DuplicateDetector();
  1926. }
  1927. if (post_info_created_) {
  1928. reinterpret_cast<MemPostInfoMap*>(&mem_post_info_map_)->~MemPostInfoMap();
  1929. }
  1930. if (hint_created_) {
  1931. for (auto iter : GetHintMap()) {
  1932. delete[] reinterpret_cast<char*>(iter.second);
  1933. }
  1934. reinterpret_cast<HintMap*>(&hint_)->~HintMap();
  1935. }
  1936. delete rebuilding_trx_;
  1937. }
  1938. MemTableInserter(const MemTableInserter&) = delete;
  1939. MemTableInserter& operator=(const MemTableInserter&) = delete;
  1940. // The batch seq is regularly restarted; In normal mode it is set when
  1941. // MemTableInserter is constructed in the write thread and in recovery mode it
  1942. // is set when a batch, which is tagged with seq, is read from the WAL.
  1943. // Within a sequenced batch, which could be a merge of multiple batches, we
  1944. // have two policies to advance the seq: i) seq_per_key (default) and ii)
  1945. // seq_per_batch. To implement the latter we need to mark the boundary between
  1946. // the individual batches. The approach is this: 1) Use the terminating
  1947. // markers to indicate the boundary (kTypeEndPrepareXID, kTypeCommitXID,
  1948. // kTypeRollbackXID) 2) Terminate a batch with kTypeNoop in the absence of a
  1949. // natural boundary marker.
  1950. void MaybeAdvanceSeq(bool batch_boundry = false) {
  1951. if (batch_boundry == seq_per_batch_) {
  1952. sequence_++;
  1953. }
  1954. }
  1955. void set_log_number_ref(uint64_t log) { log_number_ref_ = log; }
  1956. void set_prot_info(const WriteBatch::ProtectionInfo* prot_info) {
  1957. prot_info_ = prot_info;
  1958. prot_info_idx_ = 0;
  1959. }
  1960. SequenceNumber sequence() const { return sequence_; }
  1961. void PostProcess() {
  1962. assert(concurrent_memtable_writes_);
  1963. // If post info was not created there is nothing
  1964. // to process and no need to create on demand
  1965. if (post_info_created_) {
  1966. for (auto& pair : GetPostMap()) {
  1967. pair.first->BatchPostProcess(pair.second);
  1968. }
  1969. }
  1970. }
  1971. bool SeekToColumnFamily(uint32_t column_family_id, Status* s) {
  1972. // If we are in a concurrent mode, it is the caller's responsibility
  1973. // to clone the original ColumnFamilyMemTables so that each thread
  1974. // has its own instance. Otherwise, it must be guaranteed that there
  1975. // is no concurrent access
  1976. bool found = cf_mems_->Seek(column_family_id);
  1977. if (!found) {
  1978. if (ignore_missing_column_families_) {
  1979. *s = Status::OK();
  1980. } else {
  1981. *s = Status::InvalidArgument(
  1982. "Invalid column family specified in write batch");
  1983. }
  1984. return false;
  1985. }
  1986. auto* current = cf_mems_->current();
  1987. if (current && current->ioptions().disallow_memtable_writes) {
  1988. *s = Status::InvalidArgument(
  1989. "This column family has disallow_memtable_writes=true");
  1990. return false;
  1991. }
  1992. if (recovering_log_number_ != 0 &&
  1993. recovering_log_number_ < cf_mems_->GetLogNumber()) {
  1994. // This is true only in recovery environment (recovering_log_number_ is
  1995. // always 0 in
  1996. // non-recovery, regular write code-path)
  1997. // * If recovering_log_number_ < cf_mems_->GetLogNumber(), this means that
  1998. // column family already contains updates from this log. We can't apply
  1999. // updates twice because of update-in-place or merge workloads -- ignore
  2000. // the update
  2001. *s = Status::OK();
  2002. return false;
  2003. }
  2004. if (has_valid_writes_ != nullptr) {
  2005. *has_valid_writes_ = true;
  2006. }
  2007. if (log_number_ref_ > 0) {
  2008. cf_mems_->GetMemTable()->RefLogContainingPrepSection(log_number_ref_);
  2009. }
  2010. return true;
  2011. }
  2012. template <typename RebuildTxnOp>
  2013. Status PutCFImpl(uint32_t column_family_id, const Slice& key,
  2014. const Slice& value, ValueType value_type,
  2015. RebuildTxnOp rebuild_txn_op,
  2016. const ProtectionInfoKVOS64* kv_prot_info) {
  2017. // optimize for non-recovery mode
  2018. if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) {
  2019. // TODO(ajkr): propagate `ProtectionInfoKVOS64`.
  2020. return rebuild_txn_op(rebuilding_trx_, column_family_id, key, value);
  2021. // else insert the values to the memtable right away
  2022. }
  2023. Status ret_status;
  2024. if (UNLIKELY(!SeekToColumnFamily(column_family_id, &ret_status))) {
  2025. if (ret_status.ok() && rebuilding_trx_ != nullptr) {
  2026. assert(!write_after_commit_);
  2027. // The CF is probably flushed and hence no need for insert but we still
  2028. // need to keep track of the keys for upcoming rollback/commit.
  2029. // TODO(ajkr): propagate `ProtectionInfoKVOS64`.
  2030. ret_status =
  2031. rebuild_txn_op(rebuilding_trx_, column_family_id, key, value);
  2032. if (ret_status.ok()) {
  2033. MaybeAdvanceSeq(IsDuplicateKeySeq(column_family_id, key));
  2034. }
  2035. } else if (ret_status.ok()) {
  2036. MaybeAdvanceSeq(false /* batch_boundary */);
  2037. }
  2038. return ret_status;
  2039. }
  2040. assert(ret_status.ok());
  2041. MemTable* mem = cf_mems_->GetMemTable();
  2042. auto* moptions = mem->GetImmutableMemTableOptions();
  2043. // inplace_update_support is inconsistent with snapshots, and therefore with
  2044. // any kind of transactions including the ones that use seq_per_batch
  2045. assert(!seq_per_batch_ || !moptions->inplace_update_support);
  2046. if (!moptions->inplace_update_support) {
  2047. ret_status =
  2048. mem->Add(sequence_, value_type, key, value, kv_prot_info,
  2049. concurrent_memtable_writes_, get_post_process_info(mem),
  2050. hint_per_batch_ ? &GetHintMap()[mem] : nullptr);
  2051. } else if (moptions->inplace_callback == nullptr ||
  2052. value_type != kTypeValue) {
  2053. assert(!concurrent_memtable_writes_);
  2054. ret_status = mem->Update(sequence_, value_type, key, value, kv_prot_info);
  2055. } else {
  2056. assert(!concurrent_memtable_writes_);
  2057. assert(value_type == kTypeValue);
  2058. ret_status = mem->UpdateCallback(sequence_, key, value, kv_prot_info);
  2059. if (ret_status.IsNotFound()) {
  2060. // key not found in memtable. Do sst get, update, add
  2061. SnapshotImpl read_from_snapshot;
  2062. read_from_snapshot.number_ = sequence_;
  2063. // TODO: plumb Env::IOActivity, Env::IOPriority
  2064. ReadOptions ropts;
  2065. // it's going to be overwritten for sure, so no point caching data block
  2066. // containing the old version
  2067. ropts.fill_cache = false;
  2068. ropts.snapshot = &read_from_snapshot;
  2069. std::string prev_value;
  2070. std::string merged_value;
  2071. auto cf_handle = cf_mems_->GetColumnFamilyHandle();
  2072. Status get_status = Status::NotSupported();
  2073. if (db_ != nullptr && recovering_log_number_ == 0) {
  2074. if (cf_handle == nullptr) {
  2075. cf_handle = db_->DefaultColumnFamily();
  2076. }
  2077. // TODO (yanqin): fix when user-defined timestamp is enabled.
  2078. get_status = db_->Get(ropts, cf_handle, key, &prev_value);
  2079. }
  2080. // Intentionally overwrites the `NotFound` in `ret_status`.
  2081. if (!get_status.ok() && !get_status.IsNotFound()) {
  2082. ret_status = get_status;
  2083. } else {
  2084. ret_status = Status::OK();
  2085. }
  2086. if (ret_status.ok()) {
  2087. UpdateStatus update_status;
  2088. char* prev_buffer = const_cast<char*>(prev_value.c_str());
  2089. uint32_t prev_size = static_cast<uint32_t>(prev_value.size());
  2090. if (get_status.ok()) {
  2091. update_status = moptions->inplace_callback(prev_buffer, &prev_size,
  2092. value, &merged_value);
  2093. } else {
  2094. update_status = moptions->inplace_callback(
  2095. nullptr /* existing_value */, nullptr /* existing_value_size */,
  2096. value, &merged_value);
  2097. }
  2098. if (update_status == UpdateStatus::UPDATED_INPLACE) {
  2099. assert(get_status.ok());
  2100. if (kv_prot_info != nullptr) {
  2101. ProtectionInfoKVOS64 updated_kv_prot_info(*kv_prot_info);
  2102. updated_kv_prot_info.UpdateV(value,
  2103. Slice(prev_buffer, prev_size));
  2104. // prev_value is updated in-place with final value.
  2105. ret_status = mem->Add(sequence_, value_type, key,
  2106. Slice(prev_buffer, prev_size),
  2107. &updated_kv_prot_info);
  2108. } else {
  2109. ret_status = mem->Add(sequence_, value_type, key,
  2110. Slice(prev_buffer, prev_size),
  2111. nullptr /* kv_prot_info */);
  2112. }
  2113. if (ret_status.ok()) {
  2114. RecordTick(moptions->statistics, NUMBER_KEYS_WRITTEN);
  2115. }
  2116. } else if (update_status == UpdateStatus::UPDATED) {
  2117. if (kv_prot_info != nullptr) {
  2118. ProtectionInfoKVOS64 updated_kv_prot_info(*kv_prot_info);
  2119. updated_kv_prot_info.UpdateV(value, merged_value);
  2120. // merged_value contains the final value.
  2121. ret_status = mem->Add(sequence_, value_type, key,
  2122. Slice(merged_value), &updated_kv_prot_info);
  2123. } else {
  2124. // merged_value contains the final value.
  2125. ret_status =
  2126. mem->Add(sequence_, value_type, key, Slice(merged_value),
  2127. nullptr /* kv_prot_info */);
  2128. }
  2129. if (ret_status.ok()) {
  2130. RecordTick(moptions->statistics, NUMBER_KEYS_WRITTEN);
  2131. }
  2132. }
  2133. }
  2134. }
  2135. }
  2136. if (UNLIKELY(ret_status.IsTryAgain())) {
  2137. assert(seq_per_batch_);
  2138. const bool kBatchBoundary = true;
  2139. MaybeAdvanceSeq(kBatchBoundary);
  2140. } else if (ret_status.ok()) {
  2141. MaybeAdvanceSeq();
  2142. CheckMemtableFull();
  2143. }
  2144. // optimize for non-recovery mode
  2145. // If `ret_status` is `TryAgain` then the next (successful) try will add
  2146. // the key to the rebuilding transaction object. If `ret_status` is
  2147. // another non-OK `Status`, then the `rebuilding_trx_` will be thrown
  2148. // away. So we only need to add to it when `ret_status.ok()`.
  2149. if (UNLIKELY(ret_status.ok() && rebuilding_trx_ != nullptr)) {
  2150. assert(!write_after_commit_);
  2151. // TODO(ajkr): propagate `ProtectionInfoKVOS64`.
  2152. ret_status =
  2153. rebuild_txn_op(rebuilding_trx_, column_family_id, key, value);
  2154. }
  2155. return ret_status;
  2156. }
  2157. Status PutCF(uint32_t column_family_id, const Slice& key,
  2158. const Slice& value) override {
  2159. const auto* kv_prot_info = NextProtectionInfo();
  2160. Status ret_status;
  2161. auto rebuild_txn_op = [](WriteBatch* rebuilding_trx, uint32_t cf_id,
  2162. const Slice& k, const Slice& v) -> Status {
  2163. return WriteBatchInternal::Put(rebuilding_trx, cf_id, k, v);
  2164. };
  2165. if (kv_prot_info != nullptr) {
  2166. // Memtable needs seqno, doesn't need CF ID
  2167. auto mem_kv_prot_info =
  2168. kv_prot_info->StripC(column_family_id).ProtectS(sequence_);
  2169. ret_status = PutCFImpl(column_family_id, key, value, kTypeValue,
  2170. rebuild_txn_op, &mem_kv_prot_info);
  2171. } else {
  2172. ret_status = PutCFImpl(column_family_id, key, value, kTypeValue,
  2173. rebuild_txn_op, nullptr /* kv_prot_info */);
  2174. }
  2175. // TODO: this assumes that if TryAgain status is returned to the caller,
  2176. // the operation is actually tried again. The proper way to do this is to
  2177. // pass a `try_again` parameter to the operation itself and decrement
  2178. // prot_info_idx_ based on that
  2179. if (UNLIKELY(ret_status.IsTryAgain())) {
  2180. DecrementProtectionInfoIdxForTryAgain();
  2181. }
  2182. return ret_status;
  2183. }
  2184. Status TimedPutCF(uint32_t column_family_id, const Slice& key,
  2185. const Slice& value, uint64_t unix_write_time) override {
  2186. const auto* kv_prot_info = NextProtectionInfo();
  2187. Status ret_status;
  2188. std::string value_buf;
  2189. Slice packed_value =
  2190. PackValueAndWriteTime(value, unix_write_time, &value_buf);
  2191. auto rebuild_txn_op = [](WriteBatch* /* rebuilding_trx */,
  2192. uint32_t /* cf_id */, const Slice& /* k */,
  2193. const Slice& /* v */) -> Status {
  2194. return Status::NotSupported();
  2195. };
  2196. if (kv_prot_info != nullptr) {
  2197. auto mem_kv_prot_info =
  2198. kv_prot_info->StripC(column_family_id).ProtectS(sequence_);
  2199. ret_status = PutCFImpl(column_family_id, key, packed_value,
  2200. kTypeValuePreferredSeqno, rebuild_txn_op,
  2201. &mem_kv_prot_info);
  2202. } else {
  2203. ret_status = PutCFImpl(column_family_id, key, packed_value,
  2204. kTypeValuePreferredSeqno, rebuild_txn_op,
  2205. nullptr /* kv_prot_info */);
  2206. }
  2207. // TODO: this assumes that if TryAgain status is returned to the caller,
  2208. // The operation is actually tried again. The proper way to do this is to
  2209. // pass a `try_again` parameter to the operation itself and decrement
  2210. // prot_info_idx_ based on that.
  2211. if (UNLIKELY(ret_status.IsTryAgain())) {
  2212. DecrementProtectionInfoIdxForTryAgain();
  2213. }
  2214. return ret_status;
  2215. }
  2216. Status PutEntityCF(uint32_t column_family_id, const Slice& key,
  2217. const Slice& value) override {
  2218. const auto* kv_prot_info = NextProtectionInfo();
  2219. Status s;
  2220. auto rebuild_txn_op = [](WriteBatch* rebuilding_trx, uint32_t cf_id,
  2221. const Slice& k, Slice entity) -> Status {
  2222. WideColumns columns;
  2223. const Status st = WideColumnSerialization::Deserialize(entity, columns);
  2224. if (!st.ok()) {
  2225. return st;
  2226. }
  2227. return WriteBatchInternal::PutEntity(rebuilding_trx, cf_id, k, columns);
  2228. };
  2229. if (kv_prot_info) {
  2230. // Memtable needs seqno, doesn't need CF ID
  2231. auto mem_kv_prot_info =
  2232. kv_prot_info->StripC(column_family_id).ProtectS(sequence_);
  2233. s = PutCFImpl(column_family_id, key, value, kTypeWideColumnEntity,
  2234. rebuild_txn_op, &mem_kv_prot_info);
  2235. } else {
  2236. s = PutCFImpl(column_family_id, key, value, kTypeWideColumnEntity,
  2237. rebuild_txn_op,
  2238. /* kv_prot_info */ nullptr);
  2239. }
  2240. if (UNLIKELY(s.IsTryAgain())) {
  2241. DecrementProtectionInfoIdxForTryAgain();
  2242. }
  2243. return s;
  2244. }
  2245. Status DeleteImpl(uint32_t /*column_family_id*/, const Slice& key,
  2246. const Slice& value, ValueType delete_type,
  2247. const ProtectionInfoKVOS64* kv_prot_info) {
  2248. Status ret_status;
  2249. MemTable* mem = cf_mems_->GetMemTable();
  2250. ret_status =
  2251. mem->Add(sequence_, delete_type, key, value, kv_prot_info,
  2252. concurrent_memtable_writes_, get_post_process_info(mem),
  2253. hint_per_batch_ ? &GetHintMap()[mem] : nullptr);
  2254. if (UNLIKELY(ret_status.IsTryAgain())) {
  2255. assert(seq_per_batch_);
  2256. const bool kBatchBoundary = true;
  2257. MaybeAdvanceSeq(kBatchBoundary);
  2258. } else if (ret_status.ok()) {
  2259. MaybeAdvanceSeq();
  2260. CheckMemtableFull();
  2261. }
  2262. return ret_status;
  2263. }
  2264. Status DeleteCF(uint32_t column_family_id, const Slice& key) override {
  2265. const auto* kv_prot_info = NextProtectionInfo();
  2266. // optimize for non-recovery mode
  2267. if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) {
  2268. // TODO(ajkr): propagate `ProtectionInfoKVOS64`.
  2269. return WriteBatchInternal::Delete(rebuilding_trx_, column_family_id, key);
  2270. // else insert the values to the memtable right away
  2271. }
  2272. Status ret_status;
  2273. if (UNLIKELY(!SeekToColumnFamily(column_family_id, &ret_status))) {
  2274. if (ret_status.ok() && rebuilding_trx_ != nullptr) {
  2275. assert(!write_after_commit_);
  2276. // The CF is probably flushed and hence no need for insert but we still
  2277. // need to keep track of the keys for upcoming rollback/commit.
  2278. // TODO(ajkr): propagate `ProtectionInfoKVOS64`.
  2279. ret_status =
  2280. WriteBatchInternal::Delete(rebuilding_trx_, column_family_id, key);
  2281. if (ret_status.ok()) {
  2282. MaybeAdvanceSeq(IsDuplicateKeySeq(column_family_id, key));
  2283. }
  2284. } else if (ret_status.ok()) {
  2285. MaybeAdvanceSeq(false /* batch_boundary */);
  2286. }
  2287. if (UNLIKELY(ret_status.IsTryAgain())) {
  2288. DecrementProtectionInfoIdxForTryAgain();
  2289. }
  2290. return ret_status;
  2291. }
  2292. ColumnFamilyData* cfd = cf_mems_->current();
  2293. assert(!cfd || cfd->user_comparator());
  2294. const size_t ts_sz = (cfd && cfd->user_comparator())
  2295. ? cfd->user_comparator()->timestamp_size()
  2296. : 0;
  2297. const ValueType delete_type =
  2298. (0 == ts_sz) ? kTypeDeletion : kTypeDeletionWithTimestamp;
  2299. if (kv_prot_info != nullptr) {
  2300. auto mem_kv_prot_info =
  2301. kv_prot_info->StripC(column_family_id).ProtectS(sequence_);
  2302. mem_kv_prot_info.UpdateO(kTypeDeletion, delete_type);
  2303. ret_status = DeleteImpl(column_family_id, key, Slice(), delete_type,
  2304. &mem_kv_prot_info);
  2305. } else {
  2306. ret_status = DeleteImpl(column_family_id, key, Slice(), delete_type,
  2307. nullptr /* kv_prot_info */);
  2308. }
  2309. // optimize for non-recovery mode
  2310. // If `ret_status` is `TryAgain` then the next (successful) try will add
  2311. // the key to the rebuilding transaction object. If `ret_status` is
  2312. // another non-OK `Status`, then the `rebuilding_trx_` will be thrown
  2313. // away. So we only need to add to it when `ret_status.ok()`.
  2314. if (UNLIKELY(ret_status.ok() && rebuilding_trx_ != nullptr)) {
  2315. assert(!write_after_commit_);
  2316. // TODO(ajkr): propagate `ProtectionInfoKVOS64`.
  2317. ret_status =
  2318. WriteBatchInternal::Delete(rebuilding_trx_, column_family_id, key);
  2319. }
  2320. if (UNLIKELY(ret_status.IsTryAgain())) {
  2321. DecrementProtectionInfoIdxForTryAgain();
  2322. }
  2323. return ret_status;
  2324. }
  2325. Status SingleDeleteCF(uint32_t column_family_id, const Slice& key) override {
  2326. const auto* kv_prot_info = NextProtectionInfo();
  2327. // optimize for non-recovery mode
  2328. if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) {
  2329. // TODO(ajkr): propagate `ProtectionInfoKVOS64`.
  2330. return WriteBatchInternal::SingleDelete(rebuilding_trx_, column_family_id,
  2331. key);
  2332. // else insert the values to the memtable right away
  2333. }
  2334. Status ret_status;
  2335. if (UNLIKELY(!SeekToColumnFamily(column_family_id, &ret_status))) {
  2336. if (ret_status.ok() && rebuilding_trx_ != nullptr) {
  2337. assert(!write_after_commit_);
  2338. // The CF is probably flushed and hence no need for insert but we still
  2339. // need to keep track of the keys for upcoming rollback/commit.
  2340. // TODO(ajkr): propagate `ProtectionInfoKVOS64`.
  2341. ret_status = WriteBatchInternal::SingleDelete(rebuilding_trx_,
  2342. column_family_id, key);
  2343. if (ret_status.ok()) {
  2344. MaybeAdvanceSeq(IsDuplicateKeySeq(column_family_id, key));
  2345. }
  2346. } else if (ret_status.ok()) {
  2347. MaybeAdvanceSeq(false /* batch_boundary */);
  2348. }
  2349. if (UNLIKELY(ret_status.IsTryAgain())) {
  2350. DecrementProtectionInfoIdxForTryAgain();
  2351. }
  2352. return ret_status;
  2353. }
  2354. assert(ret_status.ok());
  2355. if (kv_prot_info != nullptr) {
  2356. auto mem_kv_prot_info =
  2357. kv_prot_info->StripC(column_family_id).ProtectS(sequence_);
  2358. ret_status = DeleteImpl(column_family_id, key, Slice(),
  2359. kTypeSingleDeletion, &mem_kv_prot_info);
  2360. } else {
  2361. ret_status = DeleteImpl(column_family_id, key, Slice(),
  2362. kTypeSingleDeletion, nullptr /* kv_prot_info */);
  2363. }
  2364. // optimize for non-recovery mode
  2365. // If `ret_status` is `TryAgain` then the next (successful) try will add
  2366. // the key to the rebuilding transaction object. If `ret_status` is
  2367. // another non-OK `Status`, then the `rebuilding_trx_` will be thrown
  2368. // away. So we only need to add to it when `ret_status.ok()`.
  2369. if (UNLIKELY(ret_status.ok() && rebuilding_trx_ != nullptr)) {
  2370. assert(!write_after_commit_);
  2371. // TODO(ajkr): propagate `ProtectionInfoKVOS64`.
  2372. ret_status = WriteBatchInternal::SingleDelete(rebuilding_trx_,
  2373. column_family_id, key);
  2374. }
  2375. if (UNLIKELY(ret_status.IsTryAgain())) {
  2376. DecrementProtectionInfoIdxForTryAgain();
  2377. }
  2378. return ret_status;
  2379. }
  2380. Status DeleteRangeCF(uint32_t column_family_id, const Slice& begin_key,
  2381. const Slice& end_key) override {
  2382. const auto* kv_prot_info = NextProtectionInfo();
  2383. // optimize for non-recovery mode
  2384. if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) {
  2385. // TODO(ajkr): propagate `ProtectionInfoKVOS64`.
  2386. return WriteBatchInternal::DeleteRange(rebuilding_trx_, column_family_id,
  2387. begin_key, end_key);
  2388. // else insert the values to the memtable right away
  2389. }
  2390. Status ret_status;
  2391. if (UNLIKELY(!SeekToColumnFamily(column_family_id, &ret_status))) {
  2392. if (ret_status.ok() && rebuilding_trx_ != nullptr) {
  2393. assert(!write_after_commit_);
  2394. // The CF is probably flushed and hence no need for insert but we still
  2395. // need to keep track of the keys for upcoming rollback/commit.
  2396. // TODO(ajkr): propagate `ProtectionInfoKVOS64`.
  2397. ret_status = WriteBatchInternal::DeleteRange(
  2398. rebuilding_trx_, column_family_id, begin_key, end_key);
  2399. if (ret_status.ok()) {
  2400. MaybeAdvanceSeq(IsDuplicateKeySeq(column_family_id, begin_key));
  2401. }
  2402. } else if (ret_status.ok()) {
  2403. MaybeAdvanceSeq(false /* batch_boundary */);
  2404. }
  2405. if (UNLIKELY(ret_status.IsTryAgain())) {
  2406. DecrementProtectionInfoIdxForTryAgain();
  2407. }
  2408. return ret_status;
  2409. }
  2410. assert(ret_status.ok());
  2411. if (db_ != nullptr) {
  2412. auto cf_handle = cf_mems_->GetColumnFamilyHandle();
  2413. if (cf_handle == nullptr) {
  2414. cf_handle = db_->DefaultColumnFamily();
  2415. }
  2416. auto* cfd =
  2417. static_cast_with_check<ColumnFamilyHandleImpl>(cf_handle)->cfd();
  2418. if (!cfd->is_delete_range_supported()) {
  2419. // TODO(ajkr): refactor `SeekToColumnFamily()` so it returns a `Status`.
  2420. ret_status.PermitUncheckedError();
  2421. return Status::NotSupported(
  2422. std::string("CF " + cfd->GetName() +
  2423. " reports it does not support DeleteRange"));
  2424. }
  2425. int cmp =
  2426. cfd->user_comparator()->CompareWithoutTimestamp(begin_key, end_key);
  2427. if (cmp > 0) {
  2428. // TODO(ajkr): refactor `SeekToColumnFamily()` so it returns a `Status`.
  2429. ret_status.PermitUncheckedError();
  2430. // It's an empty range where endpoints appear mistaken. Don't bother
  2431. // applying it to the DB, and return an error to the user.
  2432. return Status::InvalidArgument("end key comes before start key");
  2433. } else if (cmp == 0) {
  2434. // TODO(ajkr): refactor `SeekToColumnFamily()` so it returns a `Status`.
  2435. ret_status.PermitUncheckedError();
  2436. // It's an empty range. Don't bother applying it to the DB.
  2437. return Status::OK();
  2438. }
  2439. }
  2440. if (kv_prot_info != nullptr) {
  2441. auto mem_kv_prot_info =
  2442. kv_prot_info->StripC(column_family_id).ProtectS(sequence_);
  2443. ret_status = DeleteImpl(column_family_id, begin_key, end_key,
  2444. kTypeRangeDeletion, &mem_kv_prot_info);
  2445. } else {
  2446. ret_status = DeleteImpl(column_family_id, begin_key, end_key,
  2447. kTypeRangeDeletion, nullptr /* kv_prot_info */);
  2448. }
  2449. // optimize for non-recovery mode
  2450. // If `ret_status` is `TryAgain` then the next (successful) try will add
  2451. // the key to the rebuilding transaction object. If `ret_status` is
  2452. // another non-OK `Status`, then the `rebuilding_trx_` will be thrown
  2453. // away. So we only need to add to it when `ret_status.ok()`.
  2454. if (UNLIKELY(!ret_status.IsTryAgain() && rebuilding_trx_ != nullptr)) {
  2455. assert(!write_after_commit_);
  2456. // TODO(ajkr): propagate `ProtectionInfoKVOS64`.
  2457. ret_status = WriteBatchInternal::DeleteRange(
  2458. rebuilding_trx_, column_family_id, begin_key, end_key);
  2459. }
  2460. if (UNLIKELY(ret_status.IsTryAgain())) {
  2461. DecrementProtectionInfoIdxForTryAgain();
  2462. }
  2463. return ret_status;
  2464. }
  2465. Status MergeCF(uint32_t column_family_id, const Slice& key,
  2466. const Slice& value) override {
  2467. const auto* kv_prot_info = NextProtectionInfo();
  2468. // optimize for non-recovery mode
  2469. if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) {
  2470. // TODO(ajkr): propagate `ProtectionInfoKVOS64`.
  2471. return WriteBatchInternal::Merge(rebuilding_trx_, column_family_id, key,
  2472. value);
  2473. // else insert the values to the memtable right away
  2474. }
  2475. Status ret_status;
  2476. if (UNLIKELY(!SeekToColumnFamily(column_family_id, &ret_status))) {
  2477. if (ret_status.ok() && rebuilding_trx_ != nullptr) {
  2478. assert(!write_after_commit_);
  2479. // The CF is probably flushed and hence no need for insert but we still
  2480. // need to keep track of the keys for upcoming rollback/commit.
  2481. // TODO(ajkr): propagate `ProtectionInfoKVOS64`.
  2482. ret_status = WriteBatchInternal::Merge(rebuilding_trx_,
  2483. column_family_id, key, value);
  2484. if (ret_status.ok()) {
  2485. MaybeAdvanceSeq(IsDuplicateKeySeq(column_family_id, key));
  2486. }
  2487. } else if (ret_status.ok()) {
  2488. MaybeAdvanceSeq(false /* batch_boundary */);
  2489. }
  2490. if (UNLIKELY(ret_status.IsTryAgain())) {
  2491. DecrementProtectionInfoIdxForTryAgain();
  2492. }
  2493. return ret_status;
  2494. }
  2495. assert(ret_status.ok());
  2496. MemTable* mem = cf_mems_->GetMemTable();
  2497. auto* moptions = mem->GetImmutableMemTableOptions();
  2498. if (moptions->merge_operator == nullptr) {
  2499. return Status::InvalidArgument(
  2500. "Merge requires `ColumnFamilyOptions::merge_operator != nullptr`");
  2501. }
  2502. bool perform_merge = false;
  2503. assert(!concurrent_memtable_writes_ ||
  2504. moptions->max_successive_merges == 0);
  2505. // If we pass DB through and options.max_successive_merges is hit
  2506. // during recovery, Get() will be issued which will try to acquire
  2507. // DB mutex and cause deadlock, as DB mutex is already held.
  2508. // So we disable merge in recovery
  2509. if (moptions->max_successive_merges > 0 && db_ != nullptr &&
  2510. recovering_log_number_ == 0) {
  2511. assert(!concurrent_memtable_writes_);
  2512. LookupKey lkey(key, sequence_);
  2513. // Count the number of successive merges at the head
  2514. // of the key in the memtable. Limit the count to the threshold for
  2515. // triggering merge to prevent unnecessary counting overhead.
  2516. size_t num_merges = mem->CountSuccessiveMergeEntries(
  2517. lkey, moptions->max_successive_merges /* limit */);
  2518. if (num_merges >= moptions->max_successive_merges) {
  2519. perform_merge = true;
  2520. }
  2521. }
  2522. if (perform_merge) {
  2523. // 1) Get the existing value. Use the wide column APIs to make sure we
  2524. // don't lose any columns in the process.
  2525. PinnableWideColumns existing;
  2526. // Pass in the sequence number so that we also include previous merge
  2527. // operations in the same batch.
  2528. SnapshotImpl read_from_snapshot;
  2529. read_from_snapshot.number_ = sequence_;
  2530. // TODO: plumb Env::IOActivity, Env::IOPriority
  2531. ReadOptions read_options;
  2532. if (!moptions->strict_max_successive_merges) {
  2533. // Blocking the write path with read I/O is typically unacceptable, so
  2534. // only do this merge when the operands are all found in memory.
  2535. read_options.read_tier = kBlockCacheTier;
  2536. }
  2537. read_options.snapshot = &read_from_snapshot;
  2538. auto cf_handle = cf_mems_->GetColumnFamilyHandle();
  2539. if (cf_handle == nullptr) {
  2540. cf_handle = db_->DefaultColumnFamily();
  2541. }
  2542. Status get_status =
  2543. db_->GetEntity(read_options, cf_handle, key, &existing);
  2544. if (!get_status.ok()) {
  2545. // Failed to read a key we know exists. Store the delta in memtable.
  2546. perform_merge = false;
  2547. } else {
  2548. // 2) Apply this merge
  2549. auto merge_operator = moptions->merge_operator;
  2550. assert(merge_operator);
  2551. const auto& columns = existing.columns();
  2552. Status merge_status;
  2553. std::string new_value;
  2554. ValueType new_value_type;
  2555. if (WideColumnsHelper::HasDefaultColumnOnly(columns)) {
  2556. // `op_failure_scope` (an output parameter) is not provided (set to
  2557. // nullptr) since a failure must be propagated regardless of its
  2558. // value.
  2559. merge_status = MergeHelper::TimedFullMerge(
  2560. merge_operator, key, MergeHelper::kPlainBaseValue,
  2561. WideColumnsHelper::GetDefaultColumn(columns), {value},
  2562. moptions->info_log, moptions->statistics,
  2563. SystemClock::Default().get(),
  2564. /* update_num_ops_stats */ false, /* op_failure_scope */ nullptr,
  2565. &new_value, /* result_operand */ nullptr, &new_value_type);
  2566. } else {
  2567. // `op_failure_scope` (an output parameter) is not provided (set to
  2568. // nullptr) since a failure must be propagated regardless of its
  2569. // value.
  2570. merge_status = MergeHelper::TimedFullMerge(
  2571. merge_operator, key, MergeHelper::kWideBaseValue, columns,
  2572. {value}, moptions->info_log, moptions->statistics,
  2573. SystemClock::Default().get(),
  2574. /* update_num_ops_stats */ false, /* op_failure_scope */ nullptr,
  2575. &new_value, /* result_operand */ nullptr, &new_value_type);
  2576. }
  2577. if (!merge_status.ok()) {
  2578. // Failed to merge!
  2579. // Store the delta in memtable
  2580. perform_merge = false;
  2581. } else {
  2582. // 3) Add value to memtable
  2583. assert(!concurrent_memtable_writes_);
  2584. assert(new_value_type == kTypeValue ||
  2585. new_value_type == kTypeWideColumnEntity);
  2586. if (kv_prot_info != nullptr) {
  2587. auto merged_kv_prot_info =
  2588. kv_prot_info->StripC(column_family_id).ProtectS(sequence_);
  2589. merged_kv_prot_info.UpdateV(value, new_value);
  2590. merged_kv_prot_info.UpdateO(kTypeMerge, new_value_type);
  2591. ret_status = mem->Add(sequence_, new_value_type, key, new_value,
  2592. &merged_kv_prot_info);
  2593. } else {
  2594. ret_status = mem->Add(sequence_, new_value_type, key, new_value,
  2595. nullptr /* kv_prot_info */);
  2596. }
  2597. }
  2598. }
  2599. }
  2600. if (!perform_merge) {
  2601. assert(ret_status.ok());
  2602. // Add merge operand to memtable
  2603. if (kv_prot_info != nullptr) {
  2604. auto mem_kv_prot_info =
  2605. kv_prot_info->StripC(column_family_id).ProtectS(sequence_);
  2606. ret_status =
  2607. mem->Add(sequence_, kTypeMerge, key, value, &mem_kv_prot_info,
  2608. concurrent_memtable_writes_, get_post_process_info(mem));
  2609. } else {
  2610. ret_status = mem->Add(
  2611. sequence_, kTypeMerge, key, value, nullptr /* kv_prot_info */,
  2612. concurrent_memtable_writes_, get_post_process_info(mem));
  2613. }
  2614. }
  2615. if (UNLIKELY(ret_status.IsTryAgain())) {
  2616. assert(seq_per_batch_);
  2617. const bool kBatchBoundary = true;
  2618. MaybeAdvanceSeq(kBatchBoundary);
  2619. } else if (ret_status.ok()) {
  2620. MaybeAdvanceSeq();
  2621. CheckMemtableFull();
  2622. }
  2623. // optimize for non-recovery mode
  2624. // If `ret_status` is `TryAgain` then the next (successful) try will add
  2625. // the key to the rebuilding transaction object. If `ret_status` is
  2626. // another non-OK `Status`, then the `rebuilding_trx_` will be thrown
  2627. // away. So we only need to add to it when `ret_status.ok()`.
  2628. if (UNLIKELY(ret_status.ok() && rebuilding_trx_ != nullptr)) {
  2629. assert(!write_after_commit_);
  2630. // TODO(ajkr): propagate `ProtectionInfoKVOS64`.
  2631. ret_status = WriteBatchInternal::Merge(rebuilding_trx_, column_family_id,
  2632. key, value);
  2633. }
  2634. if (UNLIKELY(ret_status.IsTryAgain())) {
  2635. DecrementProtectionInfoIdxForTryAgain();
  2636. }
  2637. return ret_status;
  2638. }
  2639. Status PutBlobIndexCF(uint32_t column_family_id, const Slice& key,
  2640. const Slice& value) override {
  2641. const auto* kv_prot_info = NextProtectionInfo();
  2642. Status ret_status;
  2643. auto rebuild_txn_op = [](WriteBatch* /* rebuilding_trx */,
  2644. uint32_t /* cf_id */, const Slice& /* k */,
  2645. const Slice& /* v */) -> Status {
  2646. return Status::NotSupported();
  2647. };
  2648. if (kv_prot_info != nullptr) {
  2649. // Memtable needs seqno, doesn't need CF ID
  2650. auto mem_kv_prot_info =
  2651. kv_prot_info->StripC(column_family_id).ProtectS(sequence_);
  2652. // Same as PutCF except for value type.
  2653. ret_status = PutCFImpl(column_family_id, key, value, kTypeBlobIndex,
  2654. rebuild_txn_op, &mem_kv_prot_info);
  2655. } else {
  2656. ret_status = PutCFImpl(column_family_id, key, value, kTypeBlobIndex,
  2657. rebuild_txn_op, nullptr /* kv_prot_info */);
  2658. }
  2659. if (UNLIKELY(ret_status.IsTryAgain())) {
  2660. DecrementProtectionInfoIdxForTryAgain();
  2661. }
  2662. return ret_status;
  2663. }
  2664. void CheckMemtableFull() {
  2665. if (flush_scheduler_ != nullptr) {
  2666. auto* cfd = cf_mems_->current();
  2667. assert(cfd != nullptr);
  2668. if (cfd->mem()->ShouldScheduleFlush() &&
  2669. cfd->mem()->MarkFlushScheduled()) {
  2670. // MarkFlushScheduled only returns true if we are the one that
  2671. // should take action, so no need to dedup further
  2672. flush_scheduler_->ScheduleWork(cfd);
  2673. }
  2674. }
  2675. // check if memtable_list size exceeds max_write_buffer_size_to_maintain
  2676. if (trim_history_scheduler_ != nullptr) {
  2677. auto* cfd = cf_mems_->current();
  2678. assert(cfd);
  2679. const size_t size_to_maintain = static_cast<size_t>(
  2680. cfd->ioptions().max_write_buffer_size_to_maintain);
  2681. if (size_to_maintain > 0) {
  2682. MemTableList* const imm = cfd->imm();
  2683. assert(imm);
  2684. if (imm->HasHistory()) {
  2685. const MemTable* const mem = cfd->mem();
  2686. assert(mem);
  2687. if (mem->MemoryAllocatedBytes() +
  2688. imm->MemoryAllocatedBytesExcludingLast() >=
  2689. size_to_maintain &&
  2690. imm->MarkTrimHistoryNeeded()) {
  2691. trim_history_scheduler_->ScheduleWork(cfd);
  2692. }
  2693. }
  2694. }
  2695. }
  2696. }
  2697. // The write batch handler calls MarkBeginPrepare with unprepare set to true
  2698. // if it encounters the kTypeBeginUnprepareXID marker.
  2699. Status MarkBeginPrepare(bool unprepare) override {
  2700. assert(rebuilding_trx_ == nullptr);
  2701. assert(db_);
  2702. if (recovering_log_number_ != 0) {
  2703. db_->mutex()->AssertHeld();
  2704. // during recovery we rebuild a hollow transaction
  2705. // from all encountered prepare sections of the wal
  2706. if (db_->allow_2pc() == false) {
  2707. return Status::NotSupported(
  2708. "WAL contains prepared transactions. Open with "
  2709. "TransactionDB::Open().");
  2710. }
  2711. // we are now iterating through a prepared section
  2712. rebuilding_trx_ = new WriteBatch();
  2713. rebuilding_trx_seq_ = sequence_;
  2714. // Verify that we have matching MarkBeginPrepare/MarkEndPrepare markers.
  2715. // unprepared_batch_ should be false because it is false by default, and
  2716. // gets reset to false in MarkEndPrepare.
  2717. assert(!unprepared_batch_);
  2718. unprepared_batch_ = unprepare;
  2719. if (has_valid_writes_ != nullptr) {
  2720. *has_valid_writes_ = true;
  2721. }
  2722. }
  2723. return Status::OK();
  2724. }
  2725. Status MarkEndPrepare(const Slice& name) override {
  2726. assert(db_);
  2727. assert((rebuilding_trx_ != nullptr) == (recovering_log_number_ != 0));
  2728. if (recovering_log_number_ != 0) {
  2729. db_->mutex()->AssertHeld();
  2730. assert(db_->allow_2pc());
  2731. size_t batch_cnt =
  2732. write_after_commit_
  2733. ? 0 // 0 will disable further checks
  2734. : static_cast<size_t>(sequence_ - rebuilding_trx_seq_ + 1);
  2735. db_->InsertRecoveredTransaction(recovering_log_number_, name.ToString(),
  2736. rebuilding_trx_, rebuilding_trx_seq_,
  2737. batch_cnt, unprepared_batch_);
  2738. unprepared_batch_ = false;
  2739. rebuilding_trx_ = nullptr;
  2740. } else {
  2741. assert(rebuilding_trx_ == nullptr);
  2742. }
  2743. const bool batch_boundry = true;
  2744. MaybeAdvanceSeq(batch_boundry);
  2745. return Status::OK();
  2746. }
  2747. Status MarkNoop(bool empty_batch) override {
  2748. if (recovering_log_number_ != 0) {
  2749. db_->mutex()->AssertHeld();
  2750. }
  2751. // A hack in pessimistic transaction could result into a noop at the start
  2752. // of the write batch, that should be ignored.
  2753. if (!empty_batch) {
  2754. // In the absence of Prepare markers, a kTypeNoop tag indicates the end of
  2755. // a batch. This happens when write batch commits skipping the prepare
  2756. // phase.
  2757. const bool batch_boundry = true;
  2758. MaybeAdvanceSeq(batch_boundry);
  2759. }
  2760. return Status::OK();
  2761. }
  2762. Status MarkCommit(const Slice& name) override {
  2763. assert(db_);
  2764. Status s;
  2765. if (recovering_log_number_ != 0) {
  2766. // We must hold db mutex in recovery.
  2767. db_->mutex()->AssertHeld();
  2768. // in recovery when we encounter a commit marker
  2769. // we lookup this transaction in our set of rebuilt transactions
  2770. // and commit.
  2771. auto trx = db_->GetRecoveredTransaction(name.ToString());
  2772. // the log containing the prepared section may have
  2773. // been released in the last incarnation because the
  2774. // data was flushed to L0
  2775. if (trx != nullptr) {
  2776. // at this point individual CF lognumbers will prevent
  2777. // duplicate re-insertion of values.
  2778. assert(log_number_ref_ == 0);
  2779. if (write_after_commit_) {
  2780. // write_after_commit_ can only have one batch in trx.
  2781. assert(trx->batches_.size() == 1);
  2782. const auto& batch_info = trx->batches_.begin()->second;
  2783. // all inserts must reference this trx log number
  2784. log_number_ref_ = batch_info.log_number_;
  2785. ResetProtectionInfo();
  2786. s = batch_info.batch_->Iterate(this);
  2787. log_number_ref_ = 0;
  2788. }
  2789. // else the values are already inserted before the commit
  2790. if (s.ok()) {
  2791. db_->DeleteRecoveredTransaction(name.ToString());
  2792. }
  2793. if (has_valid_writes_ != nullptr) {
  2794. *has_valid_writes_ = true;
  2795. }
  2796. }
  2797. } else {
  2798. // When writes are not delayed until commit, there is no disconnect
  2799. // between a memtable write and the WAL that supports it. So the commit
  2800. // need not reference any log as the only log to which it depends.
  2801. assert(!write_after_commit_ || log_number_ref_ > 0);
  2802. }
  2803. const bool batch_boundry = true;
  2804. MaybeAdvanceSeq(batch_boundry);
  2805. if (UNLIKELY(s.IsTryAgain())) {
  2806. DecrementProtectionInfoIdxForTryAgain();
  2807. }
  2808. return s;
  2809. }
  2810. Status MarkCommitWithTimestamp(const Slice& name,
  2811. const Slice& commit_ts) override {
  2812. assert(db_);
  2813. Status s;
  2814. if (recovering_log_number_ != 0) {
  2815. // In recovery, db mutex must be held.
  2816. db_->mutex()->AssertHeld();
  2817. // in recovery when we encounter a commit marker
  2818. // we lookup this transaction in our set of rebuilt transactions
  2819. // and commit.
  2820. auto trx = db_->GetRecoveredTransaction(name.ToString());
  2821. // the log containing the prepared section may have
  2822. // been released in the last incarnation because the
  2823. // data was flushed to L0
  2824. if (trx) {
  2825. // at this point individual CF lognumbers will prevent
  2826. // duplicate re-insertion of values.
  2827. assert(0 == log_number_ref_);
  2828. if (write_after_commit_) {
  2829. // write_after_commit_ can only have one batch in trx.
  2830. assert(trx->batches_.size() == 1);
  2831. const auto& batch_info = trx->batches_.begin()->second;
  2832. // all inserts must reference this trx log number
  2833. log_number_ref_ = batch_info.log_number_;
  2834. s = batch_info.batch_->UpdateTimestamps(
  2835. commit_ts, [this](uint32_t cf) {
  2836. assert(db_);
  2837. VersionSet* const vset = db_->GetVersionSet();
  2838. assert(vset);
  2839. ColumnFamilySet* const cf_set = vset->GetColumnFamilySet();
  2840. assert(cf_set);
  2841. ColumnFamilyData* cfd = cf_set->GetColumnFamily(cf);
  2842. assert(cfd);
  2843. const auto* const ucmp = cfd->user_comparator();
  2844. assert(ucmp);
  2845. return ucmp->timestamp_size();
  2846. });
  2847. if (s.ok()) {
  2848. ResetProtectionInfo();
  2849. s = batch_info.batch_->Iterate(this);
  2850. log_number_ref_ = 0;
  2851. }
  2852. }
  2853. // else the values are already inserted before the commit
  2854. if (s.ok()) {
  2855. db_->DeleteRecoveredTransaction(name.ToString());
  2856. }
  2857. if (has_valid_writes_) {
  2858. *has_valid_writes_ = true;
  2859. }
  2860. }
  2861. } else {
  2862. // When writes are not delayed until commit, there is no connection
  2863. // between a memtable write and the WAL that supports it. So the commit
  2864. // need not reference any log as the only log to which it depends.
  2865. assert(!write_after_commit_ || log_number_ref_ > 0);
  2866. }
  2867. constexpr bool batch_boundary = true;
  2868. MaybeAdvanceSeq(batch_boundary);
  2869. if (UNLIKELY(s.IsTryAgain())) {
  2870. DecrementProtectionInfoIdxForTryAgain();
  2871. }
  2872. return s;
  2873. }
  2874. Status MarkRollback(const Slice& name) override {
  2875. assert(db_);
  2876. if (recovering_log_number_ != 0) {
  2877. auto trx = db_->GetRecoveredTransaction(name.ToString());
  2878. // the log containing the transactions prep section
  2879. // may have been released in the previous incarnation
  2880. // because we knew it had been rolled back
  2881. if (trx != nullptr) {
  2882. db_->DeleteRecoveredTransaction(name.ToString());
  2883. }
  2884. } else {
  2885. // in non recovery we simply ignore this tag
  2886. }
  2887. const bool batch_boundry = true;
  2888. MaybeAdvanceSeq(batch_boundry);
  2889. return Status::OK();
  2890. }
  2891. private:
  2892. MemTablePostProcessInfo* get_post_process_info(MemTable* mem) {
  2893. if (!concurrent_memtable_writes_) {
  2894. // No need to batch counters locally if we don't use concurrent mode.
  2895. return nullptr;
  2896. }
  2897. return &GetPostMap()[mem];
  2898. }
  2899. };
  2900. } // anonymous namespace
  2901. // This function can only be called in these conditions:
  2902. // 1) During Recovery()
  2903. // 2) During Write(), in a single-threaded write thread
  2904. // 3) During Write(), in a concurrent context where memtables has been cloned
  2905. // The reason is that it calls memtables->Seek(), which has a stateful cache
  2906. Status WriteBatchInternal::InsertInto(
  2907. WriteThread::WriteGroup& write_group, SequenceNumber sequence,
  2908. ColumnFamilyMemTables* memtables, FlushScheduler* flush_scheduler,
  2909. TrimHistoryScheduler* trim_history_scheduler,
  2910. bool ignore_missing_column_families, uint64_t recovery_log_number, DB* db,
  2911. bool seq_per_batch, bool batch_per_txn) {
  2912. MemTableInserter inserter(
  2913. sequence, memtables, flush_scheduler, trim_history_scheduler,
  2914. ignore_missing_column_families, recovery_log_number, db,
  2915. /*concurrent_memtable_writes=*/false, nullptr /* prot_info */,
  2916. nullptr /*has_valid_writes*/, seq_per_batch, batch_per_txn);
  2917. for (auto w : write_group) {
  2918. if (w->CallbackFailed()) {
  2919. continue;
  2920. }
  2921. w->sequence = inserter.sequence();
  2922. if (!w->ShouldWriteToMemtable()) {
  2923. // In seq_per_batch_ mode this advances the seq by one.
  2924. inserter.MaybeAdvanceSeq(true);
  2925. continue;
  2926. }
  2927. SetSequence(w->batch, inserter.sequence());
  2928. inserter.set_log_number_ref(w->log_ref);
  2929. inserter.set_prot_info(w->batch->prot_info_.get());
  2930. w->status = w->batch->Iterate(&inserter);
  2931. if (!w->status.ok()) {
  2932. return w->status;
  2933. }
  2934. assert(!seq_per_batch || w->batch_cnt != 0);
  2935. assert(!seq_per_batch || inserter.sequence() - w->sequence == w->batch_cnt);
  2936. }
  2937. return Status::OK();
  2938. }
  2939. Status WriteBatchInternal::InsertInto(
  2940. WriteThread::Writer* writer, SequenceNumber sequence,
  2941. ColumnFamilyMemTables* memtables, FlushScheduler* flush_scheduler,
  2942. TrimHistoryScheduler* trim_history_scheduler,
  2943. bool ignore_missing_column_families, uint64_t log_number, DB* db,
  2944. bool concurrent_memtable_writes, bool seq_per_batch, size_t batch_cnt,
  2945. bool batch_per_txn, bool hint_per_batch) {
  2946. #ifdef NDEBUG
  2947. (void)batch_cnt;
  2948. #endif
  2949. assert(writer->ShouldWriteToMemtable());
  2950. MemTableInserter inserter(sequence, memtables, flush_scheduler,
  2951. trim_history_scheduler,
  2952. ignore_missing_column_families, log_number, db,
  2953. concurrent_memtable_writes, nullptr /* prot_info */,
  2954. nullptr /*has_valid_writes*/, seq_per_batch,
  2955. batch_per_txn, hint_per_batch);
  2956. SetSequence(writer->batch, sequence);
  2957. inserter.set_log_number_ref(writer->log_ref);
  2958. inserter.set_prot_info(writer->batch->prot_info_.get());
  2959. Status s = writer->batch->Iterate(&inserter);
  2960. assert(!seq_per_batch || batch_cnt != 0);
  2961. assert(!seq_per_batch || inserter.sequence() - sequence == batch_cnt);
  2962. if (concurrent_memtable_writes) {
  2963. inserter.PostProcess();
  2964. }
  2965. return s;
  2966. }
  2967. Status WriteBatchInternal::InsertInto(
  2968. const WriteBatch* batch, ColumnFamilyMemTables* memtables,
  2969. FlushScheduler* flush_scheduler,
  2970. TrimHistoryScheduler* trim_history_scheduler,
  2971. bool ignore_missing_column_families, uint64_t log_number, DB* db,
  2972. bool concurrent_memtable_writes, SequenceNumber* next_seq,
  2973. bool* has_valid_writes, bool seq_per_batch, bool batch_per_txn) {
  2974. MemTableInserter inserter(Sequence(batch), memtables, flush_scheduler,
  2975. trim_history_scheduler,
  2976. ignore_missing_column_families, log_number, db,
  2977. concurrent_memtable_writes, batch->prot_info_.get(),
  2978. has_valid_writes, seq_per_batch, batch_per_txn);
  2979. Status s = batch->Iterate(&inserter);
  2980. if (next_seq != nullptr) {
  2981. *next_seq = inserter.sequence();
  2982. }
  2983. if (concurrent_memtable_writes) {
  2984. inserter.PostProcess();
  2985. }
  2986. return s;
  2987. }
  2988. namespace {
  2989. // This class updates protection info for a WriteBatch.
  2990. class ProtectionInfoUpdater : public WriteBatch::Handler {
  2991. public:
  2992. explicit ProtectionInfoUpdater(WriteBatch::ProtectionInfo* prot_info)
  2993. : prot_info_(prot_info) {}
  2994. ~ProtectionInfoUpdater() override = default;
  2995. Status PutCF(uint32_t cf, const Slice& key, const Slice& val) override {
  2996. return UpdateProtInfo(cf, key, val, kTypeValue);
  2997. }
  2998. Status TimedPutCF(uint32_t cf, const Slice& key, const Slice& val,
  2999. uint64_t unix_write_time) override {
  3000. std::string encoded_write_time;
  3001. PutFixed64(&encoded_write_time, unix_write_time);
  3002. std::array<Slice, 2> value_with_time{{val, encoded_write_time}};
  3003. SliceParts packed_value(value_with_time.data(), 2);
  3004. return UpdateProtInfo(cf, key, packed_value, kTypeValuePreferredSeqno);
  3005. }
  3006. Status PutEntityCF(uint32_t cf, const Slice& key,
  3007. const Slice& entity) override {
  3008. return UpdateProtInfo(cf, key, entity, kTypeWideColumnEntity);
  3009. }
  3010. Status DeleteCF(uint32_t cf, const Slice& key) override {
  3011. return UpdateProtInfo(cf, key, "", kTypeDeletion);
  3012. }
  3013. Status SingleDeleteCF(uint32_t cf, const Slice& key) override {
  3014. return UpdateProtInfo(cf, key, "", kTypeSingleDeletion);
  3015. }
  3016. Status DeleteRangeCF(uint32_t cf, const Slice& begin_key,
  3017. const Slice& end_key) override {
  3018. return UpdateProtInfo(cf, begin_key, end_key, kTypeRangeDeletion);
  3019. }
  3020. Status MergeCF(uint32_t cf, const Slice& key, const Slice& val) override {
  3021. return UpdateProtInfo(cf, key, val, kTypeMerge);
  3022. }
  3023. Status PutBlobIndexCF(uint32_t cf, const Slice& key,
  3024. const Slice& val) override {
  3025. return UpdateProtInfo(cf, key, val, kTypeBlobIndex);
  3026. }
  3027. Status MarkBeginPrepare(bool /* unprepare */) override {
  3028. return Status::OK();
  3029. }
  3030. Status MarkEndPrepare(const Slice& /* xid */) override {
  3031. return Status::OK();
  3032. }
  3033. Status MarkCommit(const Slice& /* xid */) override { return Status::OK(); }
  3034. Status MarkCommitWithTimestamp(const Slice& /* xid */,
  3035. const Slice& /* ts */) override {
  3036. return Status::OK();
  3037. }
  3038. Status MarkRollback(const Slice& /* xid */) override { return Status::OK(); }
  3039. Status MarkNoop(bool /* empty_batch */) override { return Status::OK(); }
  3040. private:
  3041. Status UpdateProtInfo(uint32_t cf, const Slice& key, const Slice& val,
  3042. const ValueType op_type) {
  3043. if (prot_info_) {
  3044. prot_info_->entries_.emplace_back(
  3045. ProtectionInfo64().ProtectKVO(key, val, op_type).ProtectC(cf));
  3046. }
  3047. return Status::OK();
  3048. }
  3049. Status UpdateProtInfo(uint32_t cf, const Slice& key, const SliceParts& val,
  3050. const ValueType op_type) {
  3051. if (prot_info_) {
  3052. prot_info_->entries_.emplace_back(
  3053. ProtectionInfo64()
  3054. .ProtectKVO(SliceParts(&key, 1), val, op_type)
  3055. .ProtectC(cf));
  3056. }
  3057. return Status::OK();
  3058. }
  3059. // No copy or move.
  3060. ProtectionInfoUpdater(const ProtectionInfoUpdater&) = delete;
  3061. ProtectionInfoUpdater(ProtectionInfoUpdater&&) = delete;
  3062. ProtectionInfoUpdater& operator=(const ProtectionInfoUpdater&) = delete;
  3063. ProtectionInfoUpdater& operator=(ProtectionInfoUpdater&&) = delete;
  3064. WriteBatch::ProtectionInfo* const prot_info_ = nullptr;
  3065. };
  3066. } // anonymous namespace
  3067. Status WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) {
  3068. assert(contents.size() >= WriteBatchInternal::kHeader);
  3069. assert(b->prot_info_ == nullptr);
  3070. b->rep_.assign(contents.data(), contents.size());
  3071. b->content_flags_.store(ContentFlags::DEFERRED, std::memory_order_relaxed);
  3072. return Status::OK();
  3073. }
  3074. Status WriteBatchInternal::Append(WriteBatch* dst, const WriteBatch* src,
  3075. const bool wal_only) {
  3076. assert(dst->Count() == 0 ||
  3077. (dst->prot_info_ == nullptr) == (src->prot_info_ == nullptr));
  3078. if ((src->prot_info_ != nullptr &&
  3079. src->prot_info_->entries_.size() != src->Count()) ||
  3080. (dst->prot_info_ != nullptr &&
  3081. dst->prot_info_->entries_.size() != dst->Count())) {
  3082. return Status::Corruption(
  3083. "Write batch has inconsistent count and number of checksums");
  3084. }
  3085. size_t src_len;
  3086. int src_count;
  3087. uint32_t src_flags;
  3088. const SavePoint& batch_end = src->GetWalTerminationPoint();
  3089. if (wal_only && !batch_end.is_cleared()) {
  3090. src_len = batch_end.size - WriteBatchInternal::kHeader;
  3091. src_count = batch_end.count;
  3092. src_flags = batch_end.content_flags;
  3093. } else {
  3094. src_len = src->rep_.size() - WriteBatchInternal::kHeader;
  3095. src_count = Count(src);
  3096. src_flags = src->content_flags_.load(std::memory_order_relaxed);
  3097. }
  3098. if (src->prot_info_ != nullptr) {
  3099. if (dst->prot_info_ == nullptr) {
  3100. dst->prot_info_.reset(new WriteBatch::ProtectionInfo());
  3101. }
  3102. std::copy(src->prot_info_->entries_.begin(),
  3103. src->prot_info_->entries_.begin() + src_count,
  3104. std::back_inserter(dst->prot_info_->entries_));
  3105. } else if (dst->prot_info_ != nullptr) {
  3106. // dst has empty prot_info->entries
  3107. // In this special case, we allow write batch without prot_info to
  3108. // be appende to write batch with empty prot_info
  3109. dst->prot_info_ = nullptr;
  3110. }
  3111. SetCount(dst, Count(dst) + src_count);
  3112. assert(src->rep_.size() >= WriteBatchInternal::kHeader);
  3113. dst->rep_.append(src->rep_.data() + WriteBatchInternal::kHeader, src_len);
  3114. dst->content_flags_.store(
  3115. dst->content_flags_.load(std::memory_order_relaxed) | src_flags,
  3116. std::memory_order_relaxed);
  3117. return Status::OK();
  3118. }
  3119. size_t WriteBatchInternal::AppendedByteSize(size_t leftByteSize,
  3120. size_t rightByteSize) {
  3121. if (leftByteSize == 0 || rightByteSize == 0) {
  3122. return leftByteSize + rightByteSize;
  3123. } else {
  3124. return leftByteSize + rightByteSize - WriteBatchInternal::kHeader;
  3125. }
  3126. }
  3127. Status WriteBatchInternal::UpdateProtectionInfo(WriteBatch* wb,
  3128. size_t bytes_per_key,
  3129. uint64_t* checksum) {
  3130. if (bytes_per_key == 0) {
  3131. if (wb->prot_info_ != nullptr) {
  3132. wb->prot_info_.reset();
  3133. return Status::OK();
  3134. } else {
  3135. // Already not protected.
  3136. return Status::OK();
  3137. }
  3138. } else if (bytes_per_key == 8) {
  3139. if (wb->prot_info_ == nullptr) {
  3140. wb->prot_info_.reset(new WriteBatch::ProtectionInfo());
  3141. ProtectionInfoUpdater prot_info_updater(wb->prot_info_.get());
  3142. Status s = wb->Iterate(&prot_info_updater);
  3143. if (s.ok() && checksum != nullptr) {
  3144. uint64_t expected_hash = XXH3_64bits(wb->rep_.data(), wb->rep_.size());
  3145. if (expected_hash != *checksum) {
  3146. return Status::Corruption("Write batch content corrupted.");
  3147. }
  3148. }
  3149. return s;
  3150. } else {
  3151. // Already protected.
  3152. return Status::OK();
  3153. }
  3154. }
  3155. return Status::NotSupported(
  3156. "WriteBatch protection info must be zero or eight bytes/key");
  3157. }
  3158. } // namespace ROCKSDB_NAMESPACE