| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185 | //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.//  This source code is licensed under both the GPLv2 (found in the//  COPYING file in the root directory) and Apache 2.0 License//  (found in the LICENSE.Apache file in the root directory).//// Copyright (c) 2011 The LevelDB Authors. All rights reserved.// Use of this source code is governed by a BSD-style license that can be// found in the LICENSE file. See the AUTHORS file for names of contributors.//// Logger implementation that can be shared by all environments// where enough posix functionality is available.#pragma once#include <algorithm>#include <stdio.h>#include "port/sys_time.h"#include <time.h>#include <fcntl.h>#ifdef OS_LINUX#ifndef FALLOC_FL_KEEP_SIZE#include <linux/falloc.h>#endif#endif#include <atomic>#include "env/io_posix.h"#include "monitoring/iostats_context_imp.h"#include "rocksdb/env.h"#include "test_util/sync_point.h"namespace ROCKSDB_NAMESPACE {class PosixLogger : public Logger { private:  Status PosixCloseHelper() {    int ret;    ret = fclose(file_);    if (ret) {      return IOError("Unable to close log file", "", ret);    }    return Status::OK();  }  FILE* file_;  uint64_t (*gettid_)();  // Return the thread id for the current thread  std::atomic_size_t log_size_;  int fd_;  const static uint64_t flush_every_seconds_ = 5;  std::atomic_uint_fast64_t last_flush_micros_;  Env* env_;  std::atomic<bool> flush_pending_; protected:  virtual Status CloseImpl() override { return PosixCloseHelper(); } public:  PosixLogger(FILE* f, uint64_t (*gettid)(), Env* env,              const InfoLogLevel log_level = InfoLogLevel::ERROR_LEVEL)      : Logger(log_level),        file_(f),        gettid_(gettid),        log_size_(0),        fd_(fileno(f)),        last_flush_micros_(0),        env_(env),        flush_pending_(false) {}  virtual ~PosixLogger() {    if (!closed_) {      closed_ = true;      PosixCloseHelper();    }  }  virtual void Flush() override {    TEST_SYNC_POINT("PosixLogger::Flush:Begin1");    TEST_SYNC_POINT("PosixLogger::Flush:Begin2");    if (flush_pending_) {      flush_pending_ = false;      fflush(file_);    }    last_flush_micros_ = env_->NowMicros();  }  using Logger::Logv;  virtual void Logv(const char* format, va_list ap) override {    IOSTATS_TIMER_GUARD(logger_nanos);    const uint64_t thread_id = (*gettid_)();    // We try twice: the first time with a fixed-size stack allocated buffer,    // and the second time with a much larger dynamically allocated buffer.    char buffer[500];    for (int iter = 0; iter < 2; iter++) {      char* base;      int bufsize;      if (iter == 0) {        bufsize = sizeof(buffer);        base = buffer;      } else {        bufsize = 65536;        base = new char[bufsize];      }      char* p = base;      char* limit = base + bufsize;      struct timeval now_tv;      gettimeofday(&now_tv, nullptr);      const time_t seconds = now_tv.tv_sec;      struct tm t;      localtime_r(&seconds, &t);      p += snprintf(p, limit - p,                    "%04d/%02d/%02d-%02d:%02d:%02d.%06d %llx ",                    t.tm_year + 1900,                    t.tm_mon + 1,                    t.tm_mday,                    t.tm_hour,                    t.tm_min,                    t.tm_sec,                    static_cast<int>(now_tv.tv_usec),                    static_cast<long long unsigned int>(thread_id));      // Print the message      if (p < limit) {        va_list backup_ap;        va_copy(backup_ap, ap);        p += vsnprintf(p, limit - p, format, backup_ap);        va_end(backup_ap);      }      // Truncate to available space if necessary      if (p >= limit) {        if (iter == 0) {          continue;       // Try again with larger buffer        } else {          p = limit - 1;        }      }      // Add newline if necessary      if (p == base || p[-1] != '\n') {        *p++ = '\n';      }      assert(p <= limit);      const size_t write_size = p - base;#ifdef ROCKSDB_FALLOCATE_PRESENT      const int kDebugLogChunkSize = 128 * 1024;      // If this write would cross a boundary of kDebugLogChunkSize      // space, pre-allocate more space to avoid overly large      // allocations from filesystem allocsize options.      const size_t log_size = log_size_;      const size_t last_allocation_chunk =        ((kDebugLogChunkSize - 1 + log_size) / kDebugLogChunkSize);      const size_t desired_allocation_chunk =        ((kDebugLogChunkSize - 1 + log_size + write_size) /           kDebugLogChunkSize);      if (last_allocation_chunk != desired_allocation_chunk) {        fallocate(            fd_, FALLOC_FL_KEEP_SIZE, 0,            static_cast<off_t>(desired_allocation_chunk * kDebugLogChunkSize));      }#endif      size_t sz = fwrite(base, 1, write_size, file_);      flush_pending_ = true;      if (sz > 0) {        log_size_ += write_size;      }      uint64_t now_micros = static_cast<uint64_t>(now_tv.tv_sec) * 1000000 +        now_tv.tv_usec;      if (now_micros - last_flush_micros_ >= flush_every_seconds_ * 1000000) {        Flush();      }      if (base != buffer) {        delete[] base;      }      break;    }  }  size_t GetLogFileSize() const override { return log_size_; }};}  // namespace ROCKSDB_NAMESPACE
 |