optimistic_transaction_db_impl.h 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #pragma once
  6. #include <algorithm>
  7. #include <cstdint>
  8. #include <memory>
  9. #include <vector>
  10. #include "rocksdb/db.h"
  11. #include "rocksdb/options.h"
  12. #include "rocksdb/utilities/optimistic_transaction_db.h"
  13. #include "util/cast_util.h"
  14. #include "util/mutexlock.h"
  15. namespace ROCKSDB_NAMESPACE {
  16. class OccLockBucketsImplBase : public OccLockBuckets {
  17. public:
  18. virtual port::Mutex& GetLockBucket(const Slice& key, uint64_t seed) = 0;
  19. };
  20. template <bool cache_aligned>
  21. class OccLockBucketsImpl : public OccLockBucketsImplBase {
  22. public:
  23. explicit OccLockBucketsImpl(size_t bucket_count) : locks_(bucket_count) {}
  24. port::Mutex& GetLockBucket(const Slice& key, uint64_t seed) override {
  25. return locks_.Get(key, seed);
  26. }
  27. size_t ApproximateMemoryUsage() const override {
  28. return locks_.ApproximateMemoryUsage();
  29. }
  30. private:
  31. // TODO: investigate optionally using folly::MicroLock to majorly save space
  32. using M = std::conditional_t<cache_aligned, CacheAlignedWrapper<port::Mutex>,
  33. port::Mutex>;
  34. Striped<M> locks_;
  35. };
  36. class OptimisticTransactionDBImpl : public OptimisticTransactionDB {
  37. public:
  38. explicit OptimisticTransactionDBImpl(
  39. DB* db, const OptimisticTransactionDBOptions& occ_options,
  40. bool take_ownership = true)
  41. : OptimisticTransactionDB(db),
  42. db_owner_(take_ownership),
  43. validate_policy_(occ_options.validate_policy) {
  44. if (validate_policy_ == OccValidationPolicy::kValidateParallel) {
  45. auto bucketed_locks = occ_options.shared_lock_buckets;
  46. if (!bucketed_locks) {
  47. uint32_t bucket_count = std::max(16u, occ_options.occ_lock_buckets);
  48. bucketed_locks = MakeSharedOccLockBuckets(bucket_count);
  49. }
  50. bucketed_locks_ = static_cast_with_check<OccLockBucketsImplBase>(
  51. std::move(bucketed_locks));
  52. }
  53. }
  54. ~OptimisticTransactionDBImpl() {
  55. // Prevent this stackable from destroying
  56. // base db
  57. if (!db_owner_) {
  58. db_ = nullptr;
  59. }
  60. }
  61. Transaction* BeginTransaction(const WriteOptions& write_options,
  62. const OptimisticTransactionOptions& txn_options,
  63. Transaction* old_txn) override;
  64. // Transactional `DeleteRange()` is not yet supported.
  65. using StackableDB::DeleteRange;
  66. Status DeleteRange(const WriteOptions&, ColumnFamilyHandle*, const Slice&,
  67. const Slice&) override {
  68. return Status::NotSupported();
  69. }
  70. // Range deletions also must not be snuck into `WriteBatch`es as they are
  71. // incompatible with `OptimisticTransactionDB`.
  72. Status Write(const WriteOptions& write_opts, WriteBatch* batch) override {
  73. if (batch->HasDeleteRange()) {
  74. return Status::NotSupported();
  75. }
  76. return OptimisticTransactionDB::Write(write_opts, batch);
  77. }
  78. OccValidationPolicy GetValidatePolicy() const { return validate_policy_; }
  79. port::Mutex& GetLockBucket(const Slice& key, uint64_t seed) {
  80. return bucketed_locks_->GetLockBucket(key, seed);
  81. }
  82. private:
  83. std::shared_ptr<OccLockBucketsImplBase> bucketed_locks_;
  84. bool db_owner_;
  85. const OccValidationPolicy validate_policy_;
  86. void ReinitializeTransaction(Transaction* txn,
  87. const WriteOptions& write_options,
  88. const OptimisticTransactionOptions& txn_options =
  89. OptimisticTransactionOptions());
  90. };
  91. } // namespace ROCKSDB_NAMESPACE