大数据量存储方案.md 13 KB

大数据量时间序列存储方案

场景分析

需求特点

  1. 高频写入:传感器数据持续采集,每次串口读取都会写入
  2. 数据量大:长期运行,数据量可能达到 TB 级别
  3. 协议转化:原始数据需要转化为标准协议后存储
  4. 复杂查询:需要按创建者、数据等级、协议类型等多字段查询
  5. 时间序列:数据按时间顺序写入和查询

性能要求

  • 写入性能:必须支持高频写入(每秒可能数百到数千条)
  • 查询性能:支持复杂查询但可能不是实时(可接受秒级延迟)
  • 存储容量:支持 TB 级别数据存储
  • 查询灵活性:支持多字段过滤、排序、统计

推荐方案:RocksDB + SQLite 混合架构

架构设计

┌─────────────────────────────────────────────────────────┐
│                    应用层 (SoftBusCore)                  │
└─────────────────────────────────────────────────────────┘
                            │
                            ├─────────────────┐
                            ▼                 ▼
        ┌──────────────────────────┐  ┌──────────────────────┐
        │      RocksDB (热数据)     │  │  SQLite (索引层)      │
        │                          │  │                      │
        │  • 存储实际数据(二进制)  │  │  • 存储元数据索引    │
        │  • 高频写入               │  │  • 支持复杂查询      │
        │  • 高性能                │  │  • 字段扩展方便      │
        │  • 支持大数据量           │  │  • 查询性能优化      │
        └──────────────────────────┘  └──────────────────────┘
                            │                 │
                            └─────────────────┘
                            │
                            ▼
                    ┌─────────────────┐
                    │   归档机制       │
                    │  (定期数据迁移)  │
                    └─────────────────┘

数据存储策略

1. RocksDB:实际数据存储(热数据)

  • 存储内容:原始二进制数据、协议转化后的消息体
  • Key 设计deviceId_timestampmessageId
  • Value:二进制数据(压缩存储)
  • 优势:写入性能极高,支持大数据量

2. SQLite:元数据索引层(查询层)

  • 存储内容:元数据索引(deviceId, timestamp, creator, level, protocol_type 等)
  • 存储实际数据路径/引用:指向 RocksDB 的 key
  • 优势:支持复杂 SQL 查询,字段扩展方便

3. 数据同步机制

  • 写入时:同时写入 RocksDB 和 SQLite
  • 查询时:先查 SQLite 获取索引,再查 RocksDB 获取实际数据
  • 归档时:定期将旧数据从 RocksDB 归档,SQLite 保留索引

详细设计

1. SQLite 表结构设计

原始数据索引表(raw_data_index)

CREATE TABLE raw_data_index (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    device_id INTEGER NOT NULL,
    rocksdb_key TEXT NOT NULL,          -- RocksDB 的 key,用于查询实际数据
    data_size INTEGER,                   -- 数据大小
    create_time INTEGER NOT NULL,        -- 创建时间(时间戳)
    update_time INTEGER,                 -- 更新时间(时间戳)
    creator TEXT,                        -- 创建者
    data_level INTEGER DEFAULT 0,        -- 数据等级
    protocol_type TEXT,                  -- 协议类型
    is_valid BOOLEAN DEFAULT 1,          -- 是否有效
    metadata TEXT,                       -- 元数据(JSON格式)
    FOREIGN KEY (device_id) REFERENCES devices(id)
);

CREATE INDEX idx_raw_data_device_time ON raw_data_index(device_id, create_time);
CREATE INDEX idx_raw_data_creator ON raw_data_index(creator);
CREATE INDEX idx_raw_data_level ON raw_data_index(data_level);
CREATE INDEX idx_raw_data_protocol ON raw_data_index(protocol_type);
CREATE INDEX idx_raw_data_time ON raw_data_index(create_time);

总线消息索引表(bus_messages_index)

CREATE TABLE bus_messages_index (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    message_id TEXT UNIQUE NOT NULL,     -- 消息ID
    source TEXT NOT NULL,
    destination TEXT NOT NULL,
    rocksdb_key TEXT NOT NULL,           -- RocksDB 的 key
    payload_size INTEGER,                -- payload 大小
    create_time INTEGER NOT NULL,
    update_time INTEGER,
    creator TEXT,
    data_level INTEGER DEFAULT 0,
    protocol_type TEXT,
    is_valid BOOLEAN DEFAULT 1,
    metadata TEXT
);

CREATE INDEX idx_bus_msg_source_dest ON bus_messages_index(source, destination);
CREATE INDEX idx_bus_msg_time ON bus_messages_index(create_time);
CREATE INDEX idx_bus_msg_creator ON bus_messages_index(creator);
CREATE INDEX idx_bus_msg_level ON bus_messages_index(data_level);
CREATE INDEX idx_bus_msg_protocol ON bus_messages_index(protocol_type);

设备表(devices)

CREATE TABLE devices (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    portname TEXT UNIQUE,
    name TEXT,
    type TEXT,
    address INTEGER,
    protocol TEXT,
    protocol_detail TEXT,
    properties TEXT,                     -- JSON格式
    create_time INTEGER,
    update_time INTEGER,
    creator TEXT
);

2. 写入流程

┌─────────────────────────────────────────────────────────┐
│  1. 接收数据 (storeRawData/storeBusMessage)            │
└─────────────────────────────────────────────────────────┘
                            │
                            ▼
        ┌───────────────────────────────────────┐
        │  2. 生成 RocksDB Key                  │
        │     deviceId_timestamp 或 messageId   │
        └───────────────────────────────────────┘
                            │
        ┌───────────────────┴───────────────────┐
        ▼                                       ▼
┌──────────────────────┐              ┌──────────────────────┐
│ 3a. 写入 RocksDB     │              │ 3b. 写入 SQLite      │
│     (实际数据)        │              │     (索引元数据)      │
│     - 二进制数据      │              │     - 元数据字段      │
│     - 高性能写入      │              │     - rocksdb_key    │
└──────────────────────┘              └──────────────────────┘
        │                                       │
        └───────────────────┬───────────────────┘
                            ▼
                ┌───────────────────────┐
                │  4. 事务提交/确认      │
                └───────────────────────┘

3. 查询流程

┌─────────────────────────────────────────────────────────┐
│  1. 复杂查询请求 (按 creator, level, time 等)           │
└─────────────────────────────────────────────────────────┘
                            │
                            ▼
        ┌───────────────────────────────────────┐
        │  2. 查询 SQLite 索引                   │
        │     - 获取匹配的 rocksdb_key 列表      │
        │     - 快速过滤和排序                   │
        └───────────────────────────────────────┘
                            │
                            ▼
        ┌───────────────────────────────────────┐
        │  3. 批量查询 RocksDB                   │
        │     - 根据 rocksdb_key 获取实际数据     │
        │     - 可以并行查询提升性能              │
        └───────────────────────────────────────┘
                            │
                            ▼
        ┌───────────────────────────────────────┐
        │  4. 返回结果                           │
        └───────────────────────────────────────┘

4. 数据归档机制

-- 归档策略:将超过 30 天的数据标记为归档
-- 可以定期将数据从 RocksDB 导出到归档文件,SQLite 保留索引

-- 标记归档数据
UPDATE raw_data_index 
SET is_valid = 0 
WHERE create_time < (strftime('%s', 'now') - 30 * 24 * 3600);

-- 查询时可以只查询有效数据
SELECT * FROM raw_data_index 
WHERE is_valid = 1 AND device_id = ? AND create_time >= ?;

性能优化策略

1. RocksDB 优化

rocksdb::Options options;
options.create_if_missing = true;
options.write_buffer_size = 64 * 1024 * 1024;  // 64MB 写缓冲区
options.max_write_buffer_number = 3;
options.min_write_buffer_number_to_merge = 2;
options.compression = rocksdb::kSnappyCompression;  // 压缩存储
options.max_open_files = 10000;
options.target_file_size_base = 64 * 1024 * 1024;  // 64MB

2. SQLite 优化

// 使用 WAL 模式提升并发性能
QSqlQuery query;
query.exec("PRAGMA journal_mode=WAL");
query.exec("PRAGMA synchronous=NORMAL");  // 平衡性能和安全性
query.exec("PRAGMA cache_size=10000");     // 10MB 缓存
query.exec("PRAGMA temp_store=MEMORY");

3. 批量写入优化

// 使用 WriteBatch 批量写入 RocksDB
rocksdb::WriteBatch batch;
for (const auto& data : dataList) {
    batch.Put(key, value);
}
db->Write(rocksdb::WriteOptions(), &batch);

// 使用事务批量写入 SQLite
db.transaction();
for (const auto& index : indexList) {
    // 插入索引
}
db.commit();

容量估算

假设场景

  • 设备数量:100 个传感器
  • 采集频率:每设备每秒 10 条数据
  • 数据大小:每条原始数据 100 字节,协议转化后 200 字节
  • 运行时间:1 年

数据量计算

  • 每秒写入:100 设备 × 10 条/秒 = 1000 条/秒
  • 每天数据量
    • 原始数据:1000 条/秒 × 86400 秒 × 100 字节 = 8.64 GB
    • 协议数据:1000 条/秒 × 86400 秒 × 200 字节 = 17.28 GB
    • 索引数据:1000 条/秒 × 86400 秒 × 200 字节(索引) = 17.28 GB
    • 总计:约 43 GB/天
  • 1年数据量:约 15.7 TB

存储分配建议

  • RocksDB:存储 30 天热数据(约 1.3 TB)
  • SQLite:存储全部索引(约 6 TB,但索引文件通常较小,实际可能几百 GB)
  • 归档存储:超过 30 天的数据归档到外部存储

实施建议

阶段1:保持 RocksDB,添加 SQLite 索引层

  1. ✅ 保留现有 RocksDB 存储(保持写入性能)
  2. ✅ 添加 SQLite 索引层(支持复杂查询)
  3. ✅ 实现双写机制(同时写入 RocksDB 和 SQLite)

阶段2:优化查询性能

  1. ✅ 实现批量查询 RocksDB
  2. ✅ 添加查询缓存
  3. ✅ 优化索引策略

阶段3:实现归档机制

  1. ✅ 定期归档旧数据
  2. ✅ 数据压缩和清理
  3. ✅ 支持数据恢复

总结

✅ 方案优势

  1. 高性能写入:RocksDB 保证写入性能
  2. 灵活查询:SQLite 支持复杂查询
  3. 大数据量:RocksDB 支持 TB 级别存储
  4. 字段扩展:SQLite 支持轻松添加字段
  5. 成本可控:无需额外数据库服务器

⚠️ 注意事项

  1. 数据一致性:需要确保 RocksDB 和 SQLite 的写入一致性
  2. 性能平衡:写入时需要同时写入两个数据库,略有性能损失
  3. 存储空间:SQLite 索引会占用额外空间(但相对较小)
  4. 维护复杂度:需要维护两套存储系统

🎯 适用场景

  • ✅ 高频写入 + 大数据量
  • ✅ 需要复杂查询
  • ✅ 单机部署
  • ✅ 需要字段扩展能力