format.cc 13 KB


  1. // Copyright (c) 2017-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #include "format.h"
  6. #include <algorithm>
  7. #include <map>
  8. #include <memory>
  9. #include "utilities/cassandra/serialize.h"
  10. namespace ROCKSDB_NAMESPACE::cassandra {
  11. namespace {
  12. const int32_t kDefaultLocalDeletionTime = std::numeric_limits<int32_t>::max();
  13. const int64_t kDefaultMarkedForDeleteAt = std::numeric_limits<int64_t>::min();
  14. } // namespace
  15. ColumnBase::ColumnBase(int8_t mask, int8_t index)
  16. : mask_(mask), index_(index) {}
  17. std::size_t ColumnBase::Size() const { return sizeof(mask_) + sizeof(index_); }
  18. int8_t ColumnBase::Mask() const { return mask_; }
  19. int8_t ColumnBase::Index() const { return index_; }
  20. void ColumnBase::Serialize(std::string* dest) const {
  21. ROCKSDB_NAMESPACE::cassandra::Serialize<int8_t>(mask_, dest);
  22. ROCKSDB_NAMESPACE::cassandra::Serialize<int8_t>(index_, dest);
  23. }
  24. std::shared_ptr<ColumnBase> ColumnBase::Deserialize(const char* src,
  25. std::size_t offset) {
  26. int8_t mask = ROCKSDB_NAMESPACE::cassandra::Deserialize<int8_t>(src, offset);
  27. if ((mask & ColumnTypeMask::DELETION_MASK) != 0) {
  28. return Tombstone::Deserialize(src, offset);
  29. } else if ((mask & ColumnTypeMask::EXPIRATION_MASK) != 0) {
  30. return ExpiringColumn::Deserialize(src, offset);
  31. } else {
  32. return Column::Deserialize(src, offset);
  33. }
  34. }
  35. Column::Column(int8_t mask, int8_t index, int64_t timestamp, int32_t value_size,
  36. const char* value)
  37. : ColumnBase(mask, index),
  38. timestamp_(timestamp),
  39. value_size_(value_size),
  40. value_(value) {}
  41. int64_t Column::Timestamp() const { return timestamp_; }
  42. std::size_t Column::Size() const {
  43. return ColumnBase::Size() + sizeof(timestamp_) + sizeof(value_size_) +
  44. value_size_;
  45. }
  46. void Column::Serialize(std::string* dest) const {
  47. ColumnBase::Serialize(dest);
  48. ROCKSDB_NAMESPACE::cassandra::Serialize<int64_t>(timestamp_, dest);
  49. ROCKSDB_NAMESPACE::cassandra::Serialize<int32_t>(value_size_, dest);
  50. dest->append(value_, value_size_);
  51. }
  52. std::shared_ptr<Column> Column::Deserialize(const char* src,
  53. std::size_t offset) {
  54. int8_t mask = ROCKSDB_NAMESPACE::cassandra::Deserialize<int8_t>(src, offset);
  55. offset += sizeof(mask);
  56. int8_t index = ROCKSDB_NAMESPACE::cassandra::Deserialize<int8_t>(src, offset);
  57. offset += sizeof(index);
  58. int64_t timestamp =
  59. ROCKSDB_NAMESPACE::cassandra::Deserialize<int64_t>(src, offset);
  60. offset += sizeof(timestamp);
  61. int32_t value_size =
  62. ROCKSDB_NAMESPACE::cassandra::Deserialize<int32_t>(src, offset);
  63. offset += sizeof(value_size);
  64. return std::make_shared<Column>(mask, index, timestamp, value_size,
  65. src + offset);
  66. }
  67. ExpiringColumn::ExpiringColumn(int8_t mask, int8_t index, int64_t timestamp,
  68. int32_t value_size, const char* value,
  69. int32_t ttl)
  70. : Column(mask, index, timestamp, value_size, value), ttl_(ttl) {}
  71. std::size_t ExpiringColumn::Size() const {
  72. return Column::Size() + sizeof(ttl_);
  73. }
  74. void ExpiringColumn::Serialize(std::string* dest) const {
  75. Column::Serialize(dest);
  76. ROCKSDB_NAMESPACE::cassandra::Serialize<int32_t>(ttl_, dest);
  77. }
  78. std::chrono::time_point<std::chrono::system_clock> ExpiringColumn::TimePoint()
  79. const {
  80. return std::chrono::time_point<std::chrono::system_clock>(
  81. std::chrono::microseconds(Timestamp()));
  82. }
  83. std::chrono::seconds ExpiringColumn::Ttl() const {
  84. return std::chrono::seconds(ttl_);
  85. }
  86. bool ExpiringColumn::Expired() const {
  87. return TimePoint() + Ttl() < std::chrono::system_clock::now();
  88. }
  89. std::shared_ptr<Tombstone> ExpiringColumn::ToTombstone() const {
  90. auto expired_at = (TimePoint() + Ttl()).time_since_epoch();
  91. int32_t local_deletion_time = static_cast<int32_t>(
  92. std::chrono::duration_cast<std::chrono::seconds>(expired_at).count());
  93. int64_t marked_for_delete_at =
  94. std::chrono::duration_cast<std::chrono::microseconds>(expired_at).count();
  95. return std::make_shared<Tombstone>(
  96. static_cast<int8_t>(ColumnTypeMask::DELETION_MASK), Index(),
  97. local_deletion_time, marked_for_delete_at);
  98. }
  99. std::shared_ptr<ExpiringColumn> ExpiringColumn::Deserialize(
  100. const char* src, std::size_t offset) {
  101. int8_t mask = ROCKSDB_NAMESPACE::cassandra::Deserialize<int8_t>(src, offset);
  102. offset += sizeof(mask);
  103. int8_t index = ROCKSDB_NAMESPACE::cassandra::Deserialize<int8_t>(src, offset);
  104. offset += sizeof(index);
  105. int64_t timestamp =
  106. ROCKSDB_NAMESPACE::cassandra::Deserialize<int64_t>(src, offset);
  107. offset += sizeof(timestamp);
  108. int32_t value_size =
  109. ROCKSDB_NAMESPACE::cassandra::Deserialize<int32_t>(src, offset);
  110. offset += sizeof(value_size);
  111. const char* value = src + offset;
  112. offset += value_size;
  113. int32_t ttl = ROCKSDB_NAMESPACE::cassandra::Deserialize<int32_t>(src, offset);
  114. return std::make_shared<ExpiringColumn>(mask, index, timestamp, value_size,
  115. value, ttl);
  116. }
  117. Tombstone::Tombstone(int8_t mask, int8_t index, int32_t local_deletion_time,
  118. int64_t marked_for_delete_at)
  119. : ColumnBase(mask, index),
  120. local_deletion_time_(local_deletion_time),
  121. marked_for_delete_at_(marked_for_delete_at) {}
  122. int64_t Tombstone::Timestamp() const { return marked_for_delete_at_; }
  123. std::size_t Tombstone::Size() const {
  124. return ColumnBase::Size() + sizeof(local_deletion_time_) +
  125. sizeof(marked_for_delete_at_);
  126. }
  127. void Tombstone::Serialize(std::string* dest) const {
  128. ColumnBase::Serialize(dest);
  129. ROCKSDB_NAMESPACE::cassandra::Serialize<int32_t>(local_deletion_time_, dest);
  130. ROCKSDB_NAMESPACE::cassandra::Serialize<int64_t>(marked_for_delete_at_, dest);
  131. }
  132. bool Tombstone::Collectable(int32_t gc_grace_period_in_seconds) const {
  133. auto local_deleted_at = std::chrono::time_point<std::chrono::system_clock>(
  134. std::chrono::seconds(local_deletion_time_));
  135. auto gc_grace_period = std::chrono::seconds(gc_grace_period_in_seconds);
  136. return local_deleted_at + gc_grace_period < std::chrono::system_clock::now();
  137. }
  138. std::shared_ptr<Tombstone> Tombstone::Deserialize(const char* src,
  139. std::size_t offset) {
  140. int8_t mask = ROCKSDB_NAMESPACE::cassandra::Deserialize<int8_t>(src, offset);
  141. offset += sizeof(mask);
  142. int8_t index = ROCKSDB_NAMESPACE::cassandra::Deserialize<int8_t>(src, offset);
  143. offset += sizeof(index);
  144. int32_t local_deletion_time =
  145. ROCKSDB_NAMESPACE::cassandra::Deserialize<int32_t>(src, offset);
  146. offset += sizeof(int32_t);
  147. int64_t marked_for_delete_at =
  148. ROCKSDB_NAMESPACE::cassandra::Deserialize<int64_t>(src, offset);
  149. return std::make_shared<Tombstone>(mask, index, local_deletion_time,
  150. marked_for_delete_at);
  151. }
  152. RowValue::RowValue(int32_t local_deletion_time, int64_t marked_for_delete_at)
  153. : local_deletion_time_(local_deletion_time),
  154. marked_for_delete_at_(marked_for_delete_at),
  155. columns_(),
  156. last_modified_time_(0) {}
  157. RowValue::RowValue(Columns columns, int64_t last_modified_time)
  158. : local_deletion_time_(kDefaultLocalDeletionTime),
  159. marked_for_delete_at_(kDefaultMarkedForDeleteAt),
  160. columns_(std::move(columns)),
  161. last_modified_time_(last_modified_time) {}
  162. std::size_t RowValue::Size() const {
  163. std::size_t size =
  164. sizeof(local_deletion_time_) + sizeof(marked_for_delete_at_);
  165. for (const auto& column : columns_) {
  166. size += column->Size();
  167. }
  168. return size;
  169. }
  170. int64_t RowValue::LastModifiedTime() const {
  171. if (IsTombstone()) {
  172. return marked_for_delete_at_;
  173. } else {
  174. return last_modified_time_;
  175. }
  176. }
  177. bool RowValue::IsTombstone() const {
  178. return marked_for_delete_at_ > kDefaultMarkedForDeleteAt;
  179. }
  180. void RowValue::Serialize(std::string* dest) const {
  181. ROCKSDB_NAMESPACE::cassandra::Serialize<int32_t>(local_deletion_time_, dest);
  182. ROCKSDB_NAMESPACE::cassandra::Serialize<int64_t>(marked_for_delete_at_, dest);
  183. for (const auto& column : columns_) {
  184. column->Serialize(dest);
  185. }
  186. }
  187. RowValue RowValue::RemoveExpiredColumns(bool* changed) const {
  188. *changed = false;
  189. Columns new_columns;
  190. for (auto& column : columns_) {
  191. if (column->Mask() == ColumnTypeMask::EXPIRATION_MASK) {
  192. std::shared_ptr<ExpiringColumn> expiring_column =
  193. std::static_pointer_cast<ExpiringColumn>(column);
  194. if (expiring_column->Expired()) {
  195. *changed = true;
  196. continue;
  197. }
  198. }
  199. new_columns.push_back(column);
  200. }
  201. return RowValue(std::move(new_columns), last_modified_time_);
  202. }
  203. RowValue RowValue::ConvertExpiredColumnsToTombstones(bool* changed) const {
  204. *changed = false;
  205. Columns new_columns;
  206. for (auto& column : columns_) {
  207. if (column->Mask() == ColumnTypeMask::EXPIRATION_MASK) {
  208. std::shared_ptr<ExpiringColumn> expiring_column =
  209. std::static_pointer_cast<ExpiringColumn>(column);
  210. if (expiring_column->Expired()) {
  211. std::shared_ptr<Tombstone> tombstone = expiring_column->ToTombstone();
  212. new_columns.push_back(tombstone);
  213. *changed = true;
  214. continue;
  215. }
  216. }
  217. new_columns.push_back(column);
  218. }
  219. return RowValue(std::move(new_columns), last_modified_time_);
  220. }
  221. RowValue RowValue::RemoveTombstones(int32_t gc_grace_period) const {
  222. Columns new_columns;
  223. for (auto& column : columns_) {
  224. if (column->Mask() == ColumnTypeMask::DELETION_MASK) {
  225. std::shared_ptr<Tombstone> tombstone =
  226. std::static_pointer_cast<Tombstone>(column);
  227. if (tombstone->Collectable(gc_grace_period)) {
  228. continue;
  229. }
  230. }
  231. new_columns.push_back(column);
  232. }
  233. return RowValue(std::move(new_columns), last_modified_time_);
  234. }
  235. bool RowValue::Empty() const { return columns_.empty(); }
  236. RowValue RowValue::Deserialize(const char* src, std::size_t size) {
  237. std::size_t offset = 0;
  238. assert(size >= sizeof(local_deletion_time_) + sizeof(marked_for_delete_at_));
  239. int32_t local_deletion_time =
  240. ROCKSDB_NAMESPACE::cassandra::Deserialize<int32_t>(src, offset);
  241. offset += sizeof(int32_t);
  242. int64_t marked_for_delete_at =
  243. ROCKSDB_NAMESPACE::cassandra::Deserialize<int64_t>(src, offset);
  244. offset += sizeof(int64_t);
  245. if (offset == size) {
  246. return RowValue(local_deletion_time, marked_for_delete_at);
  247. }
  248. assert(local_deletion_time == kDefaultLocalDeletionTime);
  249. assert(marked_for_delete_at == kDefaultMarkedForDeleteAt);
  250. Columns columns;
  251. int64_t last_modified_time = 0;
  252. while (offset < size) {
  253. auto c = ColumnBase::Deserialize(src, offset);
  254. offset += c->Size();
  255. assert(offset <= size);
  256. last_modified_time = std::max(last_modified_time, c->Timestamp());
  257. columns.push_back(std::move(c));
  258. }
  259. return RowValue(std::move(columns), last_modified_time);
  260. }
  261. // Merge multiple row values into one.
  262. // For each column in rows with same index, we pick the one with latest
  263. // timestamp. And we also take row tombstone into consideration, by iterating
  264. // each row from reverse timestamp order, and stop once we hit the first
  265. // row tombstone.
  266. RowValue RowValue::Merge(std::vector<RowValue>&& values) {
  267. assert(values.size() > 0);
  268. if (values.size() == 1) {
  269. return std::move(values[0]);
  270. }
  271. // Merge columns by their last modified time, and skip once we hit
  272. // a row tombstone.
  273. std::sort(values.begin(), values.end(),
  274. [](const RowValue& r1, const RowValue& r2) {
  275. return r1.LastModifiedTime() > r2.LastModifiedTime();
  276. });
  277. std::map<int8_t, std::shared_ptr<ColumnBase>> merged_columns;
  278. int64_t tombstone_timestamp = 0;
  279. for (auto& value : values) {
  280. if (value.IsTombstone()) {
  281. if (merged_columns.size() == 0) {
  282. return std::move(value);
  283. }
  284. tombstone_timestamp = value.LastModifiedTime();
  285. break;
  286. }
  287. for (auto& column : value.columns_) {
  288. int8_t index = column->Index();
  289. if (merged_columns.find(index) == merged_columns.end()) {
  290. merged_columns[index] = column;
  291. } else {
  292. if (column->Timestamp() > merged_columns[index]->Timestamp()) {
  293. merged_columns[index] = column;
  294. }
  295. }
  296. }
  297. }
  298. int64_t last_modified_time = 0;
  299. Columns columns;
  300. for (auto& pair : merged_columns) {
  301. // For some row, its last_modified_time > row tombstone_timestamp, but
  302. // it might have rows whose timestamp is ealier than tombstone, so we
  303. // ned to filter these rows.
  304. if (pair.second->Timestamp() <= tombstone_timestamp) {
  305. continue;
  306. }
  307. last_modified_time = std::max(last_modified_time, pair.second->Timestamp());
  308. columns.push_back(std::move(pair.second));
  309. }
  310. return RowValue(std::move(columns), last_modified_time);
  311. }
  312. } // namespace ROCKSDB_NAMESPACE::cassandra