RocksDB + SQLite 混合架构实现示例
1. 头文件修改(soft_bus_core.h)
#ifndef SOFT_BUS_CORE_H
#define SOFT_BUS_CORE_H
#include <QObject>
#include <QMap>
#include <QString>
#include <QJsonObject>
#include <QList>
#include <QSqlDatabase>
#include <QSqlQuery>
#include <rocksdb/db.h>
// 设备信息结构(保持不变)
struct DeviceInfo {
int id;
QString portname;
QString name;
QString type;
int address;
QString protocol;
QString protocol_detail;
QJsonObject properties;
DeviceInfo() : id(0) {}
// ... 构造函数
};
// 软总线消息结构(保持不变)
struct BusMessage {
QString id;
QString source;
QString destination;
QJsonObject payload;
qint64 timestamp;
BusMessage() : timestamp(0) {}
// ... 构造函数
};
class SoftBusCore : public QObject {
Q_OBJECT
public:
explicit SoftBusCore(QObject *parent = nullptr);
~SoftBusCore();
// 初始化数据库(新增 SQLite 参数)
bool initDB(const QString &rawDbPath, const QString &busDbPath,
const QString &indexDbPath = "");
// 设备管理(保持不变)
void registerDevice(const DeviceInfo &device);
void unregisterDevice(int deviceId);
void updateDevice(const DeviceInfo &device);
DeviceInfo getDeviceInfo(int deviceId) const;
DeviceInfo getDeviceInfoByPortname(const QString &portname) const;
QList<DeviceInfo> getAllDevices() const;
// 消息路由(保持不变)
void routeMessage(const BusMessage &message);
// 数据存储(新增扩展字段参数)
void storeRawData(int deviceId, const QByteArray &data,
const QString &creator = "",
int dataLevel = 0,
const QString &protocolType = "");
void storeBusMessage(const BusMessage &message,
const QString &creator = "",
int dataLevel = 0);
// 数据查询(新增复杂查询接口)
QList<QByteArray> queryRawData(int deviceId, qint64 startTime, qint64 endTime);
// 新增:支持多字段查询
QList<QByteArray> queryRawDataAdvanced(int deviceId,
qint64 startTime = 0,
qint64 endTime = 0,
const QString &creator = "",
int minLevel = -1,
int maxLevel = -1,
const QString &protocolType = "");
QList<BusMessage> queryBusMessages(const QString &source,
const QString &destination,
qint64 startTime, qint64 endTime);
// 新增:支持多字段查询
QList<BusMessage> queryBusMessagesAdvanced(const QString &source = "",
const QString &destination = "",
qint64 startTime = 0,
qint64 endTime = 0,
const QString &creator = "",
int minLevel = -1,
const QString &protocolType = "");
// 数据库操作
void closeDB();
signals:
void deviceRegistered(const DeviceInfo &device);
void deviceUpdate(const DeviceInfo &device);
void deviceUnregistered(int deviceId);
void messageReceived(const BusMessage &message);
void rawDataStored(int deviceId, const QByteArray &data);
void busMessageStored(const BusMessage &message);
void databaseError(const QString &error);
private:
// RocksDB 数据库(保持原有)
rocksdb::DB* m_rawDb;
rocksdb::DB* m_busDb;
// SQLite 索引数据库(新增)
QSqlDatabase m_indexDb;
bool m_indexDbInitialized;
QMap<int, DeviceInfo> m_devices;
QMap<QString, int> m_portnameToId;
bool m_dbInitialized;
// KV键生成(保持原有)
std::string generateRawKey(int deviceId, qint64 timestamp);
std::string generateBusKey(const BusMessage &message);
// SQLite 初始化(新增)
bool initIndexDatabase(const QString &dbPath);
bool createIndexTables();
// 索引写入(新增)
bool storeRawDataIndex(int deviceId, const QString &rocksdbKey,
qint64 timestamp, int dataSize,
const QString &creator, int dataLevel,
const QString &protocolType);
bool storeBusMessageIndex(const BusMessage &message, const QString &rocksdbKey,
int payloadSize, const QString &creator, int dataLevel);
// 工具函数
bool openDatabase(rocksdb::DB** db, const std::string& path);
bool putData(rocksdb::DB* db, const std::string& key, const std::string& value);
std::string getData(rocksdb::DB* db, const std::string& key);
};
#endif // SOFT_BUS_CORE_H
2. 实现文件修改(soft_bus_core.cpp)
2.1 初始化数据库
bool SoftBusCore::initDB(const QString &rawDbPath, const QString &busDbPath,
const QString &indexDbPath) {
// 关闭现有数据库
closeDB();
// 1. 初始化 RocksDB(保持原有逻辑)
rocksdb::Options options;
options.create_if_missing = true;
options.write_buffer_size = 64 * 1024 * 1024; // 64MB 写缓冲区
options.max_write_buffer_number = 3;
options.compression = rocksdb::kSnappyCompression; // 压缩存储
rocksdb::Status status = rocksdb::DB::Open(options, rawDbPath.toStdString(), &m_rawDb);
if (!status.ok()) {
qCritical() << "Failed to open raw database:" << status.ToString().c_str();
return false;
}
status = rocksdb::DB::Open(options, busDbPath.toStdString(), &m_busDb);
if (!status.ok()) {
qCritical() << "Failed to open bus database:" << status.ToString().c_str();
delete m_rawDb;
m_rawDb = nullptr;
return false;
}
m_dbInitialized = true;
// 2. 初始化 SQLite 索引数据库(新增)
if (!indexDbPath.isEmpty()) {
if (!initIndexDatabase(indexDbPath)) {
qWarning() << "Failed to initialize index database, continuing without index";
m_indexDbInitialized = false;
} else {
m_indexDbInitialized = true;
}
} else {
m_indexDbInitialized = false;
}
return true;
}
bool SoftBusCore::initIndexDatabase(const QString &dbPath) {
// 关闭现有连接
if (m_indexDb.isOpen()) {
m_indexDb.close();
}
// 创建 SQLite 数据库连接
m_indexDb = QSqlDatabase::addDatabase("QSQLITE", "soft_bus_index");
m_indexDb.setDatabaseName(dbPath);
if (!m_indexDb.open()) {
qCritical() << "Failed to open index database:" << m_indexDb.lastError().text();
return false;
}
// 优化 SQLite 性能
QSqlQuery query(m_indexDb);
query.exec("PRAGMA journal_mode=WAL");
query.exec("PRAGMA synchronous=NORMAL");
query.exec("PRAGMA cache_size=10000");
query.exec("PRAGMA temp_store=MEMORY");
// 创建表结构
return createIndexTables();
}
bool SoftBusCore::createIndexTables() {
QSqlQuery query(m_indexDb);
// 创建原始数据索引表
query.exec(R"(
CREATE TABLE IF NOT EXISTS raw_data_index (
id INTEGER PRIMARY KEY AUTOINCREMENT,
device_id INTEGER NOT NULL,
rocksdb_key TEXT NOT NULL,
data_size INTEGER,
create_time INTEGER NOT NULL,
update_time INTEGER,
creator TEXT,
data_level INTEGER DEFAULT 0,
protocol_type TEXT,
is_valid BOOLEAN DEFAULT 1,
metadata TEXT
)
)");
if (query.lastError().isValid()) {
qCritical() << "Failed to create raw_data_index table:" << query.lastError().text();
return false;
}
// 创建索引
query.exec("CREATE INDEX IF NOT EXISTS idx_raw_data_device_time ON raw_data_index(device_id, create_time)");
query.exec("CREATE INDEX IF NOT EXISTS idx_raw_data_creator ON raw_data_index(creator)");
query.exec("CREATE INDEX IF NOT EXISTS idx_raw_data_level ON raw_data_index(data_level)");
query.exec("CREATE INDEX IF NOT EXISTS idx_raw_data_protocol ON raw_data_index(protocol_type)");
query.exec("CREATE INDEX IF NOT EXISTS idx_raw_data_time ON raw_data_index(create_time)");
// 创建总线消息索引表
query.exec(R"(
CREATE TABLE IF NOT EXISTS bus_messages_index (
id INTEGER PRIMARY KEY AUTOINCREMENT,
message_id TEXT UNIQUE NOT NULL,
source TEXT NOT NULL,
destination TEXT NOT NULL,
rocksdb_key TEXT NOT NULL,
payload_size INTEGER,
create_time INTEGER NOT NULL,
update_time INTEGER,
creator TEXT,
data_level INTEGER DEFAULT 0,
protocol_type TEXT,
is_valid BOOLEAN DEFAULT 1,
metadata TEXT
)
)");
if (query.lastError().isValid()) {
qCritical() << "Failed to create bus_messages_index table:" << query.lastError().text();
return false;
}
// 创建索引
query.exec("CREATE INDEX IF NOT EXISTS idx_bus_msg_source_dest ON bus_messages_index(source, destination)");
query.exec("CREATE INDEX IF NOT EXISTS idx_bus_msg_time ON bus_messages_index(create_time)");
query.exec("CREATE INDEX IF NOT EXISTS idx_bus_msg_creator ON bus_messages_index(creator)");
query.exec("CREATE INDEX IF NOT EXISTS idx_bus_msg_level ON bus_messages_index(data_level)");
query.exec("CREATE INDEX IF NOT EXISTS idx_bus_msg_protocol ON bus_messages_index(protocol_type)");
return true;
}
2.2 存储原始数据(双写机制)
void SoftBusCore::storeRawData(int deviceId, const QByteArray &data,
const QString &creator,
int dataLevel,
const QString &protocolType) {
if (!m_dbInitialized || !m_rawDb) {
qWarning() << "Database not initialized for storing raw data";
return;
}
// 1. 生成 RocksDB Key
qint64 timestamp = QDateTime::currentMSecsSinceEpoch();
std::string rocksdbKey = generateRawKey(deviceId, timestamp);
std::string value(data.constData(), data.size());
// 2. 写入 RocksDB(实际数据)
rocksdb::Status status = m_rawDb->Put(rocksdb::WriteOptions(), rocksdbKey, value);
if (!status.ok()) {
qCritical() << "Failed to store raw data to RocksDB:" << status.ToString().c_str();
emit databaseError(QString::fromStdString(status.ToString()));
return;
}
// 3. 写入 SQLite 索引(如果已初始化)
if (m_indexDbInitialized && m_indexDb.isOpen()) {
bool indexSuccess = storeRawDataIndex(deviceId,
QString::fromStdString(rocksdbKey),
timestamp,
data.size(),
creator.isEmpty() ? "system" : creator,
dataLevel,
protocolType);
if (!indexSuccess) {
qWarning() << "Failed to store raw data index, but data is stored in RocksDB";
// 数据已写入 RocksDB,即使索引失败也不影响主流程
}
}
emit rawDataStored(deviceId, data);
}
bool SoftBusCore::storeRawDataIndex(int deviceId, const QString &rocksdbKey,
qint64 timestamp, int dataSize,
const QString &creator, int dataLevel,
const QString &protocolType) {
QSqlQuery query(m_indexDb);
query.prepare(R"(
INSERT INTO raw_data_index
(device_id, rocksdb_key, data_size, create_time, update_time, creator, data_level, protocol_type)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
)");
query.addBindValue(deviceId);
query.addBindValue(rocksdbKey);
query.addBindValue(dataSize);
query.addBindValue(timestamp);
query.addBindValue(timestamp);
query.addBindValue(creator);
query.addBindValue(dataLevel);
query.addBindValue(protocolType);
if (!query.exec()) {
qWarning() << "Failed to store raw data index:" << query.lastError().text();
return false;
}
return true;
}
2.3 存储总线消息(双写机制)
void SoftBusCore::storeBusMessage(const BusMessage &message,
const QString &creator,
int dataLevel) {
if (!m_dbInitialized || !m_busDb) {
qWarning() << "Database not initialized for storing bus message";
return;
}
// 1. 序列化消息
QJsonObject json;
json["id"] = message.id;
json["source"] = message.source;
json["destination"] = message.destination;
json["payload"] = message.payload;
json["timestamp"] = message.timestamp;
QJsonDocument doc(json);
QByteArray data = doc.toJson(QJsonDocument::Compact);
// 2. 生成 RocksDB Key
std::string rocksdbKey = generateBusKey(message);
std::string value(data.constData(), data.size());
// 3. 写入 RocksDB(实际数据)
rocksdb::Status status = m_busDb->Put(rocksdb::WriteOptions(), rocksdbKey, value);
if (!status.ok()) {
qCritical() << "Failed to store bus message to RocksDB:" << status.ToString().c_str();
emit databaseError(QString::fromStdString(status.ToString()));
return;
}
// 4. 写入 SQLite 索引(如果已初始化)
if (m_indexDbInitialized && m_indexDb.isOpen()) {
QString protocolType = message.payload["protocol"].toString();
bool indexSuccess = storeBusMessageIndex(message,
QString::fromStdString(rocksdbKey),
data.size(),
creator.isEmpty() ? "system" : creator,
dataLevel);
if (!indexSuccess) {
qWarning() << "Failed to store bus message index, but data is stored in RocksDB";
}
}
emit busMessageStored(message);
}
bool SoftBusCore::storeBusMessageIndex(const BusMessage &message,
const QString &rocksdbKey,
int payloadSize,
const QString &creator,
int dataLevel) {
QSqlQuery query(m_indexDb);
query.prepare(R"(
INSERT OR REPLACE INTO bus_messages_index
(message_id, source, destination, rocksdb_key, payload_size, create_time, update_time, creator, data_level, protocol_type)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
)");
qint64 now = QDateTime::currentMSecsSinceEpoch();
query.addBindValue(message.id);
query.addBindValue(message.source);
query.addBindValue(message.destination);
query.addBindValue(rocksdbKey);
query.addBindValue(payloadSize);
query.addBindValue(message.timestamp ? message.timestamp : now);
query.addBindValue(now);
query.addBindValue(creator);
query.addBindValue(dataLevel);
query.addBindValue(message.payload["protocol"].toString());
if (!query.exec()) {
qWarning() << "Failed to store bus message index:" << query.lastError().text();
return false;
}
return true;
}
2.4 复杂查询实现(使用 SQLite 索引)
QList<QByteArray> SoftBusCore::queryRawDataAdvanced(int deviceId,
qint64 startTime,
qint64 endTime,
const QString &creator,
int minLevel,
int maxLevel,
const QString &protocolType) {
QList<QByteArray> results;
// 如果没有索引数据库,降级到简单查询
if (!m_indexDbInitialized || !m_indexDb.isOpen()) {
if (startTime > 0 && endTime > 0) {
return queryRawData(deviceId, startTime, endTime);
}
return results;
}
// 1. 查询 SQLite 索引,获取匹配的 rocksdb_key 列表
QSqlQuery query(m_indexDb);
QString sql = "SELECT rocksdb_key FROM raw_data_index WHERE device_id = ? AND is_valid = 1";
QVariantList bindValues;
bindValues << deviceId;
if (startTime > 0) {
sql += " AND create_time >= ?";
bindValues << startTime;
}
if (endTime > 0) {
sql += " AND create_time <= ?";
bindValues << endTime;
}
if (!creator.isEmpty()) {
sql += " AND creator = ?";
bindValues << creator;
}
if (minLevel >= 0) {
sql += " AND data_level >= ?";
bindValues << minLevel;
}
if (maxLevel >= 0) {
sql += " AND data_level <= ?";
bindValues << maxLevel;
}
if (!protocolType.isEmpty()) {
sql += " AND protocol_type = ?";
bindValues << protocolType;
}
sql += " ORDER BY create_time ASC";
query.prepare(sql);
for (const QVariant &value : bindValues) {
query.addBindValue(value);
}
if (!query.exec()) {
qWarning() << "Index query failed:" << query.lastError().text();
return results;
}
// 2. 批量从 RocksDB 读取实际数据
QStringList rocksdbKeys;
while (query.next()) {
rocksdbKeys.append(query.value("rocksdb_key").toString());
}
// 3. 从 RocksDB 批量读取数据
for (const QString &key : rocksdbKeys) {
std::string value = getData(m_rawDb, key.toStdString());
if (!value.empty()) {
results.append(QByteArray(value.data(), value.size()));
}
}
return results;
}
QList<BusMessage> SoftBusCore::queryBusMessagesAdvanced(const QString &source,
const QString &destination,
qint64 startTime,
qint64 endTime,
const QString &creator,
int minLevel,
const QString &protocolType) {
QList<BusMessage> results;
// 如果没有索引数据库,降级到简单查询
if (!m_indexDbInitialized || !m_indexDb.isOpen()) {
if (!source.isEmpty() && !destination.isEmpty() && startTime > 0 && endTime > 0) {
return queryBusMessages(source, destination, startTime, endTime);
}
return results;
}
// 1. 查询 SQLite 索引
QSqlQuery query(m_indexDb);
QString sql = "SELECT rocksdb_key, message_id, source, destination, create_time FROM bus_messages_index WHERE is_valid = 1";
QVariantList bindValues;
if (!source.isEmpty()) {
sql += " AND source = ?";
bindValues << source;
}
if (!destination.isEmpty()) {
sql += " AND destination = ?";
bindValues << destination;
}
if (startTime > 0) {
sql += " AND create_time >= ?";
bindValues << startTime;
}
if (endTime > 0) {
sql += " AND create_time <= ?";
bindValues << endTime;
}
if (!creator.isEmpty()) {
sql += " AND creator = ?";
bindValues << creator;
}
if (minLevel >= 0) {
sql += " AND data_level >= ?";
bindValues << minLevel;
}
if (!protocolType.isEmpty()) {
sql += " AND protocol_type = ?";
bindValues << protocolType;
}
sql += " ORDER BY create_time DESC";
query.prepare(sql);
for (const QVariant &value : bindValues) {
query.addBindValue(value);
}
if (!query.exec()) {
qWarning() << "Index query failed:" << query.lastError().text();
return results;
}
// 2. 从 RocksDB 读取实际数据
while (query.next()) {
QString rocksdbKey = query.value("rocksdb_key").toString();
std::string value = getData(m_busDb, rocksdbKey.toStdString());
if (!value.empty()) {
QByteArray data(value.data(), value.size());
QJsonDocument doc = QJsonDocument::fromJson(data);
if (doc.isObject()) {
QJsonObject json = doc.object();
BusMessage message;
message.id = json["id"].toString();
message.source = json["source"].toString();
message.destination = json["destination"].toString();
message.payload = json["payload"].toObject();
message.timestamp = static_cast<qint64>(json["timestamp"].toDouble());
results.append(message);
}
}
}
return results;
}
2.5 关闭数据库
void SoftBusCore::closeDB() {
// 关闭 RocksDB
if (m_rawDb) {
delete m_rawDb;
m_rawDb = nullptr;
}
if (m_busDb) {
delete m_busDb;
m_busDb = nullptr;
}
// 关闭 SQLite
if (m_indexDb.isOpen()) {
m_indexDb.close();
}
m_dbInitialized = false;
m_indexDbInitialized = false;
}
3. 使用示例
// 初始化时指定索引数据库路径
QString rawDbPath = "data/raw_data.db";
QString busDbPath = "data/bus_messages.db";
QString indexDbPath = "data/index.db"; // SQLite 索引数据库
m_busCore->initDB(rawDbPath, busDbPath, indexDbPath);
// 存储数据时指定扩展字段
m_busCore->storeRawData(deviceId, data, "张三", 5, "modbus");
// 复杂查询
QList<QByteArray> results = m_busCore->queryRawDataAdvanced(
deviceId,
0, // startTime (0 表示不限制)
0, // endTime (0 表示不限制)
"张三", // creator
3, // minLevel
7, // maxLevel
"modbus" // protocolType
);
4. CMakeLists.txt 修改
# 保持 RocksDB 依赖
SET(ROCKSDB_DIR "${PROJECT_SOURCE_DIR}/rocksdb")
include_directories(${ROCKSDB_DIR}/include)
target_link_libraries(soft_bus ${ROCKSDB_DIR}/librocksdb.a)
# 添加 Qt Sql 模块(SQLite 支持)
find_package(Qt6 REQUIRED COMPONENTS Core Sql)
target_link_libraries(soft_bus
Qt6::Core
Qt6::Sql # 新增
# ... 其他库
)
优势总结
- ✅ 保持高性能写入:RocksDB 继续负责高频写入
- ✅ 支持复杂查询:SQLite 索引层支持多字段查询
- ✅ 向后兼容:如果 SQLite 初始化失败,自动降级到原有 RocksDB 查询
- ✅ 渐进式迁移:可以先启用索引,再逐步迁移查询逻辑
- ✅ 数据安全:即使索引写入失败,实际数据已安全存储在 RocksDB