cassandra_functional_test.cc 15 KB


  1. // Copyright (c) 2017-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #include <iostream>
  6. #include "db/db_impl/db_impl.h"
  7. #include "rocksdb/convenience.h"
  8. #include "rocksdb/db.h"
  9. #include "rocksdb/merge_operator.h"
  10. #include "rocksdb/utilities/object_registry.h"
  11. #include "test_util/testharness.h"
  12. #include "util/cast_util.h"
  13. #include "util/random.h"
  14. #include "utilities/cassandra/cassandra_compaction_filter.h"
  15. #include "utilities/cassandra/merge_operator.h"
  16. #include "utilities/cassandra/test_utils.h"
  17. #include "utilities/merge_operators.h"
  18. namespace ROCKSDB_NAMESPACE::cassandra {
  19. // Path to the database on file system
  20. const std::string kDbName = test::PerThreadDBPath("cassandra_functional_test");
  21. class CassandraStore {
  22. public:
  23. explicit CassandraStore(std::shared_ptr<DB> db)
  24. : db_(db), write_option_(), get_option_() {
  25. assert(db);
  26. }
  27. bool Append(const std::string& key, const RowValue& val) {
  28. std::string result;
  29. val.Serialize(&result);
  30. Slice valSlice(result.data(), result.size());
  31. auto s = db_->Merge(write_option_, key, valSlice);
  32. if (s.ok()) {
  33. return true;
  34. } else {
  35. std::cerr << "ERROR " << s.ToString() << std::endl;
  36. return false;
  37. }
  38. }
  39. bool Put(const std::string& key, const RowValue& val) {
  40. std::string result;
  41. val.Serialize(&result);
  42. Slice valSlice(result.data(), result.size());
  43. auto s = db_->Put(write_option_, key, valSlice);
  44. if (s.ok()) {
  45. return true;
  46. } else {
  47. std::cerr << "ERROR " << s.ToString() << std::endl;
  48. return false;
  49. }
  50. }
  51. Status Flush() {
  52. Status s = dbfull()->TEST_FlushMemTable();
  53. if (s.ok()) {
  54. s = dbfull()->TEST_WaitForCompact();
  55. }
  56. return s;
  57. }
  58. Status Compact() {
  59. return dbfull()->TEST_CompactRange(0, nullptr, nullptr,
  60. db_->DefaultColumnFamily());
  61. }
  62. std::tuple<bool, RowValue> Get(const std::string& key) {
  63. std::string result;
  64. auto s = db_->Get(get_option_, key, &result);
  65. if (s.ok()) {
  66. return std::make_tuple(
  67. true, RowValue::Deserialize(result.data(), result.size()));
  68. }
  69. if (!s.IsNotFound()) {
  70. std::cerr << "ERROR " << s.ToString() << std::endl;
  71. }
  72. return std::make_tuple(false, RowValue(0, 0));
  73. }
  74. private:
  75. std::shared_ptr<DB> db_;
  76. WriteOptions write_option_;
  77. ReadOptions get_option_;
  78. DBImpl* dbfull() { return static_cast_with_check<DBImpl>(db_.get()); }
  79. };
  80. class TestCompactionFilterFactory : public CompactionFilterFactory {
  81. public:
  82. explicit TestCompactionFilterFactory(bool purge_ttl_on_expiration,
  83. int32_t gc_grace_period_in_seconds)
  84. : purge_ttl_on_expiration_(purge_ttl_on_expiration),
  85. gc_grace_period_in_seconds_(gc_grace_period_in_seconds) {}
  86. std::unique_ptr<CompactionFilter> CreateCompactionFilter(
  87. const CompactionFilter::Context& /*context*/) override {
  88. return std::unique_ptr<CompactionFilter>(new CassandraCompactionFilter(
  89. purge_ttl_on_expiration_, gc_grace_period_in_seconds_));
  90. }
  91. const char* Name() const override { return "TestCompactionFilterFactory"; }
  92. private:
  93. bool purge_ttl_on_expiration_;
  94. int32_t gc_grace_period_in_seconds_;
  95. };
  96. // The class for unit-testing
  97. class CassandraFunctionalTest : public testing::Test {
  98. public:
  99. CassandraFunctionalTest() {
  100. EXPECT_OK(
  101. DestroyDB(kDbName, Options())); // Start each test with a fresh DB
  102. }
  103. std::shared_ptr<DB> OpenDb() {
  104. DB* db;
  105. Options options;
  106. options.create_if_missing = true;
  107. options.merge_operator.reset(
  108. new CassandraValueMergeOperator(gc_grace_period_in_seconds_));
  109. auto* cf_factory = new TestCompactionFilterFactory(
  110. purge_ttl_on_expiration_, gc_grace_period_in_seconds_);
  111. options.compaction_filter_factory.reset(cf_factory);
  112. EXPECT_OK(DB::Open(options, kDbName, &db));
  113. return std::shared_ptr<DB>(db);
  114. }
  115. bool purge_ttl_on_expiration_ = false;
  116. int32_t gc_grace_period_in_seconds_ = 100;
  117. };
  118. // THE TEST CASES BEGIN HERE
  119. TEST_F(CassandraFunctionalTest, SimpleMergeTest) {
  120. CassandraStore store(OpenDb());
  121. int64_t now = time(nullptr);
  122. store.Append(
  123. "k1",
  124. CreateTestRowValue({
  125. CreateTestColumnSpec(kTombstone, 0, ToMicroSeconds(now + 5)),
  126. CreateTestColumnSpec(kColumn, 1, ToMicroSeconds(now + 8)),
  127. CreateTestColumnSpec(kExpiringColumn, 2, ToMicroSeconds(now + 5)),
  128. }));
  129. store.Append(
  130. "k1",
  131. CreateTestRowValue({
  132. CreateTestColumnSpec(kColumn, 0, ToMicroSeconds(now + 2)),
  133. CreateTestColumnSpec(kExpiringColumn, 1, ToMicroSeconds(now + 5)),
  134. CreateTestColumnSpec(kTombstone, 2, ToMicroSeconds(now + 7)),
  135. CreateTestColumnSpec(kExpiringColumn, 7, ToMicroSeconds(now + 17)),
  136. }));
  137. store.Append(
  138. "k1",
  139. CreateTestRowValue({
  140. CreateTestColumnSpec(kExpiringColumn, 0, ToMicroSeconds(now + 6)),
  141. CreateTestColumnSpec(kTombstone, 1, ToMicroSeconds(now + 5)),
  142. CreateTestColumnSpec(kColumn, 2, ToMicroSeconds(now + 4)),
  143. CreateTestColumnSpec(kTombstone, 11, ToMicroSeconds(now + 11)),
  144. }));
  145. auto ret = store.Get("k1");
  146. ASSERT_TRUE(std::get<0>(ret));
  147. RowValue& merged = std::get<1>(ret);
  148. EXPECT_EQ(merged.get_columns().size(), 5);
  149. VerifyRowValueColumns(merged.get_columns(), 0, kExpiringColumn, 0,
  150. ToMicroSeconds(now + 6));
  151. VerifyRowValueColumns(merged.get_columns(), 1, kColumn, 1,
  152. ToMicroSeconds(now + 8));
  153. VerifyRowValueColumns(merged.get_columns(), 2, kTombstone, 2,
  154. ToMicroSeconds(now + 7));
  155. VerifyRowValueColumns(merged.get_columns(), 3, kExpiringColumn, 7,
  156. ToMicroSeconds(now + 17));
  157. VerifyRowValueColumns(merged.get_columns(), 4, kTombstone, 11,
  158. ToMicroSeconds(now + 11));
  159. }
  160. constexpr int64_t kTestTimeoutSecs = 600;
  161. TEST_F(CassandraFunctionalTest,
  162. CompactionShouldConvertExpiredColumnsToTombstone) {
  163. CassandraStore store(OpenDb());
  164. int64_t now = time(nullptr);
  165. store.Append(
  166. "k1",
  167. CreateTestRowValue(
  168. {CreateTestColumnSpec(kExpiringColumn, 0,
  169. ToMicroSeconds(now - kTtl - 20)), // expired
  170. CreateTestColumnSpec(
  171. kExpiringColumn, 1,
  172. ToMicroSeconds(now - kTtl + kTestTimeoutSecs)), // not expired
  173. CreateTestColumnSpec(kTombstone, 3, ToMicroSeconds(now))}));
  174. ASSERT_OK(store.Flush());
  175. store.Append(
  176. "k1",
  177. CreateTestRowValue(
  178. {CreateTestColumnSpec(kExpiringColumn, 0,
  179. ToMicroSeconds(now - kTtl - 10)), // expired
  180. CreateTestColumnSpec(kColumn, 2, ToMicroSeconds(now))}));
  181. ASSERT_OK(store.Flush());
  182. ASSERT_OK(store.Compact());
  183. auto ret = store.Get("k1");
  184. ASSERT_TRUE(std::get<0>(ret));
  185. RowValue& merged = std::get<1>(ret);
  186. EXPECT_EQ(merged.get_columns().size(), 4);
  187. VerifyRowValueColumns(merged.get_columns(), 0, kTombstone, 0,
  188. ToMicroSeconds(now - 10));
  189. VerifyRowValueColumns(merged.get_columns(), 1, kExpiringColumn, 1,
  190. ToMicroSeconds(now - kTtl + kTestTimeoutSecs));
  191. VerifyRowValueColumns(merged.get_columns(), 2, kColumn, 2,
  192. ToMicroSeconds(now));
  193. VerifyRowValueColumns(merged.get_columns(), 3, kTombstone, 3,
  194. ToMicroSeconds(now));
  195. }
  196. TEST_F(CassandraFunctionalTest,
  197. CompactionShouldPurgeExpiredColumnsIfPurgeTtlIsOn) {
  198. purge_ttl_on_expiration_ = true;
  199. CassandraStore store(OpenDb());
  200. int64_t now = time(nullptr);
  201. store.Append(
  202. "k1",
  203. CreateTestRowValue(
  204. {CreateTestColumnSpec(kExpiringColumn, 0,
  205. ToMicroSeconds(now - kTtl - 20)), // expired
  206. CreateTestColumnSpec(kExpiringColumn, 1,
  207. ToMicroSeconds(now)), // not expired
  208. CreateTestColumnSpec(kTombstone, 3, ToMicroSeconds(now))}));
  209. ASSERT_OK(store.Flush());
  210. store.Append(
  211. "k1",
  212. CreateTestRowValue(
  213. {CreateTestColumnSpec(kExpiringColumn, 0,
  214. ToMicroSeconds(now - kTtl - 10)), // expired
  215. CreateTestColumnSpec(kColumn, 2, ToMicroSeconds(now))}));
  216. ASSERT_OK(store.Flush());
  217. ASSERT_OK(store.Compact());
  218. auto ret = store.Get("k1");
  219. ASSERT_TRUE(std::get<0>(ret));
  220. RowValue& merged = std::get<1>(ret);
  221. EXPECT_EQ(merged.get_columns().size(), 3);
  222. VerifyRowValueColumns(merged.get_columns(), 0, kExpiringColumn, 1,
  223. ToMicroSeconds(now));
  224. VerifyRowValueColumns(merged.get_columns(), 1, kColumn, 2,
  225. ToMicroSeconds(now));
  226. VerifyRowValueColumns(merged.get_columns(), 2, kTombstone, 3,
  227. ToMicroSeconds(now));
  228. }
  229. TEST_F(CassandraFunctionalTest,
  230. CompactionShouldRemoveRowWhenAllColumnsExpiredIfPurgeTtlIsOn) {
  231. purge_ttl_on_expiration_ = true;
  232. CassandraStore store(OpenDb());
  233. int64_t now = time(nullptr);
  234. store.Append("k1", CreateTestRowValue({
  235. CreateTestColumnSpec(kExpiringColumn, 0,
  236. ToMicroSeconds(now - kTtl - 20)),
  237. CreateTestColumnSpec(kExpiringColumn, 1,
  238. ToMicroSeconds(now - kTtl - 20)),
  239. }));
  240. ASSERT_OK(store.Flush());
  241. store.Append("k1", CreateTestRowValue({
  242. CreateTestColumnSpec(kExpiringColumn, 0,
  243. ToMicroSeconds(now - kTtl - 10)),
  244. }));
  245. ASSERT_OK(store.Flush());
  246. ASSERT_OK(store.Compact());
  247. ASSERT_FALSE(std::get<0>(store.Get("k1")));
  248. }
  249. TEST_F(CassandraFunctionalTest,
  250. CompactionShouldRemoveTombstoneExceedingGCGracePeriod) {
  251. purge_ttl_on_expiration_ = true;
  252. CassandraStore store(OpenDb());
  253. int64_t now = time(nullptr);
  254. store.Append("k1",
  255. CreateTestRowValue(
  256. {CreateTestColumnSpec(
  257. kTombstone, 0,
  258. ToMicroSeconds(now - gc_grace_period_in_seconds_ - 1)),
  259. CreateTestColumnSpec(kColumn, 1, ToMicroSeconds(now))}));
  260. store.Append("k2", CreateTestRowValue({CreateTestColumnSpec(
  261. kColumn, 0, ToMicroSeconds(now))}));
  262. ASSERT_OK(store.Flush());
  263. store.Append("k1", CreateTestRowValue({
  264. CreateTestColumnSpec(kColumn, 1, ToMicroSeconds(now)),
  265. }));
  266. ASSERT_OK(store.Flush());
  267. ASSERT_OK(store.Compact());
  268. auto ret = store.Get("k1");
  269. ASSERT_TRUE(std::get<0>(ret));
  270. RowValue& gced = std::get<1>(ret);
  271. EXPECT_EQ(gced.get_columns().size(), 1);
  272. VerifyRowValueColumns(gced.get_columns(), 0, kColumn, 1, ToMicroSeconds(now));
  273. }
  274. TEST_F(CassandraFunctionalTest, CompactionShouldRemoveTombstoneFromPut) {
  275. purge_ttl_on_expiration_ = true;
  276. CassandraStore store(OpenDb());
  277. int64_t now = time(nullptr);
  278. store.Put("k1",
  279. CreateTestRowValue({
  280. CreateTestColumnSpec(
  281. kTombstone, 0,
  282. ToMicroSeconds(now - gc_grace_period_in_seconds_ - 1)),
  283. }));
  284. ASSERT_OK(store.Flush());
  285. ASSERT_OK(store.Compact());
  286. ASSERT_FALSE(std::get<0>(store.Get("k1")));
  287. }
  288. TEST_F(CassandraFunctionalTest, LoadMergeOperator) {
  289. ConfigOptions config_options;
  290. std::shared_ptr<MergeOperator> mo;
  291. config_options.ignore_unsupported_options = false;
  292. ASSERT_NOK(MergeOperator::CreateFromString(
  293. config_options, CassandraValueMergeOperator::kClassName(), &mo));
  294. config_options.registry->AddLibrary("cassandra", RegisterCassandraObjects,
  295. "cassandra");
  296. ASSERT_OK(MergeOperator::CreateFromString(
  297. config_options, CassandraValueMergeOperator::kClassName(), &mo));
  298. ASSERT_NE(mo, nullptr);
  299. ASSERT_STREQ(mo->Name(), CassandraValueMergeOperator::kClassName());
  300. mo.reset();
  301. ASSERT_OK(MergeOperator::CreateFromString(
  302. config_options,
  303. std::string("operands_limit=20;gc_grace_period_in_seconds=42;id=") +
  304. CassandraValueMergeOperator::kClassName(),
  305. &mo));
  306. ASSERT_NE(mo, nullptr);
  307. ASSERT_STREQ(mo->Name(), CassandraValueMergeOperator::kClassName());
  308. const auto* opts = mo->GetOptions<CassandraOptions>();
  309. ASSERT_NE(opts, nullptr);
  310. ASSERT_EQ(opts->gc_grace_period_in_seconds, 42);
  311. ASSERT_EQ(opts->operands_limit, 20);
  312. }
  313. TEST_F(CassandraFunctionalTest, LoadCompactionFilter) {
  314. ConfigOptions config_options;
  315. const CompactionFilter* filter = nullptr;
  316. config_options.ignore_unsupported_options = false;
  317. ASSERT_NOK(CompactionFilter::CreateFromString(
  318. config_options, CassandraCompactionFilter::kClassName(), &filter));
  319. config_options.registry->AddLibrary("cassandra", RegisterCassandraObjects,
  320. "cassandra");
  321. ASSERT_OK(CompactionFilter::CreateFromString(
  322. config_options, CassandraCompactionFilter::kClassName(), &filter));
  323. ASSERT_NE(filter, nullptr);
  324. ASSERT_STREQ(filter->Name(), CassandraCompactionFilter::kClassName());
  325. delete filter;
  326. filter = nullptr;
  327. ASSERT_OK(CompactionFilter::CreateFromString(
  328. config_options,
  329. std::string(
  330. "purge_ttl_on_expiration=true;gc_grace_period_in_seconds=42;id=") +
  331. CassandraCompactionFilter::kClassName(),
  332. &filter));
  333. ASSERT_NE(filter, nullptr);
  334. ASSERT_STREQ(filter->Name(), CassandraCompactionFilter::kClassName());
  335. const auto* opts = filter->GetOptions<CassandraOptions>();
  336. ASSERT_NE(opts, nullptr);
  337. ASSERT_EQ(opts->gc_grace_period_in_seconds, 42);
  338. ASSERT_TRUE(opts->purge_ttl_on_expiration);
  339. delete filter;
  340. }
  341. TEST_F(CassandraFunctionalTest, LoadCompactionFilterFactory) {
  342. ConfigOptions config_options;
  343. std::shared_ptr<CompactionFilterFactory> factory;
  344. config_options.ignore_unsupported_options = false;
  345. ASSERT_NOK(CompactionFilterFactory::CreateFromString(
  346. config_options, CassandraCompactionFilterFactory::kClassName(),
  347. &factory));
  348. config_options.registry->AddLibrary("cassandra", RegisterCassandraObjects,
  349. "cassandra");
  350. ASSERT_OK(CompactionFilterFactory::CreateFromString(
  351. config_options, CassandraCompactionFilterFactory::kClassName(),
  352. &factory));
  353. ASSERT_NE(factory, nullptr);
  354. ASSERT_STREQ(factory->Name(), CassandraCompactionFilterFactory::kClassName());
  355. factory.reset();
  356. ASSERT_OK(CompactionFilterFactory::CreateFromString(
  357. config_options,
  358. std::string(
  359. "purge_ttl_on_expiration=true;gc_grace_period_in_seconds=42;id=") +
  360. CassandraCompactionFilterFactory::kClassName(),
  361. &factory));
  362. ASSERT_NE(factory, nullptr);
  363. ASSERT_STREQ(factory->Name(), CassandraCompactionFilterFactory::kClassName());
  364. const auto* opts = factory->GetOptions<CassandraOptions>();
  365. ASSERT_NE(opts, nullptr);
  366. ASSERT_EQ(opts->gc_grace_period_in_seconds, 42);
  367. ASSERT_TRUE(opts->purge_ttl_on_expiration);
  368. }
  369. } // namespace ROCKSDB_NAMESPACE::cassandra
  370. int main(int argc, char** argv) {
  371. ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
  372. ::testing::InitGoogleTest(&argc, argv);
  373. return RUN_ALL_TESTS();
  374. }