| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159 | //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.//  This source code is licensed under both the GPLv2 (found in the//  COPYING file in the root directory) and Apache 2.0 License//  (found in the LICENSE.Apache file in the root directory).#ifndef ROCKSDB_LITE#ifndef GFLAGS#include <cstdio>int main() {  fprintf(stderr, "Please install gflags to run rocksdb tools\n");  return 1;}#else#include <atomic>#include <cstdio>#include "db/write_batch_internal.h"#include "rocksdb/db.h"#include "rocksdb/types.h"#include "test_util/testutil.h"#include "util/gflags_compat.h"// Run a thread to perform Put's.// Another thread uses GetUpdatesSince API to keep getting the updates.// options :// --num_inserts = the num of inserts the first thread should perform.// --wal_ttl = the wal ttl for the run.using namespace ROCKSDB_NAMESPACE;using GFLAGS_NAMESPACE::ParseCommandLineFlags;using GFLAGS_NAMESPACE::SetUsageMessage;struct DataPumpThread {  size_t no_records;  DB* db;  // Assumption DB is Open'ed already.};static std::string RandomString(Random* rnd, int len) {  std::string r;  test::RandomString(rnd, len, &r);  return r;}static void DataPumpThreadBody(void* arg) {  DataPumpThread* t = reinterpret_cast<DataPumpThread*>(arg);  DB* db = t->db;  Random rnd(301);  size_t i = 0;  while (i++ < t->no_records) {    if (!db->Put(WriteOptions(), Slice(RandomString(&rnd, 500)),                 Slice(RandomString(&rnd, 500)))             .ok()) {      fprintf(stderr, "Error in put\n");      exit(1);    }  }}struct ReplicationThread {  std::atomic<bool> stop;  DB* db;  volatile size_t no_read;};static void ReplicationThreadBody(void* arg) {  ReplicationThread* t = reinterpret_cast<ReplicationThread*>(arg);  DB* db = t->db;  std::unique_ptr<TransactionLogIterator> iter;  SequenceNumber currentSeqNum = 1;  while (!t->stop.load(std::memory_order_acquire)) {    iter.reset();    Status s;    while (!db->GetUpdatesSince(currentSeqNum, &iter).ok()) {      if (t->stop.load(std::memory_order_acquire)) {        return;      }    }    fprintf(stderr, "Refreshing iterator\n");    for (; iter->Valid(); iter->Next(), t->no_read++, currentSeqNum++) {      BatchResult res = iter->GetBatch();      if (res.sequence != currentSeqNum) {        fprintf(stderr, "Missed a seq no. b/w %ld and %ld\n",                (long)currentSeqNum, (long)res.sequence);        exit(1);      }    }  }}DEFINE_uint64(num_inserts, 1000,              "the num of inserts the first thread should"              " perform.");DEFINE_uint64(wal_ttl_seconds, 1000, "the wal ttl for the run(in seconds)");DEFINE_uint64(wal_size_limit_MB, 10,              "the wal size limit for the run"              "(in MB)");int main(int argc, const char** argv) {  SetUsageMessage(      std::string("\nUSAGE:\n") + std::string(argv[0]) +      " --num_inserts=<num_inserts> --wal_ttl_seconds=<WAL_ttl_seconds>" +      " --wal_size_limit_MB=<WAL_size_limit_MB>");  ParseCommandLineFlags(&argc, const_cast<char***>(&argv), true);  Env* env = Env::Default();  std::string default_db_path;  env->GetTestDirectory(&default_db_path);  default_db_path += "db_repl_stress";  Options options;  options.create_if_missing = true;  options.WAL_ttl_seconds = FLAGS_wal_ttl_seconds;  options.WAL_size_limit_MB = FLAGS_wal_size_limit_MB;  DB* db;  DestroyDB(default_db_path, options);  Status s = DB::Open(options, default_db_path, &db);  if (!s.ok()) {    fprintf(stderr, "Could not open DB due to %s\n", s.ToString().c_str());    exit(1);  }  DataPumpThread dataPump;  dataPump.no_records = FLAGS_num_inserts;  dataPump.db = db;  env->StartThread(DataPumpThreadBody, &dataPump);  ReplicationThread replThread;  replThread.db = db;  replThread.no_read = 0;  replThread.stop.store(false, std::memory_order_release);  env->StartThread(ReplicationThreadBody, &replThread);  while (replThread.no_read < FLAGS_num_inserts)    ;  replThread.stop.store(true, std::memory_order_release);  if (replThread.no_read < dataPump.no_records) {    // no. read should be => than inserted.    fprintf(stderr,            "No. of Record's written and read not same\nRead : %" ROCKSDB_PRIszt            " Written : %" ROCKSDB_PRIszt "\n",            replThread.no_read, dataPump.no_records);    exit(1);  }  fprintf(stderr, "Successful!\n");  exit(0);}#endif  // GFLAGS#else  // ROCKSDB_LITE#include <stdio.h>int main(int /*argc*/, char** /*argv*/) {  fprintf(stderr, "Not supported in lite mode.\n");  return 1;}#endif  // ROCKSDB_LITE
 |