trace_record_handler.cc 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #include "trace_replay/trace_record_handler.h"
  6. #include "rocksdb/iterator.h"
  7. #include "rocksdb/trace_record_result.h"
  8. #include "rocksdb/write_batch.h"
  9. namespace ROCKSDB_NAMESPACE {
  10. // TraceExecutionHandler
  11. TraceExecutionHandler::TraceExecutionHandler(
  12. DB* db, const std::vector<ColumnFamilyHandle*>& handles)
  13. : TraceRecord::Handler(),
  14. db_(db),
  15. write_opts_(WriteOptions()),
  16. read_opts_(ReadOptions()) {
  17. assert(db != nullptr);
  18. assert(!handles.empty());
  19. cf_map_.reserve(handles.size());
  20. for (ColumnFamilyHandle* handle : handles) {
  21. assert(handle != nullptr);
  22. cf_map_.insert({handle->GetID(), handle});
  23. }
  24. clock_ = db_->GetEnv()->GetSystemClock().get();
  25. }
  26. TraceExecutionHandler::~TraceExecutionHandler() { cf_map_.clear(); }
  27. Status TraceExecutionHandler::Handle(
  28. const WriteQueryTraceRecord& record,
  29. std::unique_ptr<TraceRecordResult>* result) {
  30. if (result != nullptr) {
  31. result->reset(nullptr);
  32. }
  33. uint64_t start = clock_->NowMicros();
  34. WriteBatch batch(record.GetWriteBatchRep().ToString());
  35. Status s = db_->Write(write_opts_, &batch);
  36. uint64_t end = clock_->NowMicros();
  37. if (s.ok() && result != nullptr) {
  38. result->reset(new StatusOnlyTraceExecutionResult(s, start, end,
  39. record.GetTraceType()));
  40. }
  41. return s;
  42. }
  43. Status TraceExecutionHandler::Handle(
  44. const GetQueryTraceRecord& record,
  45. std::unique_ptr<TraceRecordResult>* result) {
  46. if (result != nullptr) {
  47. result->reset(nullptr);
  48. }
  49. auto it = cf_map_.find(record.GetColumnFamilyID());
  50. if (it == cf_map_.end()) {
  51. return Status::Corruption("Invalid Column Family ID.");
  52. }
  53. uint64_t start = clock_->NowMicros();
  54. std::string value;
  55. Status s = db_->Get(read_opts_, it->second, record.GetKey(), &value);
  56. uint64_t end = clock_->NowMicros();
  57. // Treat not found as ok, return other errors.
  58. if (!s.ok() && !s.IsNotFound()) {
  59. return s;
  60. }
  61. if (result != nullptr) {
  62. // Report the actual opetation status in TraceExecutionResult
  63. result->reset(new SingleValueTraceExecutionResult(
  64. std::move(s), std::move(value), start, end, record.GetTraceType()));
  65. }
  66. return Status::OK();
  67. }
  68. Status TraceExecutionHandler::Handle(
  69. const IteratorSeekQueryTraceRecord& record,
  70. std::unique_ptr<TraceRecordResult>* result) {
  71. if (result != nullptr) {
  72. result->reset(nullptr);
  73. }
  74. auto it = cf_map_.find(record.GetColumnFamilyID());
  75. if (it == cf_map_.end()) {
  76. return Status::Corruption("Invalid Column Family ID.");
  77. }
  78. ReadOptions r_opts = read_opts_;
  79. Slice lower = record.GetLowerBound();
  80. if (!lower.empty()) {
  81. r_opts.iterate_lower_bound = &lower;
  82. }
  83. Slice upper = record.GetUpperBound();
  84. if (!upper.empty()) {
  85. r_opts.iterate_upper_bound = &upper;
  86. }
  87. Iterator* single_iter = db_->NewIterator(r_opts, it->second);
  88. uint64_t start = clock_->NowMicros();
  89. switch (record.GetSeekType()) {
  90. case IteratorSeekQueryTraceRecord::kSeekForPrev: {
  91. single_iter->SeekForPrev(record.GetKey());
  92. break;
  93. }
  94. default: {
  95. single_iter->Seek(record.GetKey());
  96. break;
  97. }
  98. }
  99. uint64_t end = clock_->NowMicros();
  100. Status s = single_iter->status();
  101. if (s.ok() && result != nullptr) {
  102. if (single_iter->Valid()) {
  103. PinnableSlice ps_key;
  104. ps_key.PinSelf(single_iter->key());
  105. PinnableSlice ps_value;
  106. ps_value.PinSelf(single_iter->value());
  107. result->reset(new IteratorTraceExecutionResult(
  108. true, s, std::move(ps_key), std::move(ps_value), start, end,
  109. record.GetTraceType()));
  110. } else {
  111. result->reset(new IteratorTraceExecutionResult(
  112. false, s, "", "", start, end, record.GetTraceType()));
  113. }
  114. }
  115. delete single_iter;
  116. return s;
  117. }
  118. Status TraceExecutionHandler::Handle(
  119. const MultiGetQueryTraceRecord& record,
  120. std::unique_ptr<TraceRecordResult>* result) {
  121. if (result != nullptr) {
  122. result->reset(nullptr);
  123. }
  124. std::vector<ColumnFamilyHandle*> handles;
  125. handles.reserve(record.GetColumnFamilyIDs().size());
  126. for (uint32_t cf_id : record.GetColumnFamilyIDs()) {
  127. auto it = cf_map_.find(cf_id);
  128. if (it == cf_map_.end()) {
  129. return Status::Corruption("Invalid Column Family ID.");
  130. }
  131. handles.push_back(it->second);
  132. }
  133. std::vector<Slice> keys = record.GetKeys();
  134. if (handles.empty() || keys.empty()) {
  135. return Status::InvalidArgument("Empty MultiGet cf_ids or keys.");
  136. }
  137. if (handles.size() != keys.size()) {
  138. return Status::InvalidArgument("MultiGet cf_ids and keys size mismatch.");
  139. }
  140. uint64_t start = clock_->NowMicros();
  141. std::vector<std::string> values;
  142. std::vector<Status> ss = db_->MultiGet(read_opts_, handles, keys, &values);
  143. uint64_t end = clock_->NowMicros();
  144. // Treat not found as ok, return other errors.
  145. for (const Status& s : ss) {
  146. if (!s.ok() && !s.IsNotFound()) {
  147. return s;
  148. }
  149. }
  150. if (result != nullptr) {
  151. // Report the actual opetation status in TraceExecutionResult
  152. result->reset(new MultiValuesTraceExecutionResult(
  153. std::move(ss), std::move(values), start, end, record.GetTraceType()));
  154. }
  155. return Status::OK();
  156. }
  157. } // namespace ROCKSDB_NAMESPACE