wal_manager_test.cc 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #include "db/wal_manager.h"
  6. #include <map>
  7. #include <string>
  8. #include "db/column_family.h"
  9. #include "db/db_impl/db_impl.h"
  10. #include "db/log_writer.h"
  11. #include "db/version_set.h"
  12. #include "env/mock_env.h"
  13. #include "file/writable_file_writer.h"
  14. #include "rocksdb/cache.h"
  15. #include "rocksdb/file_system.h"
  16. #include "rocksdb/write_batch.h"
  17. #include "rocksdb/write_buffer_manager.h"
  18. #include "table/mock_table.h"
  19. #include "test_util/testharness.h"
  20. #include "test_util/testutil.h"
  21. #include "util/string_util.h"
  22. namespace ROCKSDB_NAMESPACE {
  23. // TODO(icanadi) mock out VersionSet
  24. // TODO(icanadi) move other WalManager-specific tests from db_test here
  25. class WalManagerTest : public testing::Test {
  26. public:
  27. WalManagerTest()
  28. : dbname_(test::PerThreadDBPath("wal_manager_test")),
  29. db_options_(),
  30. table_cache_(NewLRUCache(50000, 16)),
  31. write_buffer_manager_(db_options_.db_write_buffer_size),
  32. current_log_number_(0) {
  33. env_.reset(MockEnv::Create(Env::Default()));
  34. EXPECT_OK(DestroyDB(dbname_, Options()));
  35. }
  36. void Init() {
  37. ASSERT_OK(env_->CreateDirIfMissing(dbname_));
  38. ASSERT_OK(env_->CreateDirIfMissing(ArchivalDirectory(dbname_)));
  39. db_options_.db_paths.emplace_back(dbname_,
  40. std::numeric_limits<uint64_t>::max());
  41. db_options_.wal_dir = dbname_;
  42. db_options_.env = env_.get();
  43. db_options_.fs = env_->GetFileSystem();
  44. db_options_.clock = env_->GetSystemClock().get();
  45. versions_.reset(new VersionSet(
  46. dbname_, &db_options_, env_options_, table_cache_.get(),
  47. &write_buffer_manager_, &write_controller_,
  48. /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
  49. /*db_id=*/"", /*db_session_id=*/"", /*daily_offpeak_time_utc=*/"",
  50. /*error_handler=*/nullptr, /*read_only=*/false));
  51. wal_manager_.reset(
  52. new WalManager(db_options_, env_options_, nullptr /*IOTracer*/));
  53. }
  54. void Reopen() {
  55. wal_manager_.reset(
  56. new WalManager(db_options_, env_options_, nullptr /*IOTracer*/));
  57. }
  58. // NOT thread safe
  59. void Put(const std::string& key, const std::string& value) {
  60. assert(current_log_writer_.get() != nullptr);
  61. uint64_t seq = versions_->LastSequence() + 1;
  62. WriteBatch batch;
  63. ASSERT_OK(batch.Put(key, value));
  64. WriteBatchInternal::SetSequence(&batch, seq);
  65. ASSERT_OK(current_log_writer_->AddRecord(
  66. WriteOptions(), WriteBatchInternal::Contents(&batch)));
  67. versions_->SetLastAllocatedSequence(seq);
  68. versions_->SetLastPublishedSequence(seq);
  69. versions_->SetLastSequence(seq);
  70. }
  71. // NOT thread safe
  72. void RollTheLog(bool /*archived*/) {
  73. current_log_number_++;
  74. std::string fname = ArchivedLogFileName(dbname_, current_log_number_);
  75. const auto& fs = env_->GetFileSystem();
  76. std::unique_ptr<WritableFileWriter> file_writer;
  77. ASSERT_OK(WritableFileWriter::Create(fs, fname, env_options_, &file_writer,
  78. nullptr));
  79. current_log_writer_.reset(
  80. new log::Writer(std::move(file_writer), 0, false));
  81. }
  82. void CreateArchiveLogs(int num_logs, int entries_per_log) {
  83. for (int i = 1; i <= num_logs; ++i) {
  84. RollTheLog(true);
  85. for (int k = 0; k < entries_per_log; ++k) {
  86. Put(std::to_string(k), std::string(1024, 'a'));
  87. }
  88. }
  89. }
  90. std::unique_ptr<TransactionLogIterator> OpenTransactionLogIter(
  91. const SequenceNumber seq) {
  92. std::unique_ptr<TransactionLogIterator> iter;
  93. Status status = wal_manager_->GetUpdatesSince(
  94. seq, &iter, TransactionLogIterator::ReadOptions(), versions_.get());
  95. EXPECT_OK(status);
  96. return iter;
  97. }
  98. std::unique_ptr<MockEnv> env_;
  99. std::string dbname_;
  100. ImmutableDBOptions db_options_;
  101. WriteController write_controller_;
  102. EnvOptions env_options_;
  103. std::shared_ptr<Cache> table_cache_;
  104. WriteBufferManager write_buffer_manager_;
  105. std::unique_ptr<VersionSet> versions_;
  106. std::unique_ptr<WalManager> wal_manager_;
  107. std::unique_ptr<log::Writer> current_log_writer_;
  108. uint64_t current_log_number_;
  109. };
  110. TEST_F(WalManagerTest, ReadFirstRecordCache) {
  111. Init();
  112. std::string path = dbname_ + "/000001.log";
  113. std::unique_ptr<FSWritableFile> file;
  114. ASSERT_OK(env_->GetFileSystem()->NewWritableFile(path, FileOptions(), &file,
  115. nullptr));
  116. SequenceNumber s;
  117. ASSERT_OK(wal_manager_->TEST_ReadFirstLine(path, 1 /* number */, &s));
  118. ASSERT_EQ(s, 0U);
  119. ASSERT_OK(
  120. wal_manager_->TEST_ReadFirstRecord(kAliveLogFile, 1 /* number */, &s));
  121. ASSERT_EQ(s, 0U);
  122. std::unique_ptr<WritableFileWriter> file_writer(
  123. new WritableFileWriter(std::move(file), path, FileOptions()));
  124. log::Writer writer(std::move(file_writer), 1,
  125. db_options_.recycle_log_file_num > 0);
  126. WriteBatch batch;
  127. ASSERT_OK(batch.Put("foo", "bar"));
  128. WriteBatchInternal::SetSequence(&batch, 10);
  129. ASSERT_OK(
  130. writer.AddRecord(WriteOptions(), WriteBatchInternal::Contents(&batch)));
  131. // TODO(icanadi) move SpecialEnv outside of db_test, so we can reuse it here.
  132. // Waiting for lei to finish with db_test
  133. // env_->count_sequential_reads_ = true;
  134. // sequential_read_counter_ sanity test
  135. // ASSERT_EQ(env_->sequential_read_counter_.Read(), 0);
  136. ASSERT_OK(wal_manager_->TEST_ReadFirstRecord(kAliveLogFile, 1, &s));
  137. ASSERT_EQ(s, 10U);
  138. // did a read
  139. // TODO(icanadi) move SpecialEnv outside of db_test, so we can reuse it here
  140. // ASSERT_EQ(env_->sequential_read_counter_.Read(), 1);
  141. ASSERT_OK(wal_manager_->TEST_ReadFirstRecord(kAliveLogFile, 1, &s));
  142. ASSERT_EQ(s, 10U);
  143. // no new reads since the value is cached
  144. // TODO(icanadi) move SpecialEnv outside of db_test, so we can reuse it here
  145. // ASSERT_EQ(env_->sequential_read_counter_.Read(), 1);
  146. }
  147. namespace {
  148. uint64_t GetLogDirSize(std::string dir_path, Env* env) {
  149. uint64_t dir_size = 0;
  150. std::vector<std::string> files;
  151. EXPECT_OK(env->GetChildren(dir_path, &files));
  152. for (auto& f : files) {
  153. uint64_t number;
  154. FileType type;
  155. if (ParseFileName(f, &number, &type) && type == kWalFile) {
  156. std::string const file_path = dir_path + "/" + f;
  157. uint64_t file_size;
  158. EXPECT_OK(env->GetFileSize(file_path, &file_size));
  159. dir_size += file_size;
  160. }
  161. }
  162. return dir_size;
  163. }
  164. std::vector<std::uint64_t> ListSpecificFiles(
  165. Env* env, const std::string& path, const FileType expected_file_type) {
  166. std::vector<std::string> files;
  167. std::vector<uint64_t> file_numbers;
  168. uint64_t number;
  169. FileType type;
  170. EXPECT_OK(env->GetChildren(path, &files));
  171. for (size_t i = 0; i < files.size(); ++i) {
  172. if (ParseFileName(files[i], &number, &type)) {
  173. if (type == expected_file_type) {
  174. file_numbers.push_back(number);
  175. }
  176. }
  177. }
  178. return file_numbers;
  179. }
  180. int CountRecords(TransactionLogIterator* iter) {
  181. int count = 0;
  182. SequenceNumber lastSequence = 0;
  183. BatchResult res;
  184. while (iter->Valid()) {
  185. res = iter->GetBatch();
  186. EXPECT_TRUE(res.sequence > lastSequence);
  187. ++count;
  188. lastSequence = res.sequence;
  189. EXPECT_OK(iter->status());
  190. iter->Next();
  191. }
  192. EXPECT_OK(iter->status());
  193. return count;
  194. }
  195. } // anonymous namespace
  196. TEST_F(WalManagerTest, WALArchivalSizeLimit) {
  197. db_options_.WAL_ttl_seconds = 0;
  198. db_options_.WAL_size_limit_MB = 1000;
  199. Init();
  200. // TEST : Create WalManager with huge size limit and no ttl.
  201. // Create some archived files and call PurgeObsoleteWALFiles().
  202. // Count the archived log files that survived.
  203. // Assert that all of them did.
  204. // Change size limit. Re-open WalManager.
  205. // Assert that archive is not greater than WAL_size_limit_MB after
  206. // PurgeObsoleteWALFiles()
  207. // Set ttl and time_to_check_ to small values. Re-open db.
  208. // Assert that there are no archived logs left.
  209. std::string archive_dir = ArchivalDirectory(dbname_);
  210. CreateArchiveLogs(20, 5000);
  211. std::vector<std::uint64_t> log_files =
  212. ListSpecificFiles(env_.get(), archive_dir, kWalFile);
  213. ASSERT_EQ(log_files.size(), 20U);
  214. db_options_.WAL_size_limit_MB = 8;
  215. Reopen();
  216. wal_manager_->PurgeObsoleteWALFiles();
  217. uint64_t archive_size = GetLogDirSize(archive_dir, env_.get());
  218. ASSERT_TRUE(archive_size <= db_options_.WAL_size_limit_MB * 1024 * 1024);
  219. db_options_.WAL_ttl_seconds = 1;
  220. env_->SleepForMicroseconds(2 * 1000 * 1000);
  221. Reopen();
  222. wal_manager_->PurgeObsoleteWALFiles();
  223. log_files = ListSpecificFiles(env_.get(), archive_dir, kWalFile);
  224. ASSERT_TRUE(log_files.empty());
  225. }
  226. TEST_F(WalManagerTest, WALArchivalTtl) {
  227. db_options_.WAL_ttl_seconds = 1000;
  228. Init();
  229. // TEST : Create WalManager with a ttl and no size limit.
  230. // Create some archived log files and call PurgeObsoleteWALFiles().
  231. // Assert that files are not deleted
  232. // Reopen db with small ttl.
  233. // Assert that all archived logs was removed.
  234. std::string archive_dir = ArchivalDirectory(dbname_);
  235. CreateArchiveLogs(20, 5000);
  236. std::vector<uint64_t> log_files =
  237. ListSpecificFiles(env_.get(), archive_dir, kWalFile);
  238. ASSERT_GT(log_files.size(), 0U);
  239. db_options_.WAL_ttl_seconds = 1;
  240. env_->SleepForMicroseconds(3 * 1000 * 1000);
  241. Reopen();
  242. wal_manager_->PurgeObsoleteWALFiles();
  243. log_files = ListSpecificFiles(env_.get(), archive_dir, kWalFile);
  244. ASSERT_TRUE(log_files.empty());
  245. }
  246. TEST_F(WalManagerTest, TransactionLogIteratorMoveOverZeroFiles) {
  247. Init();
  248. RollTheLog(false);
  249. Put("key1", std::string(1024, 'a'));
  250. // Create a zero record WAL file.
  251. RollTheLog(false);
  252. RollTheLog(false);
  253. Put("key2", std::string(1024, 'a'));
  254. auto iter = OpenTransactionLogIter(0);
  255. ASSERT_EQ(2, CountRecords(iter.get()));
  256. }
  257. TEST_F(WalManagerTest, TransactionLogIteratorJustEmptyFile) {
  258. Init();
  259. RollTheLog(false);
  260. auto iter = OpenTransactionLogIter(0);
  261. // Check that an empty iterator is returned
  262. ASSERT_TRUE(!iter->Valid());
  263. }
  264. TEST_F(WalManagerTest, TransactionLogIteratorNewFileWhileScanning) {
  265. Init();
  266. CreateArchiveLogs(2, 100);
  267. auto iter = OpenTransactionLogIter(0);
  268. CreateArchiveLogs(1, 100);
  269. int i = 0;
  270. for (; iter->Valid(); iter->Next()) {
  271. i++;
  272. }
  273. ASSERT_EQ(i, 200);
  274. // A new log file was added after the iterator was created.
  275. // TryAgain indicates a new iterator is needed to fetch the new data
  276. ASSERT_TRUE(iter->status().IsTryAgain());
  277. iter = OpenTransactionLogIter(0);
  278. i = 0;
  279. for (; iter->Valid(); iter->Next()) {
  280. i++;
  281. }
  282. ASSERT_EQ(i, 300);
  283. ASSERT_TRUE(iter->status().ok());
  284. }
  285. } // namespace ROCKSDB_NAMESPACE
  286. int main(int argc, char** argv) {
  287. ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
  288. ::testing::InitGoogleTest(&argc, argv);
  289. return RUN_ALL_TESTS();
  290. }