| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395 |
- // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
- // This source code is licensed under both the GPLv2 (found in the
- // COPYING file in the root directory) and Apache 2.0 License
- // (found in the LICENSE.Apache file in the root directory).
- // How to use this example
- // Open two terminals, in one of them, run `./multi_processes_example 0` to
- // start a process running the primary instance. This will create a new DB in
- // kDBPath. The process will run for a while inserting keys to the normal
- // RocksDB database.
- // Next, go to the other terminal and run `./multi_processes_example 1` to
- // start a process running the secondary instance. This will create a secondary
- // instance following the aforementioned primary instance. This process will
- // run for a while, tailing the logs of the primary. After process with primary
- // instance exits, this process will keep running until you hit 'CTRL+C'.
- #include <chrono>
- #include <cinttypes>
- #include <cstdio>
- #include <cstdlib>
- #include <ctime>
- #include <string>
- #include <thread>
- #include <vector>
- #if defined(OS_LINUX)
- #include <dirent.h>
- #include <signal.h>
- #include <sys/stat.h>
- #include <sys/types.h>
- #include <sys/wait.h>
- #include <unistd.h>
- #endif // !OS_LINUX
- #include "rocksdb/db.h"
- #include "rocksdb/options.h"
- #include "rocksdb/slice.h"
- using ROCKSDB_NAMESPACE::ColumnFamilyDescriptor;
- using ROCKSDB_NAMESPACE::ColumnFamilyHandle;
- using ROCKSDB_NAMESPACE::ColumnFamilyOptions;
- using ROCKSDB_NAMESPACE::DB;
- using ROCKSDB_NAMESPACE::FlushOptions;
- using ROCKSDB_NAMESPACE::Iterator;
- using ROCKSDB_NAMESPACE::Options;
- using ROCKSDB_NAMESPACE::ReadOptions;
- using ROCKSDB_NAMESPACE::Slice;
- using ROCKSDB_NAMESPACE::Status;
- using ROCKSDB_NAMESPACE::WriteOptions;
- const std::string kDBPath = "/tmp/rocksdb_multi_processes_example";
- const std::string kPrimaryStatusFile =
- "/tmp/rocksdb_multi_processes_example_primary_status";
- const uint64_t kMaxKey = 600000;
- const size_t kMaxValueLength = 256;
- const size_t kNumKeysPerFlush = 1000;
- const std::vector<std::string>& GetColumnFamilyNames() {
- static std::vector<std::string> column_family_names = {
- ROCKSDB_NAMESPACE::kDefaultColumnFamilyName, "pikachu"};
- return column_family_names;
- }
- inline bool IsLittleEndian() {
- uint32_t x = 1;
- return *reinterpret_cast<char*>(&x) != 0;
- }
- static std::atomic<int>& ShouldSecondaryWait() {
- static std::atomic<int> should_secondary_wait{1};
- return should_secondary_wait;
- }
- static std::string Key(uint64_t k) {
- std::string ret;
- if (IsLittleEndian()) {
- ret.append(reinterpret_cast<char*>(&k), sizeof(k));
- } else {
- char buf[sizeof(k)];
- buf[0] = k & 0xff;
- buf[1] = (k >> 8) & 0xff;
- buf[2] = (k >> 16) & 0xff;
- buf[3] = (k >> 24) & 0xff;
- buf[4] = (k >> 32) & 0xff;
- buf[5] = (k >> 40) & 0xff;
- buf[6] = (k >> 48) & 0xff;
- buf[7] = (k >> 56) & 0xff;
- ret.append(buf, sizeof(k));
- }
- size_t i = 0, j = ret.size() - 1;
- while (i < j) {
- char tmp = ret[i];
- ret[i] = ret[j];
- ret[j] = tmp;
- ++i;
- --j;
- }
- return ret;
- }
- static uint64_t Key(std::string key) {
- assert(key.size() == sizeof(uint64_t));
- size_t i = 0, j = key.size() - 1;
- while (i < j) {
- char tmp = key[i];
- key[i] = key[j];
- key[j] = tmp;
- ++i;
- --j;
- }
- uint64_t ret = 0;
- if (IsLittleEndian()) {
- memcpy(&ret, key.c_str(), sizeof(uint64_t));
- } else {
- const char* buf = key.c_str();
- ret |= static_cast<uint64_t>(buf[0]);
- ret |= (static_cast<uint64_t>(buf[1]) << 8);
- ret |= (static_cast<uint64_t>(buf[2]) << 16);
- ret |= (static_cast<uint64_t>(buf[3]) << 24);
- ret |= (static_cast<uint64_t>(buf[4]) << 32);
- ret |= (static_cast<uint64_t>(buf[5]) << 40);
- ret |= (static_cast<uint64_t>(buf[6]) << 48);
- ret |= (static_cast<uint64_t>(buf[7]) << 56);
- }
- return ret;
- }
- static Slice GenerateRandomValue(const size_t max_length, char scratch[]) {
- size_t sz = 1 + (std::rand() % max_length);
- int rnd = std::rand();
- for (size_t i = 0; i != sz; ++i) {
- scratch[i] = static_cast<char>(rnd ^ i);
- }
- return Slice(scratch, sz);
- }
- static bool ShouldCloseDB() { return true; }
- // TODO: port this example to other systems. It should be straightforward for
- // POSIX-compliant systems.
- #if defined(OS_LINUX)
- void CreateDB() {
- long my_pid = static_cast<long>(getpid());
- Options options;
- Status s = ROCKSDB_NAMESPACE::DestroyDB(kDBPath, options);
- if (!s.ok()) {
- fprintf(stderr, "[process %ld] Failed to destroy DB: %s\n", my_pid,
- s.ToString().c_str());
- assert(false);
- }
- options.create_if_missing = true;
- DB* db = nullptr;
- s = DB::Open(options, kDBPath, &db);
- if (!s.ok()) {
- fprintf(stderr, "[process %ld] Failed to open DB: %s\n", my_pid,
- s.ToString().c_str());
- assert(false);
- }
- std::vector<ColumnFamilyHandle*> handles;
- ColumnFamilyOptions cf_opts(options);
- for (const auto& cf_name : GetColumnFamilyNames()) {
- if (ROCKSDB_NAMESPACE::kDefaultColumnFamilyName != cf_name) {
- ColumnFamilyHandle* handle = nullptr;
- s = db->CreateColumnFamily(cf_opts, cf_name, &handle);
- if (!s.ok()) {
- fprintf(stderr, "[process %ld] Failed to create CF %s: %s\n", my_pid,
- cf_name.c_str(), s.ToString().c_str());
- assert(false);
- }
- handles.push_back(handle);
- }
- }
- fprintf(stdout, "[process %ld] Column families created\n", my_pid);
- for (auto h : handles) {
- delete h;
- }
- handles.clear();
- delete db;
- }
- void RunPrimary() {
- long my_pid = static_cast<long>(getpid());
- fprintf(stdout, "[process %ld] Primary instance starts\n", my_pid);
- CreateDB();
- std::srand(time(nullptr));
- DB* db = nullptr;
- Options options;
- options.create_if_missing = false;
- std::vector<ColumnFamilyDescriptor> column_families;
- for (const auto& cf_name : GetColumnFamilyNames()) {
- column_families.push_back(ColumnFamilyDescriptor(cf_name, options));
- }
- std::vector<ColumnFamilyHandle*> handles;
- WriteOptions write_opts;
- char val_buf[kMaxValueLength] = {0};
- uint64_t curr_key = 0;
- while (curr_key < kMaxKey) {
- Status s;
- if (nullptr == db) {
- s = DB::Open(options, kDBPath, column_families, &handles, &db);
- if (!s.ok()) {
- fprintf(stderr, "[process %ld] Failed to open DB: %s\n", my_pid,
- s.ToString().c_str());
- assert(false);
- }
- }
- assert(nullptr != db);
- assert(handles.size() == GetColumnFamilyNames().size());
- for (auto h : handles) {
- assert(nullptr != h);
- for (size_t i = 0; i != kNumKeysPerFlush; ++i) {
- Slice key = Key(curr_key + static_cast<uint64_t>(i));
- Slice value = GenerateRandomValue(kMaxValueLength, val_buf);
- s = db->Put(write_opts, h, key, value);
- if (!s.ok()) {
- fprintf(stderr, "[process %ld] Failed to insert\n", my_pid);
- assert(false);
- }
- }
- s = db->Flush(FlushOptions(), h);
- if (!s.ok()) {
- fprintf(stderr, "[process %ld] Failed to flush\n", my_pid);
- assert(false);
- }
- }
- curr_key += static_cast<uint64_t>(kNumKeysPerFlush);
- if (ShouldCloseDB()) {
- for (auto h : handles) {
- delete h;
- }
- handles.clear();
- delete db;
- db = nullptr;
- }
- }
- if (nullptr != db) {
- for (auto h : handles) {
- delete h;
- }
- handles.clear();
- delete db;
- db = nullptr;
- }
- fprintf(stdout, "[process %ld] Finished adding keys\n", my_pid);
- }
- void secondary_instance_sigint_handler(int signal) {
- ShouldSecondaryWait().store(0, std::memory_order_relaxed);
- fprintf(stdout, "\n");
- fflush(stdout);
- };
- void RunSecondary() {
- ::signal(SIGINT, secondary_instance_sigint_handler);
- long my_pid = static_cast<long>(getpid());
- const std::string kSecondaryPath =
- "/tmp/rocksdb_multi_processes_example_secondary";
- // Create directory if necessary
- if (nullptr == opendir(kSecondaryPath.c_str())) {
- int ret =
- mkdir(kSecondaryPath.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
- if (ret < 0) {
- perror("failed to create directory for secondary instance");
- exit(0);
- }
- }
- DB* db = nullptr;
- Options options;
- options.create_if_missing = false;
- options.max_open_files = -1;
- Status s = DB::OpenAsSecondary(options, kDBPath, kSecondaryPath, &db);
- if (!s.ok()) {
- fprintf(stderr, "[process %ld] Failed to open in secondary mode: %s\n",
- my_pid, s.ToString().c_str());
- assert(false);
- } else {
- fprintf(stdout, "[process %ld] Secondary instance starts\n", my_pid);
- }
- ReadOptions ropts;
- ropts.verify_checksums = true;
- ropts.total_order_seek = true;
- std::vector<std::thread> test_threads;
- test_threads.emplace_back([&]() {
- while (1 == ShouldSecondaryWait().load(std::memory_order_relaxed)) {
- std::unique_ptr<Iterator> iter(db->NewIterator(ropts));
- iter->SeekToFirst();
- size_t count = 0;
- for (; iter->Valid(); iter->Next()) {
- ++count;
- }
- }
- fprintf(stdout, "[process %ld] Range_scan thread finished\n", my_pid);
- });
- test_threads.emplace_back([&]() {
- std::srand(time(nullptr));
- while (1 == ShouldSecondaryWait().load(std::memory_order_relaxed)) {
- Slice key = Key(std::rand() % kMaxKey);
- std::string value;
- db->Get(ropts, key, &value);
- }
- fprintf(stdout, "[process %ld] Point lookup thread finished\n");
- });
- uint64_t curr_key = 0;
- while (1 == ShouldSecondaryWait().load(std::memory_order_relaxed)) {
- s = db->TryCatchUpWithPrimary();
- if (!s.ok()) {
- fprintf(stderr,
- "[process %ld] error while trying to catch up with "
- "primary %s\n",
- my_pid, s.ToString().c_str());
- assert(false);
- }
- {
- std::unique_ptr<Iterator> iter(db->NewIterator(ropts));
- if (!iter) {
- fprintf(stderr, "[process %ld] Failed to create iterator\n", my_pid);
- assert(false);
- }
- iter->SeekToLast();
- if (iter->Valid()) {
- uint64_t curr_max_key = Key(iter->key().ToString());
- if (curr_max_key != curr_key) {
- fprintf(stdout, "[process %ld] Observed key %" PRIu64 "\n", my_pid,
- curr_key);
- curr_key = curr_max_key;
- }
- }
- }
- std::this_thread::sleep_for(std::chrono::seconds(1));
- }
- s = db->TryCatchUpWithPrimary();
- if (!s.ok()) {
- fprintf(stderr,
- "[process %ld] error while trying to catch up with "
- "primary %s\n",
- my_pid, s.ToString().c_str());
- assert(false);
- }
- std::vector<ColumnFamilyDescriptor> column_families;
- for (const auto& cf_name : GetColumnFamilyNames()) {
- column_families.push_back(ColumnFamilyDescriptor(cf_name, options));
- }
- std::vector<ColumnFamilyHandle*> handles;
- DB* verification_db = nullptr;
- s = DB::OpenForReadOnly(options, kDBPath, column_families, &handles,
- &verification_db);
- assert(s.ok());
- Iterator* iter1 = verification_db->NewIterator(ropts);
- iter1->SeekToFirst();
- Iterator* iter = db->NewIterator(ropts);
- iter->SeekToFirst();
- for (; iter->Valid() && iter1->Valid(); iter->Next(), iter1->Next()) {
- if (iter->key().ToString() != iter1->key().ToString()) {
- fprintf(stderr, "%" PRIu64 "!= %" PRIu64 "\n",
- Key(iter->key().ToString()), Key(iter1->key().ToString()));
- assert(false);
- } else if (iter->value().ToString() != iter1->value().ToString()) {
- fprintf(stderr, "Value mismatch\n");
- assert(false);
- }
- }
- fprintf(stdout, "[process %ld] Verification succeeded\n", my_pid);
- for (auto& thr : test_threads) {
- thr.join();
- }
- delete iter;
- delete iter1;
- delete db;
- delete verification_db;
- }
- int main(int argc, char** argv) {
- if (argc < 2) {
- fprintf(stderr, "%s <0 for primary, 1 for secondary>\n", argv[0]);
- return 0;
- }
- if (atoi(argv[1]) == 0) {
- RunPrimary();
- } else {
- RunSecondary();
- }
- return 0;
- }
- #else // OS_LINUX
- int main() {
- fpritnf(stderr, "Not implemented.\n");
- return 0;
- }
- #endif // !OS_LINUX
|