write_batch.cc 74 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. //
  10. // WriteBatch::rep_ :=
  11. // sequence: fixed64
  12. // count: fixed32
  13. // data: record[count]
  14. // record :=
  15. // kTypeValue varstring varstring
  16. // kTypeDeletion varstring
  17. // kTypeSingleDeletion varstring
  18. // kTypeRangeDeletion varstring varstring
  19. // kTypeMerge varstring varstring
  20. // kTypeColumnFamilyValue varint32 varstring varstring
  21. // kTypeColumnFamilyDeletion varint32 varstring
  22. // kTypeColumnFamilySingleDeletion varint32 varstring
  23. // kTypeColumnFamilyRangeDeletion varint32 varstring varstring
  24. // kTypeColumnFamilyMerge varint32 varstring varstring
  25. // kTypeBeginPrepareXID varstring
  26. // kTypeEndPrepareXID
  27. // kTypeCommitXID varstring
  28. // kTypeRollbackXID varstring
  29. // kTypeBeginPersistedPrepareXID varstring
  30. // kTypeBeginUnprepareXID varstring
  31. // kTypeNoop
  32. // varstring :=
  33. // len: varint32
  34. // data: uint8[len]
  35. #include "rocksdb/write_batch.h"
  36. #include <map>
  37. #include <stack>
  38. #include <stdexcept>
  39. #include <type_traits>
  40. #include <unordered_map>
  41. #include <vector>
  42. #include "db/column_family.h"
  43. #include "db/db_impl/db_impl.h"
  44. #include "db/dbformat.h"
  45. #include "db/flush_scheduler.h"
  46. #include "db/memtable.h"
  47. #include "db/merge_context.h"
  48. #include "db/snapshot_impl.h"
  49. #include "db/trim_history_scheduler.h"
  50. #include "db/write_batch_internal.h"
  51. #include "monitoring/perf_context_imp.h"
  52. #include "monitoring/statistics.h"
  53. #include "rocksdb/merge_operator.h"
  54. #include "util/autovector.h"
  55. #include "util/cast_util.h"
  56. #include "util/coding.h"
  57. #include "util/duplicate_detector.h"
  58. #include "util/string_util.h"
  59. #include "util/util.h"
  60. namespace ROCKSDB_NAMESPACE {
  61. // anon namespace for file-local types
  62. namespace {
  63. enum ContentFlags : uint32_t {
  64. DEFERRED = 1 << 0,
  65. HAS_PUT = 1 << 1,
  66. HAS_DELETE = 1 << 2,
  67. HAS_SINGLE_DELETE = 1 << 3,
  68. HAS_MERGE = 1 << 4,
  69. HAS_BEGIN_PREPARE = 1 << 5,
  70. HAS_END_PREPARE = 1 << 6,
  71. HAS_COMMIT = 1 << 7,
  72. HAS_ROLLBACK = 1 << 8,
  73. HAS_DELETE_RANGE = 1 << 9,
  74. HAS_BLOB_INDEX = 1 << 10,
  75. HAS_BEGIN_UNPREPARE = 1 << 11,
  76. };
  77. struct BatchContentClassifier : public WriteBatch::Handler {
  78. uint32_t content_flags = 0;
  79. Status PutCF(uint32_t, const Slice&, const Slice&) override {
  80. content_flags |= ContentFlags::HAS_PUT;
  81. return Status::OK();
  82. }
  83. Status DeleteCF(uint32_t, const Slice&) override {
  84. content_flags |= ContentFlags::HAS_DELETE;
  85. return Status::OK();
  86. }
  87. Status SingleDeleteCF(uint32_t, const Slice&) override {
  88. content_flags |= ContentFlags::HAS_SINGLE_DELETE;
  89. return Status::OK();
  90. }
  91. Status DeleteRangeCF(uint32_t, const Slice&, const Slice&) override {
  92. content_flags |= ContentFlags::HAS_DELETE_RANGE;
  93. return Status::OK();
  94. }
  95. Status MergeCF(uint32_t, const Slice&, const Slice&) override {
  96. content_flags |= ContentFlags::HAS_MERGE;
  97. return Status::OK();
  98. }
  99. Status PutBlobIndexCF(uint32_t, const Slice&, const Slice&) override {
  100. content_flags |= ContentFlags::HAS_BLOB_INDEX;
  101. return Status::OK();
  102. }
  103. Status MarkBeginPrepare(bool unprepare) override {
  104. content_flags |= ContentFlags::HAS_BEGIN_PREPARE;
  105. if (unprepare) {
  106. content_flags |= ContentFlags::HAS_BEGIN_UNPREPARE;
  107. }
  108. return Status::OK();
  109. }
  110. Status MarkEndPrepare(const Slice&) override {
  111. content_flags |= ContentFlags::HAS_END_PREPARE;
  112. return Status::OK();
  113. }
  114. Status MarkCommit(const Slice&) override {
  115. content_flags |= ContentFlags::HAS_COMMIT;
  116. return Status::OK();
  117. }
  118. Status MarkRollback(const Slice&) override {
  119. content_flags |= ContentFlags::HAS_ROLLBACK;
  120. return Status::OK();
  121. }
  122. };
  123. class TimestampAssigner : public WriteBatch::Handler {
  124. public:
  125. explicit TimestampAssigner(const Slice& ts)
  126. : timestamp_(ts), timestamps_(kEmptyTimestampList) {}
  127. explicit TimestampAssigner(const std::vector<Slice>& ts_list)
  128. : timestamps_(ts_list) {
  129. SanityCheck();
  130. }
  131. ~TimestampAssigner() override {}
  132. Status PutCF(uint32_t, const Slice& key, const Slice&) override {
  133. AssignTimestamp(key);
  134. ++idx_;
  135. return Status::OK();
  136. }
  137. Status DeleteCF(uint32_t, const Slice& key) override {
  138. AssignTimestamp(key);
  139. ++idx_;
  140. return Status::OK();
  141. }
  142. Status SingleDeleteCF(uint32_t, const Slice& key) override {
  143. AssignTimestamp(key);
  144. ++idx_;
  145. return Status::OK();
  146. }
  147. Status DeleteRangeCF(uint32_t, const Slice& begin_key,
  148. const Slice& end_key) override {
  149. AssignTimestamp(begin_key);
  150. AssignTimestamp(end_key);
  151. ++idx_;
  152. return Status::OK();
  153. }
  154. Status MergeCF(uint32_t, const Slice& key, const Slice&) override {
  155. AssignTimestamp(key);
  156. ++idx_;
  157. return Status::OK();
  158. }
  159. Status PutBlobIndexCF(uint32_t, const Slice&, const Slice&) override {
  160. // TODO (yanqin): support blob db in the future.
  161. return Status::OK();
  162. }
  163. Status MarkBeginPrepare(bool) override {
  164. // TODO (yanqin): support in the future.
  165. return Status::OK();
  166. }
  167. Status MarkEndPrepare(const Slice&) override {
  168. // TODO (yanqin): support in the future.
  169. return Status::OK();
  170. }
  171. Status MarkCommit(const Slice&) override {
  172. // TODO (yanqin): support in the future.
  173. return Status::OK();
  174. }
  175. Status MarkRollback(const Slice&) override {
  176. // TODO (yanqin): support in the future.
  177. return Status::OK();
  178. }
  179. private:
  180. void SanityCheck() const {
  181. assert(!timestamps_.empty());
  182. #ifndef NDEBUG
  183. const size_t ts_sz = timestamps_[0].size();
  184. for (size_t i = 1; i != timestamps_.size(); ++i) {
  185. assert(ts_sz == timestamps_[i].size());
  186. }
  187. #endif // !NDEBUG
  188. }
  189. void AssignTimestamp(const Slice& key) {
  190. assert(timestamps_.empty() || idx_ < timestamps_.size());
  191. const Slice& ts = timestamps_.empty() ? timestamp_ : timestamps_[idx_];
  192. size_t ts_sz = ts.size();
  193. char* ptr = const_cast<char*>(key.data() + key.size() - ts_sz);
  194. memcpy(ptr, ts.data(), ts_sz);
  195. }
  196. static const std::vector<Slice> kEmptyTimestampList;
  197. const Slice timestamp_;
  198. const std::vector<Slice>& timestamps_;
  199. size_t idx_ = 0;
  200. // No copy or move.
  201. TimestampAssigner(const TimestampAssigner&) = delete;
  202. TimestampAssigner(TimestampAssigner&&) = delete;
  203. TimestampAssigner& operator=(const TimestampAssigner&) = delete;
  204. TimestampAssigner&& operator=(TimestampAssigner&&) = delete;
  205. };
  206. const std::vector<Slice> TimestampAssigner::kEmptyTimestampList;
  207. } // anon namespace
  208. struct SavePoints {
  209. std::stack<SavePoint, autovector<SavePoint>> stack;
  210. };
  211. WriteBatch::WriteBatch(size_t reserved_bytes, size_t max_bytes)
  212. : content_flags_(0), max_bytes_(max_bytes), rep_(), timestamp_size_(0) {
  213. rep_.reserve((reserved_bytes > WriteBatchInternal::kHeader)
  214. ? reserved_bytes
  215. : WriteBatchInternal::kHeader);
  216. rep_.resize(WriteBatchInternal::kHeader);
  217. }
  218. WriteBatch::WriteBatch(size_t reserved_bytes, size_t max_bytes, size_t ts_sz)
  219. : content_flags_(0), max_bytes_(max_bytes), rep_(), timestamp_size_(ts_sz) {
  220. rep_.reserve((reserved_bytes > WriteBatchInternal::kHeader) ?
  221. reserved_bytes : WriteBatchInternal::kHeader);
  222. rep_.resize(WriteBatchInternal::kHeader);
  223. }
  224. WriteBatch::WriteBatch(const std::string& rep)
  225. : content_flags_(ContentFlags::DEFERRED),
  226. max_bytes_(0),
  227. rep_(rep),
  228. timestamp_size_(0) {}
  229. WriteBatch::WriteBatch(std::string&& rep)
  230. : content_flags_(ContentFlags::DEFERRED),
  231. max_bytes_(0),
  232. rep_(std::move(rep)),
  233. timestamp_size_(0) {}
  234. WriteBatch::WriteBatch(const WriteBatch& src)
  235. : wal_term_point_(src.wal_term_point_),
  236. content_flags_(src.content_flags_.load(std::memory_order_relaxed)),
  237. max_bytes_(src.max_bytes_),
  238. rep_(src.rep_),
  239. timestamp_size_(src.timestamp_size_) {
  240. if (src.save_points_ != nullptr) {
  241. save_points_.reset(new SavePoints());
  242. save_points_->stack = src.save_points_->stack;
  243. }
  244. }
  245. WriteBatch::WriteBatch(WriteBatch&& src) noexcept
  246. : save_points_(std::move(src.save_points_)),
  247. wal_term_point_(std::move(src.wal_term_point_)),
  248. content_flags_(src.content_flags_.load(std::memory_order_relaxed)),
  249. max_bytes_(src.max_bytes_),
  250. rep_(std::move(src.rep_)),
  251. timestamp_size_(src.timestamp_size_) {}
  252. WriteBatch& WriteBatch::operator=(const WriteBatch& src) {
  253. if (&src != this) {
  254. this->~WriteBatch();
  255. new (this) WriteBatch(src);
  256. }
  257. return *this;
  258. }
  259. WriteBatch& WriteBatch::operator=(WriteBatch&& src) {
  260. if (&src != this) {
  261. this->~WriteBatch();
  262. new (this) WriteBatch(std::move(src));
  263. }
  264. return *this;
  265. }
  266. WriteBatch::~WriteBatch() { }
  267. WriteBatch::Handler::~Handler() { }
  268. void WriteBatch::Handler::LogData(const Slice& /*blob*/) {
  269. // If the user has not specified something to do with blobs, then we ignore
  270. // them.
  271. }
  272. bool WriteBatch::Handler::Continue() {
  273. return true;
  274. }
  275. void WriteBatch::Clear() {
  276. rep_.clear();
  277. rep_.resize(WriteBatchInternal::kHeader);
  278. content_flags_.store(0, std::memory_order_relaxed);
  279. if (save_points_ != nullptr) {
  280. while (!save_points_->stack.empty()) {
  281. save_points_->stack.pop();
  282. }
  283. }
  284. wal_term_point_.clear();
  285. }
  286. uint32_t WriteBatch::Count() const { return WriteBatchInternal::Count(this); }
  287. uint32_t WriteBatch::ComputeContentFlags() const {
  288. auto rv = content_flags_.load(std::memory_order_relaxed);
  289. if ((rv & ContentFlags::DEFERRED) != 0) {
  290. BatchContentClassifier classifier;
  291. Iterate(&classifier);
  292. rv = classifier.content_flags;
  293. // this method is conceptually const, because it is performing a lazy
  294. // computation that doesn't affect the abstract state of the batch.
  295. // content_flags_ is marked mutable so that we can perform the
  296. // following assignment
  297. content_flags_.store(rv, std::memory_order_relaxed);
  298. }
  299. return rv;
  300. }
  301. void WriteBatch::MarkWalTerminationPoint() {
  302. wal_term_point_.size = GetDataSize();
  303. wal_term_point_.count = Count();
  304. wal_term_point_.content_flags = content_flags_;
  305. }
  306. bool WriteBatch::HasPut() const {
  307. return (ComputeContentFlags() & ContentFlags::HAS_PUT) != 0;
  308. }
  309. bool WriteBatch::HasDelete() const {
  310. return (ComputeContentFlags() & ContentFlags::HAS_DELETE) != 0;
  311. }
  312. bool WriteBatch::HasSingleDelete() const {
  313. return (ComputeContentFlags() & ContentFlags::HAS_SINGLE_DELETE) != 0;
  314. }
  315. bool WriteBatch::HasDeleteRange() const {
  316. return (ComputeContentFlags() & ContentFlags::HAS_DELETE_RANGE) != 0;
  317. }
  318. bool WriteBatch::HasMerge() const {
  319. return (ComputeContentFlags() & ContentFlags::HAS_MERGE) != 0;
  320. }
  321. bool ReadKeyFromWriteBatchEntry(Slice* input, Slice* key, bool cf_record) {
  322. assert(input != nullptr && key != nullptr);
  323. // Skip tag byte
  324. input->remove_prefix(1);
  325. if (cf_record) {
  326. // Skip column_family bytes
  327. uint32_t cf;
  328. if (!GetVarint32(input, &cf)) {
  329. return false;
  330. }
  331. }
  332. // Extract key
  333. return GetLengthPrefixedSlice(input, key);
  334. }
  335. bool WriteBatch::HasBeginPrepare() const {
  336. return (ComputeContentFlags() & ContentFlags::HAS_BEGIN_PREPARE) != 0;
  337. }
  338. bool WriteBatch::HasEndPrepare() const {
  339. return (ComputeContentFlags() & ContentFlags::HAS_END_PREPARE) != 0;
  340. }
  341. bool WriteBatch::HasCommit() const {
  342. return (ComputeContentFlags() & ContentFlags::HAS_COMMIT) != 0;
  343. }
  344. bool WriteBatch::HasRollback() const {
  345. return (ComputeContentFlags() & ContentFlags::HAS_ROLLBACK) != 0;
  346. }
  347. Status ReadRecordFromWriteBatch(Slice* input, char* tag,
  348. uint32_t* column_family, Slice* key,
  349. Slice* value, Slice* blob, Slice* xid) {
  350. assert(key != nullptr && value != nullptr);
  351. *tag = (*input)[0];
  352. input->remove_prefix(1);
  353. *column_family = 0; // default
  354. switch (*tag) {
  355. case kTypeColumnFamilyValue:
  356. if (!GetVarint32(input, column_family)) {
  357. return Status::Corruption("bad WriteBatch Put");
  358. }
  359. FALLTHROUGH_INTENDED;
  360. case kTypeValue:
  361. if (!GetLengthPrefixedSlice(input, key) ||
  362. !GetLengthPrefixedSlice(input, value)) {
  363. return Status::Corruption("bad WriteBatch Put");
  364. }
  365. break;
  366. case kTypeColumnFamilyDeletion:
  367. case kTypeColumnFamilySingleDeletion:
  368. if (!GetVarint32(input, column_family)) {
  369. return Status::Corruption("bad WriteBatch Delete");
  370. }
  371. FALLTHROUGH_INTENDED;
  372. case kTypeDeletion:
  373. case kTypeSingleDeletion:
  374. if (!GetLengthPrefixedSlice(input, key)) {
  375. return Status::Corruption("bad WriteBatch Delete");
  376. }
  377. break;
  378. case kTypeColumnFamilyRangeDeletion:
  379. if (!GetVarint32(input, column_family)) {
  380. return Status::Corruption("bad WriteBatch DeleteRange");
  381. }
  382. FALLTHROUGH_INTENDED;
  383. case kTypeRangeDeletion:
  384. // for range delete, "key" is begin_key, "value" is end_key
  385. if (!GetLengthPrefixedSlice(input, key) ||
  386. !GetLengthPrefixedSlice(input, value)) {
  387. return Status::Corruption("bad WriteBatch DeleteRange");
  388. }
  389. break;
  390. case kTypeColumnFamilyMerge:
  391. if (!GetVarint32(input, column_family)) {
  392. return Status::Corruption("bad WriteBatch Merge");
  393. }
  394. FALLTHROUGH_INTENDED;
  395. case kTypeMerge:
  396. if (!GetLengthPrefixedSlice(input, key) ||
  397. !GetLengthPrefixedSlice(input, value)) {
  398. return Status::Corruption("bad WriteBatch Merge");
  399. }
  400. break;
  401. case kTypeColumnFamilyBlobIndex:
  402. if (!GetVarint32(input, column_family)) {
  403. return Status::Corruption("bad WriteBatch BlobIndex");
  404. }
  405. FALLTHROUGH_INTENDED;
  406. case kTypeBlobIndex:
  407. if (!GetLengthPrefixedSlice(input, key) ||
  408. !GetLengthPrefixedSlice(input, value)) {
  409. return Status::Corruption("bad WriteBatch BlobIndex");
  410. }
  411. break;
  412. case kTypeLogData:
  413. assert(blob != nullptr);
  414. if (!GetLengthPrefixedSlice(input, blob)) {
  415. return Status::Corruption("bad WriteBatch Blob");
  416. }
  417. break;
  418. case kTypeNoop:
  419. case kTypeBeginPrepareXID:
  420. // This indicates that the prepared batch is also persisted in the db.
  421. // This is used in WritePreparedTxn
  422. case kTypeBeginPersistedPrepareXID:
  423. // This is used in WriteUnpreparedTxn
  424. case kTypeBeginUnprepareXID:
  425. break;
  426. case kTypeEndPrepareXID:
  427. if (!GetLengthPrefixedSlice(input, xid)) {
  428. return Status::Corruption("bad EndPrepare XID");
  429. }
  430. break;
  431. case kTypeCommitXID:
  432. if (!GetLengthPrefixedSlice(input, xid)) {
  433. return Status::Corruption("bad Commit XID");
  434. }
  435. break;
  436. case kTypeRollbackXID:
  437. if (!GetLengthPrefixedSlice(input, xid)) {
  438. return Status::Corruption("bad Rollback XID");
  439. }
  440. break;
  441. default:
  442. return Status::Corruption("unknown WriteBatch tag");
  443. }
  444. return Status::OK();
  445. }
  446. Status WriteBatch::Iterate(Handler* handler) const {
  447. if (rep_.size() < WriteBatchInternal::kHeader) {
  448. return Status::Corruption("malformed WriteBatch (too small)");
  449. }
  450. return WriteBatchInternal::Iterate(this, handler, WriteBatchInternal::kHeader,
  451. rep_.size());
  452. }
  453. Status WriteBatchInternal::Iterate(const WriteBatch* wb,
  454. WriteBatch::Handler* handler, size_t begin,
  455. size_t end) {
  456. if (begin > wb->rep_.size() || end > wb->rep_.size() || end < begin) {
  457. return Status::Corruption("Invalid start/end bounds for Iterate");
  458. }
  459. assert(begin <= end);
  460. Slice input(wb->rep_.data() + begin, static_cast<size_t>(end - begin));
  461. bool whole_batch =
  462. (begin == WriteBatchInternal::kHeader) && (end == wb->rep_.size());
  463. Slice key, value, blob, xid;
  464. // Sometimes a sub-batch starts with a Noop. We want to exclude such Noops as
  465. // the batch boundary symbols otherwise we would mis-count the number of
  466. // batches. We do that by checking whether the accumulated batch is empty
  467. // before seeing the next Noop.
  468. bool empty_batch = true;
  469. uint32_t found = 0;
  470. Status s;
  471. char tag = 0;
  472. uint32_t column_family = 0; // default
  473. bool last_was_try_again = false;
  474. bool handler_continue = true;
  475. while (((s.ok() && !input.empty()) || UNLIKELY(s.IsTryAgain()))) {
  476. handler_continue = handler->Continue();
  477. if (!handler_continue) {
  478. break;
  479. }
  480. if (LIKELY(!s.IsTryAgain())) {
  481. last_was_try_again = false;
  482. tag = 0;
  483. column_family = 0; // default
  484. s = ReadRecordFromWriteBatch(&input, &tag, &column_family, &key, &value,
  485. &blob, &xid);
  486. if (!s.ok()) {
  487. return s;
  488. }
  489. } else {
  490. assert(s.IsTryAgain());
  491. assert(!last_was_try_again); // to detect infinite loop bugs
  492. if (UNLIKELY(last_was_try_again)) {
  493. return Status::Corruption(
  494. "two consecutive TryAgain in WriteBatch handler; this is either a "
  495. "software bug or data corruption.");
  496. }
  497. last_was_try_again = true;
  498. s = Status::OK();
  499. }
  500. switch (tag) {
  501. case kTypeColumnFamilyValue:
  502. case kTypeValue:
  503. assert(wb->content_flags_.load(std::memory_order_relaxed) &
  504. (ContentFlags::DEFERRED | ContentFlags::HAS_PUT));
  505. s = handler->PutCF(column_family, key, value);
  506. if (LIKELY(s.ok())) {
  507. empty_batch = false;
  508. found++;
  509. }
  510. break;
  511. case kTypeColumnFamilyDeletion:
  512. case kTypeDeletion:
  513. assert(wb->content_flags_.load(std::memory_order_relaxed) &
  514. (ContentFlags::DEFERRED | ContentFlags::HAS_DELETE));
  515. s = handler->DeleteCF(column_family, key);
  516. if (LIKELY(s.ok())) {
  517. empty_batch = false;
  518. found++;
  519. }
  520. break;
  521. case kTypeColumnFamilySingleDeletion:
  522. case kTypeSingleDeletion:
  523. assert(wb->content_flags_.load(std::memory_order_relaxed) &
  524. (ContentFlags::DEFERRED | ContentFlags::HAS_SINGLE_DELETE));
  525. s = handler->SingleDeleteCF(column_family, key);
  526. if (LIKELY(s.ok())) {
  527. empty_batch = false;
  528. found++;
  529. }
  530. break;
  531. case kTypeColumnFamilyRangeDeletion:
  532. case kTypeRangeDeletion:
  533. assert(wb->content_flags_.load(std::memory_order_relaxed) &
  534. (ContentFlags::DEFERRED | ContentFlags::HAS_DELETE_RANGE));
  535. s = handler->DeleteRangeCF(column_family, key, value);
  536. if (LIKELY(s.ok())) {
  537. empty_batch = false;
  538. found++;
  539. }
  540. break;
  541. case kTypeColumnFamilyMerge:
  542. case kTypeMerge:
  543. assert(wb->content_flags_.load(std::memory_order_relaxed) &
  544. (ContentFlags::DEFERRED | ContentFlags::HAS_MERGE));
  545. s = handler->MergeCF(column_family, key, value);
  546. if (LIKELY(s.ok())) {
  547. empty_batch = false;
  548. found++;
  549. }
  550. break;
  551. case kTypeColumnFamilyBlobIndex:
  552. case kTypeBlobIndex:
  553. assert(wb->content_flags_.load(std::memory_order_relaxed) &
  554. (ContentFlags::DEFERRED | ContentFlags::HAS_BLOB_INDEX));
  555. s = handler->PutBlobIndexCF(column_family, key, value);
  556. if (LIKELY(s.ok())) {
  557. found++;
  558. }
  559. break;
  560. case kTypeLogData:
  561. handler->LogData(blob);
  562. // A batch might have nothing but LogData. It is still a batch.
  563. empty_batch = false;
  564. break;
  565. case kTypeBeginPrepareXID:
  566. assert(wb->content_flags_.load(std::memory_order_relaxed) &
  567. (ContentFlags::DEFERRED | ContentFlags::HAS_BEGIN_PREPARE));
  568. handler->MarkBeginPrepare();
  569. empty_batch = false;
  570. if (!handler->WriteAfterCommit()) {
  571. s = Status::NotSupported(
  572. "WriteCommitted txn tag when write_after_commit_ is disabled (in "
  573. "WritePrepared/WriteUnprepared mode). If it is not due to "
  574. "corruption, the WAL must be emptied before changing the "
  575. "WritePolicy.");
  576. }
  577. if (handler->WriteBeforePrepare()) {
  578. s = Status::NotSupported(
  579. "WriteCommitted txn tag when write_before_prepare_ is enabled "
  580. "(in WriteUnprepared mode). If it is not due to corruption, the "
  581. "WAL must be emptied before changing the WritePolicy.");
  582. }
  583. break;
  584. case kTypeBeginPersistedPrepareXID:
  585. assert(wb->content_flags_.load(std::memory_order_relaxed) &
  586. (ContentFlags::DEFERRED | ContentFlags::HAS_BEGIN_PREPARE));
  587. handler->MarkBeginPrepare();
  588. empty_batch = false;
  589. if (handler->WriteAfterCommit()) {
  590. s = Status::NotSupported(
  591. "WritePrepared/WriteUnprepared txn tag when write_after_commit_ "
  592. "is enabled (in default WriteCommitted mode). If it is not due "
  593. "to corruption, the WAL must be emptied before changing the "
  594. "WritePolicy.");
  595. }
  596. break;
  597. case kTypeBeginUnprepareXID:
  598. assert(wb->content_flags_.load(std::memory_order_relaxed) &
  599. (ContentFlags::DEFERRED | ContentFlags::HAS_BEGIN_UNPREPARE));
  600. handler->MarkBeginPrepare(true /* unprepared */);
  601. empty_batch = false;
  602. if (handler->WriteAfterCommit()) {
  603. s = Status::NotSupported(
  604. "WriteUnprepared txn tag when write_after_commit_ is enabled (in "
  605. "default WriteCommitted mode). If it is not due to corruption, "
  606. "the WAL must be emptied before changing the WritePolicy.");
  607. }
  608. if (!handler->WriteBeforePrepare()) {
  609. s = Status::NotSupported(
  610. "WriteUnprepared txn tag when write_before_prepare_ is disabled "
  611. "(in WriteCommitted/WritePrepared mode). If it is not due to "
  612. "corruption, the WAL must be emptied before changing the "
  613. "WritePolicy.");
  614. }
  615. break;
  616. case kTypeEndPrepareXID:
  617. assert(wb->content_flags_.load(std::memory_order_relaxed) &
  618. (ContentFlags::DEFERRED | ContentFlags::HAS_END_PREPARE));
  619. handler->MarkEndPrepare(xid);
  620. empty_batch = true;
  621. break;
  622. case kTypeCommitXID:
  623. assert(wb->content_flags_.load(std::memory_order_relaxed) &
  624. (ContentFlags::DEFERRED | ContentFlags::HAS_COMMIT));
  625. handler->MarkCommit(xid);
  626. empty_batch = true;
  627. break;
  628. case kTypeRollbackXID:
  629. assert(wb->content_flags_.load(std::memory_order_relaxed) &
  630. (ContentFlags::DEFERRED | ContentFlags::HAS_ROLLBACK));
  631. handler->MarkRollback(xid);
  632. empty_batch = true;
  633. break;
  634. case kTypeNoop:
  635. handler->MarkNoop(empty_batch);
  636. empty_batch = true;
  637. break;
  638. default:
  639. return Status::Corruption("unknown WriteBatch tag");
  640. }
  641. }
  642. if (!s.ok()) {
  643. return s;
  644. }
  645. if (handler_continue && whole_batch &&
  646. found != WriteBatchInternal::Count(wb)) {
  647. return Status::Corruption("WriteBatch has wrong count");
  648. } else {
  649. return Status::OK();
  650. }
  651. }
  652. bool WriteBatchInternal::IsLatestPersistentState(const WriteBatch* b) {
  653. return b->is_latest_persistent_state_;
  654. }
  655. void WriteBatchInternal::SetAsLastestPersistentState(WriteBatch* b) {
  656. b->is_latest_persistent_state_ = true;
  657. }
  658. uint32_t WriteBatchInternal::Count(const WriteBatch* b) {
  659. return DecodeFixed32(b->rep_.data() + 8);
  660. }
  661. void WriteBatchInternal::SetCount(WriteBatch* b, uint32_t n) {
  662. EncodeFixed32(&b->rep_[8], n);
  663. }
  664. SequenceNumber WriteBatchInternal::Sequence(const WriteBatch* b) {
  665. return SequenceNumber(DecodeFixed64(b->rep_.data()));
  666. }
  667. void WriteBatchInternal::SetSequence(WriteBatch* b, SequenceNumber seq) {
  668. EncodeFixed64(&b->rep_[0], seq);
  669. }
  670. size_t WriteBatchInternal::GetFirstOffset(WriteBatch* /*b*/) {
  671. return WriteBatchInternal::kHeader;
  672. }
  673. Status WriteBatchInternal::Put(WriteBatch* b, uint32_t column_family_id,
  674. const Slice& key, const Slice& value) {
  675. if (key.size() > size_t{port::kMaxUint32}) {
  676. return Status::InvalidArgument("key is too large");
  677. }
  678. if (value.size() > size_t{port::kMaxUint32}) {
  679. return Status::InvalidArgument("value is too large");
  680. }
  681. LocalSavePoint save(b);
  682. WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
  683. if (column_family_id == 0) {
  684. b->rep_.push_back(static_cast<char>(kTypeValue));
  685. } else {
  686. b->rep_.push_back(static_cast<char>(kTypeColumnFamilyValue));
  687. PutVarint32(&b->rep_, column_family_id);
  688. }
  689. if (0 == b->timestamp_size_) {
  690. PutLengthPrefixedSlice(&b->rep_, key);
  691. } else {
  692. PutVarint32(&b->rep_,
  693. static_cast<uint32_t>(key.size() + b->timestamp_size_));
  694. b->rep_.append(key.data(), key.size());
  695. b->rep_.append(b->timestamp_size_, '\0');
  696. }
  697. PutLengthPrefixedSlice(&b->rep_, value);
  698. b->content_flags_.store(
  699. b->content_flags_.load(std::memory_order_relaxed) | ContentFlags::HAS_PUT,
  700. std::memory_order_relaxed);
  701. return save.commit();
  702. }
  703. Status WriteBatch::Put(ColumnFamilyHandle* column_family, const Slice& key,
  704. const Slice& value) {
  705. return WriteBatchInternal::Put(this, GetColumnFamilyID(column_family), key,
  706. value);
  707. }
  708. Status WriteBatchInternal::CheckSlicePartsLength(const SliceParts& key,
  709. const SliceParts& value) {
  710. size_t total_key_bytes = 0;
  711. for (int i = 0; i < key.num_parts; ++i) {
  712. total_key_bytes += key.parts[i].size();
  713. }
  714. if (total_key_bytes >= size_t{port::kMaxUint32}) {
  715. return Status::InvalidArgument("key is too large");
  716. }
  717. size_t total_value_bytes = 0;
  718. for (int i = 0; i < value.num_parts; ++i) {
  719. total_value_bytes += value.parts[i].size();
  720. }
  721. if (total_value_bytes >= size_t{port::kMaxUint32}) {
  722. return Status::InvalidArgument("value is too large");
  723. }
  724. return Status::OK();
  725. }
  726. Status WriteBatchInternal::Put(WriteBatch* b, uint32_t column_family_id,
  727. const SliceParts& key, const SliceParts& value) {
  728. Status s = CheckSlicePartsLength(key, value);
  729. if (!s.ok()) {
  730. return s;
  731. }
  732. LocalSavePoint save(b);
  733. WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
  734. if (column_family_id == 0) {
  735. b->rep_.push_back(static_cast<char>(kTypeValue));
  736. } else {
  737. b->rep_.push_back(static_cast<char>(kTypeColumnFamilyValue));
  738. PutVarint32(&b->rep_, column_family_id);
  739. }
  740. if (0 == b->timestamp_size_) {
  741. PutLengthPrefixedSliceParts(&b->rep_, key);
  742. } else {
  743. PutLengthPrefixedSlicePartsWithPadding(&b->rep_, key, b->timestamp_size_);
  744. }
  745. PutLengthPrefixedSliceParts(&b->rep_, value);
  746. b->content_flags_.store(
  747. b->content_flags_.load(std::memory_order_relaxed) | ContentFlags::HAS_PUT,
  748. std::memory_order_relaxed);
  749. return save.commit();
  750. }
  751. Status WriteBatch::Put(ColumnFamilyHandle* column_family, const SliceParts& key,
  752. const SliceParts& value) {
  753. return WriteBatchInternal::Put(this, GetColumnFamilyID(column_family), key,
  754. value);
  755. }
  756. Status WriteBatchInternal::InsertNoop(WriteBatch* b) {
  757. b->rep_.push_back(static_cast<char>(kTypeNoop));
  758. return Status::OK();
  759. }
  760. Status WriteBatchInternal::MarkEndPrepare(WriteBatch* b, const Slice& xid,
  761. bool write_after_commit,
  762. bool unprepared_batch) {
  763. // a manually constructed batch can only contain one prepare section
  764. assert(b->rep_[12] == static_cast<char>(kTypeNoop));
  765. // all savepoints up to this point are cleared
  766. if (b->save_points_ != nullptr) {
  767. while (!b->save_points_->stack.empty()) {
  768. b->save_points_->stack.pop();
  769. }
  770. }
  771. // rewrite noop as begin marker
  772. b->rep_[12] = static_cast<char>(
  773. write_after_commit ? kTypeBeginPrepareXID
  774. : (unprepared_batch ? kTypeBeginUnprepareXID
  775. : kTypeBeginPersistedPrepareXID));
  776. b->rep_.push_back(static_cast<char>(kTypeEndPrepareXID));
  777. PutLengthPrefixedSlice(&b->rep_, xid);
  778. b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
  779. ContentFlags::HAS_END_PREPARE |
  780. ContentFlags::HAS_BEGIN_PREPARE,
  781. std::memory_order_relaxed);
  782. if (unprepared_batch) {
  783. b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
  784. ContentFlags::HAS_BEGIN_UNPREPARE,
  785. std::memory_order_relaxed);
  786. }
  787. return Status::OK();
  788. }
  789. Status WriteBatchInternal::MarkCommit(WriteBatch* b, const Slice& xid) {
  790. b->rep_.push_back(static_cast<char>(kTypeCommitXID));
  791. PutLengthPrefixedSlice(&b->rep_, xid);
  792. b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
  793. ContentFlags::HAS_COMMIT,
  794. std::memory_order_relaxed);
  795. return Status::OK();
  796. }
  797. Status WriteBatchInternal::MarkRollback(WriteBatch* b, const Slice& xid) {
  798. b->rep_.push_back(static_cast<char>(kTypeRollbackXID));
  799. PutLengthPrefixedSlice(&b->rep_, xid);
  800. b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
  801. ContentFlags::HAS_ROLLBACK,
  802. std::memory_order_relaxed);
  803. return Status::OK();
  804. }
  805. Status WriteBatchInternal::Delete(WriteBatch* b, uint32_t column_family_id,
  806. const Slice& key) {
  807. LocalSavePoint save(b);
  808. WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
  809. if (column_family_id == 0) {
  810. b->rep_.push_back(static_cast<char>(kTypeDeletion));
  811. } else {
  812. b->rep_.push_back(static_cast<char>(kTypeColumnFamilyDeletion));
  813. PutVarint32(&b->rep_, column_family_id);
  814. }
  815. PutLengthPrefixedSlice(&b->rep_, key);
  816. b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
  817. ContentFlags::HAS_DELETE,
  818. std::memory_order_relaxed);
  819. return save.commit();
  820. }
  821. Status WriteBatch::Delete(ColumnFamilyHandle* column_family, const Slice& key) {
  822. return WriteBatchInternal::Delete(this, GetColumnFamilyID(column_family),
  823. key);
  824. }
  825. Status WriteBatchInternal::Delete(WriteBatch* b, uint32_t column_family_id,
  826. const SliceParts& key) {
  827. LocalSavePoint save(b);
  828. WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
  829. if (column_family_id == 0) {
  830. b->rep_.push_back(static_cast<char>(kTypeDeletion));
  831. } else {
  832. b->rep_.push_back(static_cast<char>(kTypeColumnFamilyDeletion));
  833. PutVarint32(&b->rep_, column_family_id);
  834. }
  835. PutLengthPrefixedSliceParts(&b->rep_, key);
  836. b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
  837. ContentFlags::HAS_DELETE,
  838. std::memory_order_relaxed);
  839. return save.commit();
  840. }
  841. Status WriteBatch::Delete(ColumnFamilyHandle* column_family,
  842. const SliceParts& key) {
  843. return WriteBatchInternal::Delete(this, GetColumnFamilyID(column_family),
  844. key);
  845. }
  846. Status WriteBatchInternal::SingleDelete(WriteBatch* b,
  847. uint32_t column_family_id,
  848. const Slice& key) {
  849. LocalSavePoint save(b);
  850. WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
  851. if (column_family_id == 0) {
  852. b->rep_.push_back(static_cast<char>(kTypeSingleDeletion));
  853. } else {
  854. b->rep_.push_back(static_cast<char>(kTypeColumnFamilySingleDeletion));
  855. PutVarint32(&b->rep_, column_family_id);
  856. }
  857. PutLengthPrefixedSlice(&b->rep_, key);
  858. b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
  859. ContentFlags::HAS_SINGLE_DELETE,
  860. std::memory_order_relaxed);
  861. return save.commit();
  862. }
  863. Status WriteBatch::SingleDelete(ColumnFamilyHandle* column_family,
  864. const Slice& key) {
  865. return WriteBatchInternal::SingleDelete(
  866. this, GetColumnFamilyID(column_family), key);
  867. }
  868. Status WriteBatchInternal::SingleDelete(WriteBatch* b,
  869. uint32_t column_family_id,
  870. const SliceParts& key) {
  871. LocalSavePoint save(b);
  872. WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
  873. if (column_family_id == 0) {
  874. b->rep_.push_back(static_cast<char>(kTypeSingleDeletion));
  875. } else {
  876. b->rep_.push_back(static_cast<char>(kTypeColumnFamilySingleDeletion));
  877. PutVarint32(&b->rep_, column_family_id);
  878. }
  879. PutLengthPrefixedSliceParts(&b->rep_, key);
  880. b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
  881. ContentFlags::HAS_SINGLE_DELETE,
  882. std::memory_order_relaxed);
  883. return save.commit();
  884. }
  885. Status WriteBatch::SingleDelete(ColumnFamilyHandle* column_family,
  886. const SliceParts& key) {
  887. return WriteBatchInternal::SingleDelete(
  888. this, GetColumnFamilyID(column_family), key);
  889. }
  890. Status WriteBatchInternal::DeleteRange(WriteBatch* b, uint32_t column_family_id,
  891. const Slice& begin_key,
  892. const Slice& end_key) {
  893. LocalSavePoint save(b);
  894. WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
  895. if (column_family_id == 0) {
  896. b->rep_.push_back(static_cast<char>(kTypeRangeDeletion));
  897. } else {
  898. b->rep_.push_back(static_cast<char>(kTypeColumnFamilyRangeDeletion));
  899. PutVarint32(&b->rep_, column_family_id);
  900. }
  901. PutLengthPrefixedSlice(&b->rep_, begin_key);
  902. PutLengthPrefixedSlice(&b->rep_, end_key);
  903. b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
  904. ContentFlags::HAS_DELETE_RANGE,
  905. std::memory_order_relaxed);
  906. return save.commit();
  907. }
  908. Status WriteBatch::DeleteRange(ColumnFamilyHandle* column_family,
  909. const Slice& begin_key, const Slice& end_key) {
  910. return WriteBatchInternal::DeleteRange(this, GetColumnFamilyID(column_family),
  911. begin_key, end_key);
  912. }
  913. Status WriteBatchInternal::DeleteRange(WriteBatch* b, uint32_t column_family_id,
  914. const SliceParts& begin_key,
  915. const SliceParts& end_key) {
  916. LocalSavePoint save(b);
  917. WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
  918. if (column_family_id == 0) {
  919. b->rep_.push_back(static_cast<char>(kTypeRangeDeletion));
  920. } else {
  921. b->rep_.push_back(static_cast<char>(kTypeColumnFamilyRangeDeletion));
  922. PutVarint32(&b->rep_, column_family_id);
  923. }
  924. PutLengthPrefixedSliceParts(&b->rep_, begin_key);
  925. PutLengthPrefixedSliceParts(&b->rep_, end_key);
  926. b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
  927. ContentFlags::HAS_DELETE_RANGE,
  928. std::memory_order_relaxed);
  929. return save.commit();
  930. }
  931. Status WriteBatch::DeleteRange(ColumnFamilyHandle* column_family,
  932. const SliceParts& begin_key,
  933. const SliceParts& end_key) {
  934. return WriteBatchInternal::DeleteRange(this, GetColumnFamilyID(column_family),
  935. begin_key, end_key);
  936. }
  937. Status WriteBatchInternal::Merge(WriteBatch* b, uint32_t column_family_id,
  938. const Slice& key, const Slice& value) {
  939. if (key.size() > size_t{port::kMaxUint32}) {
  940. return Status::InvalidArgument("key is too large");
  941. }
  942. if (value.size() > size_t{port::kMaxUint32}) {
  943. return Status::InvalidArgument("value is too large");
  944. }
  945. LocalSavePoint save(b);
  946. WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
  947. if (column_family_id == 0) {
  948. b->rep_.push_back(static_cast<char>(kTypeMerge));
  949. } else {
  950. b->rep_.push_back(static_cast<char>(kTypeColumnFamilyMerge));
  951. PutVarint32(&b->rep_, column_family_id);
  952. }
  953. PutLengthPrefixedSlice(&b->rep_, key);
  954. PutLengthPrefixedSlice(&b->rep_, value);
  955. b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
  956. ContentFlags::HAS_MERGE,
  957. std::memory_order_relaxed);
  958. return save.commit();
  959. }
  960. Status WriteBatch::Merge(ColumnFamilyHandle* column_family, const Slice& key,
  961. const Slice& value) {
  962. return WriteBatchInternal::Merge(this, GetColumnFamilyID(column_family), key,
  963. value);
  964. }
  965. Status WriteBatchInternal::Merge(WriteBatch* b, uint32_t column_family_id,
  966. const SliceParts& key,
  967. const SliceParts& value) {
  968. Status s = CheckSlicePartsLength(key, value);
  969. if (!s.ok()) {
  970. return s;
  971. }
  972. LocalSavePoint save(b);
  973. WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
  974. if (column_family_id == 0) {
  975. b->rep_.push_back(static_cast<char>(kTypeMerge));
  976. } else {
  977. b->rep_.push_back(static_cast<char>(kTypeColumnFamilyMerge));
  978. PutVarint32(&b->rep_, column_family_id);
  979. }
  980. PutLengthPrefixedSliceParts(&b->rep_, key);
  981. PutLengthPrefixedSliceParts(&b->rep_, value);
  982. b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
  983. ContentFlags::HAS_MERGE,
  984. std::memory_order_relaxed);
  985. return save.commit();
  986. }
  987. Status WriteBatch::Merge(ColumnFamilyHandle* column_family,
  988. const SliceParts& key, const SliceParts& value) {
  989. return WriteBatchInternal::Merge(this, GetColumnFamilyID(column_family), key,
  990. value);
  991. }
  992. Status WriteBatchInternal::PutBlobIndex(WriteBatch* b,
  993. uint32_t column_family_id,
  994. const Slice& key, const Slice& value) {
  995. LocalSavePoint save(b);
  996. WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
  997. if (column_family_id == 0) {
  998. b->rep_.push_back(static_cast<char>(kTypeBlobIndex));
  999. } else {
  1000. b->rep_.push_back(static_cast<char>(kTypeColumnFamilyBlobIndex));
  1001. PutVarint32(&b->rep_, column_family_id);
  1002. }
  1003. PutLengthPrefixedSlice(&b->rep_, key);
  1004. PutLengthPrefixedSlice(&b->rep_, value);
  1005. b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
  1006. ContentFlags::HAS_BLOB_INDEX,
  1007. std::memory_order_relaxed);
  1008. return save.commit();
  1009. }
  1010. Status WriteBatch::PutLogData(const Slice& blob) {
  1011. LocalSavePoint save(this);
  1012. rep_.push_back(static_cast<char>(kTypeLogData));
  1013. PutLengthPrefixedSlice(&rep_, blob);
  1014. return save.commit();
  1015. }
  1016. void WriteBatch::SetSavePoint() {
  1017. if (save_points_ == nullptr) {
  1018. save_points_.reset(new SavePoints());
  1019. }
  1020. // Record length and count of current batch of writes.
  1021. save_points_->stack.push(SavePoint(
  1022. GetDataSize(), Count(), content_flags_.load(std::memory_order_relaxed)));
  1023. }
  1024. Status WriteBatch::RollbackToSavePoint() {
  1025. if (save_points_ == nullptr || save_points_->stack.size() == 0) {
  1026. return Status::NotFound();
  1027. }
  1028. // Pop the most recent savepoint off the stack
  1029. SavePoint savepoint = save_points_->stack.top();
  1030. save_points_->stack.pop();
  1031. assert(savepoint.size <= rep_.size());
  1032. assert(static_cast<uint32_t>(savepoint.count) <= Count());
  1033. if (savepoint.size == rep_.size()) {
  1034. // No changes to rollback
  1035. } else if (savepoint.size == 0) {
  1036. // Rollback everything
  1037. Clear();
  1038. } else {
  1039. rep_.resize(savepoint.size);
  1040. WriteBatchInternal::SetCount(this, savepoint.count);
  1041. content_flags_.store(savepoint.content_flags, std::memory_order_relaxed);
  1042. }
  1043. return Status::OK();
  1044. }
  1045. Status WriteBatch::PopSavePoint() {
  1046. if (save_points_ == nullptr || save_points_->stack.size() == 0) {
  1047. return Status::NotFound();
  1048. }
  1049. // Pop the most recent savepoint off the stack
  1050. save_points_->stack.pop();
  1051. return Status::OK();
  1052. }
  1053. Status WriteBatch::AssignTimestamp(const Slice& ts) {
  1054. TimestampAssigner ts_assigner(ts);
  1055. return Iterate(&ts_assigner);
  1056. }
  1057. Status WriteBatch::AssignTimestamps(const std::vector<Slice>& ts_list) {
  1058. TimestampAssigner ts_assigner(ts_list);
  1059. return Iterate(&ts_assigner);
  1060. }
  1061. class MemTableInserter : public WriteBatch::Handler {
  1062. SequenceNumber sequence_;
  1063. ColumnFamilyMemTables* const cf_mems_;
  1064. FlushScheduler* const flush_scheduler_;
  1065. TrimHistoryScheduler* const trim_history_scheduler_;
  1066. const bool ignore_missing_column_families_;
  1067. const uint64_t recovering_log_number_;
  1068. // log number that all Memtables inserted into should reference
  1069. uint64_t log_number_ref_;
  1070. DBImpl* db_;
  1071. const bool concurrent_memtable_writes_;
  1072. bool post_info_created_;
  1073. bool* has_valid_writes_;
  1074. // On some (!) platforms just default creating
  1075. // a map is too expensive in the Write() path as they
  1076. // cause memory allocations though unused.
  1077. // Make creation optional but do not incur
  1078. // std::unique_ptr additional allocation
  1079. using MemPostInfoMap = std::map<MemTable*, MemTablePostProcessInfo>;
  1080. using PostMapType = std::aligned_storage<sizeof(MemPostInfoMap)>::type;
  1081. PostMapType mem_post_info_map_;
  1082. // current recovered transaction we are rebuilding (recovery)
  1083. WriteBatch* rebuilding_trx_;
  1084. SequenceNumber rebuilding_trx_seq_;
  1085. // Increase seq number once per each write batch. Otherwise increase it once
  1086. // per key.
  1087. bool seq_per_batch_;
  1088. // Whether the memtable write will be done only after the commit
  1089. bool write_after_commit_;
  1090. // Whether memtable write can be done before prepare
  1091. bool write_before_prepare_;
  1092. // Whether this batch was unprepared or not
  1093. bool unprepared_batch_;
  1094. using DupDetector = std::aligned_storage<sizeof(DuplicateDetector)>::type;
  1095. DupDetector duplicate_detector_;
  1096. bool dup_dectector_on_;
  1097. bool hint_per_batch_;
  1098. bool hint_created_;
  1099. // Hints for this batch
  1100. using HintMap = std::unordered_map<MemTable*, void*>;
  1101. using HintMapType = std::aligned_storage<sizeof(HintMap)>::type;
  1102. HintMapType hint_;
  1103. HintMap& GetHintMap() {
  1104. assert(hint_per_batch_);
  1105. if (!hint_created_) {
  1106. new (&hint_) HintMap();
  1107. hint_created_ = true;
  1108. }
  1109. return *reinterpret_cast<HintMap*>(&hint_);
  1110. }
  1111. MemPostInfoMap& GetPostMap() {
  1112. assert(concurrent_memtable_writes_);
  1113. if(!post_info_created_) {
  1114. new (&mem_post_info_map_) MemPostInfoMap();
  1115. post_info_created_ = true;
  1116. }
  1117. return *reinterpret_cast<MemPostInfoMap*>(&mem_post_info_map_);
  1118. }
  1119. bool IsDuplicateKeySeq(uint32_t column_family_id, const Slice& key) {
  1120. assert(!write_after_commit_);
  1121. assert(rebuilding_trx_ != nullptr);
  1122. if (!dup_dectector_on_) {
  1123. new (&duplicate_detector_) DuplicateDetector(db_);
  1124. dup_dectector_on_ = true;
  1125. }
  1126. return reinterpret_cast<DuplicateDetector*>
  1127. (&duplicate_detector_)->IsDuplicateKeySeq(column_family_id, key, sequence_);
  1128. }
  1129. protected:
  1130. bool WriteBeforePrepare() const override { return write_before_prepare_; }
  1131. bool WriteAfterCommit() const override { return write_after_commit_; }
  1132. public:
  1133. // cf_mems should not be shared with concurrent inserters
  1134. MemTableInserter(SequenceNumber _sequence, ColumnFamilyMemTables* cf_mems,
  1135. FlushScheduler* flush_scheduler,
  1136. TrimHistoryScheduler* trim_history_scheduler,
  1137. bool ignore_missing_column_families,
  1138. uint64_t recovering_log_number, DB* db,
  1139. bool concurrent_memtable_writes,
  1140. bool* has_valid_writes = nullptr, bool seq_per_batch = false,
  1141. bool batch_per_txn = true, bool hint_per_batch = false)
  1142. : sequence_(_sequence),
  1143. cf_mems_(cf_mems),
  1144. flush_scheduler_(flush_scheduler),
  1145. trim_history_scheduler_(trim_history_scheduler),
  1146. ignore_missing_column_families_(ignore_missing_column_families),
  1147. recovering_log_number_(recovering_log_number),
  1148. log_number_ref_(0),
  1149. db_(static_cast_with_check<DBImpl, DB>(db)),
  1150. concurrent_memtable_writes_(concurrent_memtable_writes),
  1151. post_info_created_(false),
  1152. has_valid_writes_(has_valid_writes),
  1153. rebuilding_trx_(nullptr),
  1154. rebuilding_trx_seq_(0),
  1155. seq_per_batch_(seq_per_batch),
  1156. // Write after commit currently uses one seq per key (instead of per
  1157. // batch). So seq_per_batch being false indicates write_after_commit
  1158. // approach.
  1159. write_after_commit_(!seq_per_batch),
  1160. // WriteUnprepared can write WriteBatches per transaction, so
  1161. // batch_per_txn being false indicates write_before_prepare.
  1162. write_before_prepare_(!batch_per_txn),
  1163. unprepared_batch_(false),
  1164. duplicate_detector_(),
  1165. dup_dectector_on_(false),
  1166. hint_per_batch_(hint_per_batch),
  1167. hint_created_(false) {
  1168. assert(cf_mems_);
  1169. }
  1170. ~MemTableInserter() override {
  1171. if (dup_dectector_on_) {
  1172. reinterpret_cast<DuplicateDetector*>
  1173. (&duplicate_detector_)->~DuplicateDetector();
  1174. }
  1175. if (post_info_created_) {
  1176. reinterpret_cast<MemPostInfoMap*>
  1177. (&mem_post_info_map_)->~MemPostInfoMap();
  1178. }
  1179. if (hint_created_) {
  1180. for (auto iter : GetHintMap()) {
  1181. delete[] reinterpret_cast<char*>(iter.second);
  1182. }
  1183. reinterpret_cast<HintMap*>(&hint_)->~HintMap();
  1184. }
  1185. delete rebuilding_trx_;
  1186. }
  1187. MemTableInserter(const MemTableInserter&) = delete;
  1188. MemTableInserter& operator=(const MemTableInserter&) = delete;
  1189. // The batch seq is regularly restarted; In normal mode it is set when
  1190. // MemTableInserter is constructed in the write thread and in recovery mode it
  1191. // is set when a batch, which is tagged with seq, is read from the WAL.
  1192. // Within a sequenced batch, which could be a merge of multiple batches, we
  1193. // have two policies to advance the seq: i) seq_per_key (default) and ii)
  1194. // seq_per_batch. To implement the latter we need to mark the boundary between
  1195. // the individual batches. The approach is this: 1) Use the terminating
  1196. // markers to indicate the boundary (kTypeEndPrepareXID, kTypeCommitXID,
  1197. // kTypeRollbackXID) 2) Terminate a batch with kTypeNoop in the absence of a
  1198. // natural boundary marker.
  1199. void MaybeAdvanceSeq(bool batch_boundry = false) {
  1200. if (batch_boundry == seq_per_batch_) {
  1201. sequence_++;
  1202. }
  1203. }
  1204. void set_log_number_ref(uint64_t log) { log_number_ref_ = log; }
  1205. SequenceNumber sequence() const { return sequence_; }
  1206. void PostProcess() {
  1207. assert(concurrent_memtable_writes_);
  1208. // If post info was not created there is nothing
  1209. // to process and no need to create on demand
  1210. if(post_info_created_) {
  1211. for (auto& pair : GetPostMap()) {
  1212. pair.first->BatchPostProcess(pair.second);
  1213. }
  1214. }
  1215. }
  1216. bool SeekToColumnFamily(uint32_t column_family_id, Status* s) {
  1217. // If we are in a concurrent mode, it is the caller's responsibility
  1218. // to clone the original ColumnFamilyMemTables so that each thread
  1219. // has its own instance. Otherwise, it must be guaranteed that there
  1220. // is no concurrent access
  1221. bool found = cf_mems_->Seek(column_family_id);
  1222. if (!found) {
  1223. if (ignore_missing_column_families_) {
  1224. *s = Status::OK();
  1225. } else {
  1226. *s = Status::InvalidArgument(
  1227. "Invalid column family specified in write batch");
  1228. }
  1229. return false;
  1230. }
  1231. if (recovering_log_number_ != 0 &&
  1232. recovering_log_number_ < cf_mems_->GetLogNumber()) {
  1233. // This is true only in recovery environment (recovering_log_number_ is
  1234. // always 0 in
  1235. // non-recovery, regular write code-path)
  1236. // * If recovering_log_number_ < cf_mems_->GetLogNumber(), this means that
  1237. // column
  1238. // family already contains updates from this log. We can't apply updates
  1239. // twice because of update-in-place or merge workloads -- ignore the
  1240. // update
  1241. *s = Status::OK();
  1242. return false;
  1243. }
  1244. if (has_valid_writes_ != nullptr) {
  1245. *has_valid_writes_ = true;
  1246. }
  1247. if (log_number_ref_ > 0) {
  1248. cf_mems_->GetMemTable()->RefLogContainingPrepSection(log_number_ref_);
  1249. }
  1250. return true;
  1251. }
  1252. Status PutCFImpl(uint32_t column_family_id, const Slice& key,
  1253. const Slice& value, ValueType value_type) {
  1254. // optimize for non-recovery mode
  1255. if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) {
  1256. WriteBatchInternal::Put(rebuilding_trx_, column_family_id, key, value);
  1257. return Status::OK();
  1258. // else insert the values to the memtable right away
  1259. }
  1260. Status seek_status;
  1261. if (UNLIKELY(!SeekToColumnFamily(column_family_id, &seek_status))) {
  1262. bool batch_boundry = false;
  1263. if (rebuilding_trx_ != nullptr) {
  1264. assert(!write_after_commit_);
  1265. // The CF is probably flushed and hence no need for insert but we still
  1266. // need to keep track of the keys for upcoming rollback/commit.
  1267. WriteBatchInternal::Put(rebuilding_trx_, column_family_id, key, value);
  1268. batch_boundry = IsDuplicateKeySeq(column_family_id, key);
  1269. }
  1270. MaybeAdvanceSeq(batch_boundry);
  1271. return seek_status;
  1272. }
  1273. Status ret_status;
  1274. MemTable* mem = cf_mems_->GetMemTable();
  1275. auto* moptions = mem->GetImmutableMemTableOptions();
  1276. // inplace_update_support is inconsistent with snapshots, and therefore with
  1277. // any kind of transactions including the ones that use seq_per_batch
  1278. assert(!seq_per_batch_ || !moptions->inplace_update_support);
  1279. if (!moptions->inplace_update_support) {
  1280. bool mem_res =
  1281. mem->Add(sequence_, value_type, key, value,
  1282. concurrent_memtable_writes_, get_post_process_info(mem),
  1283. hint_per_batch_ ? &GetHintMap()[mem] : nullptr);
  1284. if (UNLIKELY(!mem_res)) {
  1285. assert(seq_per_batch_);
  1286. ret_status = Status::TryAgain("key+seq exists");
  1287. const bool BATCH_BOUNDRY = true;
  1288. MaybeAdvanceSeq(BATCH_BOUNDRY);
  1289. }
  1290. } else if (moptions->inplace_callback == nullptr) {
  1291. assert(!concurrent_memtable_writes_);
  1292. mem->Update(sequence_, key, value);
  1293. } else {
  1294. assert(!concurrent_memtable_writes_);
  1295. if (mem->UpdateCallback(sequence_, key, value)) {
  1296. } else {
  1297. // key not found in memtable. Do sst get, update, add
  1298. SnapshotImpl read_from_snapshot;
  1299. read_from_snapshot.number_ = sequence_;
  1300. ReadOptions ropts;
  1301. // it's going to be overwritten for sure, so no point caching data block
  1302. // containing the old version
  1303. ropts.fill_cache = false;
  1304. ropts.snapshot = &read_from_snapshot;
  1305. std::string prev_value;
  1306. std::string merged_value;
  1307. auto cf_handle = cf_mems_->GetColumnFamilyHandle();
  1308. Status s = Status::NotSupported();
  1309. if (db_ != nullptr && recovering_log_number_ == 0) {
  1310. if (cf_handle == nullptr) {
  1311. cf_handle = db_->DefaultColumnFamily();
  1312. }
  1313. s = db_->Get(ropts, cf_handle, key, &prev_value);
  1314. }
  1315. char* prev_buffer = const_cast<char*>(prev_value.c_str());
  1316. uint32_t prev_size = static_cast<uint32_t>(prev_value.size());
  1317. auto status = moptions->inplace_callback(s.ok() ? prev_buffer : nullptr,
  1318. s.ok() ? &prev_size : nullptr,
  1319. value, &merged_value);
  1320. if (status == UpdateStatus::UPDATED_INPLACE) {
  1321. // prev_value is updated in-place with final value.
  1322. bool mem_res __attribute__((__unused__));
  1323. mem_res = mem->Add(
  1324. sequence_, value_type, key, Slice(prev_buffer, prev_size));
  1325. assert(mem_res);
  1326. RecordTick(moptions->statistics, NUMBER_KEYS_WRITTEN);
  1327. } else if (status == UpdateStatus::UPDATED) {
  1328. // merged_value contains the final value.
  1329. bool mem_res __attribute__((__unused__));
  1330. mem_res =
  1331. mem->Add(sequence_, value_type, key, Slice(merged_value));
  1332. assert(mem_res);
  1333. RecordTick(moptions->statistics, NUMBER_KEYS_WRITTEN);
  1334. }
  1335. }
  1336. }
  1337. // optimize for non-recovery mode
  1338. if (UNLIKELY(!ret_status.IsTryAgain() && rebuilding_trx_ != nullptr)) {
  1339. assert(!write_after_commit_);
  1340. // If the ret_status is TryAgain then let the next try to add the ky to
  1341. // the rebuilding transaction object.
  1342. WriteBatchInternal::Put(rebuilding_trx_, column_family_id, key, value);
  1343. }
  1344. // Since all Puts are logged in transaction logs (if enabled), always bump
  1345. // sequence number. Even if the update eventually fails and does not result
  1346. // in memtable add/update.
  1347. MaybeAdvanceSeq();
  1348. CheckMemtableFull();
  1349. return ret_status;
  1350. }
  1351. Status PutCF(uint32_t column_family_id, const Slice& key,
  1352. const Slice& value) override {
  1353. return PutCFImpl(column_family_id, key, value, kTypeValue);
  1354. }
  1355. Status DeleteImpl(uint32_t /*column_family_id*/, const Slice& key,
  1356. const Slice& value, ValueType delete_type) {
  1357. Status ret_status;
  1358. MemTable* mem = cf_mems_->GetMemTable();
  1359. bool mem_res =
  1360. mem->Add(sequence_, delete_type, key, value,
  1361. concurrent_memtable_writes_, get_post_process_info(mem),
  1362. hint_per_batch_ ? &GetHintMap()[mem] : nullptr);
  1363. if (UNLIKELY(!mem_res)) {
  1364. assert(seq_per_batch_);
  1365. ret_status = Status::TryAgain("key+seq exists");
  1366. const bool BATCH_BOUNDRY = true;
  1367. MaybeAdvanceSeq(BATCH_BOUNDRY);
  1368. }
  1369. MaybeAdvanceSeq();
  1370. CheckMemtableFull();
  1371. return ret_status;
  1372. }
  1373. Status DeleteCF(uint32_t column_family_id, const Slice& key) override {
  1374. // optimize for non-recovery mode
  1375. if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) {
  1376. WriteBatchInternal::Delete(rebuilding_trx_, column_family_id, key);
  1377. return Status::OK();
  1378. // else insert the values to the memtable right away
  1379. }
  1380. Status seek_status;
  1381. if (UNLIKELY(!SeekToColumnFamily(column_family_id, &seek_status))) {
  1382. bool batch_boundry = false;
  1383. if (rebuilding_trx_ != nullptr) {
  1384. assert(!write_after_commit_);
  1385. // The CF is probably flushed and hence no need for insert but we still
  1386. // need to keep track of the keys for upcoming rollback/commit.
  1387. WriteBatchInternal::Delete(rebuilding_trx_, column_family_id, key);
  1388. batch_boundry = IsDuplicateKeySeq(column_family_id, key);
  1389. }
  1390. MaybeAdvanceSeq(batch_boundry);
  1391. return seek_status;
  1392. }
  1393. auto ret_status = DeleteImpl(column_family_id, key, Slice(), kTypeDeletion);
  1394. // optimize for non-recovery mode
  1395. if (UNLIKELY(!ret_status.IsTryAgain() && rebuilding_trx_ != nullptr)) {
  1396. assert(!write_after_commit_);
  1397. // If the ret_status is TryAgain then let the next try to add the ky to
  1398. // the rebuilding transaction object.
  1399. WriteBatchInternal::Delete(rebuilding_trx_, column_family_id, key);
  1400. }
  1401. return ret_status;
  1402. }
  1403. Status SingleDeleteCF(uint32_t column_family_id, const Slice& key) override {
  1404. // optimize for non-recovery mode
  1405. if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) {
  1406. WriteBatchInternal::SingleDelete(rebuilding_trx_, column_family_id, key);
  1407. return Status::OK();
  1408. // else insert the values to the memtable right away
  1409. }
  1410. Status seek_status;
  1411. if (UNLIKELY(!SeekToColumnFamily(column_family_id, &seek_status))) {
  1412. bool batch_boundry = false;
  1413. if (rebuilding_trx_ != nullptr) {
  1414. assert(!write_after_commit_);
  1415. // The CF is probably flushed and hence no need for insert but we still
  1416. // need to keep track of the keys for upcoming rollback/commit.
  1417. WriteBatchInternal::SingleDelete(rebuilding_trx_, column_family_id,
  1418. key);
  1419. batch_boundry = IsDuplicateKeySeq(column_family_id, key);
  1420. }
  1421. MaybeAdvanceSeq(batch_boundry);
  1422. return seek_status;
  1423. }
  1424. auto ret_status =
  1425. DeleteImpl(column_family_id, key, Slice(), kTypeSingleDeletion);
  1426. // optimize for non-recovery mode
  1427. if (UNLIKELY(!ret_status.IsTryAgain() && rebuilding_trx_ != nullptr)) {
  1428. assert(!write_after_commit_);
  1429. // If the ret_status is TryAgain then let the next try to add the ky to
  1430. // the rebuilding transaction object.
  1431. WriteBatchInternal::SingleDelete(rebuilding_trx_, column_family_id, key);
  1432. }
  1433. return ret_status;
  1434. }
  1435. Status DeleteRangeCF(uint32_t column_family_id, const Slice& begin_key,
  1436. const Slice& end_key) override {
  1437. // optimize for non-recovery mode
  1438. if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) {
  1439. WriteBatchInternal::DeleteRange(rebuilding_trx_, column_family_id,
  1440. begin_key, end_key);
  1441. return Status::OK();
  1442. // else insert the values to the memtable right away
  1443. }
  1444. Status seek_status;
  1445. if (UNLIKELY(!SeekToColumnFamily(column_family_id, &seek_status))) {
  1446. bool batch_boundry = false;
  1447. if (rebuilding_trx_ != nullptr) {
  1448. assert(!write_after_commit_);
  1449. // The CF is probably flushed and hence no need for insert but we still
  1450. // need to keep track of the keys for upcoming rollback/commit.
  1451. WriteBatchInternal::DeleteRange(rebuilding_trx_, column_family_id,
  1452. begin_key, end_key);
  1453. // TODO(myabandeh): when transactional DeleteRange support is added,
  1454. // check if end_key must also be added.
  1455. batch_boundry = IsDuplicateKeySeq(column_family_id, begin_key);
  1456. }
  1457. MaybeAdvanceSeq(batch_boundry);
  1458. return seek_status;
  1459. }
  1460. if (db_ != nullptr) {
  1461. auto cf_handle = cf_mems_->GetColumnFamilyHandle();
  1462. if (cf_handle == nullptr) {
  1463. cf_handle = db_->DefaultColumnFamily();
  1464. }
  1465. auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(cf_handle)->cfd();
  1466. if (!cfd->is_delete_range_supported()) {
  1467. return Status::NotSupported(
  1468. std::string("DeleteRange not supported for table type ") +
  1469. cfd->ioptions()->table_factory->Name() + " in CF " +
  1470. cfd->GetName());
  1471. }
  1472. }
  1473. auto ret_status =
  1474. DeleteImpl(column_family_id, begin_key, end_key, kTypeRangeDeletion);
  1475. // optimize for non-recovery mode
  1476. if (UNLIKELY(!ret_status.IsTryAgain() && rebuilding_trx_ != nullptr)) {
  1477. assert(!write_after_commit_);
  1478. // If the ret_status is TryAgain then let the next try to add the ky to
  1479. // the rebuilding transaction object.
  1480. WriteBatchInternal::DeleteRange(rebuilding_trx_, column_family_id,
  1481. begin_key, end_key);
  1482. }
  1483. return ret_status;
  1484. }
  1485. Status MergeCF(uint32_t column_family_id, const Slice& key,
  1486. const Slice& value) override {
  1487. // optimize for non-recovery mode
  1488. if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) {
  1489. WriteBatchInternal::Merge(rebuilding_trx_, column_family_id, key, value);
  1490. return Status::OK();
  1491. // else insert the values to the memtable right away
  1492. }
  1493. Status seek_status;
  1494. if (UNLIKELY(!SeekToColumnFamily(column_family_id, &seek_status))) {
  1495. bool batch_boundry = false;
  1496. if (rebuilding_trx_ != nullptr) {
  1497. assert(!write_after_commit_);
  1498. // The CF is probably flushed and hence no need for insert but we still
  1499. // need to keep track of the keys for upcoming rollback/commit.
  1500. WriteBatchInternal::Merge(rebuilding_trx_, column_family_id, key,
  1501. value);
  1502. batch_boundry = IsDuplicateKeySeq(column_family_id, key);
  1503. }
  1504. MaybeAdvanceSeq(batch_boundry);
  1505. return seek_status;
  1506. }
  1507. Status ret_status;
  1508. MemTable* mem = cf_mems_->GetMemTable();
  1509. auto* moptions = mem->GetImmutableMemTableOptions();
  1510. bool perform_merge = false;
  1511. assert(!concurrent_memtable_writes_ ||
  1512. moptions->max_successive_merges == 0);
  1513. // If we pass DB through and options.max_successive_merges is hit
  1514. // during recovery, Get() will be issued which will try to acquire
  1515. // DB mutex and cause deadlock, as DB mutex is already held.
  1516. // So we disable merge in recovery
  1517. if (moptions->max_successive_merges > 0 && db_ != nullptr &&
  1518. recovering_log_number_ == 0) {
  1519. assert(!concurrent_memtable_writes_);
  1520. LookupKey lkey(key, sequence_);
  1521. // Count the number of successive merges at the head
  1522. // of the key in the memtable
  1523. size_t num_merges = mem->CountSuccessiveMergeEntries(lkey);
  1524. if (num_merges >= moptions->max_successive_merges) {
  1525. perform_merge = true;
  1526. }
  1527. }
  1528. if (perform_merge) {
  1529. // 1) Get the existing value
  1530. std::string get_value;
  1531. // Pass in the sequence number so that we also include previous merge
  1532. // operations in the same batch.
  1533. SnapshotImpl read_from_snapshot;
  1534. read_from_snapshot.number_ = sequence_;
  1535. ReadOptions read_options;
  1536. read_options.snapshot = &read_from_snapshot;
  1537. auto cf_handle = cf_mems_->GetColumnFamilyHandle();
  1538. if (cf_handle == nullptr) {
  1539. cf_handle = db_->DefaultColumnFamily();
  1540. }
  1541. db_->Get(read_options, cf_handle, key, &get_value);
  1542. Slice get_value_slice = Slice(get_value);
  1543. // 2) Apply this merge
  1544. auto merge_operator = moptions->merge_operator;
  1545. assert(merge_operator);
  1546. std::string new_value;
  1547. Status merge_status = MergeHelper::TimedFullMerge(
  1548. merge_operator, key, &get_value_slice, {value}, &new_value,
  1549. moptions->info_log, moptions->statistics, Env::Default());
  1550. if (!merge_status.ok()) {
  1551. // Failed to merge!
  1552. // Store the delta in memtable
  1553. perform_merge = false;
  1554. } else {
  1555. // 3) Add value to memtable
  1556. assert(!concurrent_memtable_writes_);
  1557. bool mem_res = mem->Add(sequence_, kTypeValue, key, new_value);
  1558. if (UNLIKELY(!mem_res)) {
  1559. assert(seq_per_batch_);
  1560. ret_status = Status::TryAgain("key+seq exists");
  1561. const bool BATCH_BOUNDRY = true;
  1562. MaybeAdvanceSeq(BATCH_BOUNDRY);
  1563. }
  1564. }
  1565. }
  1566. if (!perform_merge) {
  1567. // Add merge operator to memtable
  1568. bool mem_res =
  1569. mem->Add(sequence_, kTypeMerge, key, value,
  1570. concurrent_memtable_writes_, get_post_process_info(mem));
  1571. if (UNLIKELY(!mem_res)) {
  1572. assert(seq_per_batch_);
  1573. ret_status = Status::TryAgain("key+seq exists");
  1574. const bool BATCH_BOUNDRY = true;
  1575. MaybeAdvanceSeq(BATCH_BOUNDRY);
  1576. }
  1577. }
  1578. // optimize for non-recovery mode
  1579. if (UNLIKELY(!ret_status.IsTryAgain() && rebuilding_trx_ != nullptr)) {
  1580. assert(!write_after_commit_);
  1581. // If the ret_status is TryAgain then let the next try to add the ky to
  1582. // the rebuilding transaction object.
  1583. WriteBatchInternal::Merge(rebuilding_trx_, column_family_id, key, value);
  1584. }
  1585. MaybeAdvanceSeq();
  1586. CheckMemtableFull();
  1587. return ret_status;
  1588. }
  1589. Status PutBlobIndexCF(uint32_t column_family_id, const Slice& key,
  1590. const Slice& value) override {
  1591. // Same as PutCF except for value type.
  1592. return PutCFImpl(column_family_id, key, value, kTypeBlobIndex);
  1593. }
  1594. void CheckMemtableFull() {
  1595. if (flush_scheduler_ != nullptr) {
  1596. auto* cfd = cf_mems_->current();
  1597. assert(cfd != nullptr);
  1598. if (cfd->mem()->ShouldScheduleFlush() &&
  1599. cfd->mem()->MarkFlushScheduled()) {
  1600. // MarkFlushScheduled only returns true if we are the one that
  1601. // should take action, so no need to dedup further
  1602. flush_scheduler_->ScheduleWork(cfd);
  1603. }
  1604. }
  1605. // check if memtable_list size exceeds max_write_buffer_size_to_maintain
  1606. if (trim_history_scheduler_ != nullptr) {
  1607. auto* cfd = cf_mems_->current();
  1608. assert(cfd);
  1609. assert(cfd->ioptions());
  1610. const size_t size_to_maintain = static_cast<size_t>(
  1611. cfd->ioptions()->max_write_buffer_size_to_maintain);
  1612. if (size_to_maintain > 0) {
  1613. MemTableList* const imm = cfd->imm();
  1614. assert(imm);
  1615. if (imm->HasHistory()) {
  1616. const MemTable* const mem = cfd->mem();
  1617. assert(mem);
  1618. if (mem->ApproximateMemoryUsageFast() +
  1619. imm->ApproximateMemoryUsageExcludingLast() >=
  1620. size_to_maintain &&
  1621. imm->MarkTrimHistoryNeeded()) {
  1622. trim_history_scheduler_->ScheduleWork(cfd);
  1623. }
  1624. }
  1625. }
  1626. }
  1627. }
  1628. // The write batch handler calls MarkBeginPrepare with unprepare set to true
  1629. // if it encounters the kTypeBeginUnprepareXID marker.
  1630. Status MarkBeginPrepare(bool unprepare) override {
  1631. assert(rebuilding_trx_ == nullptr);
  1632. assert(db_);
  1633. if (recovering_log_number_ != 0) {
  1634. // during recovery we rebuild a hollow transaction
  1635. // from all encountered prepare sections of the wal
  1636. if (db_->allow_2pc() == false) {
  1637. return Status::NotSupported(
  1638. "WAL contains prepared transactions. Open with "
  1639. "TransactionDB::Open().");
  1640. }
  1641. // we are now iterating through a prepared section
  1642. rebuilding_trx_ = new WriteBatch();
  1643. rebuilding_trx_seq_ = sequence_;
  1644. // Verify that we have matching MarkBeginPrepare/MarkEndPrepare markers.
  1645. // unprepared_batch_ should be false because it is false by default, and
  1646. // gets reset to false in MarkEndPrepare.
  1647. assert(!unprepared_batch_);
  1648. unprepared_batch_ = unprepare;
  1649. if (has_valid_writes_ != nullptr) {
  1650. *has_valid_writes_ = true;
  1651. }
  1652. }
  1653. return Status::OK();
  1654. }
  1655. Status MarkEndPrepare(const Slice& name) override {
  1656. assert(db_);
  1657. assert((rebuilding_trx_ != nullptr) == (recovering_log_number_ != 0));
  1658. if (recovering_log_number_ != 0) {
  1659. assert(db_->allow_2pc());
  1660. size_t batch_cnt =
  1661. write_after_commit_
  1662. ? 0 // 0 will disable further checks
  1663. : static_cast<size_t>(sequence_ - rebuilding_trx_seq_ + 1);
  1664. db_->InsertRecoveredTransaction(recovering_log_number_, name.ToString(),
  1665. rebuilding_trx_, rebuilding_trx_seq_,
  1666. batch_cnt, unprepared_batch_);
  1667. unprepared_batch_ = false;
  1668. rebuilding_trx_ = nullptr;
  1669. } else {
  1670. assert(rebuilding_trx_ == nullptr);
  1671. }
  1672. const bool batch_boundry = true;
  1673. MaybeAdvanceSeq(batch_boundry);
  1674. return Status::OK();
  1675. }
  1676. Status MarkNoop(bool empty_batch) override {
  1677. // A hack in pessimistic transaction could result into a noop at the start
  1678. // of the write batch, that should be ignored.
  1679. if (!empty_batch) {
  1680. // In the absence of Prepare markers, a kTypeNoop tag indicates the end of
  1681. // a batch. This happens when write batch commits skipping the prepare
  1682. // phase.
  1683. const bool batch_boundry = true;
  1684. MaybeAdvanceSeq(batch_boundry);
  1685. }
  1686. return Status::OK();
  1687. }
  1688. Status MarkCommit(const Slice& name) override {
  1689. assert(db_);
  1690. Status s;
  1691. if (recovering_log_number_ != 0) {
  1692. // in recovery when we encounter a commit marker
  1693. // we lookup this transaction in our set of rebuilt transactions
  1694. // and commit.
  1695. auto trx = db_->GetRecoveredTransaction(name.ToString());
  1696. // the log containing the prepared section may have
  1697. // been released in the last incarnation because the
  1698. // data was flushed to L0
  1699. if (trx != nullptr) {
  1700. // at this point individual CF lognumbers will prevent
  1701. // duplicate re-insertion of values.
  1702. assert(log_number_ref_ == 0);
  1703. if (write_after_commit_) {
  1704. // write_after_commit_ can only have one batch in trx.
  1705. assert(trx->batches_.size() == 1);
  1706. const auto& batch_info = trx->batches_.begin()->second;
  1707. // all inserts must reference this trx log number
  1708. log_number_ref_ = batch_info.log_number_;
  1709. s = batch_info.batch_->Iterate(this);
  1710. log_number_ref_ = 0;
  1711. }
  1712. // else the values are already inserted before the commit
  1713. if (s.ok()) {
  1714. db_->DeleteRecoveredTransaction(name.ToString());
  1715. }
  1716. if (has_valid_writes_ != nullptr) {
  1717. *has_valid_writes_ = true;
  1718. }
  1719. }
  1720. } else {
  1721. // When writes are not delayed until commit, there is no disconnect
  1722. // between a memtable write and the WAL that supports it. So the commit
  1723. // need not reference any log as the only log to which it depends.
  1724. assert(!write_after_commit_ || log_number_ref_ > 0);
  1725. }
  1726. const bool batch_boundry = true;
  1727. MaybeAdvanceSeq(batch_boundry);
  1728. return s;
  1729. }
  1730. Status MarkRollback(const Slice& name) override {
  1731. assert(db_);
  1732. if (recovering_log_number_ != 0) {
  1733. auto trx = db_->GetRecoveredTransaction(name.ToString());
  1734. // the log containing the transactions prep section
  1735. // may have been released in the previous incarnation
  1736. // because we knew it had been rolled back
  1737. if (trx != nullptr) {
  1738. db_->DeleteRecoveredTransaction(name.ToString());
  1739. }
  1740. } else {
  1741. // in non recovery we simply ignore this tag
  1742. }
  1743. const bool batch_boundry = true;
  1744. MaybeAdvanceSeq(batch_boundry);
  1745. return Status::OK();
  1746. }
  1747. private:
  1748. MemTablePostProcessInfo* get_post_process_info(MemTable* mem) {
  1749. if (!concurrent_memtable_writes_) {
  1750. // No need to batch counters locally if we don't use concurrent mode.
  1751. return nullptr;
  1752. }
  1753. return &GetPostMap()[mem];
  1754. }
  1755. };
  1756. // This function can only be called in these conditions:
  1757. // 1) During Recovery()
  1758. // 2) During Write(), in a single-threaded write thread
  1759. // 3) During Write(), in a concurrent context where memtables has been cloned
  1760. // The reason is that it calls memtables->Seek(), which has a stateful cache
  1761. Status WriteBatchInternal::InsertInto(
  1762. WriteThread::WriteGroup& write_group, SequenceNumber sequence,
  1763. ColumnFamilyMemTables* memtables, FlushScheduler* flush_scheduler,
  1764. TrimHistoryScheduler* trim_history_scheduler,
  1765. bool ignore_missing_column_families, uint64_t recovery_log_number, DB* db,
  1766. bool concurrent_memtable_writes, bool seq_per_batch, bool batch_per_txn) {
  1767. MemTableInserter inserter(
  1768. sequence, memtables, flush_scheduler, trim_history_scheduler,
  1769. ignore_missing_column_families, recovery_log_number, db,
  1770. concurrent_memtable_writes, nullptr /*has_valid_writes*/, seq_per_batch,
  1771. batch_per_txn);
  1772. for (auto w : write_group) {
  1773. if (w->CallbackFailed()) {
  1774. continue;
  1775. }
  1776. w->sequence = inserter.sequence();
  1777. if (!w->ShouldWriteToMemtable()) {
  1778. // In seq_per_batch_ mode this advances the seq by one.
  1779. inserter.MaybeAdvanceSeq(true);
  1780. continue;
  1781. }
  1782. SetSequence(w->batch, inserter.sequence());
  1783. inserter.set_log_number_ref(w->log_ref);
  1784. w->status = w->batch->Iterate(&inserter);
  1785. if (!w->status.ok()) {
  1786. return w->status;
  1787. }
  1788. assert(!seq_per_batch || w->batch_cnt != 0);
  1789. assert(!seq_per_batch || inserter.sequence() - w->sequence == w->batch_cnt);
  1790. }
  1791. return Status::OK();
  1792. }
  1793. Status WriteBatchInternal::InsertInto(
  1794. WriteThread::Writer* writer, SequenceNumber sequence,
  1795. ColumnFamilyMemTables* memtables, FlushScheduler* flush_scheduler,
  1796. TrimHistoryScheduler* trim_history_scheduler,
  1797. bool ignore_missing_column_families, uint64_t log_number, DB* db,
  1798. bool concurrent_memtable_writes, bool seq_per_batch, size_t batch_cnt,
  1799. bool batch_per_txn, bool hint_per_batch) {
  1800. #ifdef NDEBUG
  1801. (void)batch_cnt;
  1802. #endif
  1803. assert(writer->ShouldWriteToMemtable());
  1804. MemTableInserter inserter(
  1805. sequence, memtables, flush_scheduler, trim_history_scheduler,
  1806. ignore_missing_column_families, log_number, db,
  1807. concurrent_memtable_writes, nullptr /*has_valid_writes*/, seq_per_batch,
  1808. batch_per_txn, hint_per_batch);
  1809. SetSequence(writer->batch, sequence);
  1810. inserter.set_log_number_ref(writer->log_ref);
  1811. Status s = writer->batch->Iterate(&inserter);
  1812. assert(!seq_per_batch || batch_cnt != 0);
  1813. assert(!seq_per_batch || inserter.sequence() - sequence == batch_cnt);
  1814. if (concurrent_memtable_writes) {
  1815. inserter.PostProcess();
  1816. }
  1817. return s;
  1818. }
  1819. Status WriteBatchInternal::InsertInto(
  1820. const WriteBatch* batch, ColumnFamilyMemTables* memtables,
  1821. FlushScheduler* flush_scheduler,
  1822. TrimHistoryScheduler* trim_history_scheduler,
  1823. bool ignore_missing_column_families, uint64_t log_number, DB* db,
  1824. bool concurrent_memtable_writes, SequenceNumber* next_seq,
  1825. bool* has_valid_writes, bool seq_per_batch, bool batch_per_txn) {
  1826. MemTableInserter inserter(Sequence(batch), memtables, flush_scheduler,
  1827. trim_history_scheduler,
  1828. ignore_missing_column_families, log_number, db,
  1829. concurrent_memtable_writes, has_valid_writes,
  1830. seq_per_batch, batch_per_txn);
  1831. Status s = batch->Iterate(&inserter);
  1832. if (next_seq != nullptr) {
  1833. *next_seq = inserter.sequence();
  1834. }
  1835. if (concurrent_memtable_writes) {
  1836. inserter.PostProcess();
  1837. }
  1838. return s;
  1839. }
  1840. Status WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) {
  1841. assert(contents.size() >= WriteBatchInternal::kHeader);
  1842. b->rep_.assign(contents.data(), contents.size());
  1843. b->content_flags_.store(ContentFlags::DEFERRED, std::memory_order_relaxed);
  1844. return Status::OK();
  1845. }
  1846. Status WriteBatchInternal::Append(WriteBatch* dst, const WriteBatch* src,
  1847. const bool wal_only) {
  1848. size_t src_len;
  1849. int src_count;
  1850. uint32_t src_flags;
  1851. const SavePoint& batch_end = src->GetWalTerminationPoint();
  1852. if (wal_only && !batch_end.is_cleared()) {
  1853. src_len = batch_end.size - WriteBatchInternal::kHeader;
  1854. src_count = batch_end.count;
  1855. src_flags = batch_end.content_flags;
  1856. } else {
  1857. src_len = src->rep_.size() - WriteBatchInternal::kHeader;
  1858. src_count = Count(src);
  1859. src_flags = src->content_flags_.load(std::memory_order_relaxed);
  1860. }
  1861. SetCount(dst, Count(dst) + src_count);
  1862. assert(src->rep_.size() >= WriteBatchInternal::kHeader);
  1863. dst->rep_.append(src->rep_.data() + WriteBatchInternal::kHeader, src_len);
  1864. dst->content_flags_.store(
  1865. dst->content_flags_.load(std::memory_order_relaxed) | src_flags,
  1866. std::memory_order_relaxed);
  1867. return Status::OK();
  1868. }
  1869. size_t WriteBatchInternal::AppendedByteSize(size_t leftByteSize,
  1870. size_t rightByteSize) {
  1871. if (leftByteSize == 0 || rightByteSize == 0) {
  1872. return leftByteSize + rightByteSize;
  1873. } else {
  1874. return leftByteSize + rightByteSize - WriteBatchInternal::kHeader;
  1875. }
  1876. }
  1877. } // namespace ROCKSDB_NAMESPACE