cassandra_functional_test.cc 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311
  1. // Copyright (c) 2017-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #include <iostream>
  6. #include "db/db_impl/db_impl.h"
  7. #include "rocksdb/db.h"
  8. #include "rocksdb/merge_operator.h"
  9. #include "rocksdb/utilities/db_ttl.h"
  10. #include "test_util/testharness.h"
  11. #include "util/random.h"
  12. #include "utilities/cassandra/cassandra_compaction_filter.h"
  13. #include "utilities/cassandra/merge_operator.h"
  14. #include "utilities/cassandra/test_utils.h"
  15. #include "utilities/merge_operators.h"
  16. using namespace ROCKSDB_NAMESPACE;
  17. namespace ROCKSDB_NAMESPACE {
  18. namespace cassandra {
  19. // Path to the database on file system
  20. const std::string kDbName = test::PerThreadDBPath("cassandra_functional_test");
  21. class CassandraStore {
  22. public:
  23. explicit CassandraStore(std::shared_ptr<DB> db)
  24. : db_(db), write_option_(), get_option_() {
  25. assert(db);
  26. }
  27. bool Append(const std::string& key, const RowValue& val){
  28. std::string result;
  29. val.Serialize(&result);
  30. Slice valSlice(result.data(), result.size());
  31. auto s = db_->Merge(write_option_, key, valSlice);
  32. if (s.ok()) {
  33. return true;
  34. } else {
  35. std::cerr << "ERROR " << s.ToString() << std::endl;
  36. return false;
  37. }
  38. }
  39. bool Put(const std::string& key, const RowValue& val) {
  40. std::string result;
  41. val.Serialize(&result);
  42. Slice valSlice(result.data(), result.size());
  43. auto s = db_->Put(write_option_, key, valSlice);
  44. if (s.ok()) {
  45. return true;
  46. } else {
  47. std::cerr << "ERROR " << s.ToString() << std::endl;
  48. return false;
  49. }
  50. }
  51. void Flush() {
  52. dbfull()->TEST_FlushMemTable();
  53. dbfull()->TEST_WaitForCompact();
  54. }
  55. void Compact() {
  56. dbfull()->TEST_CompactRange(
  57. 0, nullptr, nullptr, db_->DefaultColumnFamily());
  58. }
  59. std::tuple<bool, RowValue> Get(const std::string& key){
  60. std::string result;
  61. auto s = db_->Get(get_option_, key, &result);
  62. if (s.ok()) {
  63. return std::make_tuple(true,
  64. RowValue::Deserialize(result.data(),
  65. result.size()));
  66. }
  67. if (!s.IsNotFound()) {
  68. std::cerr << "ERROR " << s.ToString() << std::endl;
  69. }
  70. return std::make_tuple(false, RowValue(0, 0));
  71. }
  72. private:
  73. std::shared_ptr<DB> db_;
  74. WriteOptions write_option_;
  75. ReadOptions get_option_;
  76. DBImpl* dbfull() { return reinterpret_cast<DBImpl*>(db_.get()); }
  77. };
  78. class TestCompactionFilterFactory : public CompactionFilterFactory {
  79. public:
  80. explicit TestCompactionFilterFactory(bool purge_ttl_on_expiration,
  81. int32_t gc_grace_period_in_seconds)
  82. : purge_ttl_on_expiration_(purge_ttl_on_expiration),
  83. gc_grace_period_in_seconds_(gc_grace_period_in_seconds) {}
  84. std::unique_ptr<CompactionFilter> CreateCompactionFilter(
  85. const CompactionFilter::Context& /*context*/) override {
  86. return std::unique_ptr<CompactionFilter>(new CassandraCompactionFilter(
  87. purge_ttl_on_expiration_, gc_grace_period_in_seconds_));
  88. }
  89. const char* Name() const override { return "TestCompactionFilterFactory"; }
  90. private:
  91. bool purge_ttl_on_expiration_;
  92. int32_t gc_grace_period_in_seconds_;
  93. };
  94. // The class for unit-testing
  95. class CassandraFunctionalTest : public testing::Test {
  96. public:
  97. CassandraFunctionalTest() {
  98. DestroyDB(kDbName, Options()); // Start each test with a fresh DB
  99. }
  100. std::shared_ptr<DB> OpenDb() {
  101. DB* db;
  102. Options options;
  103. options.create_if_missing = true;
  104. options.merge_operator.reset(new CassandraValueMergeOperator(gc_grace_period_in_seconds_));
  105. auto* cf_factory = new TestCompactionFilterFactory(
  106. purge_ttl_on_expiration_, gc_grace_period_in_seconds_);
  107. options.compaction_filter_factory.reset(cf_factory);
  108. EXPECT_OK(DB::Open(options, kDbName, &db));
  109. return std::shared_ptr<DB>(db);
  110. }
  111. bool purge_ttl_on_expiration_ = false;
  112. int32_t gc_grace_period_in_seconds_ = 100;
  113. };
  114. // THE TEST CASES BEGIN HERE
  115. TEST_F(CassandraFunctionalTest, SimpleMergeTest) {
  116. CassandraStore store(OpenDb());
  117. int64_t now = time(nullptr);
  118. store.Append("k1", CreateTestRowValue({
  119. CreateTestColumnSpec(kTombstone, 0, ToMicroSeconds(now + 5)),
  120. CreateTestColumnSpec(kColumn, 1, ToMicroSeconds(now + 8)),
  121. CreateTestColumnSpec(kExpiringColumn, 2, ToMicroSeconds(now + 5)),
  122. }));
  123. store.Append("k1",CreateTestRowValue({
  124. CreateTestColumnSpec(kColumn, 0, ToMicroSeconds(now + 2)),
  125. CreateTestColumnSpec(kExpiringColumn, 1, ToMicroSeconds(now + 5)),
  126. CreateTestColumnSpec(kTombstone, 2, ToMicroSeconds(now + 7)),
  127. CreateTestColumnSpec(kExpiringColumn, 7, ToMicroSeconds(now + 17)),
  128. }));
  129. store.Append("k1", CreateTestRowValue({
  130. CreateTestColumnSpec(kExpiringColumn, 0, ToMicroSeconds(now + 6)),
  131. CreateTestColumnSpec(kTombstone, 1, ToMicroSeconds(now + 5)),
  132. CreateTestColumnSpec(kColumn, 2, ToMicroSeconds(now + 4)),
  133. CreateTestColumnSpec(kTombstone, 11, ToMicroSeconds(now + 11)),
  134. }));
  135. auto ret = store.Get("k1");
  136. ASSERT_TRUE(std::get<0>(ret));
  137. RowValue& merged = std::get<1>(ret);
  138. EXPECT_EQ(merged.columns_.size(), 5);
  139. VerifyRowValueColumns(merged.columns_, 0, kExpiringColumn, 0, ToMicroSeconds(now + 6));
  140. VerifyRowValueColumns(merged.columns_, 1, kColumn, 1, ToMicroSeconds(now + 8));
  141. VerifyRowValueColumns(merged.columns_, 2, kTombstone, 2, ToMicroSeconds(now + 7));
  142. VerifyRowValueColumns(merged.columns_, 3, kExpiringColumn, 7, ToMicroSeconds(now + 17));
  143. VerifyRowValueColumns(merged.columns_, 4, kTombstone, 11, ToMicroSeconds(now + 11));
  144. }
  145. TEST_F(CassandraFunctionalTest,
  146. CompactionShouldConvertExpiredColumnsToTombstone) {
  147. CassandraStore store(OpenDb());
  148. int64_t now= time(nullptr);
  149. store.Append("k1", CreateTestRowValue({
  150. CreateTestColumnSpec(kExpiringColumn, 0, ToMicroSeconds(now - kTtl - 20)), //expired
  151. CreateTestColumnSpec(kExpiringColumn, 1, ToMicroSeconds(now - kTtl + 10)), // not expired
  152. CreateTestColumnSpec(kTombstone, 3, ToMicroSeconds(now))
  153. }));
  154. store.Flush();
  155. store.Append("k1",CreateTestRowValue({
  156. CreateTestColumnSpec(kExpiringColumn, 0, ToMicroSeconds(now - kTtl - 10)), //expired
  157. CreateTestColumnSpec(kColumn, 2, ToMicroSeconds(now))
  158. }));
  159. store.Flush();
  160. store.Compact();
  161. auto ret = store.Get("k1");
  162. ASSERT_TRUE(std::get<0>(ret));
  163. RowValue& merged = std::get<1>(ret);
  164. EXPECT_EQ(merged.columns_.size(), 4);
  165. VerifyRowValueColumns(merged.columns_, 0, kTombstone, 0, ToMicroSeconds(now - 10));
  166. VerifyRowValueColumns(merged.columns_, 1, kExpiringColumn, 1, ToMicroSeconds(now - kTtl + 10));
  167. VerifyRowValueColumns(merged.columns_, 2, kColumn, 2, ToMicroSeconds(now));
  168. VerifyRowValueColumns(merged.columns_, 3, kTombstone, 3, ToMicroSeconds(now));
  169. }
  170. TEST_F(CassandraFunctionalTest,
  171. CompactionShouldPurgeExpiredColumnsIfPurgeTtlIsOn) {
  172. purge_ttl_on_expiration_ = true;
  173. CassandraStore store(OpenDb());
  174. int64_t now = time(nullptr);
  175. store.Append("k1", CreateTestRowValue({
  176. CreateTestColumnSpec(kExpiringColumn, 0, ToMicroSeconds(now - kTtl - 20)), //expired
  177. CreateTestColumnSpec(kExpiringColumn, 1, ToMicroSeconds(now)), // not expired
  178. CreateTestColumnSpec(kTombstone, 3, ToMicroSeconds(now))
  179. }));
  180. store.Flush();
  181. store.Append("k1",CreateTestRowValue({
  182. CreateTestColumnSpec(kExpiringColumn, 0, ToMicroSeconds(now - kTtl - 10)), //expired
  183. CreateTestColumnSpec(kColumn, 2, ToMicroSeconds(now))
  184. }));
  185. store.Flush();
  186. store.Compact();
  187. auto ret = store.Get("k1");
  188. ASSERT_TRUE(std::get<0>(ret));
  189. RowValue& merged = std::get<1>(ret);
  190. EXPECT_EQ(merged.columns_.size(), 3);
  191. VerifyRowValueColumns(merged.columns_, 0, kExpiringColumn, 1, ToMicroSeconds(now));
  192. VerifyRowValueColumns(merged.columns_, 1, kColumn, 2, ToMicroSeconds(now));
  193. VerifyRowValueColumns(merged.columns_, 2, kTombstone, 3, ToMicroSeconds(now));
  194. }
  195. TEST_F(CassandraFunctionalTest,
  196. CompactionShouldRemoveRowWhenAllColumnsExpiredIfPurgeTtlIsOn) {
  197. purge_ttl_on_expiration_ = true;
  198. CassandraStore store(OpenDb());
  199. int64_t now = time(nullptr);
  200. store.Append("k1", CreateTestRowValue({
  201. CreateTestColumnSpec(kExpiringColumn, 0, ToMicroSeconds(now - kTtl - 20)),
  202. CreateTestColumnSpec(kExpiringColumn, 1, ToMicroSeconds(now - kTtl - 20)),
  203. }));
  204. store.Flush();
  205. store.Append("k1",CreateTestRowValue({
  206. CreateTestColumnSpec(kExpiringColumn, 0, ToMicroSeconds(now - kTtl - 10)),
  207. }));
  208. store.Flush();
  209. store.Compact();
  210. ASSERT_FALSE(std::get<0>(store.Get("k1")));
  211. }
  212. TEST_F(CassandraFunctionalTest,
  213. CompactionShouldRemoveTombstoneExceedingGCGracePeriod) {
  214. purge_ttl_on_expiration_ = true;
  215. CassandraStore store(OpenDb());
  216. int64_t now = time(nullptr);
  217. store.Append("k1", CreateTestRowValue({
  218. CreateTestColumnSpec(kTombstone, 0, ToMicroSeconds(now - gc_grace_period_in_seconds_ - 1)),
  219. CreateTestColumnSpec(kColumn, 1, ToMicroSeconds(now))
  220. }));
  221. store.Append("k2", CreateTestRowValue({
  222. CreateTestColumnSpec(kColumn, 0, ToMicroSeconds(now))
  223. }));
  224. store.Flush();
  225. store.Append("k1",CreateTestRowValue({
  226. CreateTestColumnSpec(kColumn, 1, ToMicroSeconds(now)),
  227. }));
  228. store.Flush();
  229. store.Compact();
  230. auto ret = store.Get("k1");
  231. ASSERT_TRUE(std::get<0>(ret));
  232. RowValue& gced = std::get<1>(ret);
  233. EXPECT_EQ(gced.columns_.size(), 1);
  234. VerifyRowValueColumns(gced.columns_, 0, kColumn, 1, ToMicroSeconds(now));
  235. }
  236. TEST_F(CassandraFunctionalTest, CompactionShouldRemoveTombstoneFromPut) {
  237. purge_ttl_on_expiration_ = true;
  238. CassandraStore store(OpenDb());
  239. int64_t now = time(nullptr);
  240. store.Put("k1", CreateTestRowValue({
  241. CreateTestColumnSpec(kTombstone, 0, ToMicroSeconds(now - gc_grace_period_in_seconds_ - 1)),
  242. }));
  243. store.Flush();
  244. store.Compact();
  245. ASSERT_FALSE(std::get<0>(store.Get("k1")));
  246. }
  247. } // namespace cassandra
  248. } // namespace ROCKSDB_NAMESPACE
  249. int main(int argc, char** argv) {
  250. ::testing::InitGoogleTest(&argc, argv);
  251. return RUN_ALL_TESTS();
  252. }