||
- // Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
- // Copyright (c) 2016, Red Hat, Inc. All rights reserved.
- // This source code is licensed under both the GPLv2 (found in the
- // COPYING file in the root directory) and Apache 2.0 License
- // (found in the LICENSE.Apache file in the root directory).
- #ifndef ROCKSDB_LITE
- #include "rocksdb/utilities/env_librados.h"
- #include <rados/librados.hpp>
- #include "env/mock_env.h"
- #include "test_util/testharness.h"
- #include "rocksdb/db.h"
- #include "rocksdb/slice.h"
- #include "rocksdb/options.h"
- #include "util/random.h"
- #include <chrono>
- #include <ostream>
- #include "rocksdb/utilities/transaction_db.h"
- class Timer {
- typedef std::chrono::high_resolution_clock high_resolution_clock;
- typedef std::chrono::milliseconds milliseconds;
- public:
- explicit Timer(bool run = false)
- {
- if (run)
- Reset();
- }
- void Reset()
- {
- _start = high_resolution_clock::now();
- }
- milliseconds Elapsed() const
- {
- return std::chrono::duration_cast<milliseconds>(high_resolution_clock::now() - _start);
- }
- template <typename T, typename Traits>
- friend std::basic_ostream<T, Traits>& operator<<(std::basic_ostream<T, Traits>& out, const Timer& timer)
- {
- return out << timer.Elapsed().count();
- }
- private:
- high_resolution_clock::time_point _start;
- };
- namespace ROCKSDB_NAMESPACE {
- class EnvLibradosTest : public testing::Test {
- public:
- // we will use all of these below
- const std::string db_name = "env_librados_test_db";
- const std::string db_pool = db_name + "_pool";
- const char *keyring = "admin";
- const char *config = "../ceph/src/ceph.conf";
- EnvLibrados* env_;
- const EnvOptions soptions_;
- EnvLibradosTest()
- : env_(new EnvLibrados(db_name, config, db_pool)) {
- }
- ~EnvLibradosTest() {
- delete env_;
- librados::Rados rados;
- int ret = 0;
- do {
- ret = rados.init("admin"); // just use the client.admin keyring
- if (ret < 0) { // let's handle any error that might have come back
- std::cerr << "couldn't initialize rados! error " << ret << std::endl;
- ret = EXIT_FAILURE;
- break;
- }
- ret = rados.conf_read_file(config);
- if (ret < 0) {
- // This could fail if the config file is malformed, but it'd be hard.
- std::cerr << "failed to parse config file " << config
- << "! error" << ret << std::endl;
- ret = EXIT_FAILURE;
- break;
- }
- /*
- * next, we actually connect to the cluster
- */
- ret = rados.connect();
- if (ret < 0) {
- std::cerr << "couldn't connect to cluster! error " << ret << std::endl;
- ret = EXIT_FAILURE;
- break;
- }
- /*
- * And now we're done, so let's remove our pool and then
- * shut down the connection gracefully.
- */
- int delete_ret = rados.pool_delete(db_pool.c_str());
- if (delete_ret < 0) {
- // be careful not to
- std::cerr << "We failed to delete our test pool!" << db_pool << delete_ret << std::endl;
- ret = EXIT_FAILURE;
- }
- } while (0);
- }
- };
- TEST_F(EnvLibradosTest, Basics) {
- uint64_t file_size;
- std::unique_ptr<WritableFile> writable_file;
- std::vector<std::string> children;
- ASSERT_OK(env_->CreateDir("/dir"));
- // Check that the directory is empty.
- ASSERT_EQ(Status::NotFound(), env_->FileExists("/dir/non_existent"));
- ASSERT_TRUE(!env_->GetFileSize("/dir/non_existent", &file_size).ok());
- ASSERT_OK(env_->GetChildren("/dir", &children));
- ASSERT_EQ(0U, children.size());
- // Create a file.
- ASSERT_OK(env_->NewWritableFile("/dir/f", &writable_file, soptions_));
- writable_file.reset();
- // Check that the file exists.
- ASSERT_OK(env_->FileExists("/dir/f"));
- ASSERT_OK(env_->GetFileSize("/dir/f", &file_size));
- ASSERT_EQ(0U, file_size);
- ASSERT_OK(env_->GetChildren("/dir", &children));
- ASSERT_EQ(1U, children.size());
- ASSERT_EQ("f", children[0]);
- // Write to the file.
- ASSERT_OK(env_->NewWritableFile("/dir/f", &writable_file, soptions_));
- ASSERT_OK(writable_file->Append("abc"));
- writable_file.reset();
- // Check for expected size.
- ASSERT_OK(env_->GetFileSize("/dir/f", &file_size));
- ASSERT_EQ(3U, file_size);
- // Check that renaming works.
- ASSERT_TRUE(!env_->RenameFile("/dir/non_existent", "/dir/g").ok());
- ASSERT_OK(env_->RenameFile("/dir/f", "/dir/g"));
- ASSERT_EQ(Status::NotFound(), env_->FileExists("/dir/f"));
- ASSERT_OK(env_->FileExists("/dir/g"));
- ASSERT_OK(env_->GetFileSize("/dir/g", &file_size));
- ASSERT_EQ(3U, file_size);
- // Check that opening non-existent file fails.
- std::unique_ptr<SequentialFile> seq_file;
- std::unique_ptr<RandomAccessFile> rand_file;
- ASSERT_TRUE(
- !env_->NewSequentialFile("/dir/non_existent", &seq_file, soptions_).ok());
- ASSERT_TRUE(!seq_file);
- ASSERT_TRUE(!env_->NewRandomAccessFile("/dir/non_existent", &rand_file,
- soptions_).ok());
- ASSERT_TRUE(!rand_file);
- // Check that deleting works.
- ASSERT_TRUE(!env_->DeleteFile("/dir/non_existent").ok());
- ASSERT_OK(env_->DeleteFile("/dir/g"));
- ASSERT_EQ(Status::NotFound(), env_->FileExists("/dir/g"));
- ASSERT_OK(env_->GetChildren("/dir", &children));
- ASSERT_EQ(0U, children.size());
- ASSERT_OK(env_->DeleteDir("/dir"));
- }
- TEST_F(EnvLibradosTest, ReadWrite) {
- std::unique_ptr<WritableFile> writable_file;
- std::unique_ptr<SequentialFile> seq_file;
- std::unique_ptr<RandomAccessFile> rand_file;
- Slice result;
- char scratch[100];
- ASSERT_OK(env_->CreateDir("/dir"));
- ASSERT_OK(env_->NewWritableFile("/dir/f", &writable_file, soptions_));
- ASSERT_OK(writable_file->Append("hello "));
- ASSERT_OK(writable_file->Append("world"));
- writable_file.reset();
- // Read sequentially.
- ASSERT_OK(env_->NewSequentialFile("/dir/f", &seq_file, soptions_));
- ASSERT_OK(seq_file->Read(5, &result, scratch)); // Read "hello".
- ASSERT_EQ(0, result.compare("hello"));
- ASSERT_OK(seq_file->Skip(1));
- ASSERT_OK(seq_file->Read(1000, &result, scratch)); // Read "world".
- ASSERT_EQ(0, result.compare("world"));
- ASSERT_OK(seq_file->Read(1000, &result, scratch)); // Try reading past EOF.
- ASSERT_EQ(0U, result.size());
- ASSERT_OK(seq_file->Skip(100)); // Try to skip past end of file.
- ASSERT_OK(seq_file->Read(1000, &result, scratch));
- ASSERT_EQ(0U, result.size());
- // Random reads.
- ASSERT_OK(env_->NewRandomAccessFile("/dir/f", &rand_file, soptions_));
- ASSERT_OK(rand_file->Read(6, 5, &result, scratch)); // Read "world".
- ASSERT_EQ(0, result.compare("world"));
- ASSERT_OK(rand_file->Read(0, 5, &result, scratch)); // Read "hello".
- ASSERT_EQ(0, result.compare("hello"));
- ASSERT_OK(rand_file->Read(10, 100, &result, scratch)); // Read "d".
- ASSERT_EQ(0, result.compare("d"));
- // Too high offset.
- ASSERT_OK(rand_file->Read(1000, 5, &result, scratch));
- }
- TEST_F(EnvLibradosTest, Locks) {
- FileLock* lock = nullptr;
- std::unique_ptr<WritableFile> writable_file;
- ASSERT_OK(env_->CreateDir("/dir"));
- ASSERT_OK(env_->NewWritableFile("/dir/f", &writable_file, soptions_));
- // These are no-ops, but we test they return success.
- ASSERT_OK(env_->LockFile("some file", &lock));
- ASSERT_OK(env_->UnlockFile(lock));
- ASSERT_OK(env_->LockFile("/dir/f", &lock));
- ASSERT_OK(env_->UnlockFile(lock));
- }
- TEST_F(EnvLibradosTest, Misc) {
- std::string test_dir;
- ASSERT_OK(env_->GetTestDirectory(&test_dir));
- ASSERT_TRUE(!test_dir.empty());
- std::unique_ptr<WritableFile> writable_file;
- ASSERT_TRUE(!env_->NewWritableFile("/a/b", &writable_file, soptions_).ok());
- ASSERT_OK(env_->NewWritableFile("/a", &writable_file, soptions_));
- // These are no-ops, but we test they return success.
- ASSERT_OK(writable_file->Sync());
- ASSERT_OK(writable_file->Flush());
- ASSERT_OK(writable_file->Close());
- writable_file.reset();
- }
- TEST_F(EnvLibradosTest, LargeWrite) {
- const size_t kWriteSize = 300 * 1024;
- char* scratch = new char[kWriteSize * 2];
- std::string write_data;
- for (size_t i = 0; i < kWriteSize; ++i) {
- write_data.append(1, 'h');
- }
- std::unique_ptr<WritableFile> writable_file;
- ASSERT_OK(env_->CreateDir("/dir"));
- ASSERT_OK(env_->NewWritableFile("/dir/g", &writable_file, soptions_));
- ASSERT_OK(writable_file->Append("foo"));
- ASSERT_OK(writable_file->Append(write_data));
- writable_file.reset();
- std::unique_ptr<SequentialFile> seq_file;
- Slice result;
- ASSERT_OK(env_->NewSequentialFile("/dir/g", &seq_file, soptions_));
- ASSERT_OK(seq_file->Read(3, &result, scratch)); // Read "foo".
- ASSERT_EQ(0, result.compare("foo"));
- size_t read = 0;
- std::string read_data;
- while (read < kWriteSize) {
- ASSERT_OK(seq_file->Read(kWriteSize - read, &result, scratch));
- read_data.append(result.data(), result.size());
- read += result.size();
- }
- ASSERT_TRUE(write_data == read_data);
- delete[] scratch;
- }
- TEST_F(EnvLibradosTest, FrequentlySmallWrite) {
- const size_t kWriteSize = 1 << 10;
- char* scratch = new char[kWriteSize * 2];
- std::string write_data;
- for (size_t i = 0; i < kWriteSize; ++i) {
- write_data.append(1, 'h');
- }
- std::unique_ptr<WritableFile> writable_file;
- ASSERT_OK(env_->CreateDir("/dir"));
- ASSERT_OK(env_->NewWritableFile("/dir/g", &writable_file, soptions_));
- ASSERT_OK(writable_file->Append("foo"));
- for (size_t i = 0; i < kWriteSize; ++i) {
- ASSERT_OK(writable_file->Append("h"));
- }
- writable_file.reset();
- std::unique_ptr<SequentialFile> seq_file;
- Slice result;
- ASSERT_OK(env_->NewSequentialFile("/dir/g", &seq_file, soptions_));
- ASSERT_OK(seq_file->Read(3, &result, scratch)); // Read "foo".
- ASSERT_EQ(0, result.compare("foo"));
- size_t read = 0;
- std::string read_data;
- while (read < kWriteSize) {
- ASSERT_OK(seq_file->Read(kWriteSize - read, &result, scratch));
- read_data.append(result.data(), result.size());
- read += result.size();
- }
- ASSERT_TRUE(write_data == read_data);
- delete[] scratch;
- }
- TEST_F(EnvLibradosTest, Truncate) {
- const size_t kWriteSize = 300 * 1024;
- const size_t truncSize = 1024;
- std::string write_data;
- for (size_t i = 0; i < kWriteSize; ++i) {
- write_data.append(1, 'h');
- }
- std::unique_ptr<WritableFile> writable_file;
- ASSERT_OK(env_->CreateDir("/dir"));
- ASSERT_OK(env_->NewWritableFile("/dir/g", &writable_file, soptions_));
- ASSERT_OK(writable_file->Append(write_data));
- ASSERT_EQ(writable_file->GetFileSize(), kWriteSize);
- ASSERT_OK(writable_file->Truncate(truncSize));
- ASSERT_EQ(writable_file->GetFileSize(), truncSize);
- writable_file.reset();
- }
- TEST_F(EnvLibradosTest, DBBasics) {
- std::string kDBPath = "/tmp/DBBasics";
- DB* db;
- Options options;
- // Optimize RocksDB. This is the easiest way to get RocksDB to perform well
- options.IncreaseParallelism();
- options.OptimizeLevelStyleCompaction();
- // create the DB if it's not already present
- options.create_if_missing = true;
- options.env = env_;
- // open DB
- Status s = DB::Open(options, kDBPath, &db);
- assert(s.ok());
- // Put key-value
- s = db->Put(WriteOptions(), "key1", "value");
- assert(s.ok());
- std::string value;
- // get value
- s = db->Get(ReadOptions(), "key1", &value);
- assert(s.ok());
- assert(value == "value");
- // atomically apply a set of updates
- {
- WriteBatch batch;
- batch.Delete("key1");
- batch.Put("key2", value);
- s = db->Write(WriteOptions(), &batch);
- }
- s = db->Get(ReadOptions(), "key1", &value);
- assert(s.IsNotFound());
- db->Get(ReadOptions(), "key2", &value);
- assert(value == "value");
- delete db;
- }
- TEST_F(EnvLibradosTest, DBLoadKeysInRandomOrder) {
- char key[20] = {0}, value[20] = {0};
- int max_loop = 1 << 10;
- Timer timer(false);
- std::cout << "Test size : loop(" << max_loop << ")" << std::endl;
- /**********************************
- use default env
- ***********************************/
- std::string kDBPath1 = "/tmp/DBLoadKeysInRandomOrder1";
- DB* db1;
- Options options1;
- // Optimize Rocksdb. This is the easiest way to get RocksDB to perform well
- options1.IncreaseParallelism();
- options1.OptimizeLevelStyleCompaction();
- // create the DB if it's not already present
- options1.create_if_missing = true;
- // open DB
- Status s1 = DB::Open(options1, kDBPath1, &db1);
- assert(s1.ok());
- ROCKSDB_NAMESPACE::Random64 r1(time(nullptr));
- timer.Reset();
- for (int i = 0; i < max_loop; ++i) {
- snprintf(key,
- 20,
- "%16lx",
- (unsigned long)r1.Uniform(std::numeric_limits<uint64_t>::max()));
- snprintf(value,
- 20,
- "%16lx",
- (unsigned long)r1.Uniform(std::numeric_limits<uint64_t>::max()));
- // Put key-value
- s1 = db1->Put(WriteOptions(), key, value);
- assert(s1.ok());
- }
- std::cout << "Time by default : " << timer << "ms" << std::endl;
- delete db1;
- /**********************************
- use librados env
- ***********************************/
- std::string kDBPath2 = "/tmp/DBLoadKeysInRandomOrder2";
- DB* db2;
- Options options2;
- // Optimize RocksDB. This is the easiest way to get RocksDB to perform well
- options2.IncreaseParallelism();
- options2.OptimizeLevelStyleCompaction();
- // create the DB if it's not already present
- options2.create_if_missing = true;
- options2.env = env_;
- // open DB
- Status s2 = DB::Open(options2, kDBPath2, &db2);
- assert(s2.ok());
- ROCKSDB_NAMESPACE::Random64 r2(time(nullptr));
- timer.Reset();
- for (int i = 0; i < max_loop; ++i) {
- snprintf(key,
- 20,
- "%16lx",
- (unsigned long)r2.Uniform(std::numeric_limits<uint64_t>::max()));
- snprintf(value,
- 20,
- "%16lx",
- (unsigned long)r2.Uniform(std::numeric_limits<uint64_t>::max()));
- // Put key-value
- s2 = db2->Put(WriteOptions(), key, value);
- assert(s2.ok());
- }
- std::cout << "Time by librados : " << timer << "ms" << std::endl;
- delete db2;
- }
- TEST_F(EnvLibradosTest, DBBulkLoadKeysInRandomOrder) {
- char key[20] = {0}, value[20] = {0};
- int max_loop = 1 << 6;
- int bulk_size = 1 << 15;
- Timer timer(false);
- std::cout << "Test size : loop(" << max_loop << "); bulk_size(" << bulk_size << ")" << std::endl;
- /**********************************
- use default env
- ***********************************/
- std::string kDBPath1 = "/tmp/DBBulkLoadKeysInRandomOrder1";
- DB* db1;
- Options options1;
- // Optimize Rocksdb. This is the easiest way to get RocksDB to perform well
- options1.IncreaseParallelism();
- options1.OptimizeLevelStyleCompaction();
- // create the DB if it's not already present
- options1.create_if_missing = true;
- // open DB
- Status s1 = DB::Open(options1, kDBPath1, &db1);
- assert(s1.ok());
- ROCKSDB_NAMESPACE::Random64 r1(time(nullptr));
- timer.Reset();
- for (int i = 0; i < max_loop; ++i) {
- WriteBatch batch;
- for (int j = 0; j < bulk_size; ++j) {
- snprintf(key,
- 20,
- "%16lx",
- (unsigned long)r1.Uniform(std::numeric_limits<uint64_t>::max()));
- snprintf(value,
- 20,
- "%16lx",
- (unsigned long)r1.Uniform(std::numeric_limits<uint64_t>::max()));
- batch.Put(key, value);
- }
- s1 = db1->Write(WriteOptions(), &batch);
- assert(s1.ok());
- }
- std::cout << "Time by default : " << timer << "ms" << std::endl;
- delete db1;
- /**********************************
- use librados env
- ***********************************/
- std::string kDBPath2 = "/tmp/DBBulkLoadKeysInRandomOrder2";
- DB* db2;
- Options options2;
- // Optimize RocksDB. This is the easiest way to get RocksDB to perform well
- options2.IncreaseParallelism();
- options2.OptimizeLevelStyleCompaction();
- // create the DB if it's not already present
- options2.create_if_missing = true;
- options2.env = env_;
- // open DB
- Status s2 = DB::Open(options2, kDBPath2, &db2);
- assert(s2.ok());
- ROCKSDB_NAMESPACE::Random64 r2(time(nullptr));
- timer.Reset();
- for (int i = 0; i < max_loop; ++i) {
- WriteBatch batch;
- for (int j = 0; j < bulk_size; ++j) {
- snprintf(key,
- 20,
- "%16lx",
- (unsigned long)r2.Uniform(std::numeric_limits<uint64_t>::max()));
- snprintf(value,
- 20,
- "%16lx",
- (unsigned long)r2.Uniform(std::numeric_limits<uint64_t>::max()));
- batch.Put(key, value);
- }
- s2 = db2->Write(WriteOptions(), &batch);
- assert(s2.ok());
- }
- std::cout << "Time by librados : " << timer << "ms" << std::endl;
- delete db2;
- }
- TEST_F(EnvLibradosTest, DBBulkLoadKeysInSequentialOrder) {
- char key[20] = {0}, value[20] = {0};
- int max_loop = 1 << 6;
- int bulk_size = 1 << 15;
- Timer timer(false);
- std::cout << "Test size : loop(" << max_loop << "); bulk_size(" << bulk_size << ")" << std::endl;
- /**********************************
- use default env
- ***********************************/
- std::string kDBPath1 = "/tmp/DBBulkLoadKeysInSequentialOrder1";
- DB* db1;
- Options options1;
- // Optimize Rocksdb. This is the easiest way to get RocksDB to perform well
- options1.IncreaseParallelism();
- options1.OptimizeLevelStyleCompaction();
- // create the DB if it's not already present
- options1.create_if_missing = true;
- // open DB
- Status s1 = DB::Open(options1, kDBPath1, &db1);
- assert(s1.ok());
- ROCKSDB_NAMESPACE::Random64 r1(time(nullptr));
- timer.Reset();
- for (int i = 0; i < max_loop; ++i) {
- WriteBatch batch;
- for (int j = 0; j < bulk_size; ++j) {
- snprintf(key,
- 20,
- "%019lld",
- (long long)(i * bulk_size + j));
- snprintf(value,
- 20,
- "%16lx",
- (unsigned long)r1.Uniform(std::numeric_limits<uint64_t>::max()));
- batch.Put(key, value);
- }
- s1 = db1->Write(WriteOptions(), &batch);
- assert(s1.ok());
- }
- std::cout << "Time by default : " << timer << "ms" << std::endl;
- delete db1;
- /**********************************
- use librados env
- ***********************************/
- std::string kDBPath2 = "/tmp/DBBulkLoadKeysInSequentialOrder2";
- DB* db2;
- Options options2;
- // Optimize RocksDB. This is the easiest way to get RocksDB to perform well
- options2.IncreaseParallelism();
- options2.OptimizeLevelStyleCompaction();
- // create the DB if it's not already present
- options2.create_if_missing = true;
- options2.env = env_;
- // open DB
- Status s2 = DB::Open(options2, kDBPath2, &db2);
- assert(s2.ok());
- ROCKSDB_NAMESPACE::Random64 r2(time(nullptr));
- timer.Reset();
- for (int i = 0; i < max_loop; ++i) {
- WriteBatch batch;
- for (int j = 0; j < bulk_size; ++j) {
- snprintf(key,
- 20,
- "%16lx",
- (unsigned long)r2.Uniform(std::numeric_limits<uint64_t>::max()));
- snprintf(value,
- 20,
- "%16lx",
- (unsigned long)r2.Uniform(std::numeric_limits<uint64_t>::max()));
- batch.Put(key, value);
- }
- s2 = db2->Write(WriteOptions(), &batch);
- assert(s2.ok());
- }
- std::cout << "Time by librados : " << timer << "ms" << std::endl;
- delete db2;
- }
- TEST_F(EnvLibradosTest, DBRandomRead) {
- char key[20] = {0}, value[20] = {0};
- int max_loop = 1 << 6;
- int bulk_size = 1 << 10;
- int read_loop = 1 << 20;
- Timer timer(false);
- std::cout << "Test size : keys_num(" << max_loop << ", " << bulk_size << "); read_loop(" << read_loop << ")" << std::endl;
- /**********************************
- use default env
- ***********************************/
- std::string kDBPath1 = "/tmp/DBRandomRead1";
- DB* db1;
- Options options1;
- // Optimize Rocksdb. This is the easiest way to get RocksDB to perform well
- options1.IncreaseParallelism();
- options1.OptimizeLevelStyleCompaction();
- // create the DB if it's not already present
- options1.create_if_missing = true;
- // open DB
- Status s1 = DB::Open(options1, kDBPath1, &db1);
- assert(s1.ok());
- ROCKSDB_NAMESPACE::Random64 r1(time(nullptr));
- for (int i = 0; i < max_loop; ++i) {
- WriteBatch batch;
- for (int j = 0; j < bulk_size; ++j) {
- snprintf(key,
- 20,
- "%019lld",
- (long long)(i * bulk_size + j));
- snprintf(value,
- 20,
- "%16lx",
- (unsigned long)r1.Uniform(std::numeric_limits<uint64_t>::max()));
- batch.Put(key, value);
- }
- s1 = db1->Write(WriteOptions(), &batch);
- assert(s1.ok());
- }
- timer.Reset();
- int base1 = 0, offset1 = 0;
- for (int i = 0; i < read_loop; ++i) {
- base1 = r1.Uniform(max_loop);
- offset1 = r1.Uniform(bulk_size);
- std::string value1;
- snprintf(key,
- 20,
- "%019lld",
- (long long)(base1 * bulk_size + offset1));
- s1 = db1->Get(ReadOptions(), key, &value1);
- assert(s1.ok());
- }
- std::cout << "Time by default : " << timer << "ms" << std::endl;
- delete db1;
- /**********************************
- use librados env
- ***********************************/
- std::string kDBPath2 = "/tmp/DBRandomRead2";
- DB* db2;
- Options options2;
- // Optimize RocksDB. This is the easiest way to get RocksDB to perform well
- options2.IncreaseParallelism();
- options2.OptimizeLevelStyleCompaction();
- // create the DB if it's not already present
- options2.create_if_missing = true;
- options2.env = env_;
- // open DB
- Status s2 = DB::Open(options2, kDBPath2, &db2);
- assert(s2.ok());
- ROCKSDB_NAMESPACE::Random64 r2(time(nullptr));
- for (int i = 0; i < max_loop; ++i) {
- WriteBatch batch;
- for (int j = 0; j < bulk_size; ++j) {
- snprintf(key,
- 20,
- "%019lld",
- (long long)(i * bulk_size + j));
- snprintf(value,
- 20,
- "%16lx",
- (unsigned long)r2.Uniform(std::numeric_limits<uint64_t>::max()));
- batch.Put(key, value);
- }
- s2 = db2->Write(WriteOptions(), &batch);
- assert(s2.ok());
- }
- timer.Reset();
- int base2 = 0, offset2 = 0;
- for (int i = 0; i < read_loop; ++i) {
- base2 = r2.Uniform(max_loop);
- offset2 = r2.Uniform(bulk_size);
- std::string value2;
- snprintf(key,
- 20,
- "%019lld",
- (long long)(base2 * bulk_size + offset2));
- s2 = db2->Get(ReadOptions(), key, &value2);
- if (!s2.ok()) {
- std::cout << s2.ToString() << std::endl;
- }
- assert(s2.ok());
- }
- std::cout << "Time by librados : " << timer << "ms" << std::endl;
- delete db2;
- }
- class EnvLibradosMutipoolTest : public testing::Test {
- public:
- // we will use all of these below
- const std::string client_name = "client.admin";
- const std::string cluster_name = "ceph";
- const uint64_t flags = 0;
- const std::string db_name = "env_librados_test_db";
- const std::string db_pool = db_name + "_pool";
- const std::string wal_dir = "/wal";
- const std::string wal_pool = db_name + "_wal_pool";
- const size_t write_buffer_size = 1 << 20;
- const char *keyring = "admin";
- const char *config = "../ceph/src/ceph.conf";
- EnvLibrados* env_;
- const EnvOptions soptions_;
- EnvLibradosMutipoolTest() {
- env_ = new EnvLibrados(client_name, cluster_name, flags, db_name, config, db_pool, wal_dir, wal_pool, write_buffer_size);
- }
- ~EnvLibradosMutipoolTest() {
- delete env_;
- librados::Rados rados;
- int ret = 0;
- do {
- ret = rados.init("admin"); // just use the client.admin keyring
- if (ret < 0) { // let's handle any error that might have come back
- std::cerr << "couldn't initialize rados! error " << ret << std::endl;
- ret = EXIT_FAILURE;
- break;
- }
- ret = rados.conf_read_file(config);
- if (ret < 0) {
- // This could fail if the config file is malformed, but it'd be hard.
- std::cerr << "failed to parse config file " << config
- << "! error" << ret << std::endl;
- ret = EXIT_FAILURE;
- break;
- }
- /*
- * next, we actually connect to the cluster
- */
- ret = rados.connect();
- if (ret < 0) {
- std::cerr << "couldn't connect to cluster! error " << ret << std::endl;
- ret = EXIT_FAILURE;
- break;
- }
- /*
- * And now we're done, so let's remove our pool and then
- * shut down the connection gracefully.
- */
- int delete_ret = rados.pool_delete(db_pool.c_str());
- if (delete_ret < 0) {
- // be careful not to
- std::cerr << "We failed to delete our test pool!" << db_pool << delete_ret << std::endl;
- ret = EXIT_FAILURE;
- }
- delete_ret = rados.pool_delete(wal_pool.c_str());
- if (delete_ret < 0) {
- // be careful not to
- std::cerr << "We failed to delete our test pool!" << wal_pool << delete_ret << std::endl;
- ret = EXIT_FAILURE;
- }
- } while (0);
- }
- };
- TEST_F(EnvLibradosMutipoolTest, Basics) {
- uint64_t file_size;
- std::unique_ptr<WritableFile> writable_file;
- std::vector<std::string> children;
- std::vector<std::string> v = {"/tmp/dir1", "/tmp/dir2", "/tmp/dir3", "/tmp/dir4", "dir"};
- for (size_t i = 0; i < v.size(); ++i) {
- std::string dir = v[i];
- std::string dir_non_existent = dir + "/non_existent";
- std::string dir_f = dir + "/f";
- std::string dir_g = dir + "/g";
- ASSERT_OK(env_->CreateDir(dir.c_str()));
- // Check that the directory is empty.
- ASSERT_EQ(Status::NotFound(), env_->FileExists(dir_non_existent.c_str()));
- ASSERT_TRUE(!env_->GetFileSize(dir_non_existent.c_str(), &file_size).ok());
- ASSERT_OK(env_->GetChildren(dir.c_str(), &children));
- ASSERT_EQ(0U, children.size());
- // Create a file.
- ASSERT_OK(env_->NewWritableFile(dir_f.c_str(), &writable_file, soptions_));
- writable_file.reset();
- // Check that the file exists.
- ASSERT_OK(env_->FileExists(dir_f.c_str()));
- ASSERT_OK(env_->GetFileSize(dir_f.c_str(), &file_size));
- ASSERT_EQ(0U, file_size);
- ASSERT_OK(env_->GetChildren(dir.c_str(), &children));
- ASSERT_EQ(1U, children.size());
- ASSERT_EQ("f", children[0]);
- // Write to the file.
- ASSERT_OK(env_->NewWritableFile(dir_f.c_str(), &writable_file, soptions_));
- ASSERT_OK(writable_file->Append("abc"));
- writable_file.reset();
- // Check for expected size.
- ASSERT_OK(env_->GetFileSize(dir_f.c_str(), &file_size));
- ASSERT_EQ(3U, file_size);
- // Check that renaming works.
- ASSERT_TRUE(!env_->RenameFile(dir_non_existent.c_str(), dir_g.c_str()).ok());
- ASSERT_OK(env_->RenameFile(dir_f.c_str(), dir_g.c_str()));
- ASSERT_EQ(Status::NotFound(), env_->FileExists(dir_f.c_str()));
- ASSERT_OK(env_->FileExists(dir_g.c_str()));
- ASSERT_OK(env_->GetFileSize(dir_g.c_str(), &file_size));
- ASSERT_EQ(3U, file_size);
- // Check that opening non-existent file fails.
- std::unique_ptr<SequentialFile> seq_file;
- std::unique_ptr<RandomAccessFile> rand_file;
- ASSERT_TRUE(
- !env_->NewSequentialFile(dir_non_existent.c_str(), &seq_file, soptions_).ok());
- ASSERT_TRUE(!seq_file);
- ASSERT_TRUE(!env_->NewRandomAccessFile(dir_non_existent.c_str(), &rand_file,
- soptions_).ok());
- ASSERT_TRUE(!rand_file);
- // Check that deleting works.
- ASSERT_TRUE(!env_->DeleteFile(dir_non_existent.c_str()).ok());
- ASSERT_OK(env_->DeleteFile(dir_g.c_str()));
- ASSERT_EQ(Status::NotFound(), env_->FileExists(dir_g.c_str()));
- ASSERT_OK(env_->GetChildren(dir.c_str(), &children));
- ASSERT_EQ(0U, children.size());
- ASSERT_OK(env_->DeleteDir(dir.c_str()));
- }
- }
- TEST_F(EnvLibradosMutipoolTest, DBBasics) {
- std::string kDBPath = "/tmp/DBBasics";
- std::string walPath = "/tmp/wal";
- DB* db;
- Options options;
- // Optimize RocksDB. This is the easiest way to get RocksDB to perform well
- options.IncreaseParallelism();
- options.OptimizeLevelStyleCompaction();
- // create the DB if it's not already present
- options.create_if_missing = true;
- options.env = env_;
- options.wal_dir = walPath;
- // open DB
- Status s = DB::Open(options, kDBPath, &db);
- assert(s.ok());
- // Put key-value
- s = db->Put(WriteOptions(), "key1", "value");
- assert(s.ok());
- std::string value;
- // get value
- s = db->Get(ReadOptions(), "key1", &value);
- assert(s.ok());
- assert(value == "value");
- // atomically apply a set of updates
- {
- WriteBatch batch;
- batch.Delete("key1");
- batch.Put("key2", value);
- s = db->Write(WriteOptions(), &batch);
- }
- s = db->Get(ReadOptions(), "key1", &value);
- assert(s.IsNotFound());
- db->Get(ReadOptions(), "key2", &value);
- assert(value == "value");
- delete db;
- }
- TEST_F(EnvLibradosMutipoolTest, DBBulkLoadKeysInRandomOrder) {
- char key[20] = {0}, value[20] = {0};
- int max_loop = 1 << 6;
- int bulk_size = 1 << 15;
- Timer timer(false);
- std::cout << "Test size : loop(" << max_loop << "); bulk_size(" << bulk_size << ")" << std::endl;
- /**********************************
- use default env
- ***********************************/
- std::string kDBPath1 = "/tmp/DBBulkLoadKeysInRandomOrder1";
- std::string walPath = "/tmp/wal";
- DB* db1;
- Options options1;
- // Optimize Rocksdb. This is the easiest way to get RocksDB to perform well
- options1.IncreaseParallelism();
- options1.OptimizeLevelStyleCompaction();
- // create the DB if it's not already present
- options1.create_if_missing = true;
- // open DB
- Status s1 = DB::Open(options1, kDBPath1, &db1);
- assert(s1.ok());
- ROCKSDB_NAMESPACE::Random64 r1(time(nullptr));
- timer.Reset();
- for (int i = 0; i < max_loop; ++i) {
- WriteBatch batch;
- for (int j = 0; j < bulk_size; ++j) {
- snprintf(key,
- 20,
- "%16lx",
- (unsigned long)r1.Uniform(std::numeric_limits<uint64_t>::max()));
- snprintf(value,
- 20,
- "%16lx",
- (unsigned long)r1.Uniform(std::numeric_limits<uint64_t>::max()));
- batch.Put(key, value);
- }
- s1 = db1->Write(WriteOptions(), &batch);
- assert(s1.ok());
- }
- std::cout << "Time by default : " << timer << "ms" << std::endl;
- delete db1;
- /**********************************
- use librados env
- ***********************************/
- std::string kDBPath2 = "/tmp/DBBulkLoadKeysInRandomOrder2";
- DB* db2;
- Options options2;
- // Optimize RocksDB. This is the easiest way to get RocksDB to perform well
- options2.IncreaseParallelism();
- options2.OptimizeLevelStyleCompaction();
- // create the DB if it's not already present
- options2.create_if_missing = true;
- options2.env = env_;
- options2.wal_dir = walPath;
- // open DB
- Status s2 = DB::Open(options2, kDBPath2, &db2);
- if (!s2.ok()) {
- std::cerr << s2.ToString() << std::endl;
- }
- assert(s2.ok());
- ROCKSDB_NAMESPACE::Random64 r2(time(nullptr));
- timer.Reset();
- for (int i = 0; i < max_loop; ++i) {
- WriteBatch batch;
- for (int j = 0; j < bulk_size; ++j) {
- snprintf(key,
- 20,
- "%16lx",
- (unsigned long)r2.Uniform(std::numeric_limits<uint64_t>::max()));
- snprintf(value,
- 20,
- "%16lx",
- (unsigned long)r2.Uniform(std::numeric_limits<uint64_t>::max()));
- batch.Put(key, value);
- }
- s2 = db2->Write(WriteOptions(), &batch);
- assert(s2.ok());
- }
- std::cout << "Time by librados : " << timer << "ms" << std::endl;
- delete db2;
- }
- TEST_F(EnvLibradosMutipoolTest, DBTransactionDB) {
- std::string kDBPath = "/tmp/DBTransactionDB";
- // open DB
- Options options;
- TransactionDBOptions txn_db_options;
- options.create_if_missing = true;
- options.env = env_;
- TransactionDB* txn_db;
- Status s = TransactionDB::Open(options, txn_db_options, kDBPath, &txn_db);
- assert(s.ok());
- WriteOptions write_options;
- ReadOptions read_options;
- TransactionOptions txn_options;
- std::string value;
- ////////////////////////////////////////////////////////
- //
- // Simple OptimisticTransaction Example ("Read Committed")
- //
- ////////////////////////////////////////////////////////
- // Start a transaction
- Transaction* txn = txn_db->BeginTransaction(write_options);
- assert(txn);
- // Read a key in this transaction
- s = txn->Get(read_options, "abc", &value);
- assert(s.IsNotFound());
- // Write a key in this transaction
- s = txn->Put("abc", "def");
- assert(s.ok());
- // Read a key OUTSIDE this transaction. Does not affect txn.
- s = txn_db->Get(read_options, "abc", &value);
- // Write a key OUTSIDE of this transaction.
- // Does not affect txn since this is an unrelated key. If we wrote key 'abc'
- // here, the transaction would fail to commit.
- s = txn_db->Put(write_options, "xyz", "zzz");
- // Commit transaction
- s = txn->Commit();
- assert(s.ok());
- delete txn;
- ////////////////////////////////////////////////////////
- //
- // "Repeatable Read" (Snapshot Isolation) Example
- // -- Using a single Snapshot
- //
- ////////////////////////////////////////////////////////
- // Set a snapshot at start of transaction by setting set_snapshot=true
- txn_options.set_snapshot = true;
- txn = txn_db->BeginTransaction(write_options, txn_options);
- const Snapshot* snapshot = txn->GetSnapshot();
- // Write a key OUTSIDE of transaction
- s = txn_db->Put(write_options, "abc", "xyz");
- assert(s.ok());
- // Attempt to read a key using the snapshot. This will fail since
- // the previous write outside this txn conflicts with this read.
- read_options.snapshot = snapshot;
- s = txn->GetForUpdate(read_options, "abc", &value);
- assert(s.IsBusy());
- txn->Rollback();
- delete txn;
- // Clear snapshot from read options since it is no longer valid
- read_options.snapshot = nullptr;
- snapshot = nullptr;
- ////////////////////////////////////////////////////////
- //
- // "Read Committed" (Monotonic Atomic Views) Example
- // --Using multiple Snapshots
- //
- ////////////////////////////////////////////////////////
- // In this example, we set the snapshot multiple times. This is probably
- // only necessary if you have very strict isolation requirements to
- // implement.
- // Set a snapshot at start of transaction
- txn_options.set_snapshot = true;
- txn = txn_db->BeginTransaction(write_options, txn_options);
- // Do some reads and writes to key "x"
- read_options.snapshot = txn_db->GetSnapshot();
- s = txn->Get(read_options, "x", &value);
- txn->Put("x", "x");
- // Do a write outside of the transaction to key "y"
- s = txn_db->Put(write_options, "y", "y");
- // Set a new snapshot in the transaction
- txn->SetSnapshot();
- txn->SetSavePoint();
- read_options.snapshot = txn_db->GetSnapshot();
- // Do some reads and writes to key "y"
- // Since the snapshot was advanced, the write done outside of the
- // transaction does not conflict.
- s = txn->GetForUpdate(read_options, "y", &value);
- txn->Put("y", "y");
- // Decide we want to revert the last write from this transaction.
- txn->RollbackToSavePoint();
- // Commit.
- s = txn->Commit();
- assert(s.ok());
- delete txn;
- // Clear snapshot from read options since it is no longer valid
- read_options.snapshot = nullptr;
- // Cleanup
- delete txn_db;
- DestroyDB(kDBPath, options);
- }
- } // namespace ROCKSDB_NAMESPACE
- int main(int argc, char** argv) {
- ::testing::InitGoogleTest(&argc, argv);
- return RUN_ALL_TESTS();
- }
- #else
- #include <stdio.h>
- int main(int argc, char** argv) {
- fprintf(stderr, "SKIPPED as EnvMirror is not supported in ROCKSDB_LITE\n");
- return 0;
- }
- #endif // !ROCKSDB_LITE
|