大数据量时间序列存储方案
场景分析
需求特点
- 高频写入:传感器数据持续采集,每次串口读取都会写入
- 数据量大:长期运行,数据量可能达到 TB 级别
- 协议转化:原始数据需要转化为标准协议后存储
- 复杂查询:需要按创建者、数据等级、协议类型等多字段查询
- 时间序列:数据按时间顺序写入和查询
性能要求
- 写入性能:必须支持高频写入(每秒可能数百到数千条)
- 查询性能:支持复杂查询但可能不是实时(可接受秒级延迟)
- 存储容量:支持 TB 级别数据存储
- 查询灵活性:支持多字段过滤、排序、统计
推荐方案:RocksDB + SQLite 混合架构
架构设计
┌─────────────────────────────────────────────────────────┐
│ 应用层 (SoftBusCore) │
└─────────────────────────────────────────────────────────┘
│
├─────────────────┐
▼ ▼
┌──────────────────────────┐ ┌──────────────────────┐
│ RocksDB (热数据) │ │ SQLite (索引层) │
│ │ │ │
│ • 存储实际数据(二进制) │ │ • 存储元数据索引 │
│ • 高频写入 │ │ • 支持复杂查询 │
│ • 高性能 │ │ • 字段扩展方便 │
│ • 支持大数据量 │ │ • 查询性能优化 │
└──────────────────────────┘ └──────────────────────┘
│ │
└─────────────────┘
│
▼
┌─────────────────┐
│ 归档机制 │
│ (定期数据迁移) │
└─────────────────┘
数据存储策略
1. RocksDB:实际数据存储(热数据)
- 存储内容:原始二进制数据、协议转化后的消息体
- Key 设计:
deviceId_timestamp 或 messageId
- Value:二进制数据(压缩存储)
- 优势:写入性能极高,支持大数据量
2. SQLite:元数据索引层(查询层)
- 存储内容:元数据索引(deviceId, timestamp, creator, level, protocol_type 等)
- 存储实际数据路径/引用:指向 RocksDB 的 key
- 优势:支持复杂 SQL 查询,字段扩展方便
3. 数据同步机制
- 写入时:同时写入 RocksDB 和 SQLite
- 查询时:先查 SQLite 获取索引,再查 RocksDB 获取实际数据
- 归档时:定期将旧数据从 RocksDB 归档,SQLite 保留索引
详细设计
1. SQLite 表结构设计
原始数据索引表(raw_data_index)
CREATE TABLE raw_data_index (
id INTEGER PRIMARY KEY AUTOINCREMENT,
device_id INTEGER NOT NULL,
rocksdb_key TEXT NOT NULL, -- RocksDB 的 key,用于查询实际数据
data_size INTEGER, -- 数据大小
create_time INTEGER NOT NULL, -- 创建时间(时间戳)
update_time INTEGER, -- 更新时间(时间戳)
creator TEXT, -- 创建者
data_level INTEGER DEFAULT 0, -- 数据等级
protocol_type TEXT, -- 协议类型
is_valid BOOLEAN DEFAULT 1, -- 是否有效
metadata TEXT, -- 元数据(JSON格式)
FOREIGN KEY (device_id) REFERENCES devices(id)
);
CREATE INDEX idx_raw_data_device_time ON raw_data_index(device_id, create_time);
CREATE INDEX idx_raw_data_creator ON raw_data_index(creator);
CREATE INDEX idx_raw_data_level ON raw_data_index(data_level);
CREATE INDEX idx_raw_data_protocol ON raw_data_index(protocol_type);
CREATE INDEX idx_raw_data_time ON raw_data_index(create_time);
总线消息索引表(bus_messages_index)
CREATE TABLE bus_messages_index (
id INTEGER PRIMARY KEY AUTOINCREMENT,
message_id TEXT UNIQUE NOT NULL, -- 消息ID
source TEXT NOT NULL,
destination TEXT NOT NULL,
rocksdb_key TEXT NOT NULL, -- RocksDB 的 key
payload_size INTEGER, -- payload 大小
create_time INTEGER NOT NULL,
update_time INTEGER,
creator TEXT,
data_level INTEGER DEFAULT 0,
protocol_type TEXT,
is_valid BOOLEAN DEFAULT 1,
metadata TEXT
);
CREATE INDEX idx_bus_msg_source_dest ON bus_messages_index(source, destination);
CREATE INDEX idx_bus_msg_time ON bus_messages_index(create_time);
CREATE INDEX idx_bus_msg_creator ON bus_messages_index(creator);
CREATE INDEX idx_bus_msg_level ON bus_messages_index(data_level);
CREATE INDEX idx_bus_msg_protocol ON bus_messages_index(protocol_type);
设备表(devices)
CREATE TABLE devices (
id INTEGER PRIMARY KEY AUTOINCREMENT,
portname TEXT UNIQUE,
name TEXT,
type TEXT,
address INTEGER,
protocol TEXT,
protocol_detail TEXT,
properties TEXT, -- JSON格式
create_time INTEGER,
update_time INTEGER,
creator TEXT
);
2. 写入流程
┌─────────────────────────────────────────────────────────┐
│ 1. 接收数据 (storeRawData/storeBusMessage) │
└─────────────────────────────────────────────────────────┘
│
▼
┌───────────────────────────────────────┐
│ 2. 生成 RocksDB Key │
│ deviceId_timestamp 或 messageId │
└───────────────────────────────────────┘
│
┌───────────────────┴───────────────────┐
▼ ▼
┌──────────────────────┐ ┌──────────────────────┐
│ 3a. 写入 RocksDB │ │ 3b. 写入 SQLite │
│ (实际数据) │ │ (索引元数据) │
│ - 二进制数据 │ │ - 元数据字段 │
│ - 高性能写入 │ │ - rocksdb_key │
└──────────────────────┘ └──────────────────────┘
│ │
└───────────────────┬───────────────────┘
▼
┌───────────────────────┐
│ 4. 事务提交/确认 │
└───────────────────────┘
3. 查询流程
┌─────────────────────────────────────────────────────────┐
│ 1. 复杂查询请求 (按 creator, level, time 等) │
└─────────────────────────────────────────────────────────┘
│
▼
┌───────────────────────────────────────┐
│ 2. 查询 SQLite 索引 │
│ - 获取匹配的 rocksdb_key 列表 │
│ - 快速过滤和排序 │
└───────────────────────────────────────┘
│
▼
┌───────────────────────────────────────┐
│ 3. 批量查询 RocksDB │
│ - 根据 rocksdb_key 获取实际数据 │
│ - 可以并行查询提升性能 │
└───────────────────────────────────────┘
│
▼
┌───────────────────────────────────────┐
│ 4. 返回结果 │
└───────────────────────────────────────┘
4. 数据归档机制
-- 归档策略:将超过 30 天的数据标记为归档
-- 可以定期将数据从 RocksDB 导出到归档文件,SQLite 保留索引
-- 标记归档数据
UPDATE raw_data_index
SET is_valid = 0
WHERE create_time < (strftime('%s', 'now') - 30 * 24 * 3600);
-- 查询时可以只查询有效数据
SELECT * FROM raw_data_index
WHERE is_valid = 1 AND device_id = ? AND create_time >= ?;
性能优化策略
1. RocksDB 优化
rocksdb::Options options;
options.create_if_missing = true;
options.write_buffer_size = 64 * 1024 * 1024; // 64MB 写缓冲区
options.max_write_buffer_number = 3;
options.min_write_buffer_number_to_merge = 2;
options.compression = rocksdb::kSnappyCompression; // 压缩存储
options.max_open_files = 10000;
options.target_file_size_base = 64 * 1024 * 1024; // 64MB
2. SQLite 优化
// 使用 WAL 模式提升并发性能
QSqlQuery query;
query.exec("PRAGMA journal_mode=WAL");
query.exec("PRAGMA synchronous=NORMAL"); // 平衡性能和安全性
query.exec("PRAGMA cache_size=10000"); // 10MB 缓存
query.exec("PRAGMA temp_store=MEMORY");
3. 批量写入优化
// 使用 WriteBatch 批量写入 RocksDB
rocksdb::WriteBatch batch;
for (const auto& data : dataList) {
batch.Put(key, value);
}
db->Write(rocksdb::WriteOptions(), &batch);
// 使用事务批量写入 SQLite
db.transaction();
for (const auto& index : indexList) {
// 插入索引
}
db.commit();
容量估算
假设场景
- 设备数量:100 个传感器
- 采集频率:每设备每秒 10 条数据
- 数据大小:每条原始数据 100 字节,协议转化后 200 字节
- 运行时间:1 年
数据量计算
- 每秒写入:100 设备 × 10 条/秒 = 1000 条/秒
- 每天数据量:
- 原始数据:1000 条/秒 × 86400 秒 × 100 字节 = 8.64 GB
- 协议数据:1000 条/秒 × 86400 秒 × 200 字节 = 17.28 GB
- 索引数据:1000 条/秒 × 86400 秒 × 200 字节(索引) = 17.28 GB
- 总计:约 43 GB/天
- 1年数据量:约 15.7 TB
存储分配建议
- RocksDB:存储 30 天热数据(约 1.3 TB)
- SQLite:存储全部索引(约 6 TB,但索引文件通常较小,实际可能几百 GB)
- 归档存储:超过 30 天的数据归档到外部存储
实施建议
阶段1:保持 RocksDB,添加 SQLite 索引层
- ✅ 保留现有 RocksDB 存储(保持写入性能)
- ✅ 添加 SQLite 索引层(支持复杂查询)
- ✅ 实现双写机制(同时写入 RocksDB 和 SQLite)
阶段2:优化查询性能
- ✅ 实现批量查询 RocksDB
- ✅ 添加查询缓存
- ✅ 优化索引策略
阶段3:实现归档机制
- ✅ 定期归档旧数据
- ✅ 数据压缩和清理
- ✅ 支持数据恢复
总结
✅ 方案优势
- 高性能写入:RocksDB 保证写入性能
- 灵活查询:SQLite 支持复杂查询
- 大数据量:RocksDB 支持 TB 级别存储
- 字段扩展:SQLite 支持轻松添加字段
- 成本可控:无需额外数据库服务器
⚠️ 注意事项
- 数据一致性:需要确保 RocksDB 和 SQLite 的写入一致性
- 性能平衡:写入时需要同时写入两个数据库,略有性能损失
- 存储空间:SQLite 索引会占用额外空间(但相对较小)
- 维护复杂度:需要维护两套存储系统
🎯 适用场景
- ✅ 高频写入 + 大数据量
- ✅ 需要复杂查询
- ✅ 单机部署
- ✅ 需要字段扩展能力