混合架构实现示例.md 23 KB

RocksDB + SQLite 混合架构实现示例

1. 头文件修改(soft_bus_core.h)

#ifndef SOFT_BUS_CORE_H
#define SOFT_BUS_CORE_H

#include <QObject>
#include <QMap>
#include <QString>
#include <QJsonObject>
#include <QList>
#include <QSqlDatabase>
#include <QSqlQuery>
#include <rocksdb/db.h>

// 设备信息结构(保持不变)
struct DeviceInfo {
    int id;
    QString portname;
    QString name;
    QString type;
    int address;
    QString protocol;
    QString protocol_detail;
    QJsonObject properties;

    DeviceInfo() : id(0) {}
    // ... 构造函数
};

// 软总线消息结构(保持不变)
struct BusMessage {
    QString id;
    QString source;
    QString destination;
    QJsonObject payload;
    qint64 timestamp;

    BusMessage() : timestamp(0) {}
    // ... 构造函数
};

class SoftBusCore : public QObject {
    Q_OBJECT
public:
    explicit SoftBusCore(QObject *parent = nullptr);
    ~SoftBusCore();

    // 初始化数据库(新增 SQLite 参数)
    bool initDB(const QString &rawDbPath, const QString &busDbPath, 
                const QString &indexDbPath = "");

    // 设备管理(保持不变)
    void registerDevice(const DeviceInfo &device);
    void unregisterDevice(int deviceId);
    void updateDevice(const DeviceInfo &device);
    DeviceInfo getDeviceInfo(int deviceId) const;
    DeviceInfo getDeviceInfoByPortname(const QString &portname) const;
    QList<DeviceInfo> getAllDevices() const;

    // 消息路由(保持不变)
    void routeMessage(const BusMessage &message);

    // 数据存储(新增扩展字段参数)
    void storeRawData(int deviceId, const QByteArray &data,
                     const QString &creator = "",
                     int dataLevel = 0,
                     const QString &protocolType = "");
    void storeBusMessage(const BusMessage &message,
                        const QString &creator = "",
                        int dataLevel = 0);

    // 数据查询(新增复杂查询接口)
    QList<QByteArray> queryRawData(int deviceId, qint64 startTime, qint64 endTime);
    
    // 新增:支持多字段查询
    QList<QByteArray> queryRawDataAdvanced(int deviceId,
                                           qint64 startTime = 0,
                                           qint64 endTime = 0,
                                           const QString &creator = "",
                                           int minLevel = -1,
                                           int maxLevel = -1,
                                           const QString &protocolType = "");
    
    QList<BusMessage> queryBusMessages(const QString &source, 
                                       const QString &destination,
                                       qint64 startTime, qint64 endTime);
    
    // 新增:支持多字段查询
    QList<BusMessage> queryBusMessagesAdvanced(const QString &source = "",
                                               const QString &destination = "",
                                               qint64 startTime = 0,
                                               qint64 endTime = 0,
                                               const QString &creator = "",
                                               int minLevel = -1,
                                               const QString &protocolType = "");

    // 数据库操作
    void closeDB();

signals:
    void deviceRegistered(const DeviceInfo &device);
    void deviceUpdate(const DeviceInfo &device);
    void deviceUnregistered(int deviceId);
    void messageReceived(const BusMessage &message);
    void rawDataStored(int deviceId, const QByteArray &data);
    void busMessageStored(const BusMessage &message);
    void databaseError(const QString &error);

private:
    // RocksDB 数据库(保持原有)
    rocksdb::DB* m_rawDb;
    rocksdb::DB* m_busDb;
    
    // SQLite 索引数据库(新增)
    QSqlDatabase m_indexDb;
    bool m_indexDbInitialized;
    
    QMap<int, DeviceInfo> m_devices;
    QMap<QString, int> m_portnameToId;
    bool m_dbInitialized;

    // KV键生成(保持原有)
    std::string generateRawKey(int deviceId, qint64 timestamp);
    std::string generateBusKey(const BusMessage &message);

    // SQLite 初始化(新增)
    bool initIndexDatabase(const QString &dbPath);
    bool createIndexTables();
    
    // 索引写入(新增)
    bool storeRawDataIndex(int deviceId, const QString &rocksdbKey, 
                          qint64 timestamp, int dataSize,
                          const QString &creator, int dataLevel,
                          const QString &protocolType);
    bool storeBusMessageIndex(const BusMessage &message, const QString &rocksdbKey,
                             int payloadSize, const QString &creator, int dataLevel);
    
    // 工具函数
    bool openDatabase(rocksdb::DB** db, const std::string& path);
    bool putData(rocksdb::DB* db, const std::string& key, const std::string& value);
    std::string getData(rocksdb::DB* db, const std::string& key);
};

#endif // SOFT_BUS_CORE_H

2. 实现文件修改(soft_bus_core.cpp)

2.1 初始化数据库

bool SoftBusCore::initDB(const QString &rawDbPath, const QString &busDbPath,
                         const QString &indexDbPath) {
    // 关闭现有数据库
    closeDB();

    // 1. 初始化 RocksDB(保持原有逻辑)
    rocksdb::Options options;
    options.create_if_missing = true;
    options.write_buffer_size = 64 * 1024 * 1024;  // 64MB 写缓冲区
    options.max_write_buffer_number = 3;
    options.compression = rocksdb::kSnappyCompression;  // 压缩存储
    
    rocksdb::Status status = rocksdb::DB::Open(options, rawDbPath.toStdString(), &m_rawDb);
    if (!status.ok()) {
        qCritical() << "Failed to open raw database:" << status.ToString().c_str();
        return false;
    }

    status = rocksdb::DB::Open(options, busDbPath.toStdString(), &m_busDb);
    if (!status.ok()) {
        qCritical() << "Failed to open bus database:" << status.ToString().c_str();
        delete m_rawDb;
        m_rawDb = nullptr;
        return false;
    }

    m_dbInitialized = true;

    // 2. 初始化 SQLite 索引数据库(新增)
    if (!indexDbPath.isEmpty()) {
        if (!initIndexDatabase(indexDbPath)) {
            qWarning() << "Failed to initialize index database, continuing without index";
            m_indexDbInitialized = false;
        } else {
            m_indexDbInitialized = true;
        }
    } else {
        m_indexDbInitialized = false;
    }

    return true;
}

bool SoftBusCore::initIndexDatabase(const QString &dbPath) {
    // 关闭现有连接
    if (m_indexDb.isOpen()) {
        m_indexDb.close();
    }

    // 创建 SQLite 数据库连接
    m_indexDb = QSqlDatabase::addDatabase("QSQLITE", "soft_bus_index");
    m_indexDb.setDatabaseName(dbPath);

    if (!m_indexDb.open()) {
        qCritical() << "Failed to open index database:" << m_indexDb.lastError().text();
        return false;
    }

    // 优化 SQLite 性能
    QSqlQuery query(m_indexDb);
    query.exec("PRAGMA journal_mode=WAL");
    query.exec("PRAGMA synchronous=NORMAL");
    query.exec("PRAGMA cache_size=10000");
    query.exec("PRAGMA temp_store=MEMORY");

    // 创建表结构
    return createIndexTables();
}

bool SoftBusCore::createIndexTables() {
    QSqlQuery query(m_indexDb);

    // 创建原始数据索引表
    query.exec(R"(
        CREATE TABLE IF NOT EXISTS raw_data_index (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            device_id INTEGER NOT NULL,
            rocksdb_key TEXT NOT NULL,
            data_size INTEGER,
            create_time INTEGER NOT NULL,
            update_time INTEGER,
            creator TEXT,
            data_level INTEGER DEFAULT 0,
            protocol_type TEXT,
            is_valid BOOLEAN DEFAULT 1,
            metadata TEXT
        )
    )");

    if (query.lastError().isValid()) {
        qCritical() << "Failed to create raw_data_index table:" << query.lastError().text();
        return false;
    }

    // 创建索引
    query.exec("CREATE INDEX IF NOT EXISTS idx_raw_data_device_time ON raw_data_index(device_id, create_time)");
    query.exec("CREATE INDEX IF NOT EXISTS idx_raw_data_creator ON raw_data_index(creator)");
    query.exec("CREATE INDEX IF NOT EXISTS idx_raw_data_level ON raw_data_index(data_level)");
    query.exec("CREATE INDEX IF NOT EXISTS idx_raw_data_protocol ON raw_data_index(protocol_type)");
    query.exec("CREATE INDEX IF NOT EXISTS idx_raw_data_time ON raw_data_index(create_time)");

    // 创建总线消息索引表
    query.exec(R"(
        CREATE TABLE IF NOT EXISTS bus_messages_index (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            message_id TEXT UNIQUE NOT NULL,
            source TEXT NOT NULL,
            destination TEXT NOT NULL,
            rocksdb_key TEXT NOT NULL,
            payload_size INTEGER,
            create_time INTEGER NOT NULL,
            update_time INTEGER,
            creator TEXT,
            data_level INTEGER DEFAULT 0,
            protocol_type TEXT,
            is_valid BOOLEAN DEFAULT 1,
            metadata TEXT
        )
    )");

    if (query.lastError().isValid()) {
        qCritical() << "Failed to create bus_messages_index table:" << query.lastError().text();
        return false;
    }

    // 创建索引
    query.exec("CREATE INDEX IF NOT EXISTS idx_bus_msg_source_dest ON bus_messages_index(source, destination)");
    query.exec("CREATE INDEX IF NOT EXISTS idx_bus_msg_time ON bus_messages_index(create_time)");
    query.exec("CREATE INDEX IF NOT EXISTS idx_bus_msg_creator ON bus_messages_index(creator)");
    query.exec("CREATE INDEX IF NOT EXISTS idx_bus_msg_level ON bus_messages_index(data_level)");
    query.exec("CREATE INDEX IF NOT EXISTS idx_bus_msg_protocol ON bus_messages_index(protocol_type)");

    return true;
}

2.2 存储原始数据(双写机制)

void SoftBusCore::storeRawData(int deviceId, const QByteArray &data,
                               const QString &creator,
                               int dataLevel,
                               const QString &protocolType) {
    if (!m_dbInitialized || !m_rawDb) {
        qWarning() << "Database not initialized for storing raw data";
        return;
    }

    // 1. 生成 RocksDB Key
    qint64 timestamp = QDateTime::currentMSecsSinceEpoch();
    std::string rocksdbKey = generateRawKey(deviceId, timestamp);
    std::string value(data.constData(), data.size());

    // 2. 写入 RocksDB(实际数据)
    rocksdb::Status status = m_rawDb->Put(rocksdb::WriteOptions(), rocksdbKey, value);

    if (!status.ok()) {
        qCritical() << "Failed to store raw data to RocksDB:" << status.ToString().c_str();
        emit databaseError(QString::fromStdString(status.ToString()));
        return;
    }

    // 3. 写入 SQLite 索引(如果已初始化)
    if (m_indexDbInitialized && m_indexDb.isOpen()) {
        bool indexSuccess = storeRawDataIndex(deviceId, 
                                            QString::fromStdString(rocksdbKey),
                                            timestamp, 
                                            data.size(),
                                            creator.isEmpty() ? "system" : creator,
                                            dataLevel,
                                            protocolType);
        
        if (!indexSuccess) {
            qWarning() << "Failed to store raw data index, but data is stored in RocksDB";
            // 数据已写入 RocksDB,即使索引失败也不影响主流程
        }
    }

    emit rawDataStored(deviceId, data);
}

bool SoftBusCore::storeRawDataIndex(int deviceId, const QString &rocksdbKey,
                                   qint64 timestamp, int dataSize,
                                   const QString &creator, int dataLevel,
                                   const QString &protocolType) {
    QSqlQuery query(m_indexDb);
    query.prepare(R"(
        INSERT INTO raw_data_index 
        (device_id, rocksdb_key, data_size, create_time, update_time, creator, data_level, protocol_type)
        VALUES (?, ?, ?, ?, ?, ?, ?, ?)
    )");

    query.addBindValue(deviceId);
    query.addBindValue(rocksdbKey);
    query.addBindValue(dataSize);
    query.addBindValue(timestamp);
    query.addBindValue(timestamp);
    query.addBindValue(creator);
    query.addBindValue(dataLevel);
    query.addBindValue(protocolType);

    if (!query.exec()) {
        qWarning() << "Failed to store raw data index:" << query.lastError().text();
        return false;
    }

    return true;
}

2.3 存储总线消息(双写机制)

void SoftBusCore::storeBusMessage(const BusMessage &message,
                                  const QString &creator,
                                  int dataLevel) {
    if (!m_dbInitialized || !m_busDb) {
        qWarning() << "Database not initialized for storing bus message";
        return;
    }

    // 1. 序列化消息
    QJsonObject json;
    json["id"] = message.id;
    json["source"] = message.source;
    json["destination"] = message.destination;
    json["payload"] = message.payload;
    json["timestamp"] = message.timestamp;

    QJsonDocument doc(json);
    QByteArray data = doc.toJson(QJsonDocument::Compact);

    // 2. 生成 RocksDB Key
    std::string rocksdbKey = generateBusKey(message);
    std::string value(data.constData(), data.size());

    // 3. 写入 RocksDB(实际数据)
    rocksdb::Status status = m_busDb->Put(rocksdb::WriteOptions(), rocksdbKey, value);

    if (!status.ok()) {
        qCritical() << "Failed to store bus message to RocksDB:" << status.ToString().c_str();
        emit databaseError(QString::fromStdString(status.ToString()));
        return;
    }

    // 4. 写入 SQLite 索引(如果已初始化)
    if (m_indexDbInitialized && m_indexDb.isOpen()) {
        QString protocolType = message.payload["protocol"].toString();
        bool indexSuccess = storeBusMessageIndex(message,
                                                QString::fromStdString(rocksdbKey),
                                                data.size(),
                                                creator.isEmpty() ? "system" : creator,
                                                dataLevel);
        
        if (!indexSuccess) {
            qWarning() << "Failed to store bus message index, but data is stored in RocksDB";
        }
    }

    emit busMessageStored(message);
}

bool SoftBusCore::storeBusMessageIndex(const BusMessage &message, 
                                      const QString &rocksdbKey,
                                      int payloadSize,
                                      const QString &creator,
                                      int dataLevel) {
    QSqlQuery query(m_indexDb);
    query.prepare(R"(
        INSERT OR REPLACE INTO bus_messages_index 
        (message_id, source, destination, rocksdb_key, payload_size, create_time, update_time, creator, data_level, protocol_type)
        VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
    )");

    qint64 now = QDateTime::currentMSecsSinceEpoch();
    query.addBindValue(message.id);
    query.addBindValue(message.source);
    query.addBindValue(message.destination);
    query.addBindValue(rocksdbKey);
    query.addBindValue(payloadSize);
    query.addBindValue(message.timestamp ? message.timestamp : now);
    query.addBindValue(now);
    query.addBindValue(creator);
    query.addBindValue(dataLevel);
    query.addBindValue(message.payload["protocol"].toString());

    if (!query.exec()) {
        qWarning() << "Failed to store bus message index:" << query.lastError().text();
        return false;
    }

    return true;
}

2.4 复杂查询实现(使用 SQLite 索引)

QList<QByteArray> SoftBusCore::queryRawDataAdvanced(int deviceId,
                                                    qint64 startTime,
                                                    qint64 endTime,
                                                    const QString &creator,
                                                    int minLevel,
                                                    int maxLevel,
                                                    const QString &protocolType) {
    QList<QByteArray> results;

    // 如果没有索引数据库,降级到简单查询
    if (!m_indexDbInitialized || !m_indexDb.isOpen()) {
        if (startTime > 0 && endTime > 0) {
            return queryRawData(deviceId, startTime, endTime);
        }
        return results;
    }

    // 1. 查询 SQLite 索引,获取匹配的 rocksdb_key 列表
    QSqlQuery query(m_indexDb);
    QString sql = "SELECT rocksdb_key FROM raw_data_index WHERE device_id = ? AND is_valid = 1";
    QVariantList bindValues;
    bindValues << deviceId;

    if (startTime > 0) {
        sql += " AND create_time >= ?";
        bindValues << startTime;
    }

    if (endTime > 0) {
        sql += " AND create_time <= ?";
        bindValues << endTime;
    }

    if (!creator.isEmpty()) {
        sql += " AND creator = ?";
        bindValues << creator;
    }

    if (minLevel >= 0) {
        sql += " AND data_level >= ?";
        bindValues << minLevel;
    }

    if (maxLevel >= 0) {
        sql += " AND data_level <= ?";
        bindValues << maxLevel;
    }

    if (!protocolType.isEmpty()) {
        sql += " AND protocol_type = ?";
        bindValues << protocolType;
    }

    sql += " ORDER BY create_time ASC";

    query.prepare(sql);
    for (const QVariant &value : bindValues) {
        query.addBindValue(value);
    }

    if (!query.exec()) {
        qWarning() << "Index query failed:" << query.lastError().text();
        return results;
    }

    // 2. 批量从 RocksDB 读取实际数据
    QStringList rocksdbKeys;
    while (query.next()) {
        rocksdbKeys.append(query.value("rocksdb_key").toString());
    }

    // 3. 从 RocksDB 批量读取数据
    for (const QString &key : rocksdbKeys) {
        std::string value = getData(m_rawDb, key.toStdString());
        if (!value.empty()) {
            results.append(QByteArray(value.data(), value.size()));
        }
    }

    return results;
}

QList<BusMessage> SoftBusCore::queryBusMessagesAdvanced(const QString &source,
                                                        const QString &destination,
                                                        qint64 startTime,
                                                        qint64 endTime,
                                                        const QString &creator,
                                                        int minLevel,
                                                        const QString &protocolType) {
    QList<BusMessage> results;

    // 如果没有索引数据库,降级到简单查询
    if (!m_indexDbInitialized || !m_indexDb.isOpen()) {
        if (!source.isEmpty() && !destination.isEmpty() && startTime > 0 && endTime > 0) {
            return queryBusMessages(source, destination, startTime, endTime);
        }
        return results;
    }

    // 1. 查询 SQLite 索引
    QSqlQuery query(m_indexDb);
    QString sql = "SELECT rocksdb_key, message_id, source, destination, create_time FROM bus_messages_index WHERE is_valid = 1";
    QVariantList bindValues;

    if (!source.isEmpty()) {
        sql += " AND source = ?";
        bindValues << source;
    }

    if (!destination.isEmpty()) {
        sql += " AND destination = ?";
        bindValues << destination;
    }

    if (startTime > 0) {
        sql += " AND create_time >= ?";
        bindValues << startTime;
    }

    if (endTime > 0) {
        sql += " AND create_time <= ?";
        bindValues << endTime;
    }

    if (!creator.isEmpty()) {
        sql += " AND creator = ?";
        bindValues << creator;
    }

    if (minLevel >= 0) {
        sql += " AND data_level >= ?";
        bindValues << minLevel;
    }

    if (!protocolType.isEmpty()) {
        sql += " AND protocol_type = ?";
        bindValues << protocolType;
    }

    sql += " ORDER BY create_time DESC";

    query.prepare(sql);
    for (const QVariant &value : bindValues) {
        query.addBindValue(value);
    }

    if (!query.exec()) {
        qWarning() << "Index query failed:" << query.lastError().text();
        return results;
    }

    // 2. 从 RocksDB 读取实际数据
    while (query.next()) {
        QString rocksdbKey = query.value("rocksdb_key").toString();
        std::string value = getData(m_busDb, rocksdbKey.toStdString());
        
        if (!value.empty()) {
            QByteArray data(value.data(), value.size());
            QJsonDocument doc = QJsonDocument::fromJson(data);
            if (doc.isObject()) {
                QJsonObject json = doc.object();

                BusMessage message;
                message.id = json["id"].toString();
                message.source = json["source"].toString();
                message.destination = json["destination"].toString();
                message.payload = json["payload"].toObject();
                message.timestamp = static_cast<qint64>(json["timestamp"].toDouble());

                results.append(message);
            }
        }
    }

    return results;
}

2.5 关闭数据库

void SoftBusCore::closeDB() {
    // 关闭 RocksDB
    if (m_rawDb) {
        delete m_rawDb;
        m_rawDb = nullptr;
    }

    if (m_busDb) {
        delete m_busDb;
        m_busDb = nullptr;
    }

    // 关闭 SQLite
    if (m_indexDb.isOpen()) {
        m_indexDb.close();
    }

    m_dbInitialized = false;
    m_indexDbInitialized = false;
}

3. 使用示例

// 初始化时指定索引数据库路径
QString rawDbPath = "data/raw_data.db";
QString busDbPath = "data/bus_messages.db";
QString indexDbPath = "data/index.db";  // SQLite 索引数据库

m_busCore->initDB(rawDbPath, busDbPath, indexDbPath);

// 存储数据时指定扩展字段
m_busCore->storeRawData(deviceId, data, "张三", 5, "modbus");

// 复杂查询
QList<QByteArray> results = m_busCore->queryRawDataAdvanced(
    deviceId,
    0,  // startTime (0 表示不限制)
    0,  // endTime (0 表示不限制)
    "张三",  // creator
    3,  // minLevel
    7,  // maxLevel
    "modbus"  // protocolType
);

4. CMakeLists.txt 修改

# 保持 RocksDB 依赖
SET(ROCKSDB_DIR "${PROJECT_SOURCE_DIR}/rocksdb")
include_directories(${ROCKSDB_DIR}/include)
target_link_libraries(soft_bus ${ROCKSDB_DIR}/librocksdb.a)

# 添加 Qt Sql 模块(SQLite 支持)
find_package(Qt6 REQUIRED COMPONENTS Core Sql)
target_link_libraries(soft_bus
    Qt6::Core
    Qt6::Sql  # 新增
    # ... 其他库
)

优势总结

  1. 保持高性能写入:RocksDB 继续负责高频写入
  2. 支持复杂查询:SQLite 索引层支持多字段查询
  3. 向后兼容:如果 SQLite 初始化失败,自动降级到原有 RocksDB 查询
  4. 渐进式迁移:可以先启用索引,再逐步迁移查询逻辑
  5. 数据安全:即使索引写入失败,实际数据已安全存储在 RocksDB