| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390 |
- // Copyright (c) 2017-present, Facebook, Inc. All rights reserved.
- // This source code is licensed under both the GPLv2 (found in the
- // COPYING file in the root directory) and Apache 2.0 License
- // (found in the LICENSE.Apache file in the root directory).
- #include "format.h"
- #include <algorithm>
- #include <map>
- #include <memory>
- #include "utilities/cassandra/serialize.h"
- namespace ROCKSDB_NAMESPACE {
- namespace cassandra {
- namespace {
- const int32_t kDefaultLocalDeletionTime =
- std::numeric_limits<int32_t>::max();
- const int64_t kDefaultMarkedForDeleteAt =
- std::numeric_limits<int64_t>::min();
- }
- ColumnBase::ColumnBase(int8_t mask, int8_t index)
- : mask_(mask), index_(index) {}
- std::size_t ColumnBase::Size() const {
- return sizeof(mask_) + sizeof(index_);
- }
- int8_t ColumnBase::Mask() const {
- return mask_;
- }
- int8_t ColumnBase::Index() const {
- return index_;
- }
- void ColumnBase::Serialize(std::string* dest) const {
- ROCKSDB_NAMESPACE::cassandra::Serialize<int8_t>(mask_, dest);
- ROCKSDB_NAMESPACE::cassandra::Serialize<int8_t>(index_, dest);
- }
- std::shared_ptr<ColumnBase> ColumnBase::Deserialize(const char* src,
- std::size_t offset) {
- int8_t mask = ROCKSDB_NAMESPACE::cassandra::Deserialize<int8_t>(src, offset);
- if ((mask & ColumnTypeMask::DELETION_MASK) != 0) {
- return Tombstone::Deserialize(src, offset);
- } else if ((mask & ColumnTypeMask::EXPIRATION_MASK) != 0) {
- return ExpiringColumn::Deserialize(src, offset);
- } else {
- return Column::Deserialize(src, offset);
- }
- }
- Column::Column(
- int8_t mask,
- int8_t index,
- int64_t timestamp,
- int32_t value_size,
- const char* value
- ) : ColumnBase(mask, index), timestamp_(timestamp),
- value_size_(value_size), value_(value) {}
- int64_t Column::Timestamp() const {
- return timestamp_;
- }
- std::size_t Column::Size() const {
- return ColumnBase::Size() + sizeof(timestamp_) + sizeof(value_size_)
- + value_size_;
- }
- void Column::Serialize(std::string* dest) const {
- ColumnBase::Serialize(dest);
- ROCKSDB_NAMESPACE::cassandra::Serialize<int64_t>(timestamp_, dest);
- ROCKSDB_NAMESPACE::cassandra::Serialize<int32_t>(value_size_, dest);
- dest->append(value_, value_size_);
- }
- std::shared_ptr<Column> Column::Deserialize(const char *src,
- std::size_t offset) {
- int8_t mask = ROCKSDB_NAMESPACE::cassandra::Deserialize<int8_t>(src, offset);
- offset += sizeof(mask);
- int8_t index = ROCKSDB_NAMESPACE::cassandra::Deserialize<int8_t>(src, offset);
- offset += sizeof(index);
- int64_t timestamp =
- ROCKSDB_NAMESPACE::cassandra::Deserialize<int64_t>(src, offset);
- offset += sizeof(timestamp);
- int32_t value_size =
- ROCKSDB_NAMESPACE::cassandra::Deserialize<int32_t>(src, offset);
- offset += sizeof(value_size);
- return std::make_shared<Column>(
- mask, index, timestamp, value_size, src + offset);
- }
- ExpiringColumn::ExpiringColumn(
- int8_t mask,
- int8_t index,
- int64_t timestamp,
- int32_t value_size,
- const char* value,
- int32_t ttl
- ) : Column(mask, index, timestamp, value_size, value),
- ttl_(ttl) {}
- std::size_t ExpiringColumn::Size() const {
- return Column::Size() + sizeof(ttl_);
- }
- void ExpiringColumn::Serialize(std::string* dest) const {
- Column::Serialize(dest);
- ROCKSDB_NAMESPACE::cassandra::Serialize<int32_t>(ttl_, dest);
- }
- std::chrono::time_point<std::chrono::system_clock> ExpiringColumn::TimePoint() const {
- return std::chrono::time_point<std::chrono::system_clock>(std::chrono::microseconds(Timestamp()));
- }
- std::chrono::seconds ExpiringColumn::Ttl() const {
- return std::chrono::seconds(ttl_);
- }
- bool ExpiringColumn::Expired() const {
- return TimePoint() + Ttl() < std::chrono::system_clock::now();
- }
- std::shared_ptr<Tombstone> ExpiringColumn::ToTombstone() const {
- auto expired_at = (TimePoint() + Ttl()).time_since_epoch();
- int32_t local_deletion_time = static_cast<int32_t>(
- std::chrono::duration_cast<std::chrono::seconds>(expired_at).count());
- int64_t marked_for_delete_at =
- std::chrono::duration_cast<std::chrono::microseconds>(expired_at).count();
- return std::make_shared<Tombstone>(
- static_cast<int8_t>(ColumnTypeMask::DELETION_MASK),
- Index(),
- local_deletion_time,
- marked_for_delete_at);
- }
- std::shared_ptr<ExpiringColumn> ExpiringColumn::Deserialize(
- const char *src,
- std::size_t offset) {
- int8_t mask = ROCKSDB_NAMESPACE::cassandra::Deserialize<int8_t>(src, offset);
- offset += sizeof(mask);
- int8_t index = ROCKSDB_NAMESPACE::cassandra::Deserialize<int8_t>(src, offset);
- offset += sizeof(index);
- int64_t timestamp =
- ROCKSDB_NAMESPACE::cassandra::Deserialize<int64_t>(src, offset);
- offset += sizeof(timestamp);
- int32_t value_size =
- ROCKSDB_NAMESPACE::cassandra::Deserialize<int32_t>(src, offset);
- offset += sizeof(value_size);
- const char* value = src + offset;
- offset += value_size;
- int32_t ttl = ROCKSDB_NAMESPACE::cassandra::Deserialize<int32_t>(src, offset);
- return std::make_shared<ExpiringColumn>(
- mask, index, timestamp, value_size, value, ttl);
- }
- Tombstone::Tombstone(
- int8_t mask,
- int8_t index,
- int32_t local_deletion_time,
- int64_t marked_for_delete_at
- ) : ColumnBase(mask, index), local_deletion_time_(local_deletion_time),
- marked_for_delete_at_(marked_for_delete_at) {}
- int64_t Tombstone::Timestamp() const {
- return marked_for_delete_at_;
- }
- std::size_t Tombstone::Size() const {
- return ColumnBase::Size() + sizeof(local_deletion_time_)
- + sizeof(marked_for_delete_at_);
- }
- void Tombstone::Serialize(std::string* dest) const {
- ColumnBase::Serialize(dest);
- ROCKSDB_NAMESPACE::cassandra::Serialize<int32_t>(local_deletion_time_, dest);
- ROCKSDB_NAMESPACE::cassandra::Serialize<int64_t>(marked_for_delete_at_, dest);
- }
- bool Tombstone::Collectable(int32_t gc_grace_period_in_seconds) const {
- auto local_deleted_at = std::chrono::time_point<std::chrono::system_clock>(
- std::chrono::seconds(local_deletion_time_));
- auto gc_grace_period = std::chrono::seconds(gc_grace_period_in_seconds);
- return local_deleted_at + gc_grace_period < std::chrono::system_clock::now();
- }
- std::shared_ptr<Tombstone> Tombstone::Deserialize(const char *src,
- std::size_t offset) {
- int8_t mask = ROCKSDB_NAMESPACE::cassandra::Deserialize<int8_t>(src, offset);
- offset += sizeof(mask);
- int8_t index = ROCKSDB_NAMESPACE::cassandra::Deserialize<int8_t>(src, offset);
- offset += sizeof(index);
- int32_t local_deletion_time =
- ROCKSDB_NAMESPACE::cassandra::Deserialize<int32_t>(src, offset);
- offset += sizeof(int32_t);
- int64_t marked_for_delete_at =
- ROCKSDB_NAMESPACE::cassandra::Deserialize<int64_t>(src, offset);
- return std::make_shared<Tombstone>(
- mask, index, local_deletion_time, marked_for_delete_at);
- }
- RowValue::RowValue(int32_t local_deletion_time, int64_t marked_for_delete_at)
- : local_deletion_time_(local_deletion_time),
- marked_for_delete_at_(marked_for_delete_at), columns_(),
- last_modified_time_(0) {}
- RowValue::RowValue(Columns columns,
- int64_t last_modified_time)
- : local_deletion_time_(kDefaultLocalDeletionTime),
- marked_for_delete_at_(kDefaultMarkedForDeleteAt),
- columns_(std::move(columns)), last_modified_time_(last_modified_time) {}
- std::size_t RowValue::Size() const {
- std::size_t size = sizeof(local_deletion_time_)
- + sizeof(marked_for_delete_at_);
- for (const auto& column : columns_) {
- size += column -> Size();
- }
- return size;
- }
- int64_t RowValue::LastModifiedTime() const {
- if (IsTombstone()) {
- return marked_for_delete_at_;
- } else {
- return last_modified_time_;
- }
- }
- bool RowValue::IsTombstone() const {
- return marked_for_delete_at_ > kDefaultMarkedForDeleteAt;
- }
- void RowValue::Serialize(std::string* dest) const {
- ROCKSDB_NAMESPACE::cassandra::Serialize<int32_t>(local_deletion_time_, dest);
- ROCKSDB_NAMESPACE::cassandra::Serialize<int64_t>(marked_for_delete_at_, dest);
- for (const auto& column : columns_) {
- column -> Serialize(dest);
- }
- }
- RowValue RowValue::RemoveExpiredColumns(bool* changed) const {
- *changed = false;
- Columns new_columns;
- for (auto& column : columns_) {
- if(column->Mask() == ColumnTypeMask::EXPIRATION_MASK) {
- std::shared_ptr<ExpiringColumn> expiring_column =
- std::static_pointer_cast<ExpiringColumn>(column);
- if(expiring_column->Expired()){
- *changed = true;
- continue;
- }
- }
- new_columns.push_back(column);
- }
- return RowValue(std::move(new_columns), last_modified_time_);
- }
- RowValue RowValue::ConvertExpiredColumnsToTombstones(bool* changed) const {
- *changed = false;
- Columns new_columns;
- for (auto& column : columns_) {
- if(column->Mask() == ColumnTypeMask::EXPIRATION_MASK) {
- std::shared_ptr<ExpiringColumn> expiring_column =
- std::static_pointer_cast<ExpiringColumn>(column);
- if(expiring_column->Expired()) {
- std::shared_ptr<Tombstone> tombstone = expiring_column->ToTombstone();
- new_columns.push_back(tombstone);
- *changed = true;
- continue;
- }
- }
- new_columns.push_back(column);
- }
- return RowValue(std::move(new_columns), last_modified_time_);
- }
- RowValue RowValue::RemoveTombstones(int32_t gc_grace_period) const {
- Columns new_columns;
- for (auto& column : columns_) {
- if (column->Mask() == ColumnTypeMask::DELETION_MASK) {
- std::shared_ptr<Tombstone> tombstone =
- std::static_pointer_cast<Tombstone>(column);
- if (tombstone->Collectable(gc_grace_period)) {
- continue;
- }
- }
- new_columns.push_back(column);
- }
- return RowValue(std::move(new_columns), last_modified_time_);
- }
- bool RowValue::Empty() const {
- return columns_.empty();
- }
- RowValue RowValue::Deserialize(const char *src, std::size_t size) {
- std::size_t offset = 0;
- assert(size >= sizeof(local_deletion_time_) + sizeof(marked_for_delete_at_));
- int32_t local_deletion_time =
- ROCKSDB_NAMESPACE::cassandra::Deserialize<int32_t>(src, offset);
- offset += sizeof(int32_t);
- int64_t marked_for_delete_at =
- ROCKSDB_NAMESPACE::cassandra::Deserialize<int64_t>(src, offset);
- offset += sizeof(int64_t);
- if (offset == size) {
- return RowValue(local_deletion_time, marked_for_delete_at);
- }
- assert(local_deletion_time == kDefaultLocalDeletionTime);
- assert(marked_for_delete_at == kDefaultMarkedForDeleteAt);
- Columns columns;
- int64_t last_modified_time = 0;
- while (offset < size) {
- auto c = ColumnBase::Deserialize(src, offset);
- offset += c -> Size();
- assert(offset <= size);
- last_modified_time = std::max(last_modified_time, c -> Timestamp());
- columns.push_back(std::move(c));
- }
- return RowValue(std::move(columns), last_modified_time);
- }
- // Merge multiple row values into one.
- // For each column in rows with same index, we pick the one with latest
- // timestamp. And we also take row tombstone into consideration, by iterating
- // each row from reverse timestamp order, and stop once we hit the first
- // row tombstone.
- RowValue RowValue::Merge(std::vector<RowValue>&& values) {
- assert(values.size() > 0);
- if (values.size() == 1) {
- return std::move(values[0]);
- }
- // Merge columns by their last modified time, and skip once we hit
- // a row tombstone.
- std::sort(values.begin(), values.end(),
- [](const RowValue& r1, const RowValue& r2) {
- return r1.LastModifiedTime() > r2.LastModifiedTime();
- });
- std::map<int8_t, std::shared_ptr<ColumnBase>> merged_columns;
- int64_t tombstone_timestamp = 0;
- for (auto& value : values) {
- if (value.IsTombstone()) {
- if (merged_columns.size() == 0) {
- return std::move(value);
- }
- tombstone_timestamp = value.LastModifiedTime();
- break;
- }
- for (auto& column : value.columns_) {
- int8_t index = column->Index();
- if (merged_columns.find(index) == merged_columns.end()) {
- merged_columns[index] = column;
- } else {
- if (column->Timestamp() > merged_columns[index]->Timestamp()) {
- merged_columns[index] = column;
- }
- }
- }
- }
- int64_t last_modified_time = 0;
- Columns columns;
- for (auto& pair: merged_columns) {
- // For some row, its last_modified_time > row tombstone_timestamp, but
- // it might have rows whose timestamp is ealier than tombstone, so we
- // ned to filter these rows.
- if (pair.second->Timestamp() <= tombstone_timestamp) {
- continue;
- }
- last_modified_time = std::max(last_modified_time, pair.second->Timestamp());
- columns.push_back(std::move(pair.second));
- }
- return RowValue(std::move(columns), last_modified_time);
- }
- } // namepsace cassandrda
- } // namespace ROCKSDB_NAMESPACE
|