SQLite 实现示例
1. 数据库初始化
// soft_bus_core.h
#include <QSqlDatabase>
#include <QSqlQuery>
#include <QSqlError>
class SoftBusCore : public QObject {
Q_OBJECT
private:
QSqlDatabase m_db; // 替换 rocksdb::DB*
bool m_dbInitialized;
bool initDatabaseSchema();
bool createTables();
};
// soft_bus_core.cpp
bool SoftBusCore::initDB(const QString &dbPath) {
// 关闭现有连接
if (m_db.isOpen()) {
m_db.close();
}
// 创建 SQLite 数据库连接
m_db = QSqlDatabase::addDatabase("QSQLITE", "soft_bus_db");
m_db.setDatabaseName(dbPath);
if (!m_db.open()) {
qCritical() << "Failed to open database:" << m_db.lastError().text();
return false;
}
// 创建表结构
if (!createTables()) {
qCritical() << "Failed to create tables";
return false;
}
m_dbInitialized = true;
return true;
}
bool SoftBusCore::createTables() {
QSqlQuery query(m_db);
// 创建原始数据表
query.exec(R"(
CREATE TABLE IF NOT EXISTS raw_data (
id INTEGER PRIMARY KEY AUTOINCREMENT,
device_id INTEGER NOT NULL,
data BLOB NOT NULL,
create_time INTEGER NOT NULL,
update_time INTEGER,
creator TEXT,
data_level INTEGER DEFAULT 0,
protocol_type TEXT,
is_valid BOOLEAN DEFAULT 1,
metadata TEXT
)
)");
if (query.lastError().isValid()) {
qCritical() << "Failed to create raw_data table:" << query.lastError().text();
return false;
}
// 创建索引
query.exec("CREATE INDEX IF NOT EXISTS idx_raw_data_device_time ON raw_data(device_id, create_time)");
query.exec("CREATE INDEX IF NOT EXISTS idx_raw_data_creator ON raw_data(creator)");
query.exec("CREATE INDEX IF NOT EXISTS idx_raw_data_level ON raw_data(data_level)");
// 创建总线消息表
query.exec(R"(
CREATE TABLE IF NOT EXISTS bus_messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
message_id TEXT UNIQUE NOT NULL,
source TEXT NOT NULL,
destination TEXT NOT NULL,
payload TEXT NOT NULL,
create_time INTEGER NOT NULL,
update_time INTEGER,
creator TEXT,
data_level INTEGER DEFAULT 0,
protocol_type TEXT,
is_valid BOOLEAN DEFAULT 1,
metadata TEXT
)
)");
if (query.lastError().isValid()) {
qCritical() << "Failed to create bus_messages table:" << query.lastError().text();
return false;
}
// 创建索引
query.exec("CREATE INDEX IF NOT EXISTS idx_bus_msg_source_dest ON bus_messages(source, destination)");
query.exec("CREATE INDEX IF NOT EXISTS idx_bus_msg_time ON bus_messages(create_time)");
query.exec("CREATE INDEX IF NOT EXISTS idx_bus_msg_creator ON bus_messages(creator)");
return true;
}
2. 存储原始数据(带扩展字段)
void SoftBusCore::storeRawData(int deviceId, const QByteArray &data,
const QString &creator = "",
int dataLevel = 0,
const QString &protocolType = "") {
if (!m_dbInitialized || !m_db.isOpen()) {
qWarning() << "Database not initialized";
return;
}
QSqlQuery query(m_db);
query.prepare(R"(
INSERT INTO raw_data
(device_id, data, create_time, update_time, creator, data_level, protocol_type)
VALUES (?, ?, ?, ?, ?, ?, ?)
)");
qint64 now = QDateTime::currentMSecsSinceEpoch();
query.addBindValue(deviceId);
query.addBindValue(data);
query.addBindValue(now);
query.addBindValue(now);
query.addBindValue(creator.isEmpty() ? "system" : creator);
query.addBindValue(dataLevel);
query.addBindValue(protocolType);
if (!query.exec()) {
qCritical() << "Failed to store raw data:" << query.lastError().text();
emit databaseError(query.lastError().text());
} else {
emit rawDataStored(deviceId, data);
}
}
3. 存储总线消息(带扩展字段)
void SoftBusCore::storeBusMessage(const BusMessage &message,
const QString &creator = "",
int dataLevel = 0) {
if (!m_dbInitialized || !m_db.isOpen()) {
qWarning() << "Database not initialized";
return;
}
QSqlQuery query(m_db);
query.prepare(R"(
INSERT OR REPLACE INTO bus_messages
(message_id, source, destination, payload, create_time, update_time, creator, data_level, protocol_type)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
)");
qint64 now = QDateTime::currentMSecsSinceEpoch();
QJsonDocument doc;
doc.setObject(message.payload);
query.addBindValue(message.id);
query.addBindValue(message.source);
query.addBindValue(message.destination);
query.addBindValue(doc.toJson(QJsonDocument::Compact));
query.addBindValue(message.timestamp ? message.timestamp : now);
query.addBindValue(now);
query.addBindValue(creator.isEmpty() ? "system" : creator);
query.addBindValue(dataLevel);
query.addBindValue(message.payload["protocol"].toString());
if (!query.exec()) {
qCritical() << "Failed to store bus message:" << query.lastError().text();
emit databaseError(query.lastError().text());
} else {
emit busMessageStored(message);
}
}
4. 查询原始数据(支持多字段过滤)
QList<QByteArray> SoftBusCore::queryRawData(int deviceId,
qint64 startTime,
qint64 endTime,
const QString &creator = "",
int minLevel = -1,
int maxLevel = -1) {
QList<QByteArray> results;
if (!m_dbInitialized || !m_db.isOpen()) {
return results;
}
QSqlQuery query(m_db);
QString sql = "SELECT data FROM raw_data WHERE device_id = ? AND create_time >= ? AND create_time <= ?";
QVariantList bindValues;
bindValues << deviceId << startTime << endTime;
// 动态添加过滤条件
if (!creator.isEmpty()) {
sql += " AND creator = ?";
bindValues << creator;
}
if (minLevel >= 0) {
sql += " AND data_level >= ?";
bindValues << minLevel;
}
if (maxLevel >= 0) {
sql += " AND data_level <= ?";
bindValues << maxLevel;
}
sql += " ORDER BY create_time ASC";
query.prepare(sql);
for (int i = 0; i < bindValues.size(); ++i) {
query.addBindValue(bindValues[i]);
}
if (!query.exec()) {
qWarning() << "Query failed:" << query.lastError().text();
return results;
}
while (query.next()) {
results.append(query.value("data").toByteArray());
}
return results;
}
5. 查询总线消息(支持复杂查询)
QList<BusMessage> SoftBusCore::queryBusMessages(
const QString &source = "",
const QString &destination = "",
qint64 startTime = 0,
qint64 endTime = 0,
const QString &creator = "",
int minLevel = -1,
const QString &protocolType = "") {
QList<BusMessage> results;
if (!m_dbInitialized || !m_db.isOpen()) {
return results;
}
QSqlQuery query(m_db);
QString sql = "SELECT message_id, source, destination, payload, create_time FROM bus_messages WHERE 1=1";
QVariantList bindValues;
// 动态构建查询条件
if (!source.isEmpty()) {
sql += " AND source = ?";
bindValues << source;
}
if (!destination.isEmpty()) {
sql += " AND destination = ?";
bindValues << destination;
}
if (startTime > 0) {
sql += " AND create_time >= ?";
bindValues << startTime;
}
if (endTime > 0) {
sql += " AND create_time <= ?";
bindValues << endTime;
}
if (!creator.isEmpty()) {
sql += " AND creator = ?";
bindValues << creator;
}
if (minLevel >= 0) {
sql += " AND data_level >= ?";
bindValues << minLevel;
}
if (!protocolType.isEmpty()) {
sql += " AND protocol_type = ?";
bindValues << protocolType;
}
sql += " ORDER BY create_time DESC";
query.prepare(sql);
for (const QVariant &value : bindValues) {
query.addBindValue(value);
}
if (!query.exec()) {
qWarning() << "Query failed:" << query.lastError().text();
return results;
}
while (query.next()) {
BusMessage message;
message.id = query.value("message_id").toString();
message.source = query.value("source").toString();
message.destination = query.value("destination").toString();
message.timestamp = query.value("create_time").toLongLong();
QJsonDocument doc = QJsonDocument::fromJson(query.value("payload").toByteArray());
if (doc.isObject()) {
message.payload = doc.object();
}
results.append(message);
}
return results;
}
6. 更新数据(支持更新时间和创建者)
bool SoftBusCore::updateRawDataMetadata(int recordId,
const QString &creator = "",
int dataLevel = -1,
const QJsonObject &metadata = QJsonObject()) {
if (!m_dbInitialized || !m_db.isOpen()) {
return false;
}
QSqlQuery query(m_db);
QString sql = "UPDATE raw_data SET update_time = ?";
QVariantList bindValues;
bindValues << QDateTime::currentMSecsSinceEpoch();
if (!creator.isEmpty()) {
sql += ", creator = ?";
bindValues << creator;
}
if (dataLevel >= 0) {
sql += ", data_level = ?";
bindValues << dataLevel;
}
if (!metadata.isEmpty()) {
QJsonDocument doc(metadata);
sql += ", metadata = ?";
bindValues << doc.toJson(QJsonDocument::Compact);
}
sql += " WHERE id = ?";
bindValues << recordId;
query.prepare(sql);
for (const QVariant &value : bindValues) {
query.addBindValue(value);
}
if (!query.exec()) {
qWarning() << "Update failed:" << query.lastError().text();
return false;
}
return true;
}
7. 统计查询示例
// 统计每个创建者的数据量
QMap<QString, int> SoftBusCore::getDataCountByCreator(int deviceId) {
QMap<QString, int> results;
if (!m_dbInitialized || !m_db.isOpen()) {
return results;
}
QSqlQuery query(m_db);
query.prepare(R"(
SELECT creator, COUNT(*) as count
FROM raw_data
WHERE device_id = ?
GROUP BY creator
)");
query.addBindValue(deviceId);
if (query.exec()) {
while (query.next()) {
results[query.value("creator").toString()] = query.value("count").toInt();
}
}
return results;
}
// 按数据等级统计
QMap<int, int> SoftBusCore::getDataCountByLevel(int deviceId) {
QMap<int, int> results;
if (!m_dbInitialized || !m_db.isOpen()) {
return results;
}
QSqlQuery query(m_db);
query.prepare(R"(
SELECT data_level, COUNT(*) as count
FROM raw_data
WHERE device_id = ?
GROUP BY data_level
)");
query.addBindValue(deviceId);
if (query.exec()) {
while (query.next()) {
results[query.value("data_level").toInt()] = query.value("count").toInt();
}
}
return results;
}
8. CMakeLists.txt 修改
# 移除 RocksDB 相关配置
# SET(ROCKSDB_DIR "${PROJECT_SOURCE_DIR}/rocksdb")
# include_directories(${ROCKSDB_DIR}/include)
# Qt 已内置 SQLite 支持,无需额外配置
# 确保链接 Qt Sql 模块
find_package(Qt6 REQUIRED COMPONENTS Core Sql)
target_link_libraries(soft_bus
Qt6::Core
Qt6::Sql
# ... 其他库
)
优势总结
- ✅ 查询灵活:支持复杂的 WHERE、ORDER BY、GROUP BY
- ✅ 字段扩展:使用 ALTER TABLE 轻松添加字段
- ✅ 索引优化:自动使用索引提升查询性能
- ✅ 数据管理:可以用 SQLite 工具直接查看和管理数据
- ✅ Qt 集成:原生支持,无需额外依赖