db_repl_stress.cc 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #ifndef ROCKSDB_LITE
  6. #ifndef GFLAGS
  7. #include <cstdio>
  8. int main() {
  9. fprintf(stderr, "Please install gflags to run rocksdb tools\n");
  10. return 1;
  11. }
  12. #else
  13. #include <atomic>
  14. #include <cstdio>
  15. #include "db/write_batch_internal.h"
  16. #include "rocksdb/db.h"
  17. #include "rocksdb/types.h"
  18. #include "test_util/testutil.h"
  19. #include "util/gflags_compat.h"
  20. // Run a thread to perform Put's.
  21. // Another thread uses GetUpdatesSince API to keep getting the updates.
  22. // options :
  23. // --num_inserts = the num of inserts the first thread should perform.
  24. // --wal_ttl = the wal ttl for the run.
  25. using namespace ROCKSDB_NAMESPACE;
  26. using GFLAGS_NAMESPACE::ParseCommandLineFlags;
  27. using GFLAGS_NAMESPACE::SetUsageMessage;
  28. struct DataPumpThread {
  29. size_t no_records;
  30. DB* db; // Assumption DB is Open'ed already.
  31. };
  32. static std::string RandomString(Random* rnd, int len) {
  33. std::string r;
  34. test::RandomString(rnd, len, &r);
  35. return r;
  36. }
  37. static void DataPumpThreadBody(void* arg) {
  38. DataPumpThread* t = reinterpret_cast<DataPumpThread*>(arg);
  39. DB* db = t->db;
  40. Random rnd(301);
  41. size_t i = 0;
  42. while (i++ < t->no_records) {
  43. if (!db->Put(WriteOptions(), Slice(RandomString(&rnd, 500)),
  44. Slice(RandomString(&rnd, 500)))
  45. .ok()) {
  46. fprintf(stderr, "Error in put\n");
  47. exit(1);
  48. }
  49. }
  50. }
  51. struct ReplicationThread {
  52. std::atomic<bool> stop;
  53. DB* db;
  54. volatile size_t no_read;
  55. };
  56. static void ReplicationThreadBody(void* arg) {
  57. ReplicationThread* t = reinterpret_cast<ReplicationThread*>(arg);
  58. DB* db = t->db;
  59. std::unique_ptr<TransactionLogIterator> iter;
  60. SequenceNumber currentSeqNum = 1;
  61. while (!t->stop.load(std::memory_order_acquire)) {
  62. iter.reset();
  63. Status s;
  64. while (!db->GetUpdatesSince(currentSeqNum, &iter).ok()) {
  65. if (t->stop.load(std::memory_order_acquire)) {
  66. return;
  67. }
  68. }
  69. fprintf(stderr, "Refreshing iterator\n");
  70. for (; iter->Valid(); iter->Next(), t->no_read++, currentSeqNum++) {
  71. BatchResult res = iter->GetBatch();
  72. if (res.sequence != currentSeqNum) {
  73. fprintf(stderr, "Missed a seq no. b/w %ld and %ld\n",
  74. (long)currentSeqNum, (long)res.sequence);
  75. exit(1);
  76. }
  77. }
  78. }
  79. }
  80. DEFINE_uint64(num_inserts, 1000,
  81. "the num of inserts the first thread should"
  82. " perform.");
  83. DEFINE_uint64(wal_ttl_seconds, 1000, "the wal ttl for the run(in seconds)");
  84. DEFINE_uint64(wal_size_limit_MB, 10,
  85. "the wal size limit for the run"
  86. "(in MB)");
  87. int main(int argc, const char** argv) {
  88. SetUsageMessage(
  89. std::string("\nUSAGE:\n") + std::string(argv[0]) +
  90. " --num_inserts=<num_inserts> --wal_ttl_seconds=<WAL_ttl_seconds>" +
  91. " --wal_size_limit_MB=<WAL_size_limit_MB>");
  92. ParseCommandLineFlags(&argc, const_cast<char***>(&argv), true);
  93. Env* env = Env::Default();
  94. std::string default_db_path;
  95. env->GetTestDirectory(&default_db_path);
  96. default_db_path += "db_repl_stress";
  97. Options options;
  98. options.create_if_missing = true;
  99. options.WAL_ttl_seconds = FLAGS_wal_ttl_seconds;
  100. options.WAL_size_limit_MB = FLAGS_wal_size_limit_MB;
  101. DB* db;
  102. DestroyDB(default_db_path, options);
  103. Status s = DB::Open(options, default_db_path, &db);
  104. if (!s.ok()) {
  105. fprintf(stderr, "Could not open DB due to %s\n", s.ToString().c_str());
  106. exit(1);
  107. }
  108. DataPumpThread dataPump;
  109. dataPump.no_records = FLAGS_num_inserts;
  110. dataPump.db = db;
  111. env->StartThread(DataPumpThreadBody, &dataPump);
  112. ReplicationThread replThread;
  113. replThread.db = db;
  114. replThread.no_read = 0;
  115. replThread.stop.store(false, std::memory_order_release);
  116. env->StartThread(ReplicationThreadBody, &replThread);
  117. while (replThread.no_read < FLAGS_num_inserts)
  118. ;
  119. replThread.stop.store(true, std::memory_order_release);
  120. if (replThread.no_read < dataPump.no_records) {
  121. // no. read should be => than inserted.
  122. fprintf(stderr,
  123. "No. of Record's written and read not same\nRead : %" ROCKSDB_PRIszt
  124. " Written : %" ROCKSDB_PRIszt "\n",
  125. replThread.no_read, dataPump.no_records);
  126. exit(1);
  127. }
  128. fprintf(stderr, "Successful!\n");
  129. exit(0);
  130. }
  131. #endif // GFLAGS
  132. #else // ROCKSDB_LITE
  133. #include <stdio.h>
  134. int main(int /*argc*/, char** /*argv*/) {
  135. fprintf(stderr, "Not supported in lite mode.\n");
  136. return 1;
  137. }
  138. #endif // ROCKSDB_LITE