SQLite实现示例.md 12 KB

SQLite 实现示例

1. 数据库初始化

// soft_bus_core.h
#include <QSqlDatabase>
#include <QSqlQuery>
#include <QSqlError>

class SoftBusCore : public QObject {
    Q_OBJECT
private:
    QSqlDatabase m_db;  // 替换 rocksdb::DB*
    bool m_dbInitialized;
    
    bool initDatabaseSchema();
    bool createTables();
};
// soft_bus_core.cpp
bool SoftBusCore::initDB(const QString &dbPath) {
    // 关闭现有连接
    if (m_db.isOpen()) {
        m_db.close();
    }
    
    // 创建 SQLite 数据库连接
    m_db = QSqlDatabase::addDatabase("QSQLITE", "soft_bus_db");
    m_db.setDatabaseName(dbPath);
    
    if (!m_db.open()) {
        qCritical() << "Failed to open database:" << m_db.lastError().text();
        return false;
    }
    
    // 创建表结构
    if (!createTables()) {
        qCritical() << "Failed to create tables";
        return false;
    }
    
    m_dbInitialized = true;
    return true;
}

bool SoftBusCore::createTables() {
    QSqlQuery query(m_db);
    
    // 创建原始数据表
    query.exec(R"(
        CREATE TABLE IF NOT EXISTS raw_data (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            device_id INTEGER NOT NULL,
            data BLOB NOT NULL,
            create_time INTEGER NOT NULL,
            update_time INTEGER,
            creator TEXT,
            data_level INTEGER DEFAULT 0,
            protocol_type TEXT,
            is_valid BOOLEAN DEFAULT 1,
            metadata TEXT
        )
    )");
    
    if (query.lastError().isValid()) {
        qCritical() << "Failed to create raw_data table:" << query.lastError().text();
        return false;
    }
    
    // 创建索引
    query.exec("CREATE INDEX IF NOT EXISTS idx_raw_data_device_time ON raw_data(device_id, create_time)");
    query.exec("CREATE INDEX IF NOT EXISTS idx_raw_data_creator ON raw_data(creator)");
    query.exec("CREATE INDEX IF NOT EXISTS idx_raw_data_level ON raw_data(data_level)");
    
    // 创建总线消息表
    query.exec(R"(
        CREATE TABLE IF NOT EXISTS bus_messages (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            message_id TEXT UNIQUE NOT NULL,
            source TEXT NOT NULL,
            destination TEXT NOT NULL,
            payload TEXT NOT NULL,
            create_time INTEGER NOT NULL,
            update_time INTEGER,
            creator TEXT,
            data_level INTEGER DEFAULT 0,
            protocol_type TEXT,
            is_valid BOOLEAN DEFAULT 1,
            metadata TEXT
        )
    )");
    
    if (query.lastError().isValid()) {
        qCritical() << "Failed to create bus_messages table:" << query.lastError().text();
        return false;
    }
    
    // 创建索引
    query.exec("CREATE INDEX IF NOT EXISTS idx_bus_msg_source_dest ON bus_messages(source, destination)");
    query.exec("CREATE INDEX IF NOT EXISTS idx_bus_msg_time ON bus_messages(create_time)");
    query.exec("CREATE INDEX IF NOT EXISTS idx_bus_msg_creator ON bus_messages(creator)");
    
    return true;
}

2. 存储原始数据(带扩展字段)

void SoftBusCore::storeRawData(int deviceId, const QByteArray &data, 
                               const QString &creator = "", 
                               int dataLevel = 0,
                               const QString &protocolType = "") {
    if (!m_dbInitialized || !m_db.isOpen()) {
        qWarning() << "Database not initialized";
        return;
    }
    
    QSqlQuery query(m_db);
    query.prepare(R"(
        INSERT INTO raw_data 
        (device_id, data, create_time, update_time, creator, data_level, protocol_type)
        VALUES (?, ?, ?, ?, ?, ?, ?)
    )");
    
    qint64 now = QDateTime::currentMSecsSinceEpoch();
    query.addBindValue(deviceId);
    query.addBindValue(data);
    query.addBindValue(now);
    query.addBindValue(now);
    query.addBindValue(creator.isEmpty() ? "system" : creator);
    query.addBindValue(dataLevel);
    query.addBindValue(protocolType);
    
    if (!query.exec()) {
        qCritical() << "Failed to store raw data:" << query.lastError().text();
        emit databaseError(query.lastError().text());
    } else {
        emit rawDataStored(deviceId, data);
    }
}

3. 存储总线消息(带扩展字段)

void SoftBusCore::storeBusMessage(const BusMessage &message,
                                  const QString &creator = "",
                                  int dataLevel = 0) {
    if (!m_dbInitialized || !m_db.isOpen()) {
        qWarning() << "Database not initialized";
        return;
    }
    
    QSqlQuery query(m_db);
    query.prepare(R"(
        INSERT OR REPLACE INTO bus_messages 
        (message_id, source, destination, payload, create_time, update_time, creator, data_level, protocol_type)
        VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
    )");
    
    qint64 now = QDateTime::currentMSecsSinceEpoch();
    QJsonDocument doc;
    doc.setObject(message.payload);
    
    query.addBindValue(message.id);
    query.addBindValue(message.source);
    query.addBindValue(message.destination);
    query.addBindValue(doc.toJson(QJsonDocument::Compact));
    query.addBindValue(message.timestamp ? message.timestamp : now);
    query.addBindValue(now);
    query.addBindValue(creator.isEmpty() ? "system" : creator);
    query.addBindValue(dataLevel);
    query.addBindValue(message.payload["protocol"].toString());
    
    if (!query.exec()) {
        qCritical() << "Failed to store bus message:" << query.lastError().text();
        emit databaseError(query.lastError().text());
    } else {
        emit busMessageStored(message);
    }
}

4. 查询原始数据(支持多字段过滤)

QList<QByteArray> SoftBusCore::queryRawData(int deviceId, 
                                            qint64 startTime, 
                                            qint64 endTime,
                                            const QString &creator = "",
                                            int minLevel = -1,
                                            int maxLevel = -1) {
    QList<QByteArray> results;
    
    if (!m_dbInitialized || !m_db.isOpen()) {
        return results;
    }
    
    QSqlQuery query(m_db);
    QString sql = "SELECT data FROM raw_data WHERE device_id = ? AND create_time >= ? AND create_time <= ?";
    QVariantList bindValues;
    bindValues << deviceId << startTime << endTime;
    
    // 动态添加过滤条件
    if (!creator.isEmpty()) {
        sql += " AND creator = ?";
        bindValues << creator;
    }
    
    if (minLevel >= 0) {
        sql += " AND data_level >= ?";
        bindValues << minLevel;
    }
    
    if (maxLevel >= 0) {
        sql += " AND data_level <= ?";
        bindValues << maxLevel;
    }
    
    sql += " ORDER BY create_time ASC";
    
    query.prepare(sql);
    for (int i = 0; i < bindValues.size(); ++i) {
        query.addBindValue(bindValues[i]);
    }
    
    if (!query.exec()) {
        qWarning() << "Query failed:" << query.lastError().text();
        return results;
    }
    
    while (query.next()) {
        results.append(query.value("data").toByteArray());
    }
    
    return results;
}

5. 查询总线消息(支持复杂查询)

QList<BusMessage> SoftBusCore::queryBusMessages(
    const QString &source = "",
    const QString &destination = "",
    qint64 startTime = 0,
    qint64 endTime = 0,
    const QString &creator = "",
    int minLevel = -1,
    const QString &protocolType = "") {
    
    QList<BusMessage> results;
    
    if (!m_dbInitialized || !m_db.isOpen()) {
        return results;
    }
    
    QSqlQuery query(m_db);
    QString sql = "SELECT message_id, source, destination, payload, create_time FROM bus_messages WHERE 1=1";
    QVariantList bindValues;
    
    // 动态构建查询条件
    if (!source.isEmpty()) {
        sql += " AND source = ?";
        bindValues << source;
    }
    
    if (!destination.isEmpty()) {
        sql += " AND destination = ?";
        bindValues << destination;
    }
    
    if (startTime > 0) {
        sql += " AND create_time >= ?";
        bindValues << startTime;
    }
    
    if (endTime > 0) {
        sql += " AND create_time <= ?";
        bindValues << endTime;
    }
    
    if (!creator.isEmpty()) {
        sql += " AND creator = ?";
        bindValues << creator;
    }
    
    if (minLevel >= 0) {
        sql += " AND data_level >= ?";
        bindValues << minLevel;
    }
    
    if (!protocolType.isEmpty()) {
        sql += " AND protocol_type = ?";
        bindValues << protocolType;
    }
    
    sql += " ORDER BY create_time DESC";
    
    query.prepare(sql);
    for (const QVariant &value : bindValues) {
        query.addBindValue(value);
    }
    
    if (!query.exec()) {
        qWarning() << "Query failed:" << query.lastError().text();
        return results;
    }
    
    while (query.next()) {
        BusMessage message;
        message.id = query.value("message_id").toString();
        message.source = query.value("source").toString();
        message.destination = query.value("destination").toString();
        message.timestamp = query.value("create_time").toLongLong();
        
        QJsonDocument doc = QJsonDocument::fromJson(query.value("payload").toByteArray());
        if (doc.isObject()) {
            message.payload = doc.object();
        }
        
        results.append(message);
    }
    
    return results;
}

6. 更新数据(支持更新时间和创建者)

bool SoftBusCore::updateRawDataMetadata(int recordId, 
                                        const QString &creator = "",
                                        int dataLevel = -1,
                                        const QJsonObject &metadata = QJsonObject()) {
    if (!m_dbInitialized || !m_db.isOpen()) {
        return false;
    }
    
    QSqlQuery query(m_db);
    QString sql = "UPDATE raw_data SET update_time = ?";
    QVariantList bindValues;
    bindValues << QDateTime::currentMSecsSinceEpoch();
    
    if (!creator.isEmpty()) {
        sql += ", creator = ?";
        bindValues << creator;
    }
    
    if (dataLevel >= 0) {
        sql += ", data_level = ?";
        bindValues << dataLevel;
    }
    
    if (!metadata.isEmpty()) {
        QJsonDocument doc(metadata);
        sql += ", metadata = ?";
        bindValues << doc.toJson(QJsonDocument::Compact);
    }
    
    sql += " WHERE id = ?";
    bindValues << recordId;
    
    query.prepare(sql);
    for (const QVariant &value : bindValues) {
        query.addBindValue(value);
    }
    
    if (!query.exec()) {
        qWarning() << "Update failed:" << query.lastError().text();
        return false;
    }
    
    return true;
}

7. 统计查询示例

// 统计每个创建者的数据量
QMap<QString, int> SoftBusCore::getDataCountByCreator(int deviceId) {
    QMap<QString, int> results;
    
    if (!m_dbInitialized || !m_db.isOpen()) {
        return results;
    }
    
    QSqlQuery query(m_db);
    query.prepare(R"(
        SELECT creator, COUNT(*) as count 
        FROM raw_data 
        WHERE device_id = ? 
        GROUP BY creator
    )");
    query.addBindValue(deviceId);
    
    if (query.exec()) {
        while (query.next()) {
            results[query.value("creator").toString()] = query.value("count").toInt();
        }
    }
    
    return results;
}

// 按数据等级统计
QMap<int, int> SoftBusCore::getDataCountByLevel(int deviceId) {
    QMap<int, int> results;
    
    if (!m_dbInitialized || !m_db.isOpen()) {
        return results;
    }
    
    QSqlQuery query(m_db);
    query.prepare(R"(
        SELECT data_level, COUNT(*) as count 
        FROM raw_data 
        WHERE device_id = ? 
        GROUP BY data_level
    )");
    query.addBindValue(deviceId);
    
    if (query.exec()) {
        while (query.next()) {
            results[query.value("data_level").toInt()] = query.value("count").toInt();
        }
    }
    
    return results;
}

8. CMakeLists.txt 修改

# 移除 RocksDB 相关配置
# SET(ROCKSDB_DIR "${PROJECT_SOURCE_DIR}/rocksdb")
# include_directories(${ROCKSDB_DIR}/include)

# Qt 已内置 SQLite 支持,无需额外配置
# 确保链接 Qt Sql 模块
find_package(Qt6 REQUIRED COMPONENTS Core Sql)

target_link_libraries(soft_bus
    Qt6::Core
    Qt6::Sql
    # ... 其他库
)

优势总结

  1. 查询灵活:支持复杂的 WHERE、ORDER BY、GROUP BY
  2. 字段扩展:使用 ALTER TABLE 轻松添加字段
  3. 索引优化:自动使用索引提升查询性能
  4. 数据管理:可以用 SQLite 工具直接查看和管理数据
  5. Qt 集成:原生支持,无需额外依赖