db_repl_stress.cc 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #ifndef GFLAGS
  6. #include <cstdio>
  7. int main() {
  8. fprintf(stderr, "Please install gflags to run rocksdb tools\n");
  9. return 1;
  10. }
  11. #else
  12. #include <atomic>
  13. #include <cstdio>
  14. #include "db/write_batch_internal.h"
  15. #include "rocksdb/db.h"
  16. #include "rocksdb/types.h"
  17. #include "test_util/testutil.h"
  18. #include "util/gflags_compat.h"
  19. // Run a thread to perform Put's.
  20. // Another thread uses GetUpdatesSince API to keep getting the updates.
  21. // options :
  22. // --num_inserts = the num of inserts the first thread should perform.
  23. // --wal_ttl = the wal ttl for the run.
  24. DEFINE_uint64(num_inserts, 1000,
  25. "the num of inserts the first thread should"
  26. " perform.");
  27. DEFINE_uint64(wal_ttl_seconds, 1000, "the wal ttl for the run(in seconds)");
  28. DEFINE_uint64(wal_size_limit_MB, 10,
  29. "the wal size limit for the run"
  30. "(in MB)");
  31. using ROCKSDB_NAMESPACE::BatchResult;
  32. using ROCKSDB_NAMESPACE::DB;
  33. using ROCKSDB_NAMESPACE::DestroyDB;
  34. using ROCKSDB_NAMESPACE::Env;
  35. using ROCKSDB_NAMESPACE::Options;
  36. using ROCKSDB_NAMESPACE::Random;
  37. using ROCKSDB_NAMESPACE::SequenceNumber;
  38. using ROCKSDB_NAMESPACE::Slice;
  39. using ROCKSDB_NAMESPACE::Status;
  40. using ROCKSDB_NAMESPACE::TransactionLogIterator;
  41. using ROCKSDB_NAMESPACE::WriteOptions;
  42. using GFLAGS_NAMESPACE::ParseCommandLineFlags;
  43. using GFLAGS_NAMESPACE::SetUsageMessage;
  44. struct DataPumpThread {
  45. DB* db; // Assumption DB is Open'ed already.
  46. };
  47. static void DataPumpThreadBody(void* arg) {
  48. DataPumpThread* t = static_cast<DataPumpThread*>(arg);
  49. DB* db = t->db;
  50. Random rnd(301);
  51. uint64_t i = 0;
  52. while (i++ < FLAGS_num_inserts) {
  53. if (!db->Put(WriteOptions(), Slice(rnd.RandomString(500)),
  54. Slice(rnd.RandomString(500)))
  55. .ok()) {
  56. fprintf(stderr, "Error in put\n");
  57. exit(1);
  58. }
  59. }
  60. }
  61. int main(int argc, const char** argv) {
  62. SetUsageMessage(
  63. std::string("\nUSAGE:\n") + std::string(argv[0]) +
  64. " --num_inserts=<num_inserts> --wal_ttl_seconds=<WAL_ttl_seconds>" +
  65. " --wal_size_limit_MB=<WAL_size_limit_MB>");
  66. ParseCommandLineFlags(&argc, const_cast<char***>(&argv), true);
  67. Env* env = Env::Default();
  68. std::string default_db_path;
  69. env->GetTestDirectory(&default_db_path);
  70. default_db_path += "db_repl_stress";
  71. Options options;
  72. options.create_if_missing = true;
  73. options.WAL_ttl_seconds = FLAGS_wal_ttl_seconds;
  74. options.WAL_size_limit_MB = FLAGS_wal_size_limit_MB;
  75. DB* db;
  76. DestroyDB(default_db_path, options);
  77. Status s = DB::Open(options, default_db_path, &db);
  78. if (!s.ok()) {
  79. fprintf(stderr, "Could not open DB due to %s\n", s.ToString().c_str());
  80. exit(1);
  81. }
  82. DataPumpThread dataPump;
  83. dataPump.db = db;
  84. env->StartThread(DataPumpThreadBody, &dataPump);
  85. std::unique_ptr<TransactionLogIterator> iter;
  86. SequenceNumber currentSeqNum = 1;
  87. uint64_t num_read = 0;
  88. for (;;) {
  89. iter.reset();
  90. // Continue to probe a bit more after all received
  91. size_t probes = 0;
  92. while (!db->GetUpdatesSince(currentSeqNum, &iter).ok()) {
  93. probes++;
  94. if (probes > 100 && num_read >= FLAGS_num_inserts) {
  95. if (num_read > FLAGS_num_inserts) {
  96. fprintf(stderr, "Too many updates read: %ld expected: %ld\n",
  97. (long)num_read, (long)FLAGS_num_inserts);
  98. exit(1);
  99. }
  100. fprintf(stderr, "Successful!\n");
  101. return 0;
  102. }
  103. }
  104. fprintf(stderr, "Refreshing iterator\n");
  105. for (; iter->Valid(); iter->Next(), num_read++, currentSeqNum++) {
  106. BatchResult res = iter->GetBatch();
  107. if (res.sequence != currentSeqNum) {
  108. fprintf(stderr, "Missed a seq no. b/w %ld and %ld\n",
  109. (long)currentSeqNum, (long)res.sequence);
  110. exit(1);
  111. }
  112. }
  113. }
  114. }
  115. #endif // GFLAGS