plain_table_key_coding.cc 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #include "table/plain/plain_table_key_coding.h"
  6. #include <algorithm>
  7. #include <string>
  8. #include "db/dbformat.h"
  9. #include "file/writable_file_writer.h"
  10. #include "table/plain/plain_table_factory.h"
  11. #include "table/plain/plain_table_reader.h"
  12. namespace ROCKSDB_NAMESPACE {
  13. enum PlainTableEntryType : unsigned char {
  14. kFullKey = 0,
  15. kPrefixFromPreviousKey = 1,
  16. kKeySuffix = 2,
  17. };
  18. namespace {
  19. // Control byte:
  20. // First two bits indicate type of entry
  21. // Other bytes are inlined sizes. If all bits are 1 (0x03F), overflow bytes
  22. // are used. key_size-0x3F will be encoded as a variint32 after this bytes.
  23. const unsigned char kSizeInlineLimit = 0x3F;
  24. // Return 0 for error
  25. size_t EncodeSize(PlainTableEntryType type, uint32_t key_size,
  26. char* out_buffer) {
  27. out_buffer[0] = type << 6;
  28. if (key_size < static_cast<uint32_t>(kSizeInlineLimit)) {
  29. // size inlined
  30. out_buffer[0] |= static_cast<char>(key_size);
  31. return 1;
  32. } else {
  33. out_buffer[0] |= kSizeInlineLimit;
  34. char* ptr = EncodeVarint32(out_buffer + 1, key_size - kSizeInlineLimit);
  35. return ptr - out_buffer;
  36. }
  37. }
  38. } // namespace
  39. // Fill bytes_read with number of bytes read.
  40. inline Status PlainTableKeyDecoder::DecodeSize(uint32_t start_offset,
  41. PlainTableEntryType* entry_type,
  42. uint32_t* key_size,
  43. uint32_t* bytes_read) {
  44. Slice next_byte_slice;
  45. bool success = file_reader_.Read(start_offset, 1, &next_byte_slice);
  46. if (!success) {
  47. return file_reader_.status();
  48. }
  49. *entry_type = static_cast<PlainTableEntryType>(
  50. (static_cast<unsigned char>(next_byte_slice[0]) & ~kSizeInlineLimit) >>
  51. 6);
  52. char inline_key_size = next_byte_slice[0] & kSizeInlineLimit;
  53. if (inline_key_size < kSizeInlineLimit) {
  54. *key_size = inline_key_size;
  55. *bytes_read = 1;
  56. return Status::OK();
  57. } else {
  58. uint32_t extra_size;
  59. uint32_t tmp_bytes_read;
  60. success = file_reader_.ReadVarint32(start_offset + 1, &extra_size,
  61. &tmp_bytes_read);
  62. if (!success) {
  63. return file_reader_.status();
  64. }
  65. assert(tmp_bytes_read > 0);
  66. *key_size = kSizeInlineLimit + extra_size;
  67. *bytes_read = tmp_bytes_read + 1;
  68. return Status::OK();
  69. }
  70. }
  71. IOStatus PlainTableKeyEncoder::AppendKey(const Slice& key,
  72. WritableFileWriter* file,
  73. uint64_t* offset, char* meta_bytes_buf,
  74. size_t* meta_bytes_buf_size) {
  75. ParsedInternalKey parsed_key;
  76. Status pik_status =
  77. ParseInternalKey(key, &parsed_key, false /* log_err_key */); // TODO
  78. if (!pik_status.ok()) {
  79. return IOStatus::Corruption(pik_status.getState());
  80. }
  81. Slice key_to_write = key; // Portion of internal key to write out.
  82. uint32_t user_key_size = static_cast<uint32_t>(key.size() - 8);
  83. const IOOptions opts;
  84. if (encoding_type_ == kPlain) {
  85. if (fixed_user_key_len_ == kPlainTableVariableLength) {
  86. // Write key length
  87. char key_size_buf[5]; // tmp buffer for key size as varint32
  88. char* ptr = EncodeVarint32(key_size_buf, user_key_size);
  89. assert(ptr <= key_size_buf + sizeof(key_size_buf));
  90. auto len = ptr - key_size_buf;
  91. IOStatus io_s = file->Append(opts, Slice(key_size_buf, len));
  92. if (!io_s.ok()) {
  93. return io_s;
  94. }
  95. *offset += len;
  96. }
  97. } else {
  98. assert(encoding_type_ == kPrefix);
  99. char size_bytes[12];
  100. size_t size_bytes_pos = 0;
  101. Slice prefix =
  102. prefix_extractor_->Transform(Slice(key.data(), user_key_size));
  103. if (key_count_for_prefix_ == 0 || prefix != pre_prefix_.GetUserKey() ||
  104. key_count_for_prefix_ % index_sparseness_ == 0) {
  105. key_count_for_prefix_ = 1;
  106. pre_prefix_.SetUserKey(prefix);
  107. size_bytes_pos += EncodeSize(kFullKey, user_key_size, size_bytes);
  108. IOStatus io_s = file->Append(opts, Slice(size_bytes, size_bytes_pos));
  109. if (!io_s.ok()) {
  110. return io_s;
  111. }
  112. *offset += size_bytes_pos;
  113. } else {
  114. key_count_for_prefix_++;
  115. if (key_count_for_prefix_ == 2) {
  116. // For second key within a prefix, need to encode prefix length
  117. size_bytes_pos +=
  118. EncodeSize(kPrefixFromPreviousKey,
  119. static_cast<uint32_t>(pre_prefix_.GetUserKey().size()),
  120. size_bytes + size_bytes_pos);
  121. }
  122. uint32_t prefix_len =
  123. static_cast<uint32_t>(pre_prefix_.GetUserKey().size());
  124. size_bytes_pos += EncodeSize(kKeySuffix, user_key_size - prefix_len,
  125. size_bytes + size_bytes_pos);
  126. IOStatus io_s = file->Append(opts, Slice(size_bytes, size_bytes_pos));
  127. if (!io_s.ok()) {
  128. return io_s;
  129. }
  130. *offset += size_bytes_pos;
  131. key_to_write = Slice(key.data() + prefix_len, key.size() - prefix_len);
  132. }
  133. }
  134. // Encode full key
  135. // For value size as varint32 (up to 5 bytes).
  136. // If the row is of value type with seqId 0, flush the special flag together
  137. // in this buffer to safe one file append call, which takes 1 byte.
  138. if (parsed_key.sequence == 0 && parsed_key.type == kTypeValue) {
  139. IOStatus io_s =
  140. file->Append(opts, Slice(key_to_write.data(), key_to_write.size() - 8));
  141. if (!io_s.ok()) {
  142. return io_s;
  143. }
  144. *offset += key_to_write.size() - 8;
  145. meta_bytes_buf[*meta_bytes_buf_size] = PlainTableFactory::kValueTypeSeqId0;
  146. *meta_bytes_buf_size += 1;
  147. } else {
  148. IOStatus io_s = file->Append(opts, key_to_write);
  149. if (!io_s.ok()) {
  150. return io_s;
  151. }
  152. *offset += key_to_write.size();
  153. }
  154. return IOStatus::OK();
  155. }
  156. Slice PlainTableFileReader::GetFromBuffer(Buffer* buffer, uint32_t file_offset,
  157. uint32_t len) {
  158. assert(file_offset + len <= file_info_->data_end_offset);
  159. return Slice(buffer->buf.get() + (file_offset - buffer->buf_start_offset),
  160. len);
  161. }
  162. bool PlainTableFileReader::ReadNonMmap(uint32_t file_offset, uint32_t len,
  163. Slice* out) {
  164. const uint32_t kPrefetchSize = 256u;
  165. // Try to read from buffers.
  166. for (uint32_t i = 0; i < num_buf_; i++) {
  167. Buffer* buffer = buffers_[num_buf_ - 1 - i].get();
  168. if (file_offset >= buffer->buf_start_offset &&
  169. file_offset + len <= buffer->buf_start_offset + buffer->buf_len) {
  170. *out = GetFromBuffer(buffer, file_offset, len);
  171. return true;
  172. }
  173. }
  174. Buffer* new_buffer;
  175. // Data needed is not in any of the buffer. Allocate a new buffer.
  176. if (num_buf_ < buffers_.size()) {
  177. // Add a new buffer
  178. new_buffer = new Buffer();
  179. buffers_[num_buf_++].reset(new_buffer);
  180. } else {
  181. // Now simply replace the last buffer. Can improve the placement policy
  182. // if needed.
  183. new_buffer = buffers_[num_buf_ - 1].get();
  184. }
  185. assert(file_offset + len <= file_info_->data_end_offset);
  186. uint32_t size_to_read = std::min(file_info_->data_end_offset - file_offset,
  187. std::max(kPrefetchSize, len));
  188. if (size_to_read > new_buffer->buf_capacity) {
  189. new_buffer->buf.reset(new char[size_to_read]);
  190. new_buffer->buf_capacity = size_to_read;
  191. new_buffer->buf_len = 0;
  192. }
  193. Slice read_result;
  194. // TODO: rate limit plain table reads.
  195. Status s =
  196. file_info_->file->Read(IOOptions(), file_offset, size_to_read,
  197. &read_result, new_buffer->buf.get(), nullptr);
  198. if (!s.ok()) {
  199. status_ = s;
  200. return false;
  201. }
  202. new_buffer->buf_start_offset = file_offset;
  203. new_buffer->buf_len = size_to_read;
  204. *out = GetFromBuffer(new_buffer, file_offset, len);
  205. return true;
  206. }
  207. inline bool PlainTableFileReader::ReadVarint32(uint32_t offset, uint32_t* out,
  208. uint32_t* bytes_read) {
  209. if (file_info_->is_mmap_mode) {
  210. const char* start = file_info_->file_data.data() + offset;
  211. const char* limit =
  212. file_info_->file_data.data() + file_info_->data_end_offset;
  213. const char* key_ptr = GetVarint32Ptr(start, limit, out);
  214. assert(key_ptr != nullptr);
  215. *bytes_read = static_cast<uint32_t>(key_ptr - start);
  216. return true;
  217. } else {
  218. return ReadVarint32NonMmap(offset, out, bytes_read);
  219. }
  220. }
  221. bool PlainTableFileReader::ReadVarint32NonMmap(uint32_t offset, uint32_t* out,
  222. uint32_t* bytes_read) {
  223. const char* start;
  224. const char* limit;
  225. const uint32_t kMaxVarInt32Size = 6u;
  226. uint32_t bytes_to_read =
  227. std::min(file_info_->data_end_offset - offset, kMaxVarInt32Size);
  228. Slice bytes;
  229. if (!Read(offset, bytes_to_read, &bytes)) {
  230. return false;
  231. }
  232. start = bytes.data();
  233. limit = bytes.data() + bytes.size();
  234. const char* key_ptr = GetVarint32Ptr(start, limit, out);
  235. *bytes_read =
  236. (key_ptr != nullptr) ? static_cast<uint32_t>(key_ptr - start) : 0;
  237. return true;
  238. }
  239. Status PlainTableKeyDecoder::ReadInternalKey(
  240. uint32_t file_offset, uint32_t user_key_size, ParsedInternalKey* parsed_key,
  241. uint32_t* bytes_read, bool* internal_key_valid, Slice* internal_key) {
  242. Slice tmp_slice;
  243. bool success = file_reader_.Read(file_offset, user_key_size + 1, &tmp_slice);
  244. if (!success) {
  245. return file_reader_.status();
  246. }
  247. if (tmp_slice[user_key_size] == PlainTableFactory::kValueTypeSeqId0) {
  248. // Special encoding for the row with seqID=0
  249. parsed_key->user_key = Slice(tmp_slice.data(), user_key_size);
  250. parsed_key->sequence = 0;
  251. parsed_key->type = kTypeValue;
  252. *bytes_read += user_key_size + 1;
  253. *internal_key_valid = false;
  254. } else {
  255. success = file_reader_.Read(file_offset, user_key_size + 8, internal_key);
  256. if (!success) {
  257. return file_reader_.status();
  258. }
  259. *internal_key_valid = true;
  260. Status pik_status = ParseInternalKey(*internal_key, parsed_key,
  261. false /* log_err_key */); // TODO
  262. if (!pik_status.ok()) {
  263. return Status::Corruption(
  264. Slice("Corrupted key found during next key read. "),
  265. pik_status.getState());
  266. }
  267. *bytes_read += user_key_size + 8;
  268. }
  269. return Status::OK();
  270. }
  271. Status PlainTableKeyDecoder::NextPlainEncodingKey(uint32_t start_offset,
  272. ParsedInternalKey* parsed_key,
  273. Slice* internal_key,
  274. uint32_t* bytes_read,
  275. bool* /*seekable*/) {
  276. uint32_t user_key_size = 0;
  277. Status s;
  278. if (fixed_user_key_len_ != kPlainTableVariableLength) {
  279. user_key_size = fixed_user_key_len_;
  280. } else {
  281. uint32_t tmp_size = 0;
  282. uint32_t tmp_read;
  283. bool success =
  284. file_reader_.ReadVarint32(start_offset, &tmp_size, &tmp_read);
  285. if (!success) {
  286. return file_reader_.status();
  287. }
  288. assert(tmp_read > 0);
  289. user_key_size = tmp_size;
  290. *bytes_read = tmp_read;
  291. }
  292. // dummy initial value to avoid compiler complain
  293. bool decoded_internal_key_valid = true;
  294. Slice decoded_internal_key;
  295. s = ReadInternalKey(start_offset + *bytes_read, user_key_size, parsed_key,
  296. bytes_read, &decoded_internal_key_valid,
  297. &decoded_internal_key);
  298. if (!s.ok()) {
  299. return s;
  300. }
  301. if (!file_reader_.file_info()->is_mmap_mode) {
  302. cur_key_.SetInternalKey(*parsed_key);
  303. parsed_key->user_key =
  304. Slice(cur_key_.GetInternalKey().data(), user_key_size);
  305. if (internal_key != nullptr) {
  306. *internal_key = cur_key_.GetInternalKey();
  307. }
  308. } else if (internal_key != nullptr) {
  309. if (decoded_internal_key_valid) {
  310. *internal_key = decoded_internal_key;
  311. } else {
  312. // Need to copy out the internal key
  313. cur_key_.SetInternalKey(*parsed_key);
  314. *internal_key = cur_key_.GetInternalKey();
  315. }
  316. }
  317. return Status::OK();
  318. }
  319. Status PlainTableKeyDecoder::NextPrefixEncodingKey(
  320. uint32_t start_offset, ParsedInternalKey* parsed_key, Slice* internal_key,
  321. uint32_t* bytes_read, bool* seekable) {
  322. PlainTableEntryType entry_type;
  323. bool expect_suffix = false;
  324. Status s;
  325. do {
  326. uint32_t size = 0;
  327. // dummy initial value to avoid compiler complain
  328. bool decoded_internal_key_valid = true;
  329. uint32_t my_bytes_read = 0;
  330. s = DecodeSize(start_offset + *bytes_read, &entry_type, &size,
  331. &my_bytes_read);
  332. if (!s.ok()) {
  333. return s;
  334. }
  335. if (my_bytes_read == 0) {
  336. return Status::Corruption("Unexpected EOF when reading size of the key");
  337. }
  338. *bytes_read += my_bytes_read;
  339. switch (entry_type) {
  340. case kFullKey: {
  341. expect_suffix = false;
  342. Slice decoded_internal_key;
  343. s = ReadInternalKey(start_offset + *bytes_read, size, parsed_key,
  344. bytes_read, &decoded_internal_key_valid,
  345. &decoded_internal_key);
  346. if (!s.ok()) {
  347. return s;
  348. }
  349. if (!file_reader_.file_info()->is_mmap_mode ||
  350. (internal_key != nullptr && !decoded_internal_key_valid)) {
  351. // In non-mmap mode, always need to make a copy of keys returned to
  352. // users, because after reading value for the key, the key might
  353. // be invalid.
  354. cur_key_.SetInternalKey(*parsed_key);
  355. saved_user_key_ = cur_key_.GetUserKey();
  356. if (!file_reader_.file_info()->is_mmap_mode) {
  357. parsed_key->user_key =
  358. Slice(cur_key_.GetInternalKey().data(), size);
  359. }
  360. if (internal_key != nullptr) {
  361. *internal_key = cur_key_.GetInternalKey();
  362. }
  363. } else {
  364. if (internal_key != nullptr) {
  365. *internal_key = decoded_internal_key;
  366. }
  367. saved_user_key_ = parsed_key->user_key;
  368. }
  369. break;
  370. }
  371. case kPrefixFromPreviousKey: {
  372. if (seekable != nullptr) {
  373. *seekable = false;
  374. }
  375. prefix_len_ = size;
  376. assert(prefix_extractor_ == nullptr ||
  377. prefix_extractor_->Transform(saved_user_key_).size() ==
  378. prefix_len_);
  379. // Need read another size flag for suffix
  380. expect_suffix = true;
  381. break;
  382. }
  383. case kKeySuffix: {
  384. expect_suffix = false;
  385. if (seekable != nullptr) {
  386. *seekable = false;
  387. }
  388. Slice tmp_slice;
  389. s = ReadInternalKey(start_offset + *bytes_read, size, parsed_key,
  390. bytes_read, &decoded_internal_key_valid,
  391. &tmp_slice);
  392. if (!s.ok()) {
  393. return s;
  394. }
  395. if (!file_reader_.file_info()->is_mmap_mode) {
  396. // In non-mmap mode, we need to make a copy of keys returned to
  397. // users, because after reading value for the key, the key might
  398. // be invalid.
  399. // saved_user_key_ points to cur_key_. We are making a copy of
  400. // the prefix part to another string, and construct the current
  401. // key from the prefix part and the suffix part back to cur_key_.
  402. std::string tmp =
  403. Slice(saved_user_key_.data(), prefix_len_).ToString();
  404. cur_key_.Reserve(prefix_len_ + size);
  405. cur_key_.SetInternalKey(tmp, *parsed_key);
  406. parsed_key->user_key =
  407. Slice(cur_key_.GetInternalKey().data(), prefix_len_ + size);
  408. saved_user_key_ = cur_key_.GetUserKey();
  409. } else {
  410. cur_key_.Reserve(prefix_len_ + size);
  411. cur_key_.SetInternalKey(Slice(saved_user_key_.data(), prefix_len_),
  412. *parsed_key);
  413. }
  414. parsed_key->user_key = cur_key_.GetUserKey();
  415. if (internal_key != nullptr) {
  416. *internal_key = cur_key_.GetInternalKey();
  417. }
  418. break;
  419. }
  420. default:
  421. return Status::Corruption("Un-identified size flag.");
  422. }
  423. } while (expect_suffix); // Another round if suffix is expected.
  424. return Status::OK();
  425. }
  426. Status PlainTableKeyDecoder::NextKey(uint32_t start_offset,
  427. ParsedInternalKey* parsed_key,
  428. Slice* internal_key, Slice* value,
  429. uint32_t* bytes_read, bool* seekable) {
  430. assert(value != nullptr);
  431. Status s = NextKeyNoValue(start_offset, parsed_key, internal_key, bytes_read,
  432. seekable);
  433. if (s.ok()) {
  434. assert(bytes_read != nullptr);
  435. uint32_t value_size;
  436. uint32_t value_size_bytes;
  437. bool success = file_reader_.ReadVarint32(start_offset + *bytes_read,
  438. &value_size, &value_size_bytes);
  439. if (!success) {
  440. return file_reader_.status();
  441. }
  442. if (value_size_bytes == 0) {
  443. return Status::Corruption(
  444. "Unexpected EOF when reading the next value's size.");
  445. }
  446. *bytes_read += value_size_bytes;
  447. success = file_reader_.Read(start_offset + *bytes_read, value_size, value);
  448. if (!success) {
  449. return file_reader_.status();
  450. }
  451. *bytes_read += value_size;
  452. }
  453. return s;
  454. }
  455. Status PlainTableKeyDecoder::NextKeyNoValue(uint32_t start_offset,
  456. ParsedInternalKey* parsed_key,
  457. Slice* internal_key,
  458. uint32_t* bytes_read,
  459. bool* seekable) {
  460. *bytes_read = 0;
  461. if (seekable != nullptr) {
  462. *seekable = true;
  463. }
  464. if (encoding_type_ == kPlain) {
  465. return NextPlainEncodingKey(start_offset, parsed_key, internal_key,
  466. bytes_read, seekable);
  467. } else {
  468. assert(encoding_type_ == kPrefix);
  469. return NextPrefixEncodingKey(start_offset, parsed_key, internal_key,
  470. bytes_read, seekable);
  471. }
  472. }
  473. } // namespace ROCKSDB_NAMESPACE