数据流架构 (Data Flow Architecture)
概述
AXBlade 后端使用三层数据存储架构:
| 层级 | 技术 | 用途 | TTL |
|---|---|---|---|
| Hot | Redis | 实时缓存、发布/订阅 | 5s-24h |
| Warm | PostgreSQL | OLTP、事务数据 | 永久 |
| Cold | TimescaleDB | 时序数据分析 | 30-90 天 |
┌─────────────────────────────────────────────────────────────────────────┐
│ Client Layer │
│ (REST API / WebSocket / gRPC) │
└─────────────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────────┐
│ Application Layer │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌───────────────┐ │
│ │ Auth/JWT │ │ Matching │ │ Position │ │ Price Feed │ │
│ │ Handler │ │ Engine │ │ Service │ │ Service │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ └───────────────┘ │
└─────────────────────────────────────────────────────────────────────────┘
│ │ │
┌ ─────────┴─────────┬─────────────┴─────────────┬─────┘
▼ ▼ ▼
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────────────────┐
│ Redis │ │ PostgreSQL │ │ TimescaleDB │
│ (Hot Cache) │ │ (OLTP) │ │ (Time-Series) │
├─────────────────┤ ├─────────────────┤ ├─────────────────────────────┤
│ • Price Cache │ │ • Users │ │ • Trades (Hypertable) │
│ • Orderbook │ │ • Orders │ │ • K-lines (Continuous Agg) │
│ • User Balance │ │ • Positions │ │ - 1m, 5m, 15m, 1h, 4h │
│ • Rate Limiting │ │ • Balances │ │ - 1d, 1w │
│ • Sessions │ │ • Deposits │ │ • Compression (7d+) │
│ • Pub/Sub │ │ • Withdrawals │ │ • Retention Policies │
└─────────────────┘ │ • Referrals │ └─────────────────────────────┘
└─────────────────┘
1. 订单处理流程
┌──────────────────────────────────────────────────────────────────────────┐
│ Order Processing Flow │
└──────────────────────────────────────────────────────────────────────────┘
1. API Request
│
▼
┌─────────────────┐
│ OrderHandler │ ◄─── POST /api/v1/orders
└────────┬────────┘
│
▼
2. ┌─────────────────┐
│ MatchingEngine │ ◄─── In-memory orderbook matching
│ (DashMap) │ - Price-time priority
└────────┬────────┘ - Concurrent access via DashMap
│
│ Trades generated via broadcast::channel
▼
3. ┌─────────────────┐ ┌─────────────────┐
│ OrderFlow │────▶│ PostgreSQL │
│ Orchestrator │ │ (orders table) │
└────────┬────────┘ └─────────────────┘
│
│ tokio::spawn (async persistence)
▼
4. ┌─────────────────┐ ┌─────────────────┐
│ Persistence │────▶│ PostgreSQL │
│ Worker │ │ (trades table) │
└────────┬────────┘ └─────────────────┘
│
│ broadcast::Receiver<TradeEvent>
▼
5. ┌─────────────────┐ ┌─────────────────┐
│ KlineService │────▶│ TimescaleDB │
│ │ │ (klines_*) │
└────────┬────────┘ └─────────────────┘
│
│ broadcast::Sender<KlineUpdate>
▼
6. ┌─────────────────┐ ┌─────────────────┐
│ WebSocket │────▶│ Redis Pub/Sub │
│ Handler │ │ (channel:*) │
└ ─────────────────┘ └─────────────────┘
关键代码文件
| 文件 | 功能 |
|---|---|
src/services/matching/engine.rs:135-319 | 订单撮合逻辑 |
src/services/matching/orchestrator.rs:57-86 | 持久化工作线程 |
src/services/kline/mod.rs:268-328 | K线聚合 |
2. 交易事件流
┌──────────────────────────────────────────────────────────────────────────┐
│ Trade Event Flow │
└──────────────────────────────────────────────────────────────────────────┘
MatchingEngine
│
│ broadcast::channel(10000)
▼
┌────────────────┴────────────────┐
│ │
▼ ▼
┌───────────────┐ ┌───────────────┐
│ Persistence │ │ KlineService │
│ Worker │ │ │
└───────┬───────┘ └───────┬───────┘
│ │
▼ ▼
┌───────────────┐ ┌───────────────┐
│ trades │ │ Memory Cache │
│ (PG table) │ │ (DashMap) │
└───────┬───────┘ └───────┬───────┘
│ │
│ TimescaleDB │ On candle close
│ Continuous Aggregate ▼
▼ ┌───────────────┐
┌───────────────┐ │ klines_ │
│ klines_1m │ │ historical │
│ klines_5m │ │ (PG table) │
│ klines_15m │ └───────────────┘
│ klines_1h │
│ klines_4h │
│ klines_1d │
│ klines_1w │
└───────────────┘
交易持久化逻辑
// src/services/matching/orchestrator.rs:204-431
async fn persist_trade(pool: &PgPool, trade: &TradeEvent) {
// 1. Insert trade record
// 2. Query maker/taker leverage from orders
// 3. Update maker position (opposite side)
// 4. Update taker position (same side)
// 5. Record referral commissions
}
3. Redis 缓存架构
┌──────────────────────────────────────────────────────────────────────────┐
│ Redis Key Structure │
└──────────────────────────────────────────────────────────────────────────┘
Price Data (TTL: 5s)
├── price:mark:BTCUSDT → "97000.50"
├── price:index:BTCUSDT → "96999.00"
└── price:last:BTCUSDT → "97001.25"
Ticker Data (TTL: 5s)
└── ticker:BTCUSDT → JSON { last_price, high_24h, volume_24h, ... }
Orderbook (No TTL - managed by engine)
├── orderbook:BTCUSDT:bids → Sorted Set { price → amount }
├── orderbook:BTCUSDT:asks → Sorted Set { price → amount }
└── orderbook:BTCUSDT:snapshot → JSON { bids: [], asks: [] }
User Data (TTL: 30s)
├── user:balance:0x1234abcd → Hash { USDT → "1000.00" }
└── user:positions:0x1234abcd → JSON [{ symbol, side, size, pnl }]
Position Data (TTL: 10s)
├── position:{id} → JSON { full position data }
└── position:user:0x123:BTCUSDT:long → JSON { position data }
Session Data (TTL: 86400s / 24h)
├── session:0x1234abcd → JWT token data
└── nonce:0x1234abcd → "12345"
Rate Limiting (TTL: 60s)
├── rate:ip:192.168.1.1 → counter
├── rate:user:0x1234 → counter
└── rate:endpoint:POST:/orders:0x1234 → counter
K-line Cache (TTL: 60s)
├── kline:BTCUSDT:1m → JSON [candles]
└── kline:BTCUSDT:1m:latest → JSON { current candle }
Pub/Sub Channels
├── channel:trades:BTCUSDT → Trade events
├── channel:orderbook:BTCUSDT → Orderbook updates
├── channel:ticker:BTCUSDT → Ticker updates
├── channel:kline:BTCUSDT:1m → K-line updates
├── channel:orders:0x1234 → User order updates
└── channel:positions:0x1234 → User position updates
关键代码文件
| 文件 | 功能 |
|---|---|
src/cache/keys.rs:1-272 | Key 命名规范 |
src/cache/mod.rs:1-370 | CacheManager 实现 |
src/cache/price_cache.rs | 价格缓存逻辑 |
4. TimescaleDB 架构
┌──────────────────────────────────────────────────────────────────────────┐
│ TimescaleDB Setup │
└──────────────────────────────────────────────────────────────────────────┘
trades (Hypertable)
─────────────────────
│ Chunk interval: 1 day
│ Compression: 7+ days old
│ Retention: 90 days
│
│ Indexes:
│ - symbol
│ - created_at (time dimension)
│
┌───────────┴───────────┐
│ Continuous Aggregates │
└───────────┬───────────┘
│
┌───────────────┼───────────────────────────────────┐
│ │ │ │
▼ ▼ ▼ ▼
┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐
│klines_1m│ │klines_5m│ │klines_1h│ ... │klines_1w│
├────────┤ ├────────┤ ├────────┤ ├────────┤
│Refresh: │ │Refresh: │ │Refresh: │ │Refresh: │
│ 1 min │ │ 5 min │ │ 1 hour │ │ 1 week │
│ │ │ │ │ │ │ │
│Retention│ │Retention│ │ │ │ │
│ 30 days │ │ 60 days │ │ ∞ │ │ ∞ │
└────────┘ └────────┘ └────────┘ └────────┘
K线聚合查询
-- Materialized view for 1-minute K-lines
SELECT
symbol,
time_bucket('1 minute', created_at) AS bucket,
FIRST(price, created_at) AS open,
MAX(price) AS high,
MIN(price) AS low,
LAST(price, created_at) AS close,
SUM(amount) AS volume,
SUM(price * amount) AS quote_volume,
COUNT(*) AS trade_count
FROM trades
GROUP BY symbol, time_bucket('1 minute', created_at)
关键代码文件
| 文件 | 功能 |
|---|---|
migrations/0008_timescaledb_setup.sql | TimescaleDB 配置 |
src/db/timescale.rs | TimescaleDB 操作 |
5. 数据读取模式
价格查询流程
API Request → Redis Cache (5s TTL)
│
├─ HIT → Return cached price
│
└─ MISS → Query PostgreSQL/External Feed
│
└─ Update Redis Cache
K线查询流程
API Request → Memory Cache (KlineService)
│
├─ HIT → Return memory data
│
└─ MISS → Query TimescaleDB (klines_historical)
│
└─ Fallback to continuous aggregates
订单历史流程
API Request → In-memory History (1000 orders/user)
│
├─ Recent → Return from DashMap
│
└─ Older → Query PostgreSQL orders table
6. 数据写入模式
同步操作 (阻塞)
- 订单验证
- 余额检查
- 保证金计算
异步操作 (通过 tokio::spawn 非阻塞)
- 交易持久化
- 仓位更新
- 推荐佣金记录
- K线聚合
- WebSocket 广播
事件驱动 (broadcast::channel)
// Trade events flow
MatchingEngine → broadcast::Sender<TradeEvent>
│
┌───────────┴───────────┐
▼ ▼
PersistenceWorker KlineService
│ │
▼ ▼
PostgreSQL Memory + DB