wide_column_serialization.cc 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. // Copyright (c) Meta Platforms, Inc. and affiliates.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #include "db/wide/wide_column_serialization.h"
  6. #include <algorithm>
  7. #include <cassert>
  8. #include <limits>
  9. #include "db/wide/wide_columns_helper.h"
  10. #include "rocksdb/slice.h"
  11. #include "util/autovector.h"
  12. #include "util/coding.h"
  13. namespace ROCKSDB_NAMESPACE {
  14. Status WideColumnSerialization::Serialize(const WideColumns& columns,
  15. std::string& output) {
  16. const size_t num_columns = columns.size();
  17. if (num_columns > static_cast<size_t>(std::numeric_limits<uint32_t>::max())) {
  18. return Status::InvalidArgument("Too many wide columns");
  19. }
  20. PutVarint32(&output, kCurrentVersion);
  21. PutVarint32(&output, static_cast<uint32_t>(num_columns));
  22. const Slice* prev_name = nullptr;
  23. for (size_t i = 0; i < columns.size(); ++i) {
  24. const WideColumn& column = columns[i];
  25. const Slice& name = column.name();
  26. if (name.size() >
  27. static_cast<size_t>(std::numeric_limits<uint32_t>::max())) {
  28. return Status::InvalidArgument("Wide column name too long");
  29. }
  30. if (prev_name && prev_name->compare(name) >= 0) {
  31. return Status::Corruption("Wide columns out of order");
  32. }
  33. const Slice& value = column.value();
  34. if (value.size() >
  35. static_cast<size_t>(std::numeric_limits<uint32_t>::max())) {
  36. return Status::InvalidArgument("Wide column value too long");
  37. }
  38. PutLengthPrefixedSlice(&output, name);
  39. PutVarint32(&output, static_cast<uint32_t>(value.size()));
  40. prev_name = &name;
  41. }
  42. for (const auto& column : columns) {
  43. const Slice& value = column.value();
  44. output.append(value.data(), value.size());
  45. }
  46. return Status::OK();
  47. }
  48. Status WideColumnSerialization::Deserialize(Slice& input,
  49. WideColumns& columns) {
  50. assert(columns.empty());
  51. uint32_t version = 0;
  52. if (!GetVarint32(&input, &version)) {
  53. return Status::Corruption("Error decoding wide column version");
  54. }
  55. if (version > kCurrentVersion) {
  56. return Status::NotSupported("Unsupported wide column version");
  57. }
  58. uint32_t num_columns = 0;
  59. if (!GetVarint32(&input, &num_columns)) {
  60. return Status::Corruption("Error decoding number of wide columns");
  61. }
  62. if (!num_columns) {
  63. return Status::OK();
  64. }
  65. columns.reserve(num_columns);
  66. autovector<uint32_t, 16> column_value_sizes;
  67. column_value_sizes.reserve(num_columns);
  68. for (uint32_t i = 0; i < num_columns; ++i) {
  69. Slice name;
  70. if (!GetLengthPrefixedSlice(&input, &name)) {
  71. return Status::Corruption("Error decoding wide column name");
  72. }
  73. if (!columns.empty() && columns.back().name().compare(name) >= 0) {
  74. return Status::Corruption("Wide columns out of order");
  75. }
  76. columns.emplace_back(name, Slice());
  77. uint32_t value_size = 0;
  78. if (!GetVarint32(&input, &value_size)) {
  79. return Status::Corruption("Error decoding wide column value size");
  80. }
  81. column_value_sizes.emplace_back(value_size);
  82. }
  83. const Slice data(input);
  84. size_t pos = 0;
  85. for (uint32_t i = 0; i < num_columns; ++i) {
  86. const uint32_t value_size = column_value_sizes[i];
  87. if (pos + value_size > data.size()) {
  88. return Status::Corruption("Error decoding wide column value payload");
  89. }
  90. columns[i].value() = Slice(data.data() + pos, value_size);
  91. pos += value_size;
  92. }
  93. return Status::OK();
  94. }
  95. Status WideColumnSerialization::GetValueOfDefaultColumn(Slice& input,
  96. Slice& value) {
  97. WideColumns columns;
  98. const Status s = Deserialize(input, columns);
  99. if (!s.ok()) {
  100. return s;
  101. }
  102. if (!WideColumnsHelper::HasDefaultColumn(columns)) {
  103. value.clear();
  104. return Status::OK();
  105. }
  106. value = WideColumnsHelper::GetDefaultColumn(columns);
  107. return Status::OK();
  108. }
  109. } // namespace ROCKSDB_NAMESPACE