get_context.cc 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #include "table/get_context.h"
  6. #include "db/blob//blob_fetcher.h"
  7. #include "db/merge_helper.h"
  8. #include "db/pinned_iterators_manager.h"
  9. #include "db/read_callback.h"
  10. #include "db/wide/wide_column_serialization.h"
  11. #include "monitoring/file_read_sample.h"
  12. #include "monitoring/perf_context_imp.h"
  13. #include "monitoring/statistics_impl.h"
  14. #include "rocksdb/merge_operator.h"
  15. #include "rocksdb/statistics.h"
  16. #include "rocksdb/system_clock.h"
  17. namespace ROCKSDB_NAMESPACE {
  18. GetContext::GetContext(
  19. const Comparator* ucmp, const MergeOperator* merge_operator, Logger* logger,
  20. Statistics* statistics, GetState init_state, const Slice& user_key,
  21. PinnableSlice* pinnable_val, PinnableWideColumns* columns,
  22. std::string* timestamp, bool* value_found, MergeContext* merge_context,
  23. bool do_merge, SequenceNumber* _max_covering_tombstone_seq,
  24. SystemClock* clock, SequenceNumber* seq,
  25. PinnedIteratorsManager* _pinned_iters_mgr, ReadCallback* callback,
  26. bool* is_blob_index, uint64_t tracing_get_id, BlobFetcher* blob_fetcher)
  27. : ucmp_(ucmp),
  28. merge_operator_(merge_operator),
  29. logger_(logger),
  30. statistics_(statistics),
  31. state_(init_state),
  32. user_key_(user_key),
  33. pinnable_val_(pinnable_val),
  34. columns_(columns),
  35. timestamp_(timestamp),
  36. value_found_(value_found),
  37. merge_context_(merge_context),
  38. max_covering_tombstone_seq_(_max_covering_tombstone_seq),
  39. clock_(clock),
  40. seq_(seq),
  41. replay_log_(nullptr),
  42. pinned_iters_mgr_(_pinned_iters_mgr),
  43. callback_(callback),
  44. do_merge_(do_merge),
  45. is_blob_index_(is_blob_index),
  46. tracing_get_id_(tracing_get_id),
  47. blob_fetcher_(blob_fetcher) {
  48. if (seq_) {
  49. *seq_ = kMaxSequenceNumber;
  50. }
  51. sample_ = should_sample_file_read();
  52. }
  53. GetContext::GetContext(const Comparator* ucmp,
  54. const MergeOperator* merge_operator, Logger* logger,
  55. Statistics* statistics, GetState init_state,
  56. const Slice& user_key, PinnableSlice* pinnable_val,
  57. PinnableWideColumns* columns, bool* value_found,
  58. MergeContext* merge_context, bool do_merge,
  59. SequenceNumber* _max_covering_tombstone_seq,
  60. SystemClock* clock, SequenceNumber* seq,
  61. PinnedIteratorsManager* _pinned_iters_mgr,
  62. ReadCallback* callback, bool* is_blob_index,
  63. uint64_t tracing_get_id, BlobFetcher* blob_fetcher)
  64. : GetContext(ucmp, merge_operator, logger, statistics, init_state, user_key,
  65. pinnable_val, columns, /*timestamp=*/nullptr, value_found,
  66. merge_context, do_merge, _max_covering_tombstone_seq, clock,
  67. seq, _pinned_iters_mgr, callback, is_blob_index,
  68. tracing_get_id, blob_fetcher) {}
  69. void GetContext::appendToReplayLog(ValueType type, Slice value, Slice ts) {
  70. if (replay_log_) {
  71. if (replay_log_->empty()) {
  72. // Optimization: in the common case of only one operation in the
  73. // log, we allocate the exact amount of space needed.
  74. replay_log_->reserve(1 + VarintLength(value.size()) + value.size());
  75. }
  76. replay_log_->push_back(type);
  77. PutLengthPrefixedSlice(replay_log_, value);
  78. // If cf enables ts, there should always be a ts following each value
  79. if (ucmp_->timestamp_size() > 0) {
  80. assert(ts.size() == ucmp_->timestamp_size());
  81. PutLengthPrefixedSlice(replay_log_, ts);
  82. }
  83. }
  84. }
  85. // Called from TableCache::Get and Table::Get when file/block in which
  86. // key may exist are not there in TableCache/BlockCache respectively. In this
  87. // case we can't guarantee that key does not exist and are not permitted to do
  88. // IO to be certain.Set the status=kFound and value_found=false to let the
  89. // caller know that key may exist but is not there in memory
  90. void GetContext::MarkKeyMayExist() {
  91. state_ = kFound;
  92. if (value_found_ != nullptr) {
  93. *value_found_ = false;
  94. }
  95. }
  96. void GetContext::SaveValue(const Slice& value, SequenceNumber /*seq*/) {
  97. assert(state_ == kNotFound);
  98. assert(ucmp_->timestamp_size() == 0);
  99. appendToReplayLog(kTypeValue, value, Slice());
  100. state_ = kFound;
  101. if (LIKELY(pinnable_val_ != nullptr)) {
  102. pinnable_val_->PinSelf(value);
  103. }
  104. }
  105. void GetContext::ReportCounters() {
  106. if (get_context_stats_.num_cache_hit > 0) {
  107. RecordTick(statistics_, BLOCK_CACHE_HIT, get_context_stats_.num_cache_hit);
  108. }
  109. if (get_context_stats_.num_cache_index_hit > 0) {
  110. RecordTick(statistics_, BLOCK_CACHE_INDEX_HIT,
  111. get_context_stats_.num_cache_index_hit);
  112. }
  113. if (get_context_stats_.num_cache_data_hit > 0) {
  114. RecordTick(statistics_, BLOCK_CACHE_DATA_HIT,
  115. get_context_stats_.num_cache_data_hit);
  116. }
  117. if (get_context_stats_.num_cache_filter_hit > 0) {
  118. RecordTick(statistics_, BLOCK_CACHE_FILTER_HIT,
  119. get_context_stats_.num_cache_filter_hit);
  120. }
  121. if (get_context_stats_.num_cache_compression_dict_hit > 0) {
  122. RecordTick(statistics_, BLOCK_CACHE_COMPRESSION_DICT_HIT,
  123. get_context_stats_.num_cache_compression_dict_hit);
  124. }
  125. if (get_context_stats_.num_cache_index_miss > 0) {
  126. RecordTick(statistics_, BLOCK_CACHE_INDEX_MISS,
  127. get_context_stats_.num_cache_index_miss);
  128. }
  129. if (get_context_stats_.num_cache_filter_miss > 0) {
  130. RecordTick(statistics_, BLOCK_CACHE_FILTER_MISS,
  131. get_context_stats_.num_cache_filter_miss);
  132. }
  133. if (get_context_stats_.num_cache_data_miss > 0) {
  134. RecordTick(statistics_, BLOCK_CACHE_DATA_MISS,
  135. get_context_stats_.num_cache_data_miss);
  136. }
  137. if (get_context_stats_.num_cache_compression_dict_miss > 0) {
  138. RecordTick(statistics_, BLOCK_CACHE_COMPRESSION_DICT_MISS,
  139. get_context_stats_.num_cache_compression_dict_miss);
  140. }
  141. if (get_context_stats_.num_cache_bytes_read > 0) {
  142. RecordTick(statistics_, BLOCK_CACHE_BYTES_READ,
  143. get_context_stats_.num_cache_bytes_read);
  144. }
  145. if (get_context_stats_.num_cache_miss > 0) {
  146. RecordTick(statistics_, BLOCK_CACHE_MISS,
  147. get_context_stats_.num_cache_miss);
  148. }
  149. if (get_context_stats_.num_cache_add > 0) {
  150. RecordTick(statistics_, BLOCK_CACHE_ADD, get_context_stats_.num_cache_add);
  151. }
  152. if (get_context_stats_.num_cache_add_redundant > 0) {
  153. RecordTick(statistics_, BLOCK_CACHE_ADD_REDUNDANT,
  154. get_context_stats_.num_cache_add_redundant);
  155. }
  156. if (get_context_stats_.num_cache_bytes_write > 0) {
  157. RecordTick(statistics_, BLOCK_CACHE_BYTES_WRITE,
  158. get_context_stats_.num_cache_bytes_write);
  159. }
  160. if (get_context_stats_.num_cache_index_add > 0) {
  161. RecordTick(statistics_, BLOCK_CACHE_INDEX_ADD,
  162. get_context_stats_.num_cache_index_add);
  163. }
  164. if (get_context_stats_.num_cache_index_add_redundant > 0) {
  165. RecordTick(statistics_, BLOCK_CACHE_INDEX_ADD_REDUNDANT,
  166. get_context_stats_.num_cache_index_add_redundant);
  167. }
  168. if (get_context_stats_.num_cache_index_bytes_insert > 0) {
  169. RecordTick(statistics_, BLOCK_CACHE_INDEX_BYTES_INSERT,
  170. get_context_stats_.num_cache_index_bytes_insert);
  171. }
  172. if (get_context_stats_.num_cache_data_add > 0) {
  173. RecordTick(statistics_, BLOCK_CACHE_DATA_ADD,
  174. get_context_stats_.num_cache_data_add);
  175. }
  176. if (get_context_stats_.num_cache_data_add_redundant > 0) {
  177. RecordTick(statistics_, BLOCK_CACHE_DATA_ADD_REDUNDANT,
  178. get_context_stats_.num_cache_data_add_redundant);
  179. }
  180. if (get_context_stats_.num_cache_data_bytes_insert > 0) {
  181. RecordTick(statistics_, BLOCK_CACHE_DATA_BYTES_INSERT,
  182. get_context_stats_.num_cache_data_bytes_insert);
  183. }
  184. if (get_context_stats_.num_cache_filter_add > 0) {
  185. RecordTick(statistics_, BLOCK_CACHE_FILTER_ADD,
  186. get_context_stats_.num_cache_filter_add);
  187. }
  188. if (get_context_stats_.num_cache_filter_add_redundant > 0) {
  189. RecordTick(statistics_, BLOCK_CACHE_FILTER_ADD_REDUNDANT,
  190. get_context_stats_.num_cache_filter_add_redundant);
  191. }
  192. if (get_context_stats_.num_cache_filter_bytes_insert > 0) {
  193. RecordTick(statistics_, BLOCK_CACHE_FILTER_BYTES_INSERT,
  194. get_context_stats_.num_cache_filter_bytes_insert);
  195. }
  196. if (get_context_stats_.num_cache_compression_dict_add > 0) {
  197. RecordTick(statistics_, BLOCK_CACHE_COMPRESSION_DICT_ADD,
  198. get_context_stats_.num_cache_compression_dict_add);
  199. }
  200. if (get_context_stats_.num_cache_compression_dict_add_redundant > 0) {
  201. RecordTick(statistics_, BLOCK_CACHE_COMPRESSION_DICT_ADD_REDUNDANT,
  202. get_context_stats_.num_cache_compression_dict_add_redundant);
  203. }
  204. if (get_context_stats_.num_cache_compression_dict_bytes_insert > 0) {
  205. RecordTick(statistics_, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT,
  206. get_context_stats_.num_cache_compression_dict_bytes_insert);
  207. }
  208. }
  209. bool GetContext::SaveValue(const ParsedInternalKey& parsed_key,
  210. const Slice& value, bool* matched,
  211. Status* read_status, Cleanable* value_pinner) {
  212. assert(matched);
  213. assert((state_ != kMerge && parsed_key.type != kTypeMerge) ||
  214. merge_context_ != nullptr);
  215. if (ucmp_->EqualWithoutTimestamp(parsed_key.user_key, user_key_)) {
  216. *matched = true;
  217. // If the value is not in the snapshot, skip it
  218. if (!CheckCallback(parsed_key.sequence)) {
  219. return true; // to continue to the next seq
  220. }
  221. if (seq_ != nullptr) {
  222. // Set the sequence number if it is uninitialized
  223. if (*seq_ == kMaxSequenceNumber) {
  224. *seq_ = parsed_key.sequence;
  225. }
  226. if (max_covering_tombstone_seq_) {
  227. *seq_ = std::max(*seq_, *max_covering_tombstone_seq_);
  228. }
  229. }
  230. size_t ts_sz = ucmp_->timestamp_size();
  231. Slice ts;
  232. if (ts_sz > 0) {
  233. // ensure always have ts if cf enables ts.
  234. ts = ExtractTimestampFromUserKey(parsed_key.user_key, ts_sz);
  235. if (timestamp_ != nullptr) {
  236. if (!timestamp_->empty()) {
  237. assert(ts_sz == timestamp_->size());
  238. // `timestamp` can be set before `SaveValue` is ever called
  239. // when max_covering_tombstone_seq_ was set.
  240. // If this key has a higher sequence number than range tombstone,
  241. // then timestamp should be updated. `ts_from_rangetombstone_` is
  242. // set to false afterwards so that only the key with highest seqno
  243. // updates the timestamp.
  244. if (ts_from_rangetombstone_) {
  245. assert(max_covering_tombstone_seq_);
  246. if (parsed_key.sequence > *max_covering_tombstone_seq_) {
  247. timestamp_->assign(ts.data(), ts.size());
  248. ts_from_rangetombstone_ = false;
  249. }
  250. }
  251. }
  252. // TODO optimize for small size ts
  253. const std::string kMaxTs(ts_sz, '\xff');
  254. if (timestamp_->empty() ||
  255. ucmp_->CompareTimestamp(*timestamp_, kMaxTs) == 0) {
  256. timestamp_->assign(ts.data(), ts.size());
  257. }
  258. }
  259. }
  260. appendToReplayLog(parsed_key.type, value, ts);
  261. auto type = parsed_key.type;
  262. Slice unpacked_value = value;
  263. // Key matches. Process it
  264. if ((type == kTypeValue || type == kTypeValuePreferredSeqno ||
  265. type == kTypeMerge || type == kTypeBlobIndex ||
  266. type == kTypeWideColumnEntity || type == kTypeDeletion ||
  267. type == kTypeDeletionWithTimestamp || type == kTypeSingleDeletion) &&
  268. max_covering_tombstone_seq_ != nullptr &&
  269. *max_covering_tombstone_seq_ > parsed_key.sequence) {
  270. // Note that deletion types are also considered, this is for the case
  271. // when we need to return timestamp to user. If a range tombstone has a
  272. // higher seqno than point tombstone, its timestamp should be returned.
  273. type = kTypeRangeDeletion;
  274. }
  275. switch (type) {
  276. case kTypeValue:
  277. case kTypeValuePreferredSeqno:
  278. case kTypeBlobIndex:
  279. case kTypeWideColumnEntity:
  280. assert(state_ == kNotFound || state_ == kMerge);
  281. if (type == kTypeValuePreferredSeqno) {
  282. unpacked_value = ParsePackedValueForValue(value);
  283. }
  284. if (type == kTypeBlobIndex) {
  285. if (is_blob_index_ == nullptr) {
  286. // Blob value not supported. Stop.
  287. state_ = kUnexpectedBlobIndex;
  288. return false;
  289. }
  290. }
  291. if (is_blob_index_ != nullptr) {
  292. *is_blob_index_ = (type == kTypeBlobIndex);
  293. }
  294. if (kNotFound == state_) {
  295. state_ = kFound;
  296. if (do_merge_) {
  297. if (type == kTypeBlobIndex && ucmp_->timestamp_size() != 0) {
  298. ukey_with_ts_found_.PinSelf(parsed_key.user_key);
  299. }
  300. if (LIKELY(pinnable_val_ != nullptr)) {
  301. Slice value_to_use = unpacked_value;
  302. if (type == kTypeWideColumnEntity) {
  303. Slice value_copy = unpacked_value;
  304. if (!WideColumnSerialization::GetValueOfDefaultColumn(
  305. value_copy, value_to_use)
  306. .ok()) {
  307. state_ = kCorrupt;
  308. return false;
  309. }
  310. }
  311. if (LIKELY(value_pinner != nullptr)) {
  312. // If the backing resources for the value are provided, pin them
  313. pinnable_val_->PinSlice(value_to_use, value_pinner);
  314. } else {
  315. TEST_SYNC_POINT_CALLBACK("GetContext::SaveValue::PinSelf",
  316. this);
  317. // Otherwise copy the value
  318. pinnable_val_->PinSelf(value_to_use);
  319. }
  320. } else if (columns_ != nullptr) {
  321. if (type == kTypeWideColumnEntity) {
  322. if (!columns_->SetWideColumnValue(unpacked_value, value_pinner)
  323. .ok()) {
  324. state_ = kCorrupt;
  325. return false;
  326. }
  327. } else {
  328. columns_->SetPlainValue(unpacked_value, value_pinner);
  329. }
  330. }
  331. } else {
  332. // It means this function is called as part of DB GetMergeOperands
  333. // API and the current value should be part of
  334. // merge_context_->operand_list
  335. if (type == kTypeBlobIndex) {
  336. PinnableSlice pin_val;
  337. if (GetBlobValue(parsed_key.user_key, unpacked_value, &pin_val,
  338. read_status) == false) {
  339. return false;
  340. }
  341. Slice blob_value(pin_val);
  342. push_operand(blob_value, nullptr);
  343. } else if (type == kTypeWideColumnEntity) {
  344. Slice value_copy = unpacked_value;
  345. Slice value_of_default;
  346. if (!WideColumnSerialization::GetValueOfDefaultColumn(
  347. value_copy, value_of_default)
  348. .ok()) {
  349. state_ = kCorrupt;
  350. return false;
  351. }
  352. push_operand(value_of_default, value_pinner);
  353. } else {
  354. assert(type == kTypeValue || type == kTypeValuePreferredSeqno);
  355. push_operand(unpacked_value, value_pinner);
  356. }
  357. }
  358. } else if (kMerge == state_) {
  359. assert(merge_operator_ != nullptr);
  360. if (type == kTypeBlobIndex) {
  361. PinnableSlice pin_val;
  362. if (GetBlobValue(parsed_key.user_key, unpacked_value, &pin_val,
  363. read_status) == false) {
  364. return false;
  365. }
  366. Slice blob_value(pin_val);
  367. state_ = kFound;
  368. if (do_merge_) {
  369. MergeWithPlainBaseValue(blob_value);
  370. } else {
  371. // It means this function is called as part of DB GetMergeOperands
  372. // API and the current value should be part of
  373. // merge_context_->operand_list
  374. push_operand(blob_value, nullptr);
  375. }
  376. } else if (type == kTypeWideColumnEntity) {
  377. state_ = kFound;
  378. if (do_merge_) {
  379. MergeWithWideColumnBaseValue(unpacked_value);
  380. } else {
  381. // It means this function is called as part of DB GetMergeOperands
  382. // API and the current value should be part of
  383. // merge_context_->operand_list
  384. Slice value_copy = unpacked_value;
  385. Slice value_of_default;
  386. if (!WideColumnSerialization::GetValueOfDefaultColumn(
  387. value_copy, value_of_default)
  388. .ok()) {
  389. state_ = kCorrupt;
  390. return false;
  391. }
  392. push_operand(value_of_default, value_pinner);
  393. }
  394. } else {
  395. assert(type == kTypeValue || type == kTypeValuePreferredSeqno);
  396. state_ = kFound;
  397. if (do_merge_) {
  398. MergeWithPlainBaseValue(unpacked_value);
  399. } else {
  400. // It means this function is called as part of DB GetMergeOperands
  401. // API and the current value should be part of
  402. // merge_context_->operand_list
  403. push_operand(unpacked_value, value_pinner);
  404. }
  405. }
  406. }
  407. return false;
  408. case kTypeDeletion:
  409. case kTypeDeletionWithTimestamp:
  410. case kTypeSingleDeletion:
  411. case kTypeRangeDeletion:
  412. // TODO(noetzli): Verify correctness once merge of single-deletes
  413. // is supported
  414. assert(state_ == kNotFound || state_ == kMerge);
  415. if (kNotFound == state_) {
  416. state_ = kDeleted;
  417. } else if (kMerge == state_) {
  418. state_ = kFound;
  419. if (do_merge_) {
  420. MergeWithNoBaseValue();
  421. }
  422. // If do_merge_ = false then the current value shouldn't be part of
  423. // merge_context_->operand_list
  424. }
  425. return false;
  426. case kTypeMerge:
  427. assert(state_ == kNotFound || state_ == kMerge);
  428. state_ = kMerge;
  429. // value_pinner is not set from plain_table_reader.cc for example.
  430. push_operand(value, value_pinner);
  431. PERF_COUNTER_ADD(internal_merge_point_lookup_count, 1);
  432. if (do_merge_ && merge_operator_ != nullptr &&
  433. merge_operator_->ShouldMerge(
  434. merge_context_->GetOperandsDirectionBackward())) {
  435. state_ = kFound;
  436. MergeWithNoBaseValue();
  437. return false;
  438. }
  439. if (merge_context_->get_merge_operands_options != nullptr &&
  440. merge_context_->get_merge_operands_options->continue_cb !=
  441. nullptr &&
  442. !merge_context_->get_merge_operands_options->continue_cb(value)) {
  443. state_ = kFound;
  444. return false;
  445. }
  446. return true;
  447. default:
  448. assert(false);
  449. break;
  450. }
  451. }
  452. // state_ could be Corrupt, merge or notfound
  453. return false;
  454. }
  455. void GetContext::PostprocessMerge(const Status& merge_status) {
  456. if (!merge_status.ok()) {
  457. if (merge_status.subcode() == Status::SubCode::kMergeOperatorFailed) {
  458. state_ = kMergeOperatorFailed;
  459. } else {
  460. state_ = kCorrupt;
  461. }
  462. return;
  463. }
  464. if (LIKELY(pinnable_val_ != nullptr)) {
  465. pinnable_val_->PinSelf();
  466. }
  467. }
  468. void GetContext::MergeWithNoBaseValue() {
  469. assert(do_merge_);
  470. assert(pinnable_val_ || columns_);
  471. assert(!pinnable_val_ || !columns_);
  472. // `op_failure_scope` (an output parameter) is not provided (set to nullptr)
  473. // since a failure must be propagated regardless of its value.
  474. const Status s = MergeHelper::TimedFullMerge(
  475. merge_operator_, user_key_, MergeHelper::kNoBaseValue,
  476. merge_context_->GetOperands(), logger_, statistics_, clock_,
  477. /* update_num_ops_stats */ true, /* op_failure_scope */ nullptr,
  478. pinnable_val_ ? pinnable_val_->GetSelf() : nullptr, columns_);
  479. PostprocessMerge(s);
  480. }
  481. void GetContext::MergeWithPlainBaseValue(const Slice& value) {
  482. assert(do_merge_);
  483. assert(pinnable_val_ || columns_);
  484. assert(!pinnable_val_ || !columns_);
  485. // `op_failure_scope` (an output parameter) is not provided (set to nullptr)
  486. // since a failure must be propagated regardless of its value.
  487. const Status s = MergeHelper::TimedFullMerge(
  488. merge_operator_, user_key_, MergeHelper::kPlainBaseValue, value,
  489. merge_context_->GetOperands(), logger_, statistics_, clock_,
  490. /* update_num_ops_stats */ true, /* op_failure_scope */ nullptr,
  491. pinnable_val_ ? pinnable_val_->GetSelf() : nullptr, columns_);
  492. PostprocessMerge(s);
  493. }
  494. void GetContext::MergeWithWideColumnBaseValue(const Slice& entity) {
  495. assert(do_merge_);
  496. assert(pinnable_val_ || columns_);
  497. assert(!pinnable_val_ || !columns_);
  498. // `op_failure_scope` (an output parameter) is not provided (set to nullptr)
  499. // since a failure must be propagated regardless of its value.
  500. const Status s = MergeHelper::TimedFullMerge(
  501. merge_operator_, user_key_, MergeHelper::kWideBaseValue, entity,
  502. merge_context_->GetOperands(), logger_, statistics_, clock_,
  503. /* update_num_ops_stats */ true, /* op_failure_scope */ nullptr,
  504. pinnable_val_ ? pinnable_val_->GetSelf() : nullptr, columns_);
  505. PostprocessMerge(s);
  506. }
  507. bool GetContext::GetBlobValue(const Slice& user_key, const Slice& blob_index,
  508. PinnableSlice* blob_value, Status* read_status) {
  509. constexpr FilePrefetchBuffer* prefetch_buffer = nullptr;
  510. constexpr uint64_t* bytes_read = nullptr;
  511. *read_status = blob_fetcher_->FetchBlob(user_key, blob_index, prefetch_buffer,
  512. blob_value, bytes_read);
  513. if (!read_status->ok()) {
  514. if (read_status->IsIncomplete()) {
  515. // FIXME: this code is not covered by unit tests
  516. MarkKeyMayExist();
  517. return false;
  518. }
  519. state_ = kCorrupt;
  520. return false;
  521. }
  522. *is_blob_index_ = false;
  523. return true;
  524. }
  525. void GetContext::push_operand(const Slice& value, Cleanable* value_pinner) {
  526. // TODO(yanqin) preserve timestamps information in merge_context
  527. if (pinned_iters_mgr() && pinned_iters_mgr()->PinningEnabled() &&
  528. value_pinner != nullptr) {
  529. value_pinner->DelegateCleanupsTo(pinned_iters_mgr());
  530. merge_context_->PushOperand(value, true /*value_pinned*/);
  531. } else {
  532. merge_context_->PushOperand(value, false);
  533. }
  534. }
  535. Status replayGetContextLog(const Slice& replay_log, const Slice& user_key,
  536. GetContext* get_context, Cleanable* value_pinner,
  537. SequenceNumber seq_no) {
  538. Slice s = replay_log;
  539. Slice ts;
  540. size_t ts_sz = get_context->TimestampSize();
  541. bool ret = false;
  542. while (s.size()) {
  543. auto type = static_cast<ValueType>(*s.data());
  544. s.remove_prefix(1);
  545. Slice value;
  546. ret = GetLengthPrefixedSlice(&s, &value);
  547. assert(ret);
  548. bool dont_care __attribute__((__unused__));
  549. // Use a copy to prevent modifying user_key. Modification of user_key
  550. // could result to potential cache miss.
  551. std::string user_key_str = user_key.ToString();
  552. ParsedInternalKey ikey = ParsedInternalKey(user_key_str, seq_no, type);
  553. // If ts enabled for current cf, there will always be ts appended after each
  554. // piece of value.
  555. if (ts_sz > 0) {
  556. ret = GetLengthPrefixedSlice(&s, &ts);
  557. assert(ts_sz == ts.size());
  558. assert(ret);
  559. ikey.SetTimestamp(ts);
  560. }
  561. (void)ret;
  562. Status read_status;
  563. get_context->SaveValue(ikey, value, &dont_care, &read_status, value_pinner);
  564. if (!read_status.ok()) {
  565. return read_status;
  566. }
  567. }
  568. return Status::OK();
  569. }
  570. } // namespace ROCKSDB_NAMESPACE