# RocksDB + SQLite 混合架构实现示例 ## 1. 头文件修改(soft_bus_core.h) ```cpp #ifndef SOFT_BUS_CORE_H #define SOFT_BUS_CORE_H #include #include #include #include #include #include #include #include // 设备信息结构(保持不变) struct DeviceInfo { int id; QString portname; QString name; QString type; int address; QString protocol; QString protocol_detail; QJsonObject properties; DeviceInfo() : id(0) {} // ... 构造函数 }; // 软总线消息结构(保持不变) struct BusMessage { QString id; QString source; QString destination; QJsonObject payload; qint64 timestamp; BusMessage() : timestamp(0) {} // ... 构造函数 }; class SoftBusCore : public QObject { Q_OBJECT public: explicit SoftBusCore(QObject *parent = nullptr); ~SoftBusCore(); // 初始化数据库(新增 SQLite 参数) bool initDB(const QString &rawDbPath, const QString &busDbPath, const QString &indexDbPath = ""); // 设备管理(保持不变) void registerDevice(const DeviceInfo &device); void unregisterDevice(int deviceId); void updateDevice(const DeviceInfo &device); DeviceInfo getDeviceInfo(int deviceId) const; DeviceInfo getDeviceInfoByPortname(const QString &portname) const; QList getAllDevices() const; // 消息路由(保持不变) void routeMessage(const BusMessage &message); // 数据存储(新增扩展字段参数) void storeRawData(int deviceId, const QByteArray &data, const QString &creator = "", int dataLevel = 0, const QString &protocolType = ""); void storeBusMessage(const BusMessage &message, const QString &creator = "", int dataLevel = 0); // 数据查询(新增复杂查询接口) QList queryRawData(int deviceId, qint64 startTime, qint64 endTime); // 新增:支持多字段查询 QList queryRawDataAdvanced(int deviceId, qint64 startTime = 0, qint64 endTime = 0, const QString &creator = "", int minLevel = -1, int maxLevel = -1, const QString &protocolType = ""); QList queryBusMessages(const QString &source, const QString &destination, qint64 startTime, qint64 endTime); // 新增:支持多字段查询 QList queryBusMessagesAdvanced(const QString &source = "", const QString &destination = "", qint64 startTime = 0, qint64 endTime = 0, const QString &creator = "", int minLevel = -1, const QString &protocolType = ""); // 数据库操作 void closeDB(); signals: void deviceRegistered(const DeviceInfo &device); void deviceUpdate(const DeviceInfo &device); void deviceUnregistered(int deviceId); void messageReceived(const BusMessage &message); void rawDataStored(int deviceId, const QByteArray &data); void busMessageStored(const BusMessage &message); void databaseError(const QString &error); private: // RocksDB 数据库(保持原有) rocksdb::DB* m_rawDb; rocksdb::DB* m_busDb; // SQLite 索引数据库(新增) QSqlDatabase m_indexDb; bool m_indexDbInitialized; QMap m_devices; QMap m_portnameToId; bool m_dbInitialized; // KV键生成(保持原有) std::string generateRawKey(int deviceId, qint64 timestamp); std::string generateBusKey(const BusMessage &message); // SQLite 初始化(新增) bool initIndexDatabase(const QString &dbPath); bool createIndexTables(); // 索引写入(新增) bool storeRawDataIndex(int deviceId, const QString &rocksdbKey, qint64 timestamp, int dataSize, const QString &creator, int dataLevel, const QString &protocolType); bool storeBusMessageIndex(const BusMessage &message, const QString &rocksdbKey, int payloadSize, const QString &creator, int dataLevel); // 工具函数 bool openDatabase(rocksdb::DB** db, const std::string& path); bool putData(rocksdb::DB* db, const std::string& key, const std::string& value); std::string getData(rocksdb::DB* db, const std::string& key); }; #endif // SOFT_BUS_CORE_H ``` ## 2. 实现文件修改(soft_bus_core.cpp) ### 2.1 初始化数据库 ```cpp bool SoftBusCore::initDB(const QString &rawDbPath, const QString &busDbPath, const QString &indexDbPath) { // 关闭现有数据库 closeDB(); // 1. 初始化 RocksDB(保持原有逻辑) rocksdb::Options options; options.create_if_missing = true; options.write_buffer_size = 64 * 1024 * 1024; // 64MB 写缓冲区 options.max_write_buffer_number = 3; options.compression = rocksdb::kSnappyCompression; // 压缩存储 rocksdb::Status status = rocksdb::DB::Open(options, rawDbPath.toStdString(), &m_rawDb); if (!status.ok()) { qCritical() << "Failed to open raw database:" << status.ToString().c_str(); return false; } status = rocksdb::DB::Open(options, busDbPath.toStdString(), &m_busDb); if (!status.ok()) { qCritical() << "Failed to open bus database:" << status.ToString().c_str(); delete m_rawDb; m_rawDb = nullptr; return false; } m_dbInitialized = true; // 2. 初始化 SQLite 索引数据库(新增) if (!indexDbPath.isEmpty()) { if (!initIndexDatabase(indexDbPath)) { qWarning() << "Failed to initialize index database, continuing without index"; m_indexDbInitialized = false; } else { m_indexDbInitialized = true; } } else { m_indexDbInitialized = false; } return true; } bool SoftBusCore::initIndexDatabase(const QString &dbPath) { // 关闭现有连接 if (m_indexDb.isOpen()) { m_indexDb.close(); } // 创建 SQLite 数据库连接 m_indexDb = QSqlDatabase::addDatabase("QSQLITE", "soft_bus_index"); m_indexDb.setDatabaseName(dbPath); if (!m_indexDb.open()) { qCritical() << "Failed to open index database:" << m_indexDb.lastError().text(); return false; } // 优化 SQLite 性能 QSqlQuery query(m_indexDb); query.exec("PRAGMA journal_mode=WAL"); query.exec("PRAGMA synchronous=NORMAL"); query.exec("PRAGMA cache_size=10000"); query.exec("PRAGMA temp_store=MEMORY"); // 创建表结构 return createIndexTables(); } bool SoftBusCore::createIndexTables() { QSqlQuery query(m_indexDb); // 创建原始数据索引表 query.exec(R"( CREATE TABLE IF NOT EXISTS raw_data_index ( id INTEGER PRIMARY KEY AUTOINCREMENT, device_id INTEGER NOT NULL, rocksdb_key TEXT NOT NULL, data_size INTEGER, create_time INTEGER NOT NULL, update_time INTEGER, creator TEXT, data_level INTEGER DEFAULT 0, protocol_type TEXT, is_valid BOOLEAN DEFAULT 1, metadata TEXT ) )"); if (query.lastError().isValid()) { qCritical() << "Failed to create raw_data_index table:" << query.lastError().text(); return false; } // 创建索引 query.exec("CREATE INDEX IF NOT EXISTS idx_raw_data_device_time ON raw_data_index(device_id, create_time)"); query.exec("CREATE INDEX IF NOT EXISTS idx_raw_data_creator ON raw_data_index(creator)"); query.exec("CREATE INDEX IF NOT EXISTS idx_raw_data_level ON raw_data_index(data_level)"); query.exec("CREATE INDEX IF NOT EXISTS idx_raw_data_protocol ON raw_data_index(protocol_type)"); query.exec("CREATE INDEX IF NOT EXISTS idx_raw_data_time ON raw_data_index(create_time)"); // 创建总线消息索引表 query.exec(R"( CREATE TABLE IF NOT EXISTS bus_messages_index ( id INTEGER PRIMARY KEY AUTOINCREMENT, message_id TEXT UNIQUE NOT NULL, source TEXT NOT NULL, destination TEXT NOT NULL, rocksdb_key TEXT NOT NULL, payload_size INTEGER, create_time INTEGER NOT NULL, update_time INTEGER, creator TEXT, data_level INTEGER DEFAULT 0, protocol_type TEXT, is_valid BOOLEAN DEFAULT 1, metadata TEXT ) )"); if (query.lastError().isValid()) { qCritical() << "Failed to create bus_messages_index table:" << query.lastError().text(); return false; } // 创建索引 query.exec("CREATE INDEX IF NOT EXISTS idx_bus_msg_source_dest ON bus_messages_index(source, destination)"); query.exec("CREATE INDEX IF NOT EXISTS idx_bus_msg_time ON bus_messages_index(create_time)"); query.exec("CREATE INDEX IF NOT EXISTS idx_bus_msg_creator ON bus_messages_index(creator)"); query.exec("CREATE INDEX IF NOT EXISTS idx_bus_msg_level ON bus_messages_index(data_level)"); query.exec("CREATE INDEX IF NOT EXISTS idx_bus_msg_protocol ON bus_messages_index(protocol_type)"); return true; } ``` ### 2.2 存储原始数据(双写机制) ```cpp void SoftBusCore::storeRawData(int deviceId, const QByteArray &data, const QString &creator, int dataLevel, const QString &protocolType) { if (!m_dbInitialized || !m_rawDb) { qWarning() << "Database not initialized for storing raw data"; return; } // 1. 生成 RocksDB Key qint64 timestamp = QDateTime::currentMSecsSinceEpoch(); std::string rocksdbKey = generateRawKey(deviceId, timestamp); std::string value(data.constData(), data.size()); // 2. 写入 RocksDB(实际数据) rocksdb::Status status = m_rawDb->Put(rocksdb::WriteOptions(), rocksdbKey, value); if (!status.ok()) { qCritical() << "Failed to store raw data to RocksDB:" << status.ToString().c_str(); emit databaseError(QString::fromStdString(status.ToString())); return; } // 3. 写入 SQLite 索引(如果已初始化) if (m_indexDbInitialized && m_indexDb.isOpen()) { bool indexSuccess = storeRawDataIndex(deviceId, QString::fromStdString(rocksdbKey), timestamp, data.size(), creator.isEmpty() ? "system" : creator, dataLevel, protocolType); if (!indexSuccess) { qWarning() << "Failed to store raw data index, but data is stored in RocksDB"; // 数据已写入 RocksDB,即使索引失败也不影响主流程 } } emit rawDataStored(deviceId, data); } bool SoftBusCore::storeRawDataIndex(int deviceId, const QString &rocksdbKey, qint64 timestamp, int dataSize, const QString &creator, int dataLevel, const QString &protocolType) { QSqlQuery query(m_indexDb); query.prepare(R"( INSERT INTO raw_data_index (device_id, rocksdb_key, data_size, create_time, update_time, creator, data_level, protocol_type) VALUES (?, ?, ?, ?, ?, ?, ?, ?) )"); query.addBindValue(deviceId); query.addBindValue(rocksdbKey); query.addBindValue(dataSize); query.addBindValue(timestamp); query.addBindValue(timestamp); query.addBindValue(creator); query.addBindValue(dataLevel); query.addBindValue(protocolType); if (!query.exec()) { qWarning() << "Failed to store raw data index:" << query.lastError().text(); return false; } return true; } ``` ### 2.3 存储总线消息(双写机制) ```cpp void SoftBusCore::storeBusMessage(const BusMessage &message, const QString &creator, int dataLevel) { if (!m_dbInitialized || !m_busDb) { qWarning() << "Database not initialized for storing bus message"; return; } // 1. 序列化消息 QJsonObject json; json["id"] = message.id; json["source"] = message.source; json["destination"] = message.destination; json["payload"] = message.payload; json["timestamp"] = message.timestamp; QJsonDocument doc(json); QByteArray data = doc.toJson(QJsonDocument::Compact); // 2. 生成 RocksDB Key std::string rocksdbKey = generateBusKey(message); std::string value(data.constData(), data.size()); // 3. 写入 RocksDB(实际数据) rocksdb::Status status = m_busDb->Put(rocksdb::WriteOptions(), rocksdbKey, value); if (!status.ok()) { qCritical() << "Failed to store bus message to RocksDB:" << status.ToString().c_str(); emit databaseError(QString::fromStdString(status.ToString())); return; } // 4. 写入 SQLite 索引(如果已初始化) if (m_indexDbInitialized && m_indexDb.isOpen()) { QString protocolType = message.payload["protocol"].toString(); bool indexSuccess = storeBusMessageIndex(message, QString::fromStdString(rocksdbKey), data.size(), creator.isEmpty() ? "system" : creator, dataLevel); if (!indexSuccess) { qWarning() << "Failed to store bus message index, but data is stored in RocksDB"; } } emit busMessageStored(message); } bool SoftBusCore::storeBusMessageIndex(const BusMessage &message, const QString &rocksdbKey, int payloadSize, const QString &creator, int dataLevel) { QSqlQuery query(m_indexDb); query.prepare(R"( INSERT OR REPLACE INTO bus_messages_index (message_id, source, destination, rocksdb_key, payload_size, create_time, update_time, creator, data_level, protocol_type) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) )"); qint64 now = QDateTime::currentMSecsSinceEpoch(); query.addBindValue(message.id); query.addBindValue(message.source); query.addBindValue(message.destination); query.addBindValue(rocksdbKey); query.addBindValue(payloadSize); query.addBindValue(message.timestamp ? message.timestamp : now); query.addBindValue(now); query.addBindValue(creator); query.addBindValue(dataLevel); query.addBindValue(message.payload["protocol"].toString()); if (!query.exec()) { qWarning() << "Failed to store bus message index:" << query.lastError().text(); return false; } return true; } ``` ### 2.4 复杂查询实现(使用 SQLite 索引) ```cpp QList SoftBusCore::queryRawDataAdvanced(int deviceId, qint64 startTime, qint64 endTime, const QString &creator, int minLevel, int maxLevel, const QString &protocolType) { QList results; // 如果没有索引数据库,降级到简单查询 if (!m_indexDbInitialized || !m_indexDb.isOpen()) { if (startTime > 0 && endTime > 0) { return queryRawData(deviceId, startTime, endTime); } return results; } // 1. 查询 SQLite 索引,获取匹配的 rocksdb_key 列表 QSqlQuery query(m_indexDb); QString sql = "SELECT rocksdb_key FROM raw_data_index WHERE device_id = ? AND is_valid = 1"; QVariantList bindValues; bindValues << deviceId; if (startTime > 0) { sql += " AND create_time >= ?"; bindValues << startTime; } if (endTime > 0) { sql += " AND create_time <= ?"; bindValues << endTime; } if (!creator.isEmpty()) { sql += " AND creator = ?"; bindValues << creator; } if (minLevel >= 0) { sql += " AND data_level >= ?"; bindValues << minLevel; } if (maxLevel >= 0) { sql += " AND data_level <= ?"; bindValues << maxLevel; } if (!protocolType.isEmpty()) { sql += " AND protocol_type = ?"; bindValues << protocolType; } sql += " ORDER BY create_time ASC"; query.prepare(sql); for (const QVariant &value : bindValues) { query.addBindValue(value); } if (!query.exec()) { qWarning() << "Index query failed:" << query.lastError().text(); return results; } // 2. 批量从 RocksDB 读取实际数据 QStringList rocksdbKeys; while (query.next()) { rocksdbKeys.append(query.value("rocksdb_key").toString()); } // 3. 从 RocksDB 批量读取数据 for (const QString &key : rocksdbKeys) { std::string value = getData(m_rawDb, key.toStdString()); if (!value.empty()) { results.append(QByteArray(value.data(), value.size())); } } return results; } QList SoftBusCore::queryBusMessagesAdvanced(const QString &source, const QString &destination, qint64 startTime, qint64 endTime, const QString &creator, int minLevel, const QString &protocolType) { QList results; // 如果没有索引数据库,降级到简单查询 if (!m_indexDbInitialized || !m_indexDb.isOpen()) { if (!source.isEmpty() && !destination.isEmpty() && startTime > 0 && endTime > 0) { return queryBusMessages(source, destination, startTime, endTime); } return results; } // 1. 查询 SQLite 索引 QSqlQuery query(m_indexDb); QString sql = "SELECT rocksdb_key, message_id, source, destination, create_time FROM bus_messages_index WHERE is_valid = 1"; QVariantList bindValues; if (!source.isEmpty()) { sql += " AND source = ?"; bindValues << source; } if (!destination.isEmpty()) { sql += " AND destination = ?"; bindValues << destination; } if (startTime > 0) { sql += " AND create_time >= ?"; bindValues << startTime; } if (endTime > 0) { sql += " AND create_time <= ?"; bindValues << endTime; } if (!creator.isEmpty()) { sql += " AND creator = ?"; bindValues << creator; } if (minLevel >= 0) { sql += " AND data_level >= ?"; bindValues << minLevel; } if (!protocolType.isEmpty()) { sql += " AND protocol_type = ?"; bindValues << protocolType; } sql += " ORDER BY create_time DESC"; query.prepare(sql); for (const QVariant &value : bindValues) { query.addBindValue(value); } if (!query.exec()) { qWarning() << "Index query failed:" << query.lastError().text(); return results; } // 2. 从 RocksDB 读取实际数据 while (query.next()) { QString rocksdbKey = query.value("rocksdb_key").toString(); std::string value = getData(m_busDb, rocksdbKey.toStdString()); if (!value.empty()) { QByteArray data(value.data(), value.size()); QJsonDocument doc = QJsonDocument::fromJson(data); if (doc.isObject()) { QJsonObject json = doc.object(); BusMessage message; message.id = json["id"].toString(); message.source = json["source"].toString(); message.destination = json["destination"].toString(); message.payload = json["payload"].toObject(); message.timestamp = static_cast(json["timestamp"].toDouble()); results.append(message); } } } return results; } ``` ### 2.5 关闭数据库 ```cpp void SoftBusCore::closeDB() { // 关闭 RocksDB if (m_rawDb) { delete m_rawDb; m_rawDb = nullptr; } if (m_busDb) { delete m_busDb; m_busDb = nullptr; } // 关闭 SQLite if (m_indexDb.isOpen()) { m_indexDb.close(); } m_dbInitialized = false; m_indexDbInitialized = false; } ``` ## 3. 使用示例 ```cpp // 初始化时指定索引数据库路径 QString rawDbPath = "data/raw_data.db"; QString busDbPath = "data/bus_messages.db"; QString indexDbPath = "data/index.db"; // SQLite 索引数据库 m_busCore->initDB(rawDbPath, busDbPath, indexDbPath); // 存储数据时指定扩展字段 m_busCore->storeRawData(deviceId, data, "张三", 5, "modbus"); // 复杂查询 QList results = m_busCore->queryRawDataAdvanced( deviceId, 0, // startTime (0 表示不限制) 0, // endTime (0 表示不限制) "张三", // creator 3, // minLevel 7, // maxLevel "modbus" // protocolType ); ``` ## 4. CMakeLists.txt 修改 ```cmake # 保持 RocksDB 依赖 SET(ROCKSDB_DIR "${PROJECT_SOURCE_DIR}/rocksdb") include_directories(${ROCKSDB_DIR}/include) target_link_libraries(soft_bus ${ROCKSDB_DIR}/librocksdb.a) # 添加 Qt Sql 模块(SQLite 支持) find_package(Qt6 REQUIRED COMPONENTS Core Sql) target_link_libraries(soft_bus Qt6::Core Qt6::Sql # 新增 # ... 其他库 ) ``` ## 优势总结 1. ✅ **保持高性能写入**:RocksDB 继续负责高频写入 2. ✅ **支持复杂查询**:SQLite 索引层支持多字段查询 3. ✅ **向后兼容**:如果 SQLite 初始化失败,自动降级到原有 RocksDB 查询 4. ✅ **渐进式迁移**:可以先启用索引,再逐步迁移查询逻辑 5. ✅ **数据安全**:即使索引写入失败,实际数据已安全存储在 RocksDB