wal_edit.cc 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #include "db/wal_edit.h"
  6. #include "rocksdb/slice.h"
  7. #include "rocksdb/status.h"
  8. #include "util/coding.h"
  9. namespace ROCKSDB_NAMESPACE {
  10. void WalAddition::EncodeTo(std::string* dst) const {
  11. PutVarint64(dst, number_);
  12. if (metadata_.HasSyncedSize()) {
  13. PutVarint32(dst, static_cast<uint32_t>(WalAdditionTag::kSyncedSize));
  14. PutVarint64(dst, metadata_.GetSyncedSizeInBytes());
  15. }
  16. PutVarint32(dst, static_cast<uint32_t>(WalAdditionTag::kTerminate));
  17. }
  18. Status WalAddition::DecodeFrom(Slice* src) {
  19. constexpr char class_name[] = "WalAddition";
  20. if (!GetVarint64(src, &number_)) {
  21. return Status::Corruption(class_name, "Error decoding WAL log number");
  22. }
  23. while (true) {
  24. uint32_t tag_value = 0;
  25. if (!GetVarint32(src, &tag_value)) {
  26. return Status::Corruption(class_name, "Error decoding tag");
  27. }
  28. WalAdditionTag tag = static_cast<WalAdditionTag>(tag_value);
  29. switch (tag) {
  30. case WalAdditionTag::kSyncedSize: {
  31. uint64_t size = 0;
  32. if (!GetVarint64(src, &size)) {
  33. return Status::Corruption(class_name, "Error decoding WAL file size");
  34. }
  35. metadata_.SetSyncedSizeInBytes(size);
  36. break;
  37. }
  38. // TODO: process future tags such as checksum.
  39. case WalAdditionTag::kTerminate:
  40. return Status::OK();
  41. default: {
  42. std::stringstream ss;
  43. ss << "Unknown tag " << tag_value;
  44. return Status::Corruption(class_name, ss.str());
  45. }
  46. }
  47. }
  48. }
  49. JSONWriter& operator<<(JSONWriter& jw, const WalAddition& wal) {
  50. jw << "LogNumber" << wal.GetLogNumber() << "SyncedSizeInBytes"
  51. << wal.GetMetadata().GetSyncedSizeInBytes();
  52. return jw;
  53. }
  54. std::ostream& operator<<(std::ostream& os, const WalAddition& wal) {
  55. os << "log_number: " << wal.GetLogNumber()
  56. << " synced_size_in_bytes: " << wal.GetMetadata().GetSyncedSizeInBytes();
  57. return os;
  58. }
  59. std::string WalAddition::DebugString() const {
  60. std::ostringstream oss;
  61. oss << *this;
  62. return oss.str();
  63. }
  64. void WalDeletion::EncodeTo(std::string* dst) const {
  65. PutVarint64(dst, number_);
  66. }
  67. Status WalDeletion::DecodeFrom(Slice* src) {
  68. constexpr char class_name[] = "WalDeletion";
  69. if (!GetVarint64(src, &number_)) {
  70. return Status::Corruption(class_name, "Error decoding WAL log number");
  71. }
  72. return Status::OK();
  73. }
  74. JSONWriter& operator<<(JSONWriter& jw, const WalDeletion& wal) {
  75. jw << "LogNumber" << wal.GetLogNumber();
  76. return jw;
  77. }
  78. std::ostream& operator<<(std::ostream& os, const WalDeletion& wal) {
  79. os << "log_number: " << wal.GetLogNumber();
  80. return os;
  81. }
  82. std::string WalDeletion::DebugString() const {
  83. std::ostringstream oss;
  84. oss << *this;
  85. return oss.str();
  86. }
  87. Status WalSet::AddWal(const WalAddition& wal) {
  88. if (wal.GetLogNumber() < min_wal_number_to_keep_) {
  89. // The WAL has been obsolete, ignore it.
  90. return Status::OK();
  91. }
  92. auto it = wals_.lower_bound(wal.GetLogNumber());
  93. bool existing = it != wals_.end() && it->first == wal.GetLogNumber();
  94. if (!existing) {
  95. wals_.insert(it, {wal.GetLogNumber(), wal.GetMetadata()});
  96. return Status::OK();
  97. }
  98. assert(existing);
  99. if (!wal.GetMetadata().HasSyncedSize()) {
  100. std::stringstream ss;
  101. ss << "WAL " << wal.GetLogNumber() << " is created more than once";
  102. return Status::Corruption("WalSet::AddWal", ss.str());
  103. }
  104. assert(wal.GetMetadata().HasSyncedSize());
  105. if (it->second.HasSyncedSize() && wal.GetMetadata().GetSyncedSizeInBytes() <=
  106. it->second.GetSyncedSizeInBytes()) {
  107. // This is possible because version edits with different synced WAL sizes
  108. // for the same WAL can be committed out-of-order. For example, thread
  109. // 1 synces the first 10 bytes of 1.log, while thread 2 synces the first 20
  110. // bytes of 1.log. It's possible that thread 1 calls LogAndApply() after
  111. // thread 2.
  112. // In this case, just return ok.
  113. return Status::OK();
  114. }
  115. // Update synced size for the given WAL.
  116. it->second.SetSyncedSizeInBytes(wal.GetMetadata().GetSyncedSizeInBytes());
  117. return Status::OK();
  118. }
  119. Status WalSet::AddWals(const WalAdditions& wals) {
  120. Status s;
  121. for (const WalAddition& wal : wals) {
  122. s = AddWal(wal);
  123. if (!s.ok()) {
  124. break;
  125. }
  126. }
  127. return s;
  128. }
  129. Status WalSet::DeleteWalsBefore(WalNumber wal) {
  130. if (wal > min_wal_number_to_keep_) {
  131. min_wal_number_to_keep_ = wal;
  132. wals_.erase(wals_.begin(), wals_.lower_bound(wal));
  133. }
  134. return Status::OK();
  135. }
  136. void WalSet::Reset() {
  137. wals_.clear();
  138. min_wal_number_to_keep_ = 0;
  139. }
  140. Status WalSet::CheckWals(
  141. Env* env,
  142. const std::unordered_map<WalNumber, std::string>& logs_on_disk) const {
  143. assert(env != nullptr);
  144. Status s;
  145. for (const auto& wal : wals_) {
  146. const uint64_t log_number = wal.first;
  147. const WalMetadata& wal_meta = wal.second;
  148. if (!wal_meta.HasSyncedSize()) {
  149. // The WAL and WAL directory is not even synced,
  150. // so the WAL's inode may not be persisted,
  151. // then the WAL might not show up when listing WAL directory.
  152. continue;
  153. }
  154. if (logs_on_disk.find(log_number) == logs_on_disk.end()) {
  155. std::stringstream ss;
  156. ss << "Missing WAL with log number: " << log_number << ".";
  157. s = Status::Corruption(ss.str());
  158. break;
  159. }
  160. uint64_t log_file_size = 0;
  161. s = env->GetFileSize(logs_on_disk.at(log_number), &log_file_size);
  162. if (!s.ok()) {
  163. break;
  164. }
  165. if (log_file_size < wal_meta.GetSyncedSizeInBytes()) {
  166. std::stringstream ss;
  167. ss << "Size mismatch: WAL (log number: " << log_number
  168. << ") in MANIFEST is " << wal_meta.GetSyncedSizeInBytes()
  169. << " bytes , but actually is " << log_file_size << " bytes on disk.";
  170. s = Status::Corruption(ss.str());
  171. break;
  172. }
  173. }
  174. return s;
  175. }
  176. } // namespace ROCKSDB_NAMESPACE