plain_table_key_coding.cc 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #ifndef ROCKSDB_LITE
  6. #include "table/plain/plain_table_key_coding.h"
  7. #include <algorithm>
  8. #include <string>
  9. #include "db/dbformat.h"
  10. #include "file/writable_file_writer.h"
  11. #include "table/plain/plain_table_factory.h"
  12. #include "table/plain/plain_table_reader.h"
  13. namespace ROCKSDB_NAMESPACE {
  14. enum PlainTableEntryType : unsigned char {
  15. kFullKey = 0,
  16. kPrefixFromPreviousKey = 1,
  17. kKeySuffix = 2,
  18. };
  19. namespace {
  20. // Control byte:
  21. // First two bits indicate type of entry
  22. // Other bytes are inlined sizes. If all bits are 1 (0x03F), overflow bytes
  23. // are used. key_size-0x3F will be encoded as a variint32 after this bytes.
  24. const unsigned char kSizeInlineLimit = 0x3F;
  25. // Return 0 for error
  26. size_t EncodeSize(PlainTableEntryType type, uint32_t key_size,
  27. char* out_buffer) {
  28. out_buffer[0] = type << 6;
  29. if (key_size < static_cast<uint32_t>(kSizeInlineLimit)) {
  30. // size inlined
  31. out_buffer[0] |= static_cast<char>(key_size);
  32. return 1;
  33. } else {
  34. out_buffer[0] |= kSizeInlineLimit;
  35. char* ptr = EncodeVarint32(out_buffer + 1, key_size - kSizeInlineLimit);
  36. return ptr - out_buffer;
  37. }
  38. }
  39. } // namespace
  40. // Fill bytes_read with number of bytes read.
  41. inline Status PlainTableKeyDecoder::DecodeSize(uint32_t start_offset,
  42. PlainTableEntryType* entry_type,
  43. uint32_t* key_size,
  44. uint32_t* bytes_read) {
  45. Slice next_byte_slice;
  46. bool success = file_reader_.Read(start_offset, 1, &next_byte_slice);
  47. if (!success) {
  48. return file_reader_.status();
  49. }
  50. *entry_type = static_cast<PlainTableEntryType>(
  51. (static_cast<unsigned char>(next_byte_slice[0]) & ~kSizeInlineLimit) >>
  52. 6);
  53. char inline_key_size = next_byte_slice[0] & kSizeInlineLimit;
  54. if (inline_key_size < kSizeInlineLimit) {
  55. *key_size = inline_key_size;
  56. *bytes_read = 1;
  57. return Status::OK();
  58. } else {
  59. uint32_t extra_size;
  60. uint32_t tmp_bytes_read;
  61. success = file_reader_.ReadVarint32(start_offset + 1, &extra_size,
  62. &tmp_bytes_read);
  63. if (!success) {
  64. return file_reader_.status();
  65. }
  66. assert(tmp_bytes_read > 0);
  67. *key_size = kSizeInlineLimit + extra_size;
  68. *bytes_read = tmp_bytes_read + 1;
  69. return Status::OK();
  70. }
  71. }
  72. Status PlainTableKeyEncoder::AppendKey(const Slice& key,
  73. WritableFileWriter* file,
  74. uint64_t* offset, char* meta_bytes_buf,
  75. size_t* meta_bytes_buf_size) {
  76. ParsedInternalKey parsed_key;
  77. if (!ParseInternalKey(key, &parsed_key)) {
  78. return Status::Corruption(Slice());
  79. }
  80. Slice key_to_write = key; // Portion of internal key to write out.
  81. uint32_t user_key_size = static_cast<uint32_t>(key.size() - 8);
  82. if (encoding_type_ == kPlain) {
  83. if (fixed_user_key_len_ == kPlainTableVariableLength) {
  84. // Write key length
  85. char key_size_buf[5]; // tmp buffer for key size as varint32
  86. char* ptr = EncodeVarint32(key_size_buf, user_key_size);
  87. assert(ptr <= key_size_buf + sizeof(key_size_buf));
  88. auto len = ptr - key_size_buf;
  89. Status s = file->Append(Slice(key_size_buf, len));
  90. if (!s.ok()) {
  91. return s;
  92. }
  93. *offset += len;
  94. }
  95. } else {
  96. assert(encoding_type_ == kPrefix);
  97. char size_bytes[12];
  98. size_t size_bytes_pos = 0;
  99. Slice prefix =
  100. prefix_extractor_->Transform(Slice(key.data(), user_key_size));
  101. if (key_count_for_prefix_ == 0 || prefix != pre_prefix_.GetUserKey() ||
  102. key_count_for_prefix_ % index_sparseness_ == 0) {
  103. key_count_for_prefix_ = 1;
  104. pre_prefix_.SetUserKey(prefix);
  105. size_bytes_pos += EncodeSize(kFullKey, user_key_size, size_bytes);
  106. Status s = file->Append(Slice(size_bytes, size_bytes_pos));
  107. if (!s.ok()) {
  108. return s;
  109. }
  110. *offset += size_bytes_pos;
  111. } else {
  112. key_count_for_prefix_++;
  113. if (key_count_for_prefix_ == 2) {
  114. // For second key within a prefix, need to encode prefix length
  115. size_bytes_pos +=
  116. EncodeSize(kPrefixFromPreviousKey,
  117. static_cast<uint32_t>(pre_prefix_.GetUserKey().size()),
  118. size_bytes + size_bytes_pos);
  119. }
  120. uint32_t prefix_len =
  121. static_cast<uint32_t>(pre_prefix_.GetUserKey().size());
  122. size_bytes_pos += EncodeSize(kKeySuffix, user_key_size - prefix_len,
  123. size_bytes + size_bytes_pos);
  124. Status s = file->Append(Slice(size_bytes, size_bytes_pos));
  125. if (!s.ok()) {
  126. return s;
  127. }
  128. *offset += size_bytes_pos;
  129. key_to_write = Slice(key.data() + prefix_len, key.size() - prefix_len);
  130. }
  131. }
  132. // Encode full key
  133. // For value size as varint32 (up to 5 bytes).
  134. // If the row is of value type with seqId 0, flush the special flag together
  135. // in this buffer to safe one file append call, which takes 1 byte.
  136. if (parsed_key.sequence == 0 && parsed_key.type == kTypeValue) {
  137. Status s =
  138. file->Append(Slice(key_to_write.data(), key_to_write.size() - 8));
  139. if (!s.ok()) {
  140. return s;
  141. }
  142. *offset += key_to_write.size() - 8;
  143. meta_bytes_buf[*meta_bytes_buf_size] = PlainTableFactory::kValueTypeSeqId0;
  144. *meta_bytes_buf_size += 1;
  145. } else {
  146. file->Append(key_to_write);
  147. *offset += key_to_write.size();
  148. }
  149. return Status::OK();
  150. }
  151. Slice PlainTableFileReader::GetFromBuffer(Buffer* buffer, uint32_t file_offset,
  152. uint32_t len) {
  153. assert(file_offset + len <= file_info_->data_end_offset);
  154. return Slice(buffer->buf.get() + (file_offset - buffer->buf_start_offset),
  155. len);
  156. }
  157. bool PlainTableFileReader::ReadNonMmap(uint32_t file_offset, uint32_t len,
  158. Slice* out) {
  159. const uint32_t kPrefetchSize = 256u;
  160. // Try to read from buffers.
  161. for (uint32_t i = 0; i < num_buf_; i++) {
  162. Buffer* buffer = buffers_[num_buf_ - 1 - i].get();
  163. if (file_offset >= buffer->buf_start_offset &&
  164. file_offset + len <= buffer->buf_start_offset + buffer->buf_len) {
  165. *out = GetFromBuffer(buffer, file_offset, len);
  166. return true;
  167. }
  168. }
  169. Buffer* new_buffer;
  170. // Data needed is not in any of the buffer. Allocate a new buffer.
  171. if (num_buf_ < buffers_.size()) {
  172. // Add a new buffer
  173. new_buffer = new Buffer();
  174. buffers_[num_buf_++].reset(new_buffer);
  175. } else {
  176. // Now simply replace the last buffer. Can improve the placement policy
  177. // if needed.
  178. new_buffer = buffers_[num_buf_ - 1].get();
  179. }
  180. assert(file_offset + len <= file_info_->data_end_offset);
  181. uint32_t size_to_read = std::min(file_info_->data_end_offset - file_offset,
  182. std::max(kPrefetchSize, len));
  183. if (size_to_read > new_buffer->buf_capacity) {
  184. new_buffer->buf.reset(new char[size_to_read]);
  185. new_buffer->buf_capacity = size_to_read;
  186. new_buffer->buf_len = 0;
  187. }
  188. Slice read_result;
  189. Status s = file_info_->file->Read(file_offset, size_to_read, &read_result,
  190. new_buffer->buf.get());
  191. if (!s.ok()) {
  192. status_ = s;
  193. return false;
  194. }
  195. new_buffer->buf_start_offset = file_offset;
  196. new_buffer->buf_len = size_to_read;
  197. *out = GetFromBuffer(new_buffer, file_offset, len);
  198. return true;
  199. }
  200. inline bool PlainTableFileReader::ReadVarint32(uint32_t offset, uint32_t* out,
  201. uint32_t* bytes_read) {
  202. if (file_info_->is_mmap_mode) {
  203. const char* start = file_info_->file_data.data() + offset;
  204. const char* limit =
  205. file_info_->file_data.data() + file_info_->data_end_offset;
  206. const char* key_ptr = GetVarint32Ptr(start, limit, out);
  207. assert(key_ptr != nullptr);
  208. *bytes_read = static_cast<uint32_t>(key_ptr - start);
  209. return true;
  210. } else {
  211. return ReadVarint32NonMmap(offset, out, bytes_read);
  212. }
  213. }
  214. bool PlainTableFileReader::ReadVarint32NonMmap(uint32_t offset, uint32_t* out,
  215. uint32_t* bytes_read) {
  216. const char* start;
  217. const char* limit;
  218. const uint32_t kMaxVarInt32Size = 6u;
  219. uint32_t bytes_to_read =
  220. std::min(file_info_->data_end_offset - offset, kMaxVarInt32Size);
  221. Slice bytes;
  222. if (!Read(offset, bytes_to_read, &bytes)) {
  223. return false;
  224. }
  225. start = bytes.data();
  226. limit = bytes.data() + bytes.size();
  227. const char* key_ptr = GetVarint32Ptr(start, limit, out);
  228. *bytes_read =
  229. (key_ptr != nullptr) ? static_cast<uint32_t>(key_ptr - start) : 0;
  230. return true;
  231. }
  232. Status PlainTableKeyDecoder::ReadInternalKey(
  233. uint32_t file_offset, uint32_t user_key_size, ParsedInternalKey* parsed_key,
  234. uint32_t* bytes_read, bool* internal_key_valid, Slice* internal_key) {
  235. Slice tmp_slice;
  236. bool success = file_reader_.Read(file_offset, user_key_size + 1, &tmp_slice);
  237. if (!success) {
  238. return file_reader_.status();
  239. }
  240. if (tmp_slice[user_key_size] == PlainTableFactory::kValueTypeSeqId0) {
  241. // Special encoding for the row with seqID=0
  242. parsed_key->user_key = Slice(tmp_slice.data(), user_key_size);
  243. parsed_key->sequence = 0;
  244. parsed_key->type = kTypeValue;
  245. *bytes_read += user_key_size + 1;
  246. *internal_key_valid = false;
  247. } else {
  248. success = file_reader_.Read(file_offset, user_key_size + 8, internal_key);
  249. if (!success) {
  250. return file_reader_.status();
  251. }
  252. *internal_key_valid = true;
  253. if (!ParseInternalKey(*internal_key, parsed_key)) {
  254. return Status::Corruption(
  255. Slice("Incorrect value type found when reading the next key"));
  256. }
  257. *bytes_read += user_key_size + 8;
  258. }
  259. return Status::OK();
  260. }
  261. Status PlainTableKeyDecoder::NextPlainEncodingKey(uint32_t start_offset,
  262. ParsedInternalKey* parsed_key,
  263. Slice* internal_key,
  264. uint32_t* bytes_read,
  265. bool* /*seekable*/) {
  266. uint32_t user_key_size = 0;
  267. Status s;
  268. if (fixed_user_key_len_ != kPlainTableVariableLength) {
  269. user_key_size = fixed_user_key_len_;
  270. } else {
  271. uint32_t tmp_size = 0;
  272. uint32_t tmp_read;
  273. bool success =
  274. file_reader_.ReadVarint32(start_offset, &tmp_size, &tmp_read);
  275. if (!success) {
  276. return file_reader_.status();
  277. }
  278. assert(tmp_read > 0);
  279. user_key_size = tmp_size;
  280. *bytes_read = tmp_read;
  281. }
  282. // dummy initial value to avoid compiler complain
  283. bool decoded_internal_key_valid = true;
  284. Slice decoded_internal_key;
  285. s = ReadInternalKey(start_offset + *bytes_read, user_key_size, parsed_key,
  286. bytes_read, &decoded_internal_key_valid,
  287. &decoded_internal_key);
  288. if (!s.ok()) {
  289. return s;
  290. }
  291. if (!file_reader_.file_info()->is_mmap_mode) {
  292. cur_key_.SetInternalKey(*parsed_key);
  293. parsed_key->user_key =
  294. Slice(cur_key_.GetInternalKey().data(), user_key_size);
  295. if (internal_key != nullptr) {
  296. *internal_key = cur_key_.GetInternalKey();
  297. }
  298. } else if (internal_key != nullptr) {
  299. if (decoded_internal_key_valid) {
  300. *internal_key = decoded_internal_key;
  301. } else {
  302. // Need to copy out the internal key
  303. cur_key_.SetInternalKey(*parsed_key);
  304. *internal_key = cur_key_.GetInternalKey();
  305. }
  306. }
  307. return Status::OK();
  308. }
  309. Status PlainTableKeyDecoder::NextPrefixEncodingKey(
  310. uint32_t start_offset, ParsedInternalKey* parsed_key, Slice* internal_key,
  311. uint32_t* bytes_read, bool* seekable) {
  312. PlainTableEntryType entry_type;
  313. bool expect_suffix = false;
  314. Status s;
  315. do {
  316. uint32_t size = 0;
  317. // dummy initial value to avoid compiler complain
  318. bool decoded_internal_key_valid = true;
  319. uint32_t my_bytes_read = 0;
  320. s = DecodeSize(start_offset + *bytes_read, &entry_type, &size,
  321. &my_bytes_read);
  322. if (!s.ok()) {
  323. return s;
  324. }
  325. if (my_bytes_read == 0) {
  326. return Status::Corruption("Unexpected EOF when reading size of the key");
  327. }
  328. *bytes_read += my_bytes_read;
  329. switch (entry_type) {
  330. case kFullKey: {
  331. expect_suffix = false;
  332. Slice decoded_internal_key;
  333. s = ReadInternalKey(start_offset + *bytes_read, size, parsed_key,
  334. bytes_read, &decoded_internal_key_valid,
  335. &decoded_internal_key);
  336. if (!s.ok()) {
  337. return s;
  338. }
  339. if (!file_reader_.file_info()->is_mmap_mode ||
  340. (internal_key != nullptr && !decoded_internal_key_valid)) {
  341. // In non-mmap mode, always need to make a copy of keys returned to
  342. // users, because after reading value for the key, the key might
  343. // be invalid.
  344. cur_key_.SetInternalKey(*parsed_key);
  345. saved_user_key_ = cur_key_.GetUserKey();
  346. if (!file_reader_.file_info()->is_mmap_mode) {
  347. parsed_key->user_key =
  348. Slice(cur_key_.GetInternalKey().data(), size);
  349. }
  350. if (internal_key != nullptr) {
  351. *internal_key = cur_key_.GetInternalKey();
  352. }
  353. } else {
  354. if (internal_key != nullptr) {
  355. *internal_key = decoded_internal_key;
  356. }
  357. saved_user_key_ = parsed_key->user_key;
  358. }
  359. break;
  360. }
  361. case kPrefixFromPreviousKey: {
  362. if (seekable != nullptr) {
  363. *seekable = false;
  364. }
  365. prefix_len_ = size;
  366. assert(prefix_extractor_ == nullptr ||
  367. prefix_extractor_->Transform(saved_user_key_).size() ==
  368. prefix_len_);
  369. // Need read another size flag for suffix
  370. expect_suffix = true;
  371. break;
  372. }
  373. case kKeySuffix: {
  374. expect_suffix = false;
  375. if (seekable != nullptr) {
  376. *seekable = false;
  377. }
  378. Slice tmp_slice;
  379. s = ReadInternalKey(start_offset + *bytes_read, size, parsed_key,
  380. bytes_read, &decoded_internal_key_valid,
  381. &tmp_slice);
  382. if (!s.ok()) {
  383. return s;
  384. }
  385. if (!file_reader_.file_info()->is_mmap_mode) {
  386. // In non-mmap mode, we need to make a copy of keys returned to
  387. // users, because after reading value for the key, the key might
  388. // be invalid.
  389. // saved_user_key_ points to cur_key_. We are making a copy of
  390. // the prefix part to another string, and construct the current
  391. // key from the prefix part and the suffix part back to cur_key_.
  392. std::string tmp =
  393. Slice(saved_user_key_.data(), prefix_len_).ToString();
  394. cur_key_.Reserve(prefix_len_ + size);
  395. cur_key_.SetInternalKey(tmp, *parsed_key);
  396. parsed_key->user_key =
  397. Slice(cur_key_.GetInternalKey().data(), prefix_len_ + size);
  398. saved_user_key_ = cur_key_.GetUserKey();
  399. } else {
  400. cur_key_.Reserve(prefix_len_ + size);
  401. cur_key_.SetInternalKey(Slice(saved_user_key_.data(), prefix_len_),
  402. *parsed_key);
  403. }
  404. parsed_key->user_key = cur_key_.GetUserKey();
  405. if (internal_key != nullptr) {
  406. *internal_key = cur_key_.GetInternalKey();
  407. }
  408. break;
  409. }
  410. default:
  411. return Status::Corruption("Un-identified size flag.");
  412. }
  413. } while (expect_suffix); // Another round if suffix is expected.
  414. return Status::OK();
  415. }
  416. Status PlainTableKeyDecoder::NextKey(uint32_t start_offset,
  417. ParsedInternalKey* parsed_key,
  418. Slice* internal_key, Slice* value,
  419. uint32_t* bytes_read, bool* seekable) {
  420. assert(value != nullptr);
  421. Status s = NextKeyNoValue(start_offset, parsed_key, internal_key, bytes_read,
  422. seekable);
  423. if (s.ok()) {
  424. assert(bytes_read != nullptr);
  425. uint32_t value_size;
  426. uint32_t value_size_bytes;
  427. bool success = file_reader_.ReadVarint32(start_offset + *bytes_read,
  428. &value_size, &value_size_bytes);
  429. if (!success) {
  430. return file_reader_.status();
  431. }
  432. if (value_size_bytes == 0) {
  433. return Status::Corruption(
  434. "Unexpected EOF when reading the next value's size.");
  435. }
  436. *bytes_read += value_size_bytes;
  437. success = file_reader_.Read(start_offset + *bytes_read, value_size, value);
  438. if (!success) {
  439. return file_reader_.status();
  440. }
  441. *bytes_read += value_size;
  442. }
  443. return s;
  444. }
  445. Status PlainTableKeyDecoder::NextKeyNoValue(uint32_t start_offset,
  446. ParsedInternalKey* parsed_key,
  447. Slice* internal_key,
  448. uint32_t* bytes_read,
  449. bool* seekable) {
  450. *bytes_read = 0;
  451. if (seekable != nullptr) {
  452. *seekable = true;
  453. }
  454. Status s;
  455. if (encoding_type_ == kPlain) {
  456. return NextPlainEncodingKey(start_offset, parsed_key, internal_key,
  457. bytes_read, seekable);
  458. } else {
  459. assert(encoding_type_ == kPrefix);
  460. return NextPrefixEncodingKey(start_offset, parsed_key, internal_key,
  461. bytes_read, seekable);
  462. }
  463. }
  464. } // namespace ROCKSDB_NAMESPACE
  465. #endif // ROCKSDB_LIT