| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311 |
- // Copyright (c) 2017-present, Facebook, Inc. All rights reserved.
- // This source code is licensed under both the GPLv2 (found in the
- // COPYING file in the root directory) and Apache 2.0 License
- // (found in the LICENSE.Apache file in the root directory).
- #include <iostream>
- #include "db/db_impl/db_impl.h"
- #include "rocksdb/db.h"
- #include "rocksdb/merge_operator.h"
- #include "rocksdb/utilities/db_ttl.h"
- #include "test_util/testharness.h"
- #include "util/random.h"
- #include "utilities/cassandra/cassandra_compaction_filter.h"
- #include "utilities/cassandra/merge_operator.h"
- #include "utilities/cassandra/test_utils.h"
- #include "utilities/merge_operators.h"
- using namespace ROCKSDB_NAMESPACE;
- namespace ROCKSDB_NAMESPACE {
- namespace cassandra {
- // Path to the database on file system
- const std::string kDbName = test::PerThreadDBPath("cassandra_functional_test");
- class CassandraStore {
- public:
- explicit CassandraStore(std::shared_ptr<DB> db)
- : db_(db), write_option_(), get_option_() {
- assert(db);
- }
- bool Append(const std::string& key, const RowValue& val){
- std::string result;
- val.Serialize(&result);
- Slice valSlice(result.data(), result.size());
- auto s = db_->Merge(write_option_, key, valSlice);
- if (s.ok()) {
- return true;
- } else {
- std::cerr << "ERROR " << s.ToString() << std::endl;
- return false;
- }
- }
- bool Put(const std::string& key, const RowValue& val) {
- std::string result;
- val.Serialize(&result);
- Slice valSlice(result.data(), result.size());
- auto s = db_->Put(write_option_, key, valSlice);
- if (s.ok()) {
- return true;
- } else {
- std::cerr << "ERROR " << s.ToString() << std::endl;
- return false;
- }
- }
- void Flush() {
- dbfull()->TEST_FlushMemTable();
- dbfull()->TEST_WaitForCompact();
- }
- void Compact() {
- dbfull()->TEST_CompactRange(
- 0, nullptr, nullptr, db_->DefaultColumnFamily());
- }
- std::tuple<bool, RowValue> Get(const std::string& key){
- std::string result;
- auto s = db_->Get(get_option_, key, &result);
- if (s.ok()) {
- return std::make_tuple(true,
- RowValue::Deserialize(result.data(),
- result.size()));
- }
- if (!s.IsNotFound()) {
- std::cerr << "ERROR " << s.ToString() << std::endl;
- }
- return std::make_tuple(false, RowValue(0, 0));
- }
- private:
- std::shared_ptr<DB> db_;
- WriteOptions write_option_;
- ReadOptions get_option_;
- DBImpl* dbfull() { return reinterpret_cast<DBImpl*>(db_.get()); }
- };
- class TestCompactionFilterFactory : public CompactionFilterFactory {
- public:
- explicit TestCompactionFilterFactory(bool purge_ttl_on_expiration,
- int32_t gc_grace_period_in_seconds)
- : purge_ttl_on_expiration_(purge_ttl_on_expiration),
- gc_grace_period_in_seconds_(gc_grace_period_in_seconds) {}
- std::unique_ptr<CompactionFilter> CreateCompactionFilter(
- const CompactionFilter::Context& /*context*/) override {
- return std::unique_ptr<CompactionFilter>(new CassandraCompactionFilter(
- purge_ttl_on_expiration_, gc_grace_period_in_seconds_));
- }
- const char* Name() const override { return "TestCompactionFilterFactory"; }
- private:
- bool purge_ttl_on_expiration_;
- int32_t gc_grace_period_in_seconds_;
- };
- // The class for unit-testing
- class CassandraFunctionalTest : public testing::Test {
- public:
- CassandraFunctionalTest() {
- DestroyDB(kDbName, Options()); // Start each test with a fresh DB
- }
- std::shared_ptr<DB> OpenDb() {
- DB* db;
- Options options;
- options.create_if_missing = true;
- options.merge_operator.reset(new CassandraValueMergeOperator(gc_grace_period_in_seconds_));
- auto* cf_factory = new TestCompactionFilterFactory(
- purge_ttl_on_expiration_, gc_grace_period_in_seconds_);
- options.compaction_filter_factory.reset(cf_factory);
- EXPECT_OK(DB::Open(options, kDbName, &db));
- return std::shared_ptr<DB>(db);
- }
- bool purge_ttl_on_expiration_ = false;
- int32_t gc_grace_period_in_seconds_ = 100;
- };
- // THE TEST CASES BEGIN HERE
- TEST_F(CassandraFunctionalTest, SimpleMergeTest) {
- CassandraStore store(OpenDb());
- int64_t now = time(nullptr);
- store.Append("k1", CreateTestRowValue({
- CreateTestColumnSpec(kTombstone, 0, ToMicroSeconds(now + 5)),
- CreateTestColumnSpec(kColumn, 1, ToMicroSeconds(now + 8)),
- CreateTestColumnSpec(kExpiringColumn, 2, ToMicroSeconds(now + 5)),
- }));
- store.Append("k1",CreateTestRowValue({
- CreateTestColumnSpec(kColumn, 0, ToMicroSeconds(now + 2)),
- CreateTestColumnSpec(kExpiringColumn, 1, ToMicroSeconds(now + 5)),
- CreateTestColumnSpec(kTombstone, 2, ToMicroSeconds(now + 7)),
- CreateTestColumnSpec(kExpiringColumn, 7, ToMicroSeconds(now + 17)),
- }));
- store.Append("k1", CreateTestRowValue({
- CreateTestColumnSpec(kExpiringColumn, 0, ToMicroSeconds(now + 6)),
- CreateTestColumnSpec(kTombstone, 1, ToMicroSeconds(now + 5)),
- CreateTestColumnSpec(kColumn, 2, ToMicroSeconds(now + 4)),
- CreateTestColumnSpec(kTombstone, 11, ToMicroSeconds(now + 11)),
- }));
- auto ret = store.Get("k1");
- ASSERT_TRUE(std::get<0>(ret));
- RowValue& merged = std::get<1>(ret);
- EXPECT_EQ(merged.columns_.size(), 5);
- VerifyRowValueColumns(merged.columns_, 0, kExpiringColumn, 0, ToMicroSeconds(now + 6));
- VerifyRowValueColumns(merged.columns_, 1, kColumn, 1, ToMicroSeconds(now + 8));
- VerifyRowValueColumns(merged.columns_, 2, kTombstone, 2, ToMicroSeconds(now + 7));
- VerifyRowValueColumns(merged.columns_, 3, kExpiringColumn, 7, ToMicroSeconds(now + 17));
- VerifyRowValueColumns(merged.columns_, 4, kTombstone, 11, ToMicroSeconds(now + 11));
- }
- TEST_F(CassandraFunctionalTest,
- CompactionShouldConvertExpiredColumnsToTombstone) {
- CassandraStore store(OpenDb());
- int64_t now= time(nullptr);
- store.Append("k1", CreateTestRowValue({
- CreateTestColumnSpec(kExpiringColumn, 0, ToMicroSeconds(now - kTtl - 20)), //expired
- CreateTestColumnSpec(kExpiringColumn, 1, ToMicroSeconds(now - kTtl + 10)), // not expired
- CreateTestColumnSpec(kTombstone, 3, ToMicroSeconds(now))
- }));
- store.Flush();
- store.Append("k1",CreateTestRowValue({
- CreateTestColumnSpec(kExpiringColumn, 0, ToMicroSeconds(now - kTtl - 10)), //expired
- CreateTestColumnSpec(kColumn, 2, ToMicroSeconds(now))
- }));
- store.Flush();
- store.Compact();
- auto ret = store.Get("k1");
- ASSERT_TRUE(std::get<0>(ret));
- RowValue& merged = std::get<1>(ret);
- EXPECT_EQ(merged.columns_.size(), 4);
- VerifyRowValueColumns(merged.columns_, 0, kTombstone, 0, ToMicroSeconds(now - 10));
- VerifyRowValueColumns(merged.columns_, 1, kExpiringColumn, 1, ToMicroSeconds(now - kTtl + 10));
- VerifyRowValueColumns(merged.columns_, 2, kColumn, 2, ToMicroSeconds(now));
- VerifyRowValueColumns(merged.columns_, 3, kTombstone, 3, ToMicroSeconds(now));
- }
- TEST_F(CassandraFunctionalTest,
- CompactionShouldPurgeExpiredColumnsIfPurgeTtlIsOn) {
- purge_ttl_on_expiration_ = true;
- CassandraStore store(OpenDb());
- int64_t now = time(nullptr);
- store.Append("k1", CreateTestRowValue({
- CreateTestColumnSpec(kExpiringColumn, 0, ToMicroSeconds(now - kTtl - 20)), //expired
- CreateTestColumnSpec(kExpiringColumn, 1, ToMicroSeconds(now)), // not expired
- CreateTestColumnSpec(kTombstone, 3, ToMicroSeconds(now))
- }));
- store.Flush();
- store.Append("k1",CreateTestRowValue({
- CreateTestColumnSpec(kExpiringColumn, 0, ToMicroSeconds(now - kTtl - 10)), //expired
- CreateTestColumnSpec(kColumn, 2, ToMicroSeconds(now))
- }));
- store.Flush();
- store.Compact();
- auto ret = store.Get("k1");
- ASSERT_TRUE(std::get<0>(ret));
- RowValue& merged = std::get<1>(ret);
- EXPECT_EQ(merged.columns_.size(), 3);
- VerifyRowValueColumns(merged.columns_, 0, kExpiringColumn, 1, ToMicroSeconds(now));
- VerifyRowValueColumns(merged.columns_, 1, kColumn, 2, ToMicroSeconds(now));
- VerifyRowValueColumns(merged.columns_, 2, kTombstone, 3, ToMicroSeconds(now));
- }
- TEST_F(CassandraFunctionalTest,
- CompactionShouldRemoveRowWhenAllColumnsExpiredIfPurgeTtlIsOn) {
- purge_ttl_on_expiration_ = true;
- CassandraStore store(OpenDb());
- int64_t now = time(nullptr);
- store.Append("k1", CreateTestRowValue({
- CreateTestColumnSpec(kExpiringColumn, 0, ToMicroSeconds(now - kTtl - 20)),
- CreateTestColumnSpec(kExpiringColumn, 1, ToMicroSeconds(now - kTtl - 20)),
- }));
- store.Flush();
- store.Append("k1",CreateTestRowValue({
- CreateTestColumnSpec(kExpiringColumn, 0, ToMicroSeconds(now - kTtl - 10)),
- }));
- store.Flush();
- store.Compact();
- ASSERT_FALSE(std::get<0>(store.Get("k1")));
- }
- TEST_F(CassandraFunctionalTest,
- CompactionShouldRemoveTombstoneExceedingGCGracePeriod) {
- purge_ttl_on_expiration_ = true;
- CassandraStore store(OpenDb());
- int64_t now = time(nullptr);
- store.Append("k1", CreateTestRowValue({
- CreateTestColumnSpec(kTombstone, 0, ToMicroSeconds(now - gc_grace_period_in_seconds_ - 1)),
- CreateTestColumnSpec(kColumn, 1, ToMicroSeconds(now))
- }));
- store.Append("k2", CreateTestRowValue({
- CreateTestColumnSpec(kColumn, 0, ToMicroSeconds(now))
- }));
- store.Flush();
- store.Append("k1",CreateTestRowValue({
- CreateTestColumnSpec(kColumn, 1, ToMicroSeconds(now)),
- }));
- store.Flush();
- store.Compact();
- auto ret = store.Get("k1");
- ASSERT_TRUE(std::get<0>(ret));
- RowValue& gced = std::get<1>(ret);
- EXPECT_EQ(gced.columns_.size(), 1);
- VerifyRowValueColumns(gced.columns_, 0, kColumn, 1, ToMicroSeconds(now));
- }
- TEST_F(CassandraFunctionalTest, CompactionShouldRemoveTombstoneFromPut) {
- purge_ttl_on_expiration_ = true;
- CassandraStore store(OpenDb());
- int64_t now = time(nullptr);
- store.Put("k1", CreateTestRowValue({
- CreateTestColumnSpec(kTombstone, 0, ToMicroSeconds(now - gc_grace_period_in_seconds_ - 1)),
- }));
- store.Flush();
- store.Compact();
- ASSERT_FALSE(std::get<0>(store.Get("k1")));
- }
- } // namespace cassandra
- } // namespace ROCKSDB_NAMESPACE
- int main(int argc, char** argv) {
- ::testing::InitGoogleTest(&argc, argv);
- return RUN_ALL_TESTS();
- }
|