| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159 |
- // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
- // This source code is licensed under both the GPLv2 (found in the
- // COPYING file in the root directory) and Apache 2.0 License
- // (found in the LICENSE.Apache file in the root directory).
- #ifndef ROCKSDB_LITE
- #ifndef GFLAGS
- #include <cstdio>
- int main() {
- fprintf(stderr, "Please install gflags to run rocksdb tools\n");
- return 1;
- }
- #else
- #include <atomic>
- #include <cstdio>
- #include "db/write_batch_internal.h"
- #include "rocksdb/db.h"
- #include "rocksdb/types.h"
- #include "test_util/testutil.h"
- #include "util/gflags_compat.h"
- // Run a thread to perform Put's.
- // Another thread uses GetUpdatesSince API to keep getting the updates.
- // options :
- // --num_inserts = the num of inserts the first thread should perform.
- // --wal_ttl = the wal ttl for the run.
- using namespace ROCKSDB_NAMESPACE;
- using GFLAGS_NAMESPACE::ParseCommandLineFlags;
- using GFLAGS_NAMESPACE::SetUsageMessage;
- struct DataPumpThread {
- size_t no_records;
- DB* db; // Assumption DB is Open'ed already.
- };
- static std::string RandomString(Random* rnd, int len) {
- std::string r;
- test::RandomString(rnd, len, &r);
- return r;
- }
- static void DataPumpThreadBody(void* arg) {
- DataPumpThread* t = reinterpret_cast<DataPumpThread*>(arg);
- DB* db = t->db;
- Random rnd(301);
- size_t i = 0;
- while (i++ < t->no_records) {
- if (!db->Put(WriteOptions(), Slice(RandomString(&rnd, 500)),
- Slice(RandomString(&rnd, 500)))
- .ok()) {
- fprintf(stderr, "Error in put\n");
- exit(1);
- }
- }
- }
- struct ReplicationThread {
- std::atomic<bool> stop;
- DB* db;
- volatile size_t no_read;
- };
- static void ReplicationThreadBody(void* arg) {
- ReplicationThread* t = reinterpret_cast<ReplicationThread*>(arg);
- DB* db = t->db;
- std::unique_ptr<TransactionLogIterator> iter;
- SequenceNumber currentSeqNum = 1;
- while (!t->stop.load(std::memory_order_acquire)) {
- iter.reset();
- Status s;
- while (!db->GetUpdatesSince(currentSeqNum, &iter).ok()) {
- if (t->stop.load(std::memory_order_acquire)) {
- return;
- }
- }
- fprintf(stderr, "Refreshing iterator\n");
- for (; iter->Valid(); iter->Next(), t->no_read++, currentSeqNum++) {
- BatchResult res = iter->GetBatch();
- if (res.sequence != currentSeqNum) {
- fprintf(stderr, "Missed a seq no. b/w %ld and %ld\n",
- (long)currentSeqNum, (long)res.sequence);
- exit(1);
- }
- }
- }
- }
- DEFINE_uint64(num_inserts, 1000,
- "the num of inserts the first thread should"
- " perform.");
- DEFINE_uint64(wal_ttl_seconds, 1000, "the wal ttl for the run(in seconds)");
- DEFINE_uint64(wal_size_limit_MB, 10,
- "the wal size limit for the run"
- "(in MB)");
- int main(int argc, const char** argv) {
- SetUsageMessage(
- std::string("\nUSAGE:\n") + std::string(argv[0]) +
- " --num_inserts=<num_inserts> --wal_ttl_seconds=<WAL_ttl_seconds>" +
- " --wal_size_limit_MB=<WAL_size_limit_MB>");
- ParseCommandLineFlags(&argc, const_cast<char***>(&argv), true);
- Env* env = Env::Default();
- std::string default_db_path;
- env->GetTestDirectory(&default_db_path);
- default_db_path += "db_repl_stress";
- Options options;
- options.create_if_missing = true;
- options.WAL_ttl_seconds = FLAGS_wal_ttl_seconds;
- options.WAL_size_limit_MB = FLAGS_wal_size_limit_MB;
- DB* db;
- DestroyDB(default_db_path, options);
- Status s = DB::Open(options, default_db_path, &db);
- if (!s.ok()) {
- fprintf(stderr, "Could not open DB due to %s\n", s.ToString().c_str());
- exit(1);
- }
- DataPumpThread dataPump;
- dataPump.no_records = FLAGS_num_inserts;
- dataPump.db = db;
- env->StartThread(DataPumpThreadBody, &dataPump);
- ReplicationThread replThread;
- replThread.db = db;
- replThread.no_read = 0;
- replThread.stop.store(false, std::memory_order_release);
- env->StartThread(ReplicationThreadBody, &replThread);
- while (replThread.no_read < FLAGS_num_inserts)
- ;
- replThread.stop.store(true, std::memory_order_release);
- if (replThread.no_read < dataPump.no_records) {
- // no. read should be => than inserted.
- fprintf(stderr,
- "No. of Record's written and read not same\nRead : %" ROCKSDB_PRIszt
- " Written : %" ROCKSDB_PRIszt "\n",
- replThread.no_read, dataPump.no_records);
- exit(1);
- }
- fprintf(stderr, "Successful!\n");
- exit(0);
- }
- #endif // GFLAGS
- #else // ROCKSDB_LITE
- #include <stdio.h>
- int main(int /*argc*/, char** /*argv*/) {
- fprintf(stderr, "Not supported in lite mode.\n");
- return 1;
- }
- #endif // ROCKSDB_LITE
|