format.cc 12 KB


  1. // Copyright (c) 2017-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #include "format.h"
  6. #include <algorithm>
  7. #include <map>
  8. #include <memory>
  9. #include "utilities/cassandra/serialize.h"
  10. namespace ROCKSDB_NAMESPACE {
  11. namespace cassandra {
  12. namespace {
  13. const int32_t kDefaultLocalDeletionTime =
  14. std::numeric_limits<int32_t>::max();
  15. const int64_t kDefaultMarkedForDeleteAt =
  16. std::numeric_limits<int64_t>::min();
  17. }
  18. ColumnBase::ColumnBase(int8_t mask, int8_t index)
  19. : mask_(mask), index_(index) {}
  20. std::size_t ColumnBase::Size() const {
  21. return sizeof(mask_) + sizeof(index_);
  22. }
  23. int8_t ColumnBase::Mask() const {
  24. return mask_;
  25. }
  26. int8_t ColumnBase::Index() const {
  27. return index_;
  28. }
  29. void ColumnBase::Serialize(std::string* dest) const {
  30. ROCKSDB_NAMESPACE::cassandra::Serialize<int8_t>(mask_, dest);
  31. ROCKSDB_NAMESPACE::cassandra::Serialize<int8_t>(index_, dest);
  32. }
  33. std::shared_ptr<ColumnBase> ColumnBase::Deserialize(const char* src,
  34. std::size_t offset) {
  35. int8_t mask = ROCKSDB_NAMESPACE::cassandra::Deserialize<int8_t>(src, offset);
  36. if ((mask & ColumnTypeMask::DELETION_MASK) != 0) {
  37. return Tombstone::Deserialize(src, offset);
  38. } else if ((mask & ColumnTypeMask::EXPIRATION_MASK) != 0) {
  39. return ExpiringColumn::Deserialize(src, offset);
  40. } else {
  41. return Column::Deserialize(src, offset);
  42. }
  43. }
  44. Column::Column(
  45. int8_t mask,
  46. int8_t index,
  47. int64_t timestamp,
  48. int32_t value_size,
  49. const char* value
  50. ) : ColumnBase(mask, index), timestamp_(timestamp),
  51. value_size_(value_size), value_(value) {}
  52. int64_t Column::Timestamp() const {
  53. return timestamp_;
  54. }
  55. std::size_t Column::Size() const {
  56. return ColumnBase::Size() + sizeof(timestamp_) + sizeof(value_size_)
  57. + value_size_;
  58. }
  59. void Column::Serialize(std::string* dest) const {
  60. ColumnBase::Serialize(dest);
  61. ROCKSDB_NAMESPACE::cassandra::Serialize<int64_t>(timestamp_, dest);
  62. ROCKSDB_NAMESPACE::cassandra::Serialize<int32_t>(value_size_, dest);
  63. dest->append(value_, value_size_);
  64. }
  65. std::shared_ptr<Column> Column::Deserialize(const char *src,
  66. std::size_t offset) {
  67. int8_t mask = ROCKSDB_NAMESPACE::cassandra::Deserialize<int8_t>(src, offset);
  68. offset += sizeof(mask);
  69. int8_t index = ROCKSDB_NAMESPACE::cassandra::Deserialize<int8_t>(src, offset);
  70. offset += sizeof(index);
  71. int64_t timestamp =
  72. ROCKSDB_NAMESPACE::cassandra::Deserialize<int64_t>(src, offset);
  73. offset += sizeof(timestamp);
  74. int32_t value_size =
  75. ROCKSDB_NAMESPACE::cassandra::Deserialize<int32_t>(src, offset);
  76. offset += sizeof(value_size);
  77. return std::make_shared<Column>(
  78. mask, index, timestamp, value_size, src + offset);
  79. }
  80. ExpiringColumn::ExpiringColumn(
  81. int8_t mask,
  82. int8_t index,
  83. int64_t timestamp,
  84. int32_t value_size,
  85. const char* value,
  86. int32_t ttl
  87. ) : Column(mask, index, timestamp, value_size, value),
  88. ttl_(ttl) {}
  89. std::size_t ExpiringColumn::Size() const {
  90. return Column::Size() + sizeof(ttl_);
  91. }
  92. void ExpiringColumn::Serialize(std::string* dest) const {
  93. Column::Serialize(dest);
  94. ROCKSDB_NAMESPACE::cassandra::Serialize<int32_t>(ttl_, dest);
  95. }
  96. std::chrono::time_point<std::chrono::system_clock> ExpiringColumn::TimePoint() const {
  97. return std::chrono::time_point<std::chrono::system_clock>(std::chrono::microseconds(Timestamp()));
  98. }
  99. std::chrono::seconds ExpiringColumn::Ttl() const {
  100. return std::chrono::seconds(ttl_);
  101. }
  102. bool ExpiringColumn::Expired() const {
  103. return TimePoint() + Ttl() < std::chrono::system_clock::now();
  104. }
  105. std::shared_ptr<Tombstone> ExpiringColumn::ToTombstone() const {
  106. auto expired_at = (TimePoint() + Ttl()).time_since_epoch();
  107. int32_t local_deletion_time = static_cast<int32_t>(
  108. std::chrono::duration_cast<std::chrono::seconds>(expired_at).count());
  109. int64_t marked_for_delete_at =
  110. std::chrono::duration_cast<std::chrono::microseconds>(expired_at).count();
  111. return std::make_shared<Tombstone>(
  112. static_cast<int8_t>(ColumnTypeMask::DELETION_MASK),
  113. Index(),
  114. local_deletion_time,
  115. marked_for_delete_at);
  116. }
  117. std::shared_ptr<ExpiringColumn> ExpiringColumn::Deserialize(
  118. const char *src,
  119. std::size_t offset) {
  120. int8_t mask = ROCKSDB_NAMESPACE::cassandra::Deserialize<int8_t>(src, offset);
  121. offset += sizeof(mask);
  122. int8_t index = ROCKSDB_NAMESPACE::cassandra::Deserialize<int8_t>(src, offset);
  123. offset += sizeof(index);
  124. int64_t timestamp =
  125. ROCKSDB_NAMESPACE::cassandra::Deserialize<int64_t>(src, offset);
  126. offset += sizeof(timestamp);
  127. int32_t value_size =
  128. ROCKSDB_NAMESPACE::cassandra::Deserialize<int32_t>(src, offset);
  129. offset += sizeof(value_size);
  130. const char* value = src + offset;
  131. offset += value_size;
  132. int32_t ttl = ROCKSDB_NAMESPACE::cassandra::Deserialize<int32_t>(src, offset);
  133. return std::make_shared<ExpiringColumn>(
  134. mask, index, timestamp, value_size, value, ttl);
  135. }
  136. Tombstone::Tombstone(
  137. int8_t mask,
  138. int8_t index,
  139. int32_t local_deletion_time,
  140. int64_t marked_for_delete_at
  141. ) : ColumnBase(mask, index), local_deletion_time_(local_deletion_time),
  142. marked_for_delete_at_(marked_for_delete_at) {}
  143. int64_t Tombstone::Timestamp() const {
  144. return marked_for_delete_at_;
  145. }
  146. std::size_t Tombstone::Size() const {
  147. return ColumnBase::Size() + sizeof(local_deletion_time_)
  148. + sizeof(marked_for_delete_at_);
  149. }
  150. void Tombstone::Serialize(std::string* dest) const {
  151. ColumnBase::Serialize(dest);
  152. ROCKSDB_NAMESPACE::cassandra::Serialize<int32_t>(local_deletion_time_, dest);
  153. ROCKSDB_NAMESPACE::cassandra::Serialize<int64_t>(marked_for_delete_at_, dest);
  154. }
  155. bool Tombstone::Collectable(int32_t gc_grace_period_in_seconds) const {
  156. auto local_deleted_at = std::chrono::time_point<std::chrono::system_clock>(
  157. std::chrono::seconds(local_deletion_time_));
  158. auto gc_grace_period = std::chrono::seconds(gc_grace_period_in_seconds);
  159. return local_deleted_at + gc_grace_period < std::chrono::system_clock::now();
  160. }
  161. std::shared_ptr<Tombstone> Tombstone::Deserialize(const char *src,
  162. std::size_t offset) {
  163. int8_t mask = ROCKSDB_NAMESPACE::cassandra::Deserialize<int8_t>(src, offset);
  164. offset += sizeof(mask);
  165. int8_t index = ROCKSDB_NAMESPACE::cassandra::Deserialize<int8_t>(src, offset);
  166. offset += sizeof(index);
  167. int32_t local_deletion_time =
  168. ROCKSDB_NAMESPACE::cassandra::Deserialize<int32_t>(src, offset);
  169. offset += sizeof(int32_t);
  170. int64_t marked_for_delete_at =
  171. ROCKSDB_NAMESPACE::cassandra::Deserialize<int64_t>(src, offset);
  172. return std::make_shared<Tombstone>(
  173. mask, index, local_deletion_time, marked_for_delete_at);
  174. }
  175. RowValue::RowValue(int32_t local_deletion_time, int64_t marked_for_delete_at)
  176. : local_deletion_time_(local_deletion_time),
  177. marked_for_delete_at_(marked_for_delete_at), columns_(),
  178. last_modified_time_(0) {}
  179. RowValue::RowValue(Columns columns,
  180. int64_t last_modified_time)
  181. : local_deletion_time_(kDefaultLocalDeletionTime),
  182. marked_for_delete_at_(kDefaultMarkedForDeleteAt),
  183. columns_(std::move(columns)), last_modified_time_(last_modified_time) {}
  184. std::size_t RowValue::Size() const {
  185. std::size_t size = sizeof(local_deletion_time_)
  186. + sizeof(marked_for_delete_at_);
  187. for (const auto& column : columns_) {
  188. size += column -> Size();
  189. }
  190. return size;
  191. }
  192. int64_t RowValue::LastModifiedTime() const {
  193. if (IsTombstone()) {
  194. return marked_for_delete_at_;
  195. } else {
  196. return last_modified_time_;
  197. }
  198. }
  199. bool RowValue::IsTombstone() const {
  200. return marked_for_delete_at_ > kDefaultMarkedForDeleteAt;
  201. }
  202. void RowValue::Serialize(std::string* dest) const {
  203. ROCKSDB_NAMESPACE::cassandra::Serialize<int32_t>(local_deletion_time_, dest);
  204. ROCKSDB_NAMESPACE::cassandra::Serialize<int64_t>(marked_for_delete_at_, dest);
  205. for (const auto& column : columns_) {
  206. column -> Serialize(dest);
  207. }
  208. }
  209. RowValue RowValue::RemoveExpiredColumns(bool* changed) const {
  210. *changed = false;
  211. Columns new_columns;
  212. for (auto& column : columns_) {
  213. if(column->Mask() == ColumnTypeMask::EXPIRATION_MASK) {
  214. std::shared_ptr<ExpiringColumn> expiring_column =
  215. std::static_pointer_cast<ExpiringColumn>(column);
  216. if(expiring_column->Expired()){
  217. *changed = true;
  218. continue;
  219. }
  220. }
  221. new_columns.push_back(column);
  222. }
  223. return RowValue(std::move(new_columns), last_modified_time_);
  224. }
  225. RowValue RowValue::ConvertExpiredColumnsToTombstones(bool* changed) const {
  226. *changed = false;
  227. Columns new_columns;
  228. for (auto& column : columns_) {
  229. if(column->Mask() == ColumnTypeMask::EXPIRATION_MASK) {
  230. std::shared_ptr<ExpiringColumn> expiring_column =
  231. std::static_pointer_cast<ExpiringColumn>(column);
  232. if(expiring_column->Expired()) {
  233. std::shared_ptr<Tombstone> tombstone = expiring_column->ToTombstone();
  234. new_columns.push_back(tombstone);
  235. *changed = true;
  236. continue;
  237. }
  238. }
  239. new_columns.push_back(column);
  240. }
  241. return RowValue(std::move(new_columns), last_modified_time_);
  242. }
  243. RowValue RowValue::RemoveTombstones(int32_t gc_grace_period) const {
  244. Columns new_columns;
  245. for (auto& column : columns_) {
  246. if (column->Mask() == ColumnTypeMask::DELETION_MASK) {
  247. std::shared_ptr<Tombstone> tombstone =
  248. std::static_pointer_cast<Tombstone>(column);
  249. if (tombstone->Collectable(gc_grace_period)) {
  250. continue;
  251. }
  252. }
  253. new_columns.push_back(column);
  254. }
  255. return RowValue(std::move(new_columns), last_modified_time_);
  256. }
  257. bool RowValue::Empty() const {
  258. return columns_.empty();
  259. }
  260. RowValue RowValue::Deserialize(const char *src, std::size_t size) {
  261. std::size_t offset = 0;
  262. assert(size >= sizeof(local_deletion_time_) + sizeof(marked_for_delete_at_));
  263. int32_t local_deletion_time =
  264. ROCKSDB_NAMESPACE::cassandra::Deserialize<int32_t>(src, offset);
  265. offset += sizeof(int32_t);
  266. int64_t marked_for_delete_at =
  267. ROCKSDB_NAMESPACE::cassandra::Deserialize<int64_t>(src, offset);
  268. offset += sizeof(int64_t);
  269. if (offset == size) {
  270. return RowValue(local_deletion_time, marked_for_delete_at);
  271. }
  272. assert(local_deletion_time == kDefaultLocalDeletionTime);
  273. assert(marked_for_delete_at == kDefaultMarkedForDeleteAt);
  274. Columns columns;
  275. int64_t last_modified_time = 0;
  276. while (offset < size) {
  277. auto c = ColumnBase::Deserialize(src, offset);
  278. offset += c -> Size();
  279. assert(offset <= size);
  280. last_modified_time = std::max(last_modified_time, c -> Timestamp());
  281. columns.push_back(std::move(c));
  282. }
  283. return RowValue(std::move(columns), last_modified_time);
  284. }
  285. // Merge multiple row values into one.
  286. // For each column in rows with same index, we pick the one with latest
  287. // timestamp. And we also take row tombstone into consideration, by iterating
  288. // each row from reverse timestamp order, and stop once we hit the first
  289. // row tombstone.
  290. RowValue RowValue::Merge(std::vector<RowValue>&& values) {
  291. assert(values.size() > 0);
  292. if (values.size() == 1) {
  293. return std::move(values[0]);
  294. }
  295. // Merge columns by their last modified time, and skip once we hit
  296. // a row tombstone.
  297. std::sort(values.begin(), values.end(),
  298. [](const RowValue& r1, const RowValue& r2) {
  299. return r1.LastModifiedTime() > r2.LastModifiedTime();
  300. });
  301. std::map<int8_t, std::shared_ptr<ColumnBase>> merged_columns;
  302. int64_t tombstone_timestamp = 0;
  303. for (auto& value : values) {
  304. if (value.IsTombstone()) {
  305. if (merged_columns.size() == 0) {
  306. return std::move(value);
  307. }
  308. tombstone_timestamp = value.LastModifiedTime();
  309. break;
  310. }
  311. for (auto& column : value.columns_) {
  312. int8_t index = column->Index();
  313. if (merged_columns.find(index) == merged_columns.end()) {
  314. merged_columns[index] = column;
  315. } else {
  316. if (column->Timestamp() > merged_columns[index]->Timestamp()) {
  317. merged_columns[index] = column;
  318. }
  319. }
  320. }
  321. }
  322. int64_t last_modified_time = 0;
  323. Columns columns;
  324. for (auto& pair: merged_columns) {
  325. // For some row, its last_modified_time > row tombstone_timestamp, but
  326. // it might have rows whose timestamp is ealier than tombstone, so we
  327. // ned to filter these rows.
  328. if (pair.second->Timestamp() <= tombstone_timestamp) {
  329. continue;
  330. }
  331. last_modified_time = std::max(last_modified_time, pair.second->Timestamp());
  332. columns.push_back(std::move(pair.second));
  333. }
  334. return RowValue(std::move(columns), last_modified_time);
  335. }
  336. } // namepsace cassandrda
  337. } // namespace ROCKSDB_NAMESPACE