跳到主要内容

数据流架构 (Data Flow Architecture)

概述

AXBlade 后端使用三层数据存储架构:

层级技术用途TTL
HotRedis实时缓存、发布/订阅5s-24h
WarmPostgreSQLOLTP、事务数据永久
ColdTimescaleDB时序数据分析30-90 天
┌─────────────────────────────────────────────────────────────────────────┐
│ Client Layer │
│ (REST API / WebSocket / gRPC) │
└─────────────────────────────────────────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────────────────┐
│ Application Layer │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌───────────────┐ │
│ │ Auth/JWT │ │ Matching │ │ Position │ │ Price Feed │ │
│ │ Handler │ │ Engine │ │ Service │ │ Service │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ └───────────────┘ │
└─────────────────────────────────────────────────────────────────────────┘
│ │ │
┌─────────┴─────────┬─────────────┴─────────────┬─────┘
▼ ▼ ▼
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────────────────┐
│ Redis │ │ PostgreSQL │ │ TimescaleDB │
│ (Hot Cache) │ │ (OLTP) │ │ (Time-Series) │
├─────────────────┤ ├─────────────────┤ ├─────────────────────────────┤
│ • Price Cache │ │ • Users │ │ • Trades (Hypertable) │
│ • Orderbook │ │ • Orders │ │ • K-lines (Continuous Agg) │
│ • User Balance │ │ • Positions │ │ - 1m, 5m, 15m, 1h, 4h │
│ • Rate Limiting │ │ • Balances │ │ - 1d, 1w │
│ • Sessions │ │ • Deposits │ │ • Compression (7d+) │
│ • Pub/Sub │ │ • Withdrawals │ │ • Retention Policies │
└─────────────────┘ │ • Referrals │ └─────────────────────────────┘
└─────────────────┘

1. 订单处理流程

┌──────────────────────────────────────────────────────────────────────────┐
│ Order Processing Flow │
└──────────────────────────────────────────────────────────────────────────┘

1. API Request


┌─────────────────┐
│ OrderHandler │ ◄─── POST /api/v1/orders
└────────┬────────┘


2. ┌─────────────────┐
│ MatchingEngine │ ◄─── In-memory orderbook matching
│ (DashMap) │ - Price-time priority
└────────┬────────┘ - Concurrent access via DashMap

│ Trades generated via broadcast::channel

3. ┌─────────────────┐ ┌─────────────────┐
│ OrderFlow │────▶│ PostgreSQL │
│ Orchestrator │ │ (orders table) │
└────────┬────────┘ └─────────────────┘

│ tokio::spawn (async persistence)

4. ┌─────────────────┐ ┌─────────────────┐
│ Persistence │────▶│ PostgreSQL │
│ Worker │ │ (trades table) │
└────────┬────────┘ └─────────────────┘

│ broadcast::Receiver<TradeEvent>

5. ┌─────────────────┐ ┌─────────────────┐
│ KlineService │────▶│ TimescaleDB │
│ │ │ (klines_*) │
└────────┬────────┘ └─────────────────┘

│ broadcast::Sender<KlineUpdate>

6. ┌─────────────────┐ ┌─────────────────┐
│ WebSocket │────▶│ Redis Pub/Sub │
│ Handler │ │ (channel:*) │
└─────────────────┘ └─────────────────┘

关键代码文件

文件功能
src/services/matching/engine.rs:135-319订单撮合逻辑
src/services/matching/orchestrator.rs:57-86持久化工作线程
src/services/kline/mod.rs:268-328K线聚合

2. 交易事件流

┌──────────────────────────────────────────────────────────────────────────┐
│ Trade Event Flow │
└──────────────────────────────────────────────────────────────────────────┘

MatchingEngine

│ broadcast::channel(10000)

┌────────────────┴────────────────┐
│ │
▼ ▼
┌───────────────┐ ┌───────────────┐
│ Persistence │ │ KlineService │
│ Worker │ │ │
└───────┬───────┘ └───────┬───────┘
│ │
▼ ▼
┌───────────────┐ ┌───────────────┐
│ trades │ │ Memory Cache │
│ (PG table) │ │ (DashMap) │
└───────┬───────┘ └───────┬───────┘
│ │
│ TimescaleDB │ On candle close
│ Continuous Aggregate ▼
▼ ┌───────────────┐
┌───────────────┐ │ klines_ │
│ klines_1m │ │ historical │
│ klines_5m │ │ (PG table) │
│ klines_15m │ └───────────────┘
│ klines_1h │
│ klines_4h │
│ klines_1d │
│ klines_1w │
└───────────────┘

交易持久化逻辑

// src/services/matching/orchestrator.rs:204-431
async fn persist_trade(pool: &PgPool, trade: &TradeEvent) {
// 1. Insert trade record
// 2. Query maker/taker leverage from orders
// 3. Update maker position (opposite side)
// 4. Update taker position (same side)
// 5. Record referral commissions
}

3. Redis 缓存架构

┌──────────────────────────────────────────────────────────────────────────┐
│ Redis Key Structure │
└──────────────────────────────────────────────────────────────────────────┘

Price Data (TTL: 5s)
├── price:mark:BTCUSDT → "97000.50"
├── price:index:BTCUSDT → "96999.00"
└── price:last:BTCUSDT → "97001.25"

Ticker Data (TTL: 5s)
└── ticker:BTCUSDT → JSON { last_price, high_24h, volume_24h, ... }

Orderbook (No TTL - managed by engine)
├── orderbook:BTCUSDT:bids → Sorted Set { price → amount }
├── orderbook:BTCUSDT:asks → Sorted Set { price → amount }
└── orderbook:BTCUSDT:snapshot → JSON { bids: [], asks: [] }

User Data (TTL: 30s)
├── user:balance:0x1234abcd → Hash { USDT → "1000.00" }
└── user:positions:0x1234abcd → JSON [{ symbol, side, size, pnl }]

Position Data (TTL: 10s)
├── position:{id} → JSON { full position data }
└── position:user:0x123:BTCUSDT:long → JSON { position data }

Session Data (TTL: 86400s / 24h)
├── session:0x1234abcd → JWT token data
└── nonce:0x1234abcd → "12345"

Rate Limiting (TTL: 60s)
├── rate:ip:192.168.1.1 → counter
├── rate:user:0x1234 → counter
└── rate:endpoint:POST:/orders:0x1234 → counter

K-line Cache (TTL: 60s)
├── kline:BTCUSDT:1m → JSON [candles]
└── kline:BTCUSDT:1m:latest → JSON { current candle }

Pub/Sub Channels
├── channel:trades:BTCUSDT → Trade events
├── channel:orderbook:BTCUSDT → Orderbook updates
├── channel:ticker:BTCUSDT → Ticker updates
├── channel:kline:BTCUSDT:1m → K-line updates
├── channel:orders:0x1234 → User order updates
└── channel:positions:0x1234 → User position updates

关键代码文件

文件功能
src/cache/keys.rs:1-272Key 命名规范
src/cache/mod.rs:1-370CacheManager 实现
src/cache/price_cache.rs价格缓存逻辑

4. TimescaleDB 架构

┌──────────────────────────────────────────────────────────────────────────┐
│ TimescaleDB Setup │
└──────────────────────────────────────────────────────────────────────────┘

trades (Hypertable)
─────────────────────
│ Chunk interval: 1 day
│ Compression: 7+ days old
│ Retention: 90 days

│ Indexes:
│ - symbol
│ - created_at (time dimension)

┌───────────┴───────────┐
│ Continuous Aggregates │
└───────────┬───────────┘

┌───────────────┼───────────────────────────────────┐
│ │ │ │
▼ ▼ ▼ ▼
┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐
│klines_1m│ │klines_5m│ │klines_1h│ ... │klines_1w│
├────────┤ ├────────┤ ├────────┤ ├────────┤
│Refresh: │ │Refresh: │ │Refresh: │ │Refresh: │
│ 1 min │ │ 5 min │ │ 1 hour │ │ 1 week │
│ │ │ │ │ │ │ │
│Retention│ │Retention│ │ │ │ │
│ 30 days │ │ 60 days │ │ ∞ │ │ ∞ │
└────────┘ └────────┘ └────────┘ └────────┘

K线聚合查询

-- Materialized view for 1-minute K-lines
SELECT
symbol,
time_bucket('1 minute', created_at) AS bucket,
FIRST(price, created_at) AS open,
MAX(price) AS high,
MIN(price) AS low,
LAST(price, created_at) AS close,
SUM(amount) AS volume,
SUM(price * amount) AS quote_volume,
COUNT(*) AS trade_count
FROM trades
GROUP BY symbol, time_bucket('1 minute', created_at)

关键代码文件

文件功能
migrations/0008_timescaledb_setup.sqlTimescaleDB 配置
src/db/timescale.rsTimescaleDB 操作

5. 数据读取模式

价格查询流程

API Request → Redis Cache (5s TTL)

├─ HIT → Return cached price

└─ MISS → Query PostgreSQL/External Feed

└─ Update Redis Cache

K线查询流程

API Request → Memory Cache (KlineService)

├─ HIT → Return memory data

└─ MISS → Query TimescaleDB (klines_historical)

└─ Fallback to continuous aggregates

订单历史流程

API Request → In-memory History (1000 orders/user)

├─ Recent → Return from DashMap

└─ Older → Query PostgreSQL orders table

6. 数据写入模式

同步操作 (阻塞)

  • 订单验证
  • 余额检查
  • 保证金计算

异步操作 (通过 tokio::spawn 非阻塞)

  • 交易持久化
  • 仓位更新
  • 推荐佣金记录
  • K线聚合
  • WebSocket 广播

事件驱动 (broadcast::channel)

// Trade events flow
MatchingEnginebroadcast::Sender<TradeEvent>

┌───────────┴───────────┐
▼ ▼
PersistenceWorker KlineService
│ │
▼ ▼
PostgreSQL Memory + DB

7. 数据一致性模型

操作一致性备注
订单撮合强一致性内存中,每个订单簿单线程
交易持久化最终一致性通过 broadcast channel 异步
仓位更新最终一致性交易持久化后更新
余额更新强一致性PostgreSQL 事务
K线更新最终一致性从交易聚合
缓存更新最终一致性基于 TTL 失效

已知问题

  1. 无事务 - 交易、仓位、余额更新是独立操作
  2. 无重试机制 - 失败的持久化仅记录日志不重试
  3. N+1 查询 - 仓位服务逐个获取价格
  4. 广播延迟 - 消费者慢时可能丢失交易

8. 数据保留策略

数据类型保留时间压缩
原始交易90 天7天后
klines_1m30 天N/A
klines_5m60 天N/A
klines_15m90 天N/A
klines_1h+永久N/A
订单永久N/A
仓位永久N/A

9. 监控要点

┌──────────────────────────────────────────────────────────────────────────┐
│ Key Metrics to Monitor │
└──────────────────────────────────────────────────────────────────────────┘

Redis:
- Cache hit/miss ratio
- Pub/Sub lag
- Memory usage
- Connection pool status

PostgreSQL:
- Query latency (p50, p95, p99)
- Connection pool exhaustion
- Transaction rate
- Lock contention

TimescaleDB:
- Compression ratio
- Chunk size
- Continuous aggregate refresh time
- Retention policy execution

Application:
- Broadcast channel lag (trades.lagged count)
- Order processing latency
- Matching engine queue depth
- WebSocket connection count

10. 灾难恢复

Redis 故障

  • 系统以降级性能继续运行
  • 优雅降级: CacheManager.is_available() → false
  • 读取回退到 PostgreSQL
  • 价格源继续从外部获取

PostgreSQL 故障

  • 交易暂停(订单无法持久化)
  • 仓位更新失败
  • 余额操作失败
  • K线生成在内存中继续

TimescaleDB 扩展故障

  • K线查询回退到原始交易表
  • 聚合显著变慢
  • 历史数据仍可访问

相关文档