# 大数据量时间序列存储方案 ## 场景分析 ### 需求特点 1. **高频写入**:传感器数据持续采集,每次串口读取都会写入 2. **数据量大**:长期运行,数据量可能达到 TB 级别 3. **协议转化**:原始数据需要转化为标准协议后存储 4. **复杂查询**:需要按创建者、数据等级、协议类型等多字段查询 5. **时间序列**:数据按时间顺序写入和查询 ### 性能要求 - **写入性能**:必须支持高频写入(每秒可能数百到数千条) - **查询性能**:支持复杂查询但可能不是实时(可接受秒级延迟) - **存储容量**:支持 TB 级别数据存储 - **查询灵活性**:支持多字段过滤、排序、统计 --- ## 推荐方案:RocksDB + SQLite 混合架构 ### 架构设计 ``` ┌─────────────────────────────────────────────────────────┐ │ 应用层 (SoftBusCore) │ └─────────────────────────────────────────────────────────┘ │ ├─────────────────┐ ▼ ▼ ┌──────────────────────────┐ ┌──────────────────────┐ │ RocksDB (热数据) │ │ SQLite (索引层) │ │ │ │ │ │ • 存储实际数据(二进制) │ │ • 存储元数据索引 │ │ • 高频写入 │ │ • 支持复杂查询 │ │ • 高性能 │ │ • 字段扩展方便 │ │ • 支持大数据量 │ │ • 查询性能优化 │ └──────────────────────────┘ └──────────────────────┘ │ │ └─────────────────┘ │ ▼ ┌─────────────────┐ │ 归档机制 │ │ (定期数据迁移) │ └─────────────────┘ ``` ### 数据存储策略 #### 1. RocksDB:实际数据存储(热数据) - **存储内容**:原始二进制数据、协议转化后的消息体 - **Key 设计**:`deviceId_timestamp` 或 `messageId` - **Value**:二进制数据(压缩存储) - **优势**:写入性能极高,支持大数据量 #### 2. SQLite:元数据索引层(查询层) - **存储内容**:元数据索引(deviceId, timestamp, creator, level, protocol_type 等) - **存储实际数据路径/引用**:指向 RocksDB 的 key - **优势**:支持复杂 SQL 查询,字段扩展方便 #### 3. 数据同步机制 - **写入时**:同时写入 RocksDB 和 SQLite - **查询时**:先查 SQLite 获取索引,再查 RocksDB 获取实际数据 - **归档时**:定期将旧数据从 RocksDB 归档,SQLite 保留索引 --- ## 详细设计 ### 1. SQLite 表结构设计 #### 原始数据索引表(raw_data_index) ```sql CREATE TABLE raw_data_index ( id INTEGER PRIMARY KEY AUTOINCREMENT, device_id INTEGER NOT NULL, rocksdb_key TEXT NOT NULL, -- RocksDB 的 key,用于查询实际数据 data_size INTEGER, -- 数据大小 create_time INTEGER NOT NULL, -- 创建时间(时间戳) update_time INTEGER, -- 更新时间(时间戳) creator TEXT, -- 创建者 data_level INTEGER DEFAULT 0, -- 数据等级 protocol_type TEXT, -- 协议类型 is_valid BOOLEAN DEFAULT 1, -- 是否有效 metadata TEXT, -- 元数据(JSON格式) FOREIGN KEY (device_id) REFERENCES devices(id) ); CREATE INDEX idx_raw_data_device_time ON raw_data_index(device_id, create_time); CREATE INDEX idx_raw_data_creator ON raw_data_index(creator); CREATE INDEX idx_raw_data_level ON raw_data_index(data_level); CREATE INDEX idx_raw_data_protocol ON raw_data_index(protocol_type); CREATE INDEX idx_raw_data_time ON raw_data_index(create_time); ``` #### 总线消息索引表(bus_messages_index) ```sql CREATE TABLE bus_messages_index ( id INTEGER PRIMARY KEY AUTOINCREMENT, message_id TEXT UNIQUE NOT NULL, -- 消息ID source TEXT NOT NULL, destination TEXT NOT NULL, rocksdb_key TEXT NOT NULL, -- RocksDB 的 key payload_size INTEGER, -- payload 大小 create_time INTEGER NOT NULL, update_time INTEGER, creator TEXT, data_level INTEGER DEFAULT 0, protocol_type TEXT, is_valid BOOLEAN DEFAULT 1, metadata TEXT ); CREATE INDEX idx_bus_msg_source_dest ON bus_messages_index(source, destination); CREATE INDEX idx_bus_msg_time ON bus_messages_index(create_time); CREATE INDEX idx_bus_msg_creator ON bus_messages_index(creator); CREATE INDEX idx_bus_msg_level ON bus_messages_index(data_level); CREATE INDEX idx_bus_msg_protocol ON bus_messages_index(protocol_type); ``` #### 设备表(devices) ```sql CREATE TABLE devices ( id INTEGER PRIMARY KEY AUTOINCREMENT, portname TEXT UNIQUE, name TEXT, type TEXT, address INTEGER, protocol TEXT, protocol_detail TEXT, properties TEXT, -- JSON格式 create_time INTEGER, update_time INTEGER, creator TEXT ); ``` ### 2. 写入流程 ``` ┌─────────────────────────────────────────────────────────┐ │ 1. 接收数据 (storeRawData/storeBusMessage) │ └─────────────────────────────────────────────────────────┘ │ ▼ ┌───────────────────────────────────────┐ │ 2. 生成 RocksDB Key │ │ deviceId_timestamp 或 messageId │ └───────────────────────────────────────┘ │ ┌───────────────────┴───────────────────┐ ▼ ▼ ┌──────────────────────┐ ┌──────────────────────┐ │ 3a. 写入 RocksDB │ │ 3b. 写入 SQLite │ │ (实际数据) │ │ (索引元数据) │ │ - 二进制数据 │ │ - 元数据字段 │ │ - 高性能写入 │ │ - rocksdb_key │ └──────────────────────┘ └──────────────────────┘ │ │ └───────────────────┬───────────────────┘ ▼ ┌───────────────────────┐ │ 4. 事务提交/确认 │ └───────────────────────┘ ``` ### 3. 查询流程 ``` ┌─────────────────────────────────────────────────────────┐ │ 1. 复杂查询请求 (按 creator, level, time 等) │ └─────────────────────────────────────────────────────────┘ │ ▼ ┌───────────────────────────────────────┐ │ 2. 查询 SQLite 索引 │ │ - 获取匹配的 rocksdb_key 列表 │ │ - 快速过滤和排序 │ └───────────────────────────────────────┘ │ ▼ ┌───────────────────────────────────────┐ │ 3. 批量查询 RocksDB │ │ - 根据 rocksdb_key 获取实际数据 │ │ - 可以并行查询提升性能 │ └───────────────────────────────────────┘ │ ▼ ┌───────────────────────────────────────┐ │ 4. 返回结果 │ └───────────────────────────────────────┘ ``` ### 4. 数据归档机制 ```sql -- 归档策略:将超过 30 天的数据标记为归档 -- 可以定期将数据从 RocksDB 导出到归档文件,SQLite 保留索引 -- 标记归档数据 UPDATE raw_data_index SET is_valid = 0 WHERE create_time < (strftime('%s', 'now') - 30 * 24 * 3600); -- 查询时可以只查询有效数据 SELECT * FROM raw_data_index WHERE is_valid = 1 AND device_id = ? AND create_time >= ?; ``` --- ## 性能优化策略 ### 1. RocksDB 优化 ```cpp rocksdb::Options options; options.create_if_missing = true; options.write_buffer_size = 64 * 1024 * 1024; // 64MB 写缓冲区 options.max_write_buffer_number = 3; options.min_write_buffer_number_to_merge = 2; options.compression = rocksdb::kSnappyCompression; // 压缩存储 options.max_open_files = 10000; options.target_file_size_base = 64 * 1024 * 1024; // 64MB ``` ### 2. SQLite 优化 ```cpp // 使用 WAL 模式提升并发性能 QSqlQuery query; query.exec("PRAGMA journal_mode=WAL"); query.exec("PRAGMA synchronous=NORMAL"); // 平衡性能和安全性 query.exec("PRAGMA cache_size=10000"); // 10MB 缓存 query.exec("PRAGMA temp_store=MEMORY"); ``` ### 3. 批量写入优化 ```cpp // 使用 WriteBatch 批量写入 RocksDB rocksdb::WriteBatch batch; for (const auto& data : dataList) { batch.Put(key, value); } db->Write(rocksdb::WriteOptions(), &batch); // 使用事务批量写入 SQLite db.transaction(); for (const auto& index : indexList) { // 插入索引 } db.commit(); ``` --- ## 容量估算 ### 假设场景 - **设备数量**:100 个传感器 - **采集频率**:每设备每秒 10 条数据 - **数据大小**:每条原始数据 100 字节,协议转化后 200 字节 - **运行时间**:1 年 ### 数据量计算 - **每秒写入**:100 设备 × 10 条/秒 = 1000 条/秒 - **每天数据量**: - 原始数据:1000 条/秒 × 86400 秒 × 100 字节 = 8.64 GB - 协议数据:1000 条/秒 × 86400 秒 × 200 字节 = 17.28 GB - 索引数据:1000 条/秒 × 86400 秒 × 200 字节(索引) = 17.28 GB - **总计**:约 **43 GB/天** - **1年数据量**:约 **15.7 TB** ### 存储分配建议 - **RocksDB**:存储 30 天热数据(约 1.3 TB) - **SQLite**:存储全部索引(约 6 TB,但索引文件通常较小,实际可能几百 GB) - **归档存储**:超过 30 天的数据归档到外部存储 --- ## 实施建议 ### 阶段1:保持 RocksDB,添加 SQLite 索引层 1. ✅ 保留现有 RocksDB 存储(保持写入性能) 2. ✅ 添加 SQLite 索引层(支持复杂查询) 3. ✅ 实现双写机制(同时写入 RocksDB 和 SQLite) ### 阶段2:优化查询性能 1. ✅ 实现批量查询 RocksDB 2. ✅ 添加查询缓存 3. ✅ 优化索引策略 ### 阶段3:实现归档机制 1. ✅ 定期归档旧数据 2. ✅ 数据压缩和清理 3. ✅ 支持数据恢复 --- ## 总结 ### ✅ 方案优势 1. **高性能写入**:RocksDB 保证写入性能 2. **灵活查询**:SQLite 支持复杂查询 3. **大数据量**:RocksDB 支持 TB 级别存储 4. **字段扩展**:SQLite 支持轻松添加字段 5. **成本可控**:无需额外数据库服务器 ### ⚠️ 注意事项 1. **数据一致性**:需要确保 RocksDB 和 SQLite 的写入一致性 2. **性能平衡**:写入时需要同时写入两个数据库,略有性能损失 3. **存储空间**:SQLite 索引会占用额外空间(但相对较小) 4. **维护复杂度**:需要维护两套存储系统 ### 🎯 适用场景 - ✅ 高频写入 + 大数据量 - ✅ 需要复杂查询 - ✅ 单机部署 - ✅ 需要字段扩展能力